Skip to content

Commit fce4695

Browse files
Add optin setting to await MinPoolSize population
1 parent 4098fda commit fce4695

File tree

2 files changed

+94
-37
lines changed

2 files changed

+94
-37
lines changed

internal/integration/unified/client_entity.go

Lines changed: 86 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type clientEntity struct {
5959
ignoredCommands map[string]struct{}
6060
observeSensitiveCommands *bool
6161
numConnsCheckedOut int32
62+
latestDesc event.TopologyDescription
63+
connsPerServer map[string]int
6264

6365
// These should not be changed after the clientEntity is initialized
6466
observedEvents map[monitoringEventType]struct{}
@@ -75,29 +77,6 @@ type clientEntity struct {
7577
logQueue chan orderedLogMessage
7678
}
7779

78-
// awaitMinimumPoolSize waits for the client's connection pool to reach the
79-
// specified minimum size. This is a best effort operation that times out after
80-
// some predefined amount of time to avoid blocking tests indefinitely.
81-
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
82-
// Don't spend longer than 500ms awaiting minPoolSize.
83-
awaitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
84-
defer cancel()
85-
86-
ticker := time.NewTicker(100 * time.Millisecond)
87-
defer ticker.Stop()
88-
89-
for {
90-
select {
91-
case <-awaitCtx.Done():
92-
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
93-
case <-ticker.C:
94-
if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize {
95-
return nil
96-
}
97-
}
98-
}
99-
}
100-
10180
func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) {
10281
// The "configureFailPoint" command should always be ignored.
10382
ignoredCommands := map[string]struct{}{
@@ -118,6 +97,7 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
11897
serverDescriptionChangedEventsCount: make(map[serverDescriptionChangedEventInfo]int32),
11998
entityMap: em,
12099
observeSensitiveCommands: entityOptions.ObserveSensitiveCommands,
100+
connsPerServer: make(map[string]int),
121101
}
122102
entity.setRecordEvents(true)
123103

@@ -226,8 +206,15 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
226206
return nil, fmt.Errorf("error creating mongo.Client: %w", err)
227207
}
228208

229-
if entityOptions.AwaitMinPoolSize && clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
230-
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize); err != nil {
209+
if entityOptions.AwaitMinPoolSizeMS != nil && *entityOptions.AwaitMinPoolSizeMS > 0 &&
210+
clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
211+
212+
if err := func() error {
213+
awaitCtx, cancel := context.WithTimeout(ctx, time.Duration(*entityOptions.AwaitMinPoolSizeMS)*time.Millisecond)
214+
defer cancel()
215+
216+
return awaitMinimumPoolSize(awaitCtx, entity, *clientOpts.MinPoolSize)
217+
}(); err != nil {
231218
return nil, err
232219
}
233220
}
@@ -476,6 +463,27 @@ func (c *clientEntity) processFailedEvent(_ context.Context, evt *event.CommandF
476463
}
477464
}
478465

466+
func (c *clientEntity) resetEventHistory() {
467+
c.eventProcessMu.Lock()
468+
defer c.eventProcessMu.Unlock()
469+
470+
c.pooled = nil
471+
c.serverDescriptionChanged = nil
472+
c.serverHeartbeatStartedEvent = nil
473+
c.serverHeartbeatSucceeded = nil
474+
c.serverHeartbeatFailedEvent = nil
475+
c.topologyDescriptionChanged = nil
476+
c.topologyOpening = nil
477+
c.topologyClosed = nil
478+
}
479+
480+
func (c *clientEntity) latestTopology() event.TopologyDescription {
481+
c.eventProcessMu.RLock()
482+
defer c.eventProcessMu.RUnlock()
483+
484+
return c.latestDesc
485+
}
486+
479487
func getPoolEventDocument(evt *event.PoolEvent, eventType monitoringEventType) bson.Raw {
480488
bsonBuilder := bsoncore.NewDocumentBuilder().
481489
AppendString("name", string(eventType)).
@@ -506,12 +514,21 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
506514
return
507515
}
508516

509-
// Update the connection counter. This happens even if we're not storing any events.
517+
// Update the connection counter. This happens even if we're not storing any
518+
// events.
510519
switch evt.Type {
511520
case event.ConnectionCheckedOut:
512521
atomic.AddInt32(&c.numConnsCheckedOut, 1)
513522
case event.ConnectionCheckedIn:
514523
atomic.AddInt32(&c.numConnsCheckedOut, -1)
524+
case event.ConnectionReady:
525+
c.eventProcessMu.Lock()
526+
c.connsPerServer[evt.Address]++
527+
c.eventProcessMu.Unlock()
528+
case event.ConnectionClosed:
529+
c.eventProcessMu.Lock()
530+
c.connsPerServer[evt.Address]--
531+
c.eventProcessMu.Unlock()
515532
}
516533

517534
eventType := monitoringEventTypeFromPoolEvent(evt)
@@ -529,6 +546,15 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
529546
}
530547
}
531548

549+
// connsReady returns the number of ready connections for the given server
550+
// address.
551+
func (c *clientEntity) connsReady(serverAddr string) int {
552+
c.eventProcessMu.RLock()
553+
defer c.eventProcessMu.RUnlock()
554+
555+
return c.connsPerServer[serverAddr]
556+
}
557+
532558
func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDescriptionChangedEvent) {
533559
c.eventProcessMu.Lock()
534560
defer c.eventProcessMu.Unlock()
@@ -601,6 +627,8 @@ func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.Topolog
601627
return
602628
}
603629

630+
c.latestDesc = evt.NewDescription
631+
604632
if _, ok := c.observedEvents[topologyDescriptionChangedEvent]; ok {
605633
c.topologyDescriptionChanged = append(c.topologyDescriptionChanged, evt)
606634
}
@@ -724,3 +752,35 @@ func evaluateUseMultipleMongoses(clientOpts *options.ClientOptions, useMultipleM
724752
}
725753
return nil
726754
}
755+
756+
// awaitMinimumPoolSize waits for the client's connection pool to reach the
757+
// specified minimum size. This is a best effort operation that times out after
758+
// some predefined amount of time to avoid blocking tests indefinitely.
759+
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
760+
ticker := time.NewTicker(100 * time.Millisecond)
761+
defer ticker.Stop()
762+
763+
for {
764+
select {
765+
case <-ctx.Done():
766+
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
767+
case <-ticker.C:
768+
ready := true
769+
for _, server := range entity.latestTopology().Servers {
770+
if entity.connsReady(server.Addr.String()) < int(minPoolSize) {
771+
ready = false
772+
773+
// If any server has less than minPoolSize connections, continue
774+
// waiting.
775+
break
776+
}
777+
}
778+
779+
if ready {
780+
entity.resetEventHistory()
781+
782+
return nil
783+
}
784+
}
785+
}
786+
}

internal/integration/unified/entity.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ import (
2323
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
2424
)
2525

26-
var (
27-
// ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open
28-
ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open")
29-
)
26+
// ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open
27+
var ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open")
3028

3129
var (
3230
tlsCAFile = os.Getenv("CSFLE_TLS_CA_FILE")
@@ -83,11 +81,10 @@ type entityOptions struct {
8381

8482
ClientEncryptionOpts *clientEncryptionOpts `bson:"clientEncryptionOpts"`
8583

86-
// If true, the unified spec runner must wait for the connection pool to be
87-
// populated for all servers according to the minPoolSize option. If false,
88-
// not specified, or if minPoolSize equals 0, there is no need to wait for any
89-
// specific pool state.
90-
AwaitMinPoolSize bool `bson:"awaitMinPoolSize"`
84+
// Maximum duration (in milliseconds) that the test runner MUST wait for each
85+
// connection pool to be populated with minPoolSize. Any CMAP and SDAM events
86+
// that occur before the pool is populated will be ignored.
87+
AwaitMinPoolSizeMS *int `bson:"awaitMinPoolSizeMS"`
9188
}
9289

9390
func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) {
@@ -106,7 +103,8 @@ func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) {
106103
// newCollectionEntityOptions constructs an entity options object for a
107104
// collection.
108105
func newCollectionEntityOptions(id string, databaseID string, collectionName string,
109-
opts *dbOrCollectionOptions) *entityOptions {
106+
opts *dbOrCollectionOptions,
107+
) *entityOptions {
110108
options := &entityOptions{
111109
ID: id,
112110
DatabaseID: databaseID,
@@ -598,7 +596,6 @@ func getKmsCredential(kmsDocument bson.Raw, credentialName string, envVar string
598596
return "", fmt.Errorf("unable to get environment value for %v. Please set the CSFLE environment variable: %v", credentialName, envVar)
599597
}
600598
return os.Getenv(envVar), nil
601-
602599
}
603600

604601
func (em *EntityMap) addClientEncryptionEntity(entityOptions *entityOptions) error {

0 commit comments

Comments
 (0)