Skip to content

Commit 8a0716c

Browse files
Add optin setting to await MinPoolSize population
1 parent d17ef8b commit 8a0716c

File tree

2 files changed

+102
-37
lines changed

2 files changed

+102
-37
lines changed

internal/integration/unified/client_entity.go

Lines changed: 94 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.mongodb.org/mongo-driver/v2/mongo/options"
2525
"go.mongodb.org/mongo-driver/v2/mongo/readconcern"
2626
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
27+
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
2728
)
2829

2930
// There are no automated tests for truncation. Given that, setting the
@@ -59,6 +60,8 @@ type clientEntity struct {
5960
ignoredCommands map[string]struct{}
6061
observeSensitiveCommands *bool
6162
numConnsCheckedOut int32
63+
latestDesc event.TopologyDescription
64+
connsPerServer map[string]int
6265

6366
// These should not be changed after the clientEntity is initialized
6467
observedEvents map[monitoringEventType]struct{}
@@ -75,29 +78,6 @@ type clientEntity struct {
7578
logQueue chan orderedLogMessage
7679
}
7780

78-
// awaitMinimumPoolSize waits for the client's connection pool to reach the
79-
// specified minimum size. This is a best effort operation that times out after
80-
// some predefined amount of time to avoid blocking tests indefinitely.
81-
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
82-
// Don't spend longer than 500ms awaiting minPoolSize.
83-
awaitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
84-
defer cancel()
85-
86-
ticker := time.NewTicker(100 * time.Millisecond)
87-
defer ticker.Stop()
88-
89-
for {
90-
select {
91-
case <-awaitCtx.Done():
92-
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
93-
case <-ticker.C:
94-
if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize {
95-
return nil
96-
}
97-
}
98-
}
99-
}
100-
10181
func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) {
10282
// The "configureFailPoint" command should always be ignored.
10383
ignoredCommands := map[string]struct{}{
@@ -118,6 +98,7 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
11898
serverDescriptionChangedEventsCount: make(map[serverDescriptionChangedEventInfo]int32),
11999
entityMap: em,
120100
observeSensitiveCommands: entityOptions.ObserveSensitiveCommands,
101+
connsPerServer: make(map[string]int),
121102
}
122103
entity.setRecordEvents(true)
123104

@@ -226,8 +207,17 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
226207
return nil, fmt.Errorf("error creating mongo.Client: %w", err)
227208
}
228209

229-
if entityOptions.AwaitMinPoolSize && clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
230-
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize); err != nil {
210+
if entityOptions.AwaitMinPoolSizeMS != nil && *entityOptions.AwaitMinPoolSizeMS > 0 &&
211+
clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
212+
213+
if err := func() error {
214+
awaitDur := time.Duration(*entityOptions.AwaitMinPoolSizeMS) * time.Millisecond
215+
216+
awaitCtx, cancel := context.WithTimeout(ctx, awaitDur)
217+
defer cancel()
218+
219+
return awaitMinimumPoolSize(awaitCtx, entity, *clientOpts.MinPoolSize)
220+
}(); err != nil {
231221
return nil, err
232222
}
233223
}
@@ -476,6 +466,27 @@ func (c *clientEntity) processFailedEvent(_ context.Context, evt *event.CommandF
476466
}
477467
}
478468

469+
func (c *clientEntity) resetEventHistory() {
470+
c.eventProcessMu.Lock()
471+
defer c.eventProcessMu.Unlock()
472+
473+
c.pooled = nil
474+
c.serverDescriptionChanged = nil
475+
c.serverHeartbeatStartedEvent = nil
476+
c.serverHeartbeatSucceeded = nil
477+
c.serverHeartbeatFailedEvent = nil
478+
c.topologyDescriptionChanged = nil
479+
c.topologyOpening = nil
480+
c.topologyClosed = nil
481+
}
482+
483+
func (c *clientEntity) latestTopology() event.TopologyDescription {
484+
c.eventProcessMu.RLock()
485+
defer c.eventProcessMu.RUnlock()
486+
487+
return c.latestDesc
488+
}
489+
479490
func getPoolEventDocument(evt *event.PoolEvent, eventType monitoringEventType) bson.Raw {
480491
bsonBuilder := bsoncore.NewDocumentBuilder().
481492
AppendString("name", string(eventType)).
@@ -506,12 +517,21 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
506517
return
507518
}
508519

509-
// Update the connection counter. This happens even if we're not storing any events.
520+
// Update the connection counter. This happens even if we're not storing any
521+
// events.
510522
switch evt.Type {
511523
case event.ConnectionCheckedOut:
512524
atomic.AddInt32(&c.numConnsCheckedOut, 1)
513525
case event.ConnectionCheckedIn:
514526
atomic.AddInt32(&c.numConnsCheckedOut, -1)
527+
case event.ConnectionReady:
528+
c.eventProcessMu.Lock()
529+
c.connsPerServer[evt.Address]++
530+
c.eventProcessMu.Unlock()
531+
case event.ConnectionClosed:
532+
c.eventProcessMu.Lock()
533+
c.connsPerServer[evt.Address]--
534+
c.eventProcessMu.Unlock()
515535
}
516536

