From 364deeca0754499b4c5d5d52a0545c49cf624845 Mon Sep 17 00:00:00 2001 From: Viktor Tsapovskiy Date: Mon, 17 Nov 2025 19:21:23 +0300 Subject: [PATCH] api: replaced Future.done with a sync.Cond This commit reduces allocations. Future.done allocation replaced with - Future.cond (sync.Cond) - Future.finished (bool) Other code use `Future.finished` instead `Future.done == nil` check. Added Future.finish() marks Future as done. Future.WaitChan() now creates channel on demand. Closes #496 --- CHANGELOG.md | 2 ++ MIGRATION.md | 2 ++ connection.go | 31 ++++++++++++++++--------- future.go | 64 ++++++++++++++++++++++++++++++++++++--------------- 4 files changed, 70 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f0b56dfd..cea2d57ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. * Added `box.MustNew` wrapper for `box.New` without an error (#448). * Added missing IPROTO feature flags to greeting negotiation (iproto.IPROTO_FEATURE_IS_SYNC, iproto.IPROTO_FEATURE_INSERT_ARROW) (#466). +* Added Future.cond (sync.Cond) and Future.finished bool. Added Future.finish() marks Future as done (#496). ### Changed @@ -28,6 +29,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. * `LogAppendPushFailed` replaced with `LogBoxSessionPushUnsupported` (#480). * Removed deprecated `Connection` methods, related interfaces and tests are updated (#479). * Replaced the use of optional types in crud with go-option library (#492). +* Future.done replaced with Future.cond (sync.Cond) + Future.finished bool (#496). ### Fixed diff --git a/MIGRATION.md b/MIGRATION.md index 0065f0b7f..2a31878e9 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -15,6 +15,7 @@ TODO * Removed `box.session.push()` support: Future.AppendPush() and Future.GetIterator() methods, ResponseIterator and TimeoutResponseIterator types. * Removed deprecated `Connection` methods, related interfaces and tests are updated. + *NOTE*: due to Future.GetTyped() doesn't decode SelectRequest into structure, substitute Connection.GetTyped() following the example: ```Go var singleTpl = Tuple{} @@ -30,6 +31,7 @@ TODO ).GetTyped(&tpl) singleTpl := tpl[0] ``` +* Future.done replaced with Future.cond (sync.Cond) + Future.finished bool. ## Migration from v1.x.x to v2.x.x diff --git a/connection.go b/connection.go index 0053556ea..d8e94bbe4 100644 --- a/connection.go +++ b/connection.go @@ -936,7 +936,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { ErrRateLimited, "Request is rate limited on client", } - fut.done = nil + fut.finish() return } } @@ -950,7 +950,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { ErrConnectionClosed, "using closed connection", } - fut.done = nil + fut.finish() shard.rmut.Unlock() return case connDisconnected: @@ -958,7 +958,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { ErrConnectionNotReady, "client connection is not ready", } - fut.done = nil + fut.finish() shard.rmut.Unlock() return case connShutdown: @@ -966,7 +966,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { ErrConnectionShutdown, "server shutdown in progress", } - fut.done = nil + fut.finish() shard.rmut.Unlock() return } @@ -995,7 +995,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { runtime.Gosched() select { case conn.rlimit <- struct{}{}: - case <-fut.done: + case <-fut.WaitChan(): if fut.err == nil { panic("fut.done is closed, but err is nil") } @@ -1009,12 +1009,12 @@ func (conn *Connection) newFuture(req Request) (fut *Future) { // is "done" before the response is come. func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) { select { - case <-fut.done: + case <-fut.WaitChan(): case <-ctx.Done(): } select { - case <-fut.done: + case <-fut.WaitChan(): return default: conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w", @@ -1036,7 +1036,12 @@ func (conn *Connection) send(req Request, streamId uint64) *Future { conn.incrementRequestCnt() fut := conn.newFuture(req) - if fut.done == nil { + + fut.mutex.Lock() + is_done := fut.finished + fut.mutex.Unlock() + + if is_done { conn.decrementRequestCnt() return fut } @@ -1059,12 +1064,16 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) { shardn := fut.requestId & (conn.opts.Concurrency - 1) shard := &conn.shard[shardn] shard.bufmut.Lock() - select { - case <-fut.done: + + fut.mutex.Lock() + is_done := fut.finished + fut.mutex.Unlock() + + if is_done { shard.bufmut.Unlock() return - default: } + firstWritten := shard.buf.Len() == 0 if shard.buf.Cap() == 0 { shard.buf.b = make([]byte, 0, 128) diff --git a/future.go b/future.go index 0f882014a..780646b9b 100644 --- a/future.go +++ b/future.go @@ -15,32 +15,39 @@ type Future struct { mutex sync.Mutex resp Response err error + cond sync.Cond + finished bool done chan struct{} } func (fut *Future) wait() { - if fut.done == nil { - return + fut.mutex.Lock() + defer fut.mutex.Unlock() + + for !fut.finished { + fut.cond.Wait() } - <-fut.done } -func (fut *Future) isDone() bool { - if fut.done == nil { - return true - } - select { - case <-fut.done: - return true - default: - return false +func (fut *Future) finish() { + fut.mutex.Lock() + defer fut.mutex.Unlock() + + fut.finished = true + + if fut.done != nil { + close(fut.done) } + + fut.cond.Broadcast() } // NewFuture creates a new empty Future for a given Request. func NewFuture(req Request) (fut *Future) { fut = &Future{} - fut.done = make(chan struct{}) + fut.done = nil + fut.finished = false + fut.cond.L = &fut.mutex fut.req = req return fut } @@ -50,7 +57,7 @@ func (fut *Future) SetResponse(header Header, body io.Reader) error { fut.mutex.Lock() defer fut.mutex.Unlock() - if fut.isDone() { + if fut.finished { return nil } @@ -60,7 +67,14 @@ func (fut *Future) SetResponse(header Header, body io.Reader) error { } fut.resp = resp - close(fut.done) + fut.finished = true + + if fut.done != nil { + close(fut.done) + } + + fut.cond.Broadcast() + return nil } @@ -69,12 +83,18 @@ func (fut *Future) SetError(err error) { fut.mutex.Lock() defer fut.mutex.Unlock() - if fut.isDone() { + if fut.finished { return } fut.err = err - close(fut.done) + fut.finished = true + + if fut.done != nil { + close(fut.done) + } + + fut.cond.Broadcast() } // GetResponse waits for Future to be filled and returns Response and error. @@ -122,8 +142,16 @@ func init() { // WaitChan returns channel which becomes closed when response arrived or error occurred. func (fut *Future) WaitChan() <-chan struct{} { - if fut.done == nil { + fut.mutex.Lock() + defer fut.mutex.Unlock() + + if fut.finished { return closedChan } + + if fut.done == nil { + fut.done = make(chan struct{}) + } + return fut.done }