From 31b51f308cdee2f84e4ddd94791b1ffe8e0e8bb6 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Wed, 10 Dec 2025 14:55:45 -0500 Subject: [PATCH 1/7] feat: add option to rebuild gRPC connection on error Signed-off-by: Todd Baert --- .../contrib/providers/flagd/Config.java | 2 + .../contrib/providers/flagd/FlagdOptions.java | 10 +++ .../resolver/process/InProcessResolver.java | 21 ++++- .../connector/sync/SyncStreamQueueSource.java | 77 +++++++++++++++--- .../process/InProcessResolverTest.java | 41 +++++++++- .../sync/SyncStreamQueueSourceTest.java | 81 +++++++++++++++++-- 6 files changed, 208 insertions(+), 24 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java index 417826437..1a3e1d352 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java @@ -20,6 +20,7 @@ public final class Config { static final int DEFAULT_MAX_CACHE_SIZE = 1000; static final int DEFAULT_OFFLINE_POLL_MS = 5000; static final long DEFAULT_KEEP_ALIVE = 0; + static final String DEFAULT_REINITIALIZE_ON_ERROR = "false"; static final String RESOLVER_ENV_VAR = "FLAGD_RESOLVER"; static final String HOST_ENV_VAR_NAME = "FLAGD_HOST"; @@ -51,6 +52,7 @@ public final class Config { static final String KEEP_ALIVE_MS_ENV_VAR_NAME = "FLAGD_KEEP_ALIVE_TIME_MS"; static final String TARGET_URI_ENV_VAR_NAME = "FLAGD_TARGET_URI"; static final String STREAM_RETRY_GRACE_PERIOD = "FLAGD_RETRY_GRACE_PERIOD"; + static final String REINITIALIZE_ON_ERROR_ENV_VAR_NAME = "FLAGD_REINITIALIZE_ON_ERROR"; static final String RESOLVER_RPC = "rpc"; static final String RESOLVER_IN_PROCESS = "in-process"; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java index 17e86e6d1..4cda34df4 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java @@ -204,6 +204,16 @@ public class FlagdOptions { @Builder.Default private String defaultAuthority = fallBackToEnvOrDefault(Config.DEFAULT_AUTHORITY_ENV_VAR_NAME, null); + /** + * !EXPERIMENTAL! + * Whether to reinitialize the channel (TCP connection) after the grace period is exceeded. + * This can help recover from connection issues by creating fresh connections. + * Particularly useful for troubleshooting network issues related to proxies or service meshes. + */ + @Builder.Default + private boolean reinitializeOnError = Boolean.parseBoolean( + fallBackToEnvOrDefault(Config.REINITIALIZE_ON_ERROR_ENV_VAR_NAME, Config.DEFAULT_REINITIALIZE_ON_ERROR)); + /** * Builder overwrite in order to customize the "build" method. * diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index e02cdd59b..7cc648b8b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -41,6 +41,7 @@ public class InProcessResolver implements Resolver { private final Consumer onConnectionEvent; private final Operator operator; private final String scope; + private final QueueSource queueSource; /** * Resolves flag values using @@ -52,7 +53,8 @@ public class InProcessResolver implements Resolver { * connection/stream */ public InProcessResolver(FlagdOptions options, Consumer onConnectionEvent) { - this.flagStore = new FlagStore(getConnector(options)); + this.queueSource = getQueueSource(options); + this.flagStore = new FlagStore(queueSource); this.onConnectionEvent = onConnectionEvent; this.operator = new Operator(); this.scope = options.getSelector(); @@ -94,6 +96,21 @@ public void init() throws Exception { stateWatcher.start(); } + /** + * Called when the provider enters error state after grace period. + * Attempts to reinitialize the sync connector if enabled. + */ + @Override + public void onError() { + if (queueSource instanceof SyncStreamQueueSource) { + SyncStreamQueueSource syncConnector = (SyncStreamQueueSource) queueSource; + if (syncConnector.getStreamQueue() != null) { + // Only reinitialize if option is enabled + syncConnector.reinitializeChannelComponents(); + } + } + } + /** * Shutdown in-process resolver. * @@ -147,7 +164,7 @@ public ProviderEvaluation objectEvaluation(String key, Value defaultValue .build(); } - static QueueSource getConnector(final FlagdOptions options) { + static QueueSource getQueueSource(final FlagdOptions options) { if (options.getCustomConnector() != null) { return options.getCustomConnector(); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index a033fac3b..7d2feaa08 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -4,6 +4,7 @@ import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector; +import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource; @@ -23,15 +24,18 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + import lombok.extern.slf4j.Slf4j; /** - * Implements the {@link QueueSource} contract and emit flags obtained from flagd sync gRPC contract. + * Implements the {@link QueueSource} contract and emit flags obtained from + * flagd sync gRPC contract. */ @Slf4j @SuppressFBWarnings( value = {"EI_EXPOSE_REP"}, - justification = "Random is used to generate a variation & flag configurations require exposing") + justification = "We need to expose the BlockingQueue to allow consumers to read from it") public class SyncStreamQueueSource implements QueueSource { private static final int QUEUE_SIZE = 5; @@ -43,13 +47,16 @@ public class SyncStreamQueueSource implements QueueSource { private final String selector; private final String providerId; private final boolean syncMetadataDisabled; - private final ChannelConnector channelConnector; + private final boolean reinitializeOnError; + private final FlagdOptions options; private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); - private final FlagSyncServiceStub flagSyncStub; - private final FlagSyncServiceBlockingStub metadataStub; + private volatile ChannelConnector channelConnector; + private volatile FlagSyncServiceStub flagSyncStub; + private volatile FlagSyncServiceBlockingStub metadataStub; /** - * Creates a new SyncStreamQueueSource responsible for observing the event stream. + * Creates a new SyncStreamQueueSource responsible for observing the event + * stream. */ public SyncStreamQueueSource(final FlagdOptions options) { streamDeadline = options.getStreamDeadlineMs(); @@ -58,11 +65,9 @@ public SyncStreamQueueSource(final FlagdOptions options) { providerId = options.getProviderId(); maxBackoffMs = options.getRetryBackoffMaxMs(); syncMetadataDisabled = options.isSyncMetadataDisabled(); - channelConnector = new ChannelConnector(options, ChannelBuilder.nettyChannel(options)); - flagSyncStub = - FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady(); - metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel()) - .withWaitForReady(); + reinitializeOnError = options.isReinitializeOnError(); + this.options = options; + initializeChannelComponents(); } // internal use only @@ -79,7 +84,51 @@ protected SyncStreamQueueSource( maxBackoffMs = options.getRetryBackoffMaxMs(); flagSyncStub = stubMock; syncMetadataDisabled = options.isSyncMetadataDisabled(); + reinitializeOnError = options.isReinitializeOnError(); metadataStub = blockingStubMock; + this.options = options; + } + + /** Initialize channel connector and stubs. */ + private synchronized void initializeChannelComponents() { + ChannelConnector newConnector = + new ChannelConnector(options, ChannelBuilder.nettyChannel(options)); + FlagSyncServiceStub newFlagSyncStub = + FlagSyncServiceGrpc.newStub(newConnector.getChannel()).withWaitForReady(); + FlagSyncServiceBlockingStub newMetadataStub = + FlagSyncServiceGrpc.newBlockingStub(newConnector.getChannel()).withWaitForReady(); + + // Atomic assignment of all components + channelConnector = newConnector; + flagSyncStub = newFlagSyncStub; + metadataStub = newMetadataStub; + } + + /** Reinitialize channel connector and stubs on error. */ + public synchronized void reinitializeChannelComponents() { + if (!reinitializeOnError || shutdown.get()) { + return; + } + + log.info("Reinitializing channel gRPC components in attempt to restore stream..."); + ChannelConnector oldConnector = channelConnector; + + try { + // Create new channel components first + initializeChannelComponents(); + } catch (Exception e) { + log.error("Failed to reinitialize channel components", e); + return; + } + + // Shutdown old connector after successful reinitialization + if (oldConnector != null) { + try { + oldConnector.shutdown(); + } catch (Exception e) { + log.debug("Error shutting down old channel connector during reinitialization", e); + } + } } /** Initialize sync stream connector. */ @@ -156,7 +205,8 @@ private void observeSyncStream() { log.info("Shutdown invoked, exiting event stream listener"); } - // TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584 + // TODO: remove the metadata call entirely after + // https://github.com/open-feature/flagd/issues/1584 private Struct getMetadata() { if (syncMetadataDisabled) { return null; @@ -177,7 +227,8 @@ private Struct getMetadata() { return null; } catch (StatusRuntimeException e) { - // In newer versions of flagd, metadata is part of the sync stream. If the method is unimplemented, we + // In newer versions of flagd, metadata is part of the sync stream. If the + // method is unimplemented, we // can ignore the error if (e.getStatus() != null && Status.Code.UNIMPLEMENTED.equals(e.getStatus().getCode())) { diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java index 7c4820a17..175a9bfbc 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java @@ -17,6 +17,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import dev.openfeature.contrib.providers.flagd.Config; import dev.openfeature.contrib.providers.flagd.FlagdOptions; @@ -51,6 +56,36 @@ import org.junit.jupiter.api.Test; class InProcessResolverTest { + @Test + void onError_reinitializesOnlyIfOptionTrue() throws Exception { + // Setup: option true, should call reinitializeChannelComponents + FlagdOptions options = FlagdOptions.builder().reinitializeOnError(true).build(); + SyncStreamQueueSource mockConnector = mock(SyncStreamQueueSource.class); + // Mock getStreamQueue to return a non-null queue + when(mockConnector.getStreamQueue()).thenReturn(new LinkedBlockingQueue<>()); + InProcessResolver resolver = new InProcessResolver(options, e -> {}); + // Inject mock connector + java.lang.reflect.Field queueSourceField = InProcessResolver.class.getDeclaredField("queueSource"); + queueSourceField.setAccessible(true); + queueSourceField.set(resolver, mockConnector); + resolver.onError(); + + verify(mockConnector, times(1)).reinitializeChannelComponents(); + } + + @Test + void onError_doesNotReinitializeIfOptionFalse() throws Exception { + // Setup: option false, should NOT call reinitializeChannelComponents + FlagdOptions options = FlagdOptions.builder().reinitializeOnError(false).build(); + SyncStreamQueueSource mockConnector = mock(SyncStreamQueueSource.class); + InProcessResolver resolver = new InProcessResolver(options, e -> {}); + // Inject mock connector + java.lang.reflect.Field queueSourceField = InProcessResolver.class.getDeclaredField("queueSource"); + queueSourceField.setAccessible(true); + queueSourceField.set(resolver, mockConnector); + resolver.onError(); + verify(mockConnector, never()).reinitializeChannelComponents(); + } @Test public void connectorSetup() { @@ -70,9 +105,9 @@ public void connectorSetup() { .build(); // then - assertInstanceOf(SyncStreamQueueSource.class, InProcessResolver.getConnector(forGrpcOptions)); - assertInstanceOf(FileQueueSource.class, InProcessResolver.getConnector(forOfflineOptions)); - assertInstanceOf(MockConnector.class, InProcessResolver.getConnector(forCustomConnectorOptions)); + assertInstanceOf(SyncStreamQueueSource.class, InProcessResolver.getQueueSource(forGrpcOptions)); + assertInstanceOf(FileQueueSource.class, InProcessResolver.getQueueSource(forOfflineOptions)); + assertInstanceOf(MockConnector.class, InProcessResolver.getQueueSource(forCustomConnectorOptions)); } @Test diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java index ccc979a9f..d26f57613 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java @@ -32,6 +32,67 @@ import org.mockito.stubbing.Answer; class SyncStreamQueueSourceTest { + @Test + void reinitializeChannelComponents_reinitializesWhenEnabled() { + FlagdOptions options = FlagdOptions.builder().reinitializeOnError(true).build(); + ChannelConnector initialConnector = mock(ChannelConnector.class); + FlagSyncServiceStub initialStub = mock(FlagSyncServiceStub.class); + FlagSyncServiceBlockingStub initialBlockingStub = mock(FlagSyncServiceBlockingStub.class); + SyncStreamQueueSource queueSource = + new SyncStreamQueueSource(options, initialConnector, initialStub, initialBlockingStub); + + // Save references + ChannelConnector oldConnector = (ChannelConnector) getPrivateField(queueSource, "channelConnector"); + queueSource.reinitializeChannelComponents(); + ChannelConnector newConnector = (ChannelConnector) getPrivateField(queueSource, "channelConnector"); + // Should have replaced channelConnector + assertNotNull(newConnector); + org.junit.jupiter.api.Assertions.assertNotSame(oldConnector, newConnector); + } + + @Test + void reinitializeChannelComponents_doesNothingWhenDisabled() { + FlagdOptions options = FlagdOptions.builder().reinitializeOnError(false).build(); + ChannelConnector initialConnector = mock(ChannelConnector.class); + FlagSyncServiceStub initialStub = mock(FlagSyncServiceStub.class); + FlagSyncServiceBlockingStub initialBlockingStub = mock(FlagSyncServiceBlockingStub.class); + SyncStreamQueueSource queueSource = + new SyncStreamQueueSource(options, initialConnector, initialStub, initialBlockingStub); + + ChannelConnector oldConnector = (ChannelConnector) getPrivateField(queueSource, "channelConnector"); + queueSource.reinitializeChannelComponents(); + ChannelConnector newConnector = (ChannelConnector) getPrivateField(queueSource, "channelConnector"); + // Should NOT have replaced channelConnector + org.junit.jupiter.api.Assertions.assertSame(oldConnector, newConnector); + } + + @Test + void reinitializeChannelComponents_doesNothingWhenShutdown() throws InterruptedException { + FlagdOptions options = FlagdOptions.builder().reinitializeOnError(true).build(); + ChannelConnector initialConnector = mock(ChannelConnector.class); + FlagSyncServiceStub initialStub = mock(FlagSyncServiceStub.class); + FlagSyncServiceBlockingStub initialBlockingStub = mock(FlagSyncServiceBlockingStub.class); + SyncStreamQueueSource queueSource = + new SyncStreamQueueSource(options, initialConnector, initialStub, initialBlockingStub); + + queueSource.shutdown(); + ChannelConnector oldConnector = (ChannelConnector) getPrivateField(queueSource, "channelConnector"); + queueSource.reinitializeChannelComponents(); + ChannelConnector newConnector = (ChannelConnector) getPrivateField(queueSource, "channelConnector"); + // Should NOT have replaced channelConnector + org.junit.jupiter.api.Assertions.assertSame(oldConnector, newConnector); + } + // Helper to access private fields via reflection + private static Object getPrivateField(Object instance, String fieldName) { + try { + java.lang.reflect.Field field = instance.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return field.get(instance); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private ChannelConnector mockConnector; private FlagSyncServiceBlockingStub blockingStub; private FlagSyncServiceStub stub; @@ -41,6 +102,7 @@ class SyncStreamQueueSourceTest { private CountDownLatch latch; // used to wait for observer to be initialized @BeforeEach + @SuppressWarnings("deprecation") public void setup() throws Exception { blockingStub = mock(FlagSyncServiceBlockingStub.class); when(blockingStub.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStub); @@ -52,29 +114,35 @@ public void setup() throws Exception { when(stub.withDeadlineAfter(anyLong(), any())).thenReturn(stub); doAnswer((Answer) invocation -> { Object[] args = invocation.getArguments(); - observer = (StreamObserver) args[1]; + @SuppressWarnings("unchecked") + StreamObserver obs = (StreamObserver) args[1]; + observer = obs; latch.countDown(); return null; }) .when(stub) - .syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize + .syncFlags(any(SyncFlagsRequest.class), any()); // Mock the initialize syncErrorStub = mock(FlagSyncServiceStub.class); when(syncErrorStub.withDeadlineAfter(anyLong(), any())).thenReturn(syncErrorStub); doAnswer((Answer) invocation -> { Object[] args = invocation.getArguments(); - observer = (StreamObserver) args[1]; + @SuppressWarnings("unchecked") + StreamObserver obs = (StreamObserver) args[1]; + observer = obs; latch.countDown(); throw new StatusRuntimeException(io.grpc.Status.NOT_FOUND); }) .when(syncErrorStub) - .syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize + .syncFlags(any(SyncFlagsRequest.class), any()); // Mock the initialize asyncErrorStub = mock(FlagSyncServiceStub.class); when(asyncErrorStub.withDeadlineAfter(anyLong(), any())).thenReturn(asyncErrorStub); doAnswer((Answer) invocation -> { Object[] args = invocation.getArguments(); - observer = (StreamObserver) args[1]; + @SuppressWarnings("unchecked") + StreamObserver obs = (StreamObserver) args[1]; + observer = obs; latch.countDown(); // Start a thread to call onError after a short delay @@ -91,7 +159,7 @@ public void setup() throws Exception { return null; }) .when(asyncErrorStub) - .syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize + .syncFlags(any(SyncFlagsRequest.class), any()); // Mock the initialize } @Test @@ -166,6 +234,7 @@ void onNextEnqueuesDataPayload() throws Exception { } @Test + @SuppressWarnings("deprecation") void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception { // disable GetMetadata call SyncStreamQueueSource queueSource = new SyncStreamQueueSource( From a60a5b67ece026ccb9257713e597dfa50940e7cc Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Wed, 10 Dec 2025 15:40:25 -0500 Subject: [PATCH 2/7] fixup: feedback Signed-off-by: Todd Baert --- .../resolver/process/InProcessResolver.java | 6 +-- .../connector/sync/SyncStreamQueueSource.java | 48 ++++++++++++------- .../process/InProcessResolverTest.java | 30 ++++-------- .../sync/SyncStreamQueueSourceTest.java | 44 ++++++++--------- 4 files changed, 63 insertions(+), 65 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index 7cc648b8b..c898aef3a 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -104,10 +104,8 @@ public void init() throws Exception { public void onError() { if (queueSource instanceof SyncStreamQueueSource) { SyncStreamQueueSource syncConnector = (SyncStreamQueueSource) queueSource; - if (syncConnector.getStreamQueue() != null) { - // Only reinitialize if option is enabled - syncConnector.reinitializeChannelComponents(); - } + // only reinitialize if option is enabled + syncConnector.reinitializeChannelComponents(); } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 7d2feaa08..56a770aa9 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -50,9 +50,24 @@ public class SyncStreamQueueSource implements QueueSource { private final boolean reinitializeOnError; private final FlagdOptions options; private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); - private volatile ChannelConnector channelConnector; - private volatile FlagSyncServiceStub flagSyncStub; - private volatile FlagSyncServiceBlockingStub metadataStub; + private volatile GrpcComponents grpcComponents; + + /** + * Container for gRPC components to ensure atomicity during reinitialization. + * All three components are updated together to prevent consumers from seeing + * an inconsistent state where components are from different channel instances. + */ + private static class GrpcComponents { + final ChannelConnector channelConnector; + final FlagSyncServiceStub flagSyncStub; + final FlagSyncServiceBlockingStub metadataStub; + + GrpcComponents(ChannelConnector connector, FlagSyncServiceStub stub, FlagSyncServiceBlockingStub blockingStub) { + this.channelConnector = connector; + this.flagSyncStub = stub; + this.metadataStub = blockingStub; + } + } /** * Creates a new SyncStreamQueueSource responsible for observing the event @@ -80,13 +95,12 @@ protected SyncStreamQueueSource( deadline = options.getDeadline(); selector = options.getSelector(); providerId = options.getProviderId(); - channelConnector = connectorMock; maxBackoffMs = options.getRetryBackoffMaxMs(); - flagSyncStub = stubMock; syncMetadataDisabled = options.isSyncMetadataDisabled(); reinitializeOnError = options.isReinitializeOnError(); - metadataStub = blockingStubMock; this.options = options; + //this.onConnectionEvent = null; + this.grpcComponents = new GrpcComponents(connectorMock, stubMock, blockingStubMock); } /** Initialize channel connector and stubs. */ @@ -98,10 +112,8 @@ private synchronized void initializeChannelComponents() { FlagSyncServiceBlockingStub newMetadataStub = FlagSyncServiceGrpc.newBlockingStub(newConnector.getChannel()).withWaitForReady(); - // Atomic assignment of all components - channelConnector = newConnector; - flagSyncStub = newFlagSyncStub; - metadataStub = newMetadataStub; + // atomic assignment of all components as a single unit + grpcComponents = new GrpcComponents(newConnector, newFlagSyncStub, newMetadataStub); } /** Reinitialize channel connector and stubs on error. */ @@ -111,20 +123,20 @@ public synchronized void reinitializeChannelComponents() { } log.info("Reinitializing channel gRPC components in attempt to restore stream..."); - ChannelConnector oldConnector = channelConnector; + GrpcComponents oldComponents = grpcComponents; try { - // Create new channel components first + // create new channel components first initializeChannelComponents(); } catch (Exception e) { log.error("Failed to reinitialize channel components", e); return; } - // Shutdown old connector after successful reinitialization - if (oldConnector != null) { + // shutdown old connector after successful reinitialization + if (oldComponents != null && oldComponents.channelConnector != null) { try { - oldConnector.shutdown(); + oldComponents.channelConnector.shutdown(); } catch (Exception e) { log.debug("Error shutting down old channel connector during reinitialization", e); } @@ -155,7 +167,7 @@ public void shutdown() throws InterruptedException { log.debug("Shutdown already in progress or completed"); return; } - this.channelConnector.shutdown(); + grpcComponents.channelConnector.shutdown(); } /** Contains blocking calls, to be used concurrently. */ @@ -212,7 +224,7 @@ private Struct getMetadata() { return null; } - FlagSyncServiceBlockingStub localStub = metadataStub; + FlagSyncServiceBlockingStub localStub = grpcComponents.metadataStub; if (deadline > 0) { localStub = localStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); @@ -240,7 +252,7 @@ private Struct getMetadata() { } private void syncFlags(SyncStreamObserver streamObserver) { - FlagSyncServiceStub localStub = flagSyncStub; // don't mutate the stub + FlagSyncServiceStub localStub = grpcComponents.flagSyncStub; // don't mutate the stub if (streamDeadline > 0) { localStub = localStub.withDeadlineAfter(streamDeadline, TimeUnit.MILLISECONDS); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java index 175a9bfbc..34c660702 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java @@ -18,10 +18,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import dev.openfeature.contrib.providers.flagd.Config; import dev.openfeature.contrib.providers.flagd.FlagdOptions; @@ -57,36 +55,26 @@ class InProcessResolverTest { @Test - void onError_reinitializesOnlyIfOptionTrue() throws Exception { - // Setup: option true, should call reinitializeChannelComponents - FlagdOptions options = FlagdOptions.builder().reinitializeOnError(true).build(); + void onError_delegatesToQueueSource() throws Exception { + // given + FlagdOptions options = FlagdOptions.builder().build(); // option value doesn't matter here SyncStreamQueueSource mockConnector = mock(SyncStreamQueueSource.class); - // Mock getStreamQueue to return a non-null queue - when(mockConnector.getStreamQueue()).thenReturn(new LinkedBlockingQueue<>()); InProcessResolver resolver = new InProcessResolver(options, e -> {}); + // Inject mock connector java.lang.reflect.Field queueSourceField = InProcessResolver.class.getDeclaredField("queueSource"); queueSourceField.setAccessible(true); queueSourceField.set(resolver, mockConnector); + + // when resolver.onError(); + // then + // InProcessResolver should always delegate to the queue source. + // The decision to re-initialize or not is handled within SyncStreamQueueSource. verify(mockConnector, times(1)).reinitializeChannelComponents(); } - @Test - void onError_doesNotReinitializeIfOptionFalse() throws Exception { - // Setup: option false, should NOT call reinitializeChannelComponents - FlagdOptions options = FlagdOptions.builder().reinitializeOnError(false).build(); - SyncStreamQueueSource mockConnector = mock(SyncStreamQueueSource.class); - InProcessResolver resolver = new InProcessResolver(options, e -> {}); - // Inject mock connector - java.lang.reflect.Field queueSourceField = InProcessResolver.class.getDeclaredField("queueSource"); - queueSourceField.setAccessible(true); - queueSourceField.set(resolver, mockConnector); - resolver.onError(); - verify(mockConnector, never()).reinitializeChannelComponents(); - } - @Test public void connectorSetup() { // given diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java index d26f57613..22c4a4aa3 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java @@ -41,13 +41,13 @@ void reinitializeChannelComponents_reinitializesWhenEnabled() { SyncStreamQueueSource queueSource = new SyncStreamQueueSource(options, initialConnector, initialStub, initialBlockingStub); - // Save references - ChannelConnector oldConnector = (ChannelConnector) getPrivateField(queueSource, "channelConnector"); + // save reference to old GrpcComponents + Object oldComponents = getPrivateField(queueSource, "grpcComponents"); queueSource.reinitializeChannelComponents(); - ChannelConnector newConnector = (ChannelConnector) getPrivateField(queueSource, "channelConnector"); - // Should have replaced channelConnector - assertNotNull(newConnector); - org.junit.jupiter.api.Assertions.assertNotSame(oldConnector, newConnector); + Object newComponents = getPrivateField(queueSource, "grpcComponents"); + // should have replaced grpcComponents + assertNotNull(newComponents); + org.junit.jupiter.api.Assertions.assertNotSame(oldComponents, newComponents); } @Test @@ -59,11 +59,11 @@ void reinitializeChannelComponents_doesNothingWhenDisabled() { SyncStreamQueueSource queueSource = new SyncStreamQueueSource(options, initialConnector, initialStub, initialBlockingStub); - ChannelConnector oldConnector = (ChannelConnector) getPrivateField(queueSource, "channelConnector"); + Object oldComponents = getPrivateField(queueSource, "grpcComponents"); queueSource.reinitializeChannelComponents(); - ChannelConnector newConnector = (ChannelConnector) getPrivateField(queueSource, "channelConnector"); - // Should NOT have replaced channelConnector - org.junit.jupiter.api.Assertions.assertSame(oldConnector, newConnector); + Object newComponents = getPrivateField(queueSource, "grpcComponents"); + // should NOT have replaced grpcComponents + org.junit.jupiter.api.Assertions.assertSame(oldComponents, newComponents); } @Test @@ -76,13 +76,13 @@ void reinitializeChannelComponents_doesNothingWhenShutdown() throws InterruptedE new SyncStreamQueueSource(options, initialConnector, initialStub, initialBlockingStub); queueSource.shutdown(); - ChannelConnector oldConnector = (ChannelConnector) getPrivateField(queueSource, "channelConnector"); + Object oldComponents = getPrivateField(queueSource, "grpcComponents"); queueSource.reinitializeChannelComponents(); - ChannelConnector newConnector = (ChannelConnector) getPrivateField(queueSource, "channelConnector"); - // Should NOT have replaced channelConnector - org.junit.jupiter.api.Assertions.assertSame(oldConnector, newConnector); + Object newComponents = getPrivateField(queueSource, "grpcComponents"); + // should NOT have replaced grpcComponents + org.junit.jupiter.api.Assertions.assertSame(oldComponents, newComponents); } - // Helper to access private fields via reflection + // helper to access private fields via reflection private static Object getPrivateField(Object instance, String fieldName) { try { java.lang.reflect.Field field = instance.getClass().getDeclaredField(fieldName); @@ -121,7 +121,7 @@ public void setup() throws Exception { return null; }) .when(stub) - .syncFlags(any(SyncFlagsRequest.class), any()); // Mock the initialize + .syncFlags(any(SyncFlagsRequest.class), any()); // mock the initialize syncErrorStub = mock(FlagSyncServiceStub.class); when(syncErrorStub.withDeadlineAfter(anyLong(), any())).thenReturn(syncErrorStub); @@ -134,7 +134,7 @@ public void setup() throws Exception { throw new StatusRuntimeException(io.grpc.Status.NOT_FOUND); }) .when(syncErrorStub) - .syncFlags(any(SyncFlagsRequest.class), any()); // Mock the initialize + .syncFlags(any(SyncFlagsRequest.class), any()); // mock the initialize asyncErrorStub = mock(FlagSyncServiceStub.class); when(asyncErrorStub.withDeadlineAfter(anyLong(), any())).thenReturn(asyncErrorStub); @@ -145,10 +145,10 @@ public void setup() throws Exception { observer = obs; latch.countDown(); - // Start a thread to call onError after a short delay + // start a thread to call onError after a short delay new Thread(() -> { try { - Thread.sleep(10); // Wait 100ms before calling onError + Thread.sleep(10); // wait 10ms before calling onError observer.onError(new StatusRuntimeException(io.grpc.Status.INTERNAL)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -159,7 +159,7 @@ public void setup() throws Exception { return null; }) .when(asyncErrorStub) - .syncFlags(any(SyncFlagsRequest.class), any()); // Mock the initialize + .syncFlags(any(SyncFlagsRequest.class), any()); // mock the initialize } @Test @@ -180,7 +180,7 @@ void syncInitError_DoesNotBusyWait() throws Exception { QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS); assertNotNull(payload); assertEquals(QueuePayloadType.ERROR, payload.getType()); - Thread.sleep(maxBackoffMs + (maxBackoffMs / 2)); // wait 1.5x our delay for reties + Thread.sleep(maxBackoffMs + (maxBackoffMs / 2)); // wait 1.5x our delay for retries // should have retried the stream (2 calls); initial + 1 retry // it's very important that the retry count is low, to confirm no busy-loop @@ -205,7 +205,7 @@ void asyncInitError_DoesNotBusyWait() throws Exception { QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS); assertNotNull(payload); assertEquals(QueuePayloadType.ERROR, payload.getType()); - Thread.sleep(maxBackoffMs + (maxBackoffMs / 2)); // wait 1.5x our delay for reties + Thread.sleep(maxBackoffMs + (maxBackoffMs / 2)); // wait 1.5x our delay for retries // should have retried the stream (2 calls); initial + 1 retry // it's very important that the retry count is low, to confirm no busy-loop From c68054943457fa800f7553c10af99fab2781d46e Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Wed, 10 Dec 2025 15:46:45 -0500 Subject: [PATCH 3/7] fixup: apply suggestions from code review Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Todd Baert --- .../process/storage/connector/sync/SyncStreamQueueSource.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 56a770aa9..93f61f209 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -4,7 +4,6 @@ import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector; -import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource; @@ -24,7 +23,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; @@ -99,7 +97,6 @@ protected SyncStreamQueueSource( syncMetadataDisabled = options.isSyncMetadataDisabled(); reinitializeOnError = options.isReinitializeOnError(); this.options = options; - //this.onConnectionEvent = null; this.grpcComponents = new GrpcComponents(connectorMock, stubMock, blockingStubMock); } From cdec4c393a0eda9034b4473917ec57e48d2ee35a Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Fri, 12 Dec 2025 09:18:43 -0500 Subject: [PATCH 4/7] fixup: better test cleanup Signed-off-by: Todd Baert --- .../sync/SyncStreamQueueSourceTest.java | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java index 22c4a4aa3..ce50a4d20 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java @@ -33,7 +33,7 @@ class SyncStreamQueueSourceTest { @Test - void reinitializeChannelComponents_reinitializesWhenEnabled() { + void reinitializeChannelComponents_reinitializesWhenEnabled() throws InterruptedException { FlagdOptions options = FlagdOptions.builder().reinitializeOnError(true).build(); ChannelConnector initialConnector = mock(ChannelConnector.class); FlagSyncServiceStub initialStub = mock(FlagSyncServiceStub.class); @@ -41,17 +41,21 @@ void reinitializeChannelComponents_reinitializesWhenEnabled() { SyncStreamQueueSource queueSource = new SyncStreamQueueSource(options, initialConnector, initialStub, initialBlockingStub); - // save reference to old GrpcComponents - Object oldComponents = getPrivateField(queueSource, "grpcComponents"); - queueSource.reinitializeChannelComponents(); - Object newComponents = getPrivateField(queueSource, "grpcComponents"); - // should have replaced grpcComponents - assertNotNull(newComponents); - org.junit.jupiter.api.Assertions.assertNotSame(oldComponents, newComponents); + try { + // save reference to old GrpcComponents + Object oldComponents = getPrivateField(queueSource, "grpcComponents"); + queueSource.reinitializeChannelComponents(); + Object newComponents = getPrivateField(queueSource, "grpcComponents"); + // should have replaced grpcComponents + assertNotNull(newComponents); + org.junit.jupiter.api.Assertions.assertNotSame(oldComponents, newComponents); + } finally { + queueSource.shutdown(); + } } @Test - void reinitializeChannelComponents_doesNothingWhenDisabled() { + void reinitializeChannelComponents_doesNothingWhenDisabled() throws InterruptedException { FlagdOptions options = FlagdOptions.builder().reinitializeOnError(false).build(); ChannelConnector initialConnector = mock(ChannelConnector.class); FlagSyncServiceStub initialStub = mock(FlagSyncServiceStub.class); @@ -59,11 +63,15 @@ void reinitializeChannelComponents_doesNothingWhenDisabled() { SyncStreamQueueSource queueSource = new SyncStreamQueueSource(options, initialConnector, initialStub, initialBlockingStub); - Object oldComponents = getPrivateField(queueSource, "grpcComponents"); - queueSource.reinitializeChannelComponents(); - Object newComponents = getPrivateField(queueSource, "grpcComponents"); - // should NOT have replaced grpcComponents - org.junit.jupiter.api.Assertions.assertSame(oldComponents, newComponents); + try { + Object oldComponents = getPrivateField(queueSource, "grpcComponents"); + queueSource.reinitializeChannelComponents(); + Object newComponents = getPrivateField(queueSource, "grpcComponents"); + // should NOT have replaced grpcComponents + org.junit.jupiter.api.Assertions.assertSame(oldComponents, newComponents); + } finally { + queueSource.shutdown(); + } } @Test From 368d6408160b24c2408851364cf99d6e84c27fea Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Fri, 12 Dec 2025 12:13:03 -0500 Subject: [PATCH 5/7] Apply suggestion from @toddbaert Signed-off-by: Todd Baert --- .../process/storage/connector/sync/SyncStreamQueueSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 93f61f209..f4bedcc86 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -119,7 +119,7 @@ public synchronized void reinitializeChannelComponents() { return; } - log.info("Reinitializing channel gRPC components in attempt to restore stream..."); + log.info("Reinitializing channel gRPC components in attempt to restore stream."); GrpcComponents oldComponents = grpcComponents; try { From 26230b5a55dcf2ba8e2cf5833be1dde098c55c64 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Wed, 17 Dec 2025 11:33:34 -0500 Subject: [PATCH 6/7] fixup: style Signed-off-by: Todd Baert --- .../process/storage/connector/sync/SyncStreamQueueSource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index f4bedcc86..ad2c85ec4 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -23,7 +23,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import lombok.extern.slf4j.Slf4j; /** From f70b2aebf70433c744d08827386cab9acab862e3 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Wed, 17 Dec 2025 12:53:02 -0500 Subject: [PATCH 7/7] fixup: spotless Signed-off-by: Todd Baert --- .../process/storage/connector/sync/SyncStreamQueueSource.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index ad2c85ec4..3c1058566 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -101,8 +101,7 @@ protected SyncStreamQueueSource( /** Initialize channel connector and stubs. */ private synchronized void initializeChannelComponents() { - ChannelConnector newConnector = - new ChannelConnector(options, ChannelBuilder.nettyChannel(options)); + ChannelConnector newConnector = new ChannelConnector(options, ChannelBuilder.nettyChannel(options)); FlagSyncServiceStub newFlagSyncStub = FlagSyncServiceGrpc.newStub(newConnector.getChannel()).withWaitForReady(); FlagSyncServiceBlockingStub newMetadataStub =