517537
eventType := monitoringEventTypeFromPoolEvent(evt)
@@ -529,6 +549,20 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
529549
}
530550
}
531551

552+
// connsReady returns the number of ready connections for the given server
553+
// address. If the server is not data-bearing, this method will return -1.
554+
func (c *clientEntity) connsReady(server event.ServerDescription) int {
555+
c.eventProcessMu.RLock()
556+
defer c.eventProcessMu.RUnlock()
557+
558+
if server.Kind == description.ServerKindRSArbiter.String() ||
559+
server.Kind == description.ServerKindRSGhost.String() {
560+
return -1
561+
}
562+
563+
return c.connsPerServer[server.Addr.String()]
564+
}
565+
532566
func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDescriptionChangedEvent) {
533567
c.eventProcessMu.Lock()
534568
defer c.eventProcessMu.Unlock()
@@ -601,6 +635,8 @@ func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.Topolog
601635
return
602636
}
603637

638+
c.latestDesc = evt.NewDescription
639+
604640
if _, ok := c.observedEvents[topologyDescriptionChangedEvent]; ok {
605641
c.topologyDescriptionChanged = append(c.topologyDescriptionChanged, evt)
606642
}
@@ -724,3 +760,35 @@ func evaluateUseMultipleMongoses(clientOpts *options.ClientOptions, useMultipleM
724760
}
725761
return nil
726762
}
763+
764+
// awaitMinimumPoolSize waits for the client's connection pool to reach the
765+
// specified minimum size. This is a best effort operation that times out after
766+
// some predefined amount of time to avoid blocking tests indefinitely.
767+
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
768+
ticker := time.NewTicker(100 * time.Millisecond)
769+
defer ticker.Stop()
770+
771+
for {
772+
select {
773+
case <-ctx.Done():
774+
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
775+
case <-ticker.C:
776+
ready := true
777+
for _, server := range entity.latestTopology().Servers {
778+
if r := entity.connsReady(server); r >= 0 && r < int(minPoolSize) {
779+
ready = false
780+
781+
// If any server has less than minPoolSize connections, continue
782+
// waiting.
783+
break
784+
}
785+
}
786+
787+
if ready {
788+
entity.resetEventHistory()
789+
790+
return nil
791+
}
792+
}
793+
}
794+
}

internal/integration/unified/entity.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ import (
2323
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
2424
)
2525

26-
var (
27-
// ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open
28-
ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open")
29-
)
26+
// ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open
27+
var ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open")
3028

3129
var (
3230
tlsCAFile = os.Getenv("CSFLE_TLS_CA_FILE")
@@ -83,11 +81,10 @@ type entityOptions struct {
8381

8482
ClientEncryptionOpts *clientEncryptionOpts `bson:"clientEncryptionOpts"`
8583

86-
// If true, the unified spec runner must wait for the connection pool to be
87-
// populated for all servers according to the minPoolSize option. If false,
88-
// not specified, or if minPoolSize equals 0, there is no need to wait for any
89-
// specific pool state.
90-
AwaitMinPoolSize bool `bson:"awaitMinPoolSize"`
84+
// Maximum duration (in milliseconds) that the test runner MUST wait for each
85+
// connection pool to be populated with minPoolSize. Any CMAP and SDAM events
86+
// that occur before the pool is populated will be ignored.
87+
AwaitMinPoolSizeMS *int `bson:"awaitMinPoolSizeMS"`
9188
}
9289

9390
func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) {
@@ -106,7 +103,8 @@ func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) {
106103
// newCollectionEntityOptions constructs an entity options object for a
107104
// collection.
108105
func newCollectionEntityOptions(id string, databaseID string, collectionName string,
109-
opts *dbOrCollectionOptions) *entityOptions {
106+
opts *dbOrCollectionOptions,
107+
) *entityOptions {
110108
options := &entityOptions{
111109
ID: id,
112110
DatabaseID: databaseID,
@@ -598,7 +596,6 @@ func getKmsCredential(kmsDocument bson.Raw, credentialName string, envVar string
598596
return "", fmt.Errorf("unable to get environment value for %v. Please set the CSFLE environment variable: %v", credentialName, envVar)
599597
}
600598
return os.Getenv(envVar), nil
601-
602599
}
603600

604601
func (em *EntityMap) addClientEncryptionEntity(entityOptions *entityOptions) error {

0 commit comments

Comments
 (0)