From e1ca31adf81272869f73f2cefa2779f5cef1a34f Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Mon, 15 Dec 2025 12:20:10 -0500 Subject: [PATCH 1/2] refactor: remove channel monitor Signed-off-by: Todd Baert --- .../resolver/common/ChannelConnector.java | 46 +--------- .../connector/sync/SyncStreamQueueSource.java | 1 - .../flagd/resolver/rpc/RpcResolver.java | 2 - .../resolver/common/ChannelConnectorTest.java | 86 ------------------- .../sync/SyncStreamQueueSourceTest.java | 2 - .../flagd/resolver/rpc/RpcResolverTest.java | 1 - 6 files changed, 1 insertion(+), 137 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java index 6261affe7..954c5e01e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java @@ -1,19 +1,14 @@ package dev.openfeature.contrib.providers.flagd.resolver.common; import dev.openfeature.contrib.providers.flagd.FlagdOptions; -import dev.openfeature.sdk.ImmutableStructure; -import dev.openfeature.sdk.ProviderEvent; -import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; -import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import lombok.Getter; import lombok.extern.slf4j.Slf4j; /** - * A generic GRPC connector that manages connection states, reconnection logic, and event streaming for - * GRPC services. + * A GRPC connector that maintains a managed channel for communication with a flagd server and handles shutdown. */ @Slf4j public class ChannelConnector { @@ -29,11 +24,6 @@ public class ChannelConnector { */ private final long deadline; - /** - * A consumer that handles connection events such as connection loss or reconnection. - */ - private final Consumer onConnectionEvent; - /** * Constructs a new {@code ChannelConnector} instance with the specified options and parameters. * @@ -45,17 +35,6 @@ public ChannelConnector( final FlagdOptions options, final Consumer onConnectionEvent, ManagedChannel channel) { this.channel = channel; this.deadline = options.getDeadline(); - this.onConnectionEvent = onConnectionEvent; - } - - /** - * Initializes the GRPC connection by waiting for the channel to be ready and monitoring its state. - * - * @throws Exception if the channel does not reach the desired state within the deadline - */ - public void initialize() throws Exception { - log.info("Initializing GRPC connection."); - monitorChannelState(ConnectivityState.READY); } /** @@ -71,27 +50,4 @@ public void shutdown() throws InterruptedException { channel.awaitTermination(deadline, TimeUnit.MILLISECONDS); } } - - /** - * Monitors the state of a gRPC channel and triggers the specified callbacks based on state changes. - * - * @param expectedState the initial state to monitor. - */ - private void monitorChannelState(ConnectivityState expectedState) { - channel.notifyWhenStateChanged(expectedState, this::onStateChange); - } - - private void onStateChange() { - ConnectivityState currentState = channel.getState(true); - log.debug("Channel state changed to: {}", currentState); - if (currentState == ConnectivityState.TRANSIENT_FAILURE || currentState == ConnectivityState.SHUTDOWN) { - this.onConnectionEvent.accept(new FlagdProviderEvent( - ProviderEvent.PROVIDER_ERROR, Collections.emptyList(), new ImmutableStructure())); - } - if (currentState != ConnectivityState.SHUTDOWN) { - log.debug("continuing to monitor the grpc channel"); - // Re-register the state monitor to watch for the next state transition. - monitorChannelState(currentState); - } - } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index a3b01f913..f58430c1e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -86,7 +86,6 @@ protected SyncStreamQueueSource( /** Initialize sync stream connector. */ public void init() throws Exception { - channelConnector.initialize(); Thread listener = new Thread(this::observeSyncStream); listener.setDaemon(true); listener.start(); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index 1f3101d00..2c5966a72 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -113,8 +113,6 @@ protected RpcResolver( * Initialize RpcResolver resolver. */ public void init() throws Exception { - this.connector.initialize(); - Thread listener = new Thread(() -> { try { observeEventStream(); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java index a1f91d450..62bdbb187 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java @@ -1,20 +1,7 @@ package dev.openfeature.contrib.providers.flagd.resolver.common; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.common.collect.Lists; -import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.flagd.grpc.evaluation.Evaluation; import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc; -import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Server; @@ -22,17 +9,8 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.net.ServerSocket; -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; -import org.mockito.ArgumentCaptor; import org.mockito.MockitoAnnotations; class ChannelConnectorTest { @@ -84,68 +62,4 @@ private void tearDownGrpcServer() throws InterruptedException { testServer.awaitTermination(); } } - - @Test - void whenShuttingDownGrpcConnectorConsumerReceivesDisconnectedEvent() throws Exception { - CountDownLatch sync = new CountDownLatch(1); - ArrayList connectionStateChanges = Lists.newArrayList(); - Consumer testConsumer = event -> { - connectionStateChanges.add(!event.isDisconnected()); - sync.countDown(); - }; - - ChannelConnector instance = new ChannelConnector(FlagdOptions.builder().build(), testConsumer, testChannel); - - instance.initialize(); - // when shutting grpc connector - instance.shutdown(); - - // then consumer received DISCONNECTED and CONNECTED event - boolean finished = sync.await(10, TimeUnit.SECONDS); - Assertions.assertTrue(finished); - Assertions.assertEquals(Lists.newArrayList(DISCONNECTED), connectionStateChanges); - } - - @ParameterizedTest - @EnumSource(ConnectivityState.class) - void testMonitorChannelState(ConnectivityState state) throws Exception { - ManagedChannel channel = mock(ManagedChannel.class); - - // Set up the expected state - ConnectivityState expectedState = ConnectivityState.IDLE; - when(channel.getState(anyBoolean())).thenReturn(state); - - // Capture the callback - ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Runnable.class); - doNothing().when(channel).notifyWhenStateChanged(any(), callbackCaptor.capture()); - - Consumer testConsumer = spy(Consumer.class); - - ChannelConnector instance = new ChannelConnector(FlagdOptions.builder().build(), testConsumer, channel); - - instance.initialize(); - - // Simulate state change - callbackCaptor.getValue().run(); - - // Verify the callbacks based on the state - switch (state) { - case READY: - verify(channel, times(2)).notifyWhenStateChanged(any(), any()); - verify(testConsumer, never()).accept(any()); - break; - case TRANSIENT_FAILURE: - verify(channel, times(2)).notifyWhenStateChanged(any(), any()); - verify(testConsumer).accept(any()); - break; - case SHUTDOWN: - verify(channel, times(1)).notifyWhenStateChanged(any(), any()); - verify(testConsumer).accept(any()); - break; - default: - verify(channel, times(2)).notifyWhenStateChanged(any(), any()); - verify(testConsumer, never()).accept(any()); - break; - } - } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java index 9116b8142..ccc979a9f 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java @@ -7,7 +7,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -48,7 +47,6 @@ public void setup() throws Exception { when(blockingStub.getMetadata(any())).thenReturn(GetMetadataResponse.getDefaultInstance()); mockConnector = mock(ChannelConnector.class); - doNothing().when(mockConnector).initialize(); // Mock the initialize method stub = mock(FlagSyncServiceStub.class); when(stub.withDeadlineAfter(anyLong(), any())).thenReturn(stub); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java index 4b7acb569..119f9e2e6 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java @@ -44,7 +44,6 @@ public void init() throws Exception { blockingStub = mock(ServiceBlockingStub.class); mockConnector = mock(ChannelConnector.class); - doNothing().when(mockConnector).initialize(); // Mock the initialize method stub = mock(ServiceStub.class); when(stub.withDeadlineAfter(anyLong(), any())).thenReturn(stub); From 528210b3f3da3d7372da206e16901ade8a84a775 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Mon, 15 Dec 2025 12:33:10 -0500 Subject: [PATCH 2/2] fixup: related cleanup Signed-off-by: Todd Baert --- .../resolver/common/ChannelConnector.java | 5 +- .../resolver/process/InProcessResolver.java | 6 +- .../connector/sync/SyncStreamQueueSource.java | 6 +- .../flagd/resolver/rpc/RpcResolver.java | 2 +- .../resolver/common/ChannelConnectorTest.java | 65 ------------------- .../process/InProcessResolverTest.java | 6 +- 6 files changed, 10 insertions(+), 80 deletions(-) delete mode 100644 providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java index 954c5e01e..0369fd7b6 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java @@ -3,7 +3,6 @@ import dev.openfeature.contrib.providers.flagd.FlagdOptions; import io.grpc.ManagedChannel; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -28,11 +27,9 @@ public class ChannelConnector { * Constructs a new {@code ChannelConnector} instance with the specified options and parameters. * * @param options the configuration options for the GRPC connection - * @param onConnectionEvent a consumer to handle connection events * @param channel the managed channel for the GRPC connection */ - public ChannelConnector( - final FlagdOptions options, final Consumer onConnectionEvent, ManagedChannel channel) { + public ChannelConnector(final FlagdOptions options, ManagedChannel channel) { this.channel = channel; this.deadline = options.getDeadline(); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index e54c938cf..e02cdd59b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -52,7 +52,7 @@ public class InProcessResolver implements Resolver { * connection/stream */ public InProcessResolver(FlagdOptions options, Consumer onConnectionEvent) { - this.flagStore = new FlagStore(getConnector(options, onConnectionEvent)); + this.flagStore = new FlagStore(getConnector(options)); this.onConnectionEvent = onConnectionEvent; this.operator = new Operator(); this.scope = options.getSelector(); @@ -147,14 +147,14 @@ public ProviderEvaluation objectEvaluation(String key, Value defaultValue .build(); } - static QueueSource getConnector(final FlagdOptions options, Consumer onConnectionEvent) { + static QueueSource getConnector(final FlagdOptions options) { if (options.getCustomConnector() != null) { return options.getCustomConnector(); } return options.getOfflineFlagSourcePath() != null && !options.getOfflineFlagSourcePath().isEmpty() ? new FileQueueSource(options.getOfflineFlagSourcePath(), options.getOfflinePollIntervalMs()) - : new SyncStreamQueueSource(options, onConnectionEvent); + : new SyncStreamQueueSource(options); } private ProviderEvaluation resolve(Class type, String key, EvaluationContext ctx) { diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index f58430c1e..a033fac3b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -4,7 +4,6 @@ import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector; -import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource; @@ -24,7 +23,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; /** @@ -53,14 +51,14 @@ public class SyncStreamQueueSource implements QueueSource { /** * Creates a new SyncStreamQueueSource responsible for observing the event stream. */ - public SyncStreamQueueSource(final FlagdOptions options, Consumer onConnectionEvent) { + public SyncStreamQueueSource(final FlagdOptions options) { streamDeadline = options.getStreamDeadlineMs(); deadline = options.getDeadline(); selector = options.getSelector(); providerId = options.getProviderId(); maxBackoffMs = options.getRetryBackoffMaxMs(); syncMetadataDisabled = options.isSyncMetadataDisabled(); - channelConnector = new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options)); + channelConnector = new ChannelConnector(options, ChannelBuilder.nettyChannel(options)); flagSyncStub = FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady(); metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel()) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index 2c5966a72..afb06120b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -84,7 +84,7 @@ public RpcResolver( this.strategy = ResolveFactory.getStrategy(options); this.options = options; incomingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); - this.connector = new ChannelConnector(options, onProviderEvent, ChannelBuilder.nettyChannel(options)); + this.connector = new ChannelConnector(options, ChannelBuilder.nettyChannel(options)); this.onProviderEvent = onProviderEvent; this.stub = ServiceGrpc.newStub(this.connector.getChannel()).withWaitForReady(); this.blockingStub = diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java deleted file mode 100644 index 62bdbb187..000000000 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java +++ /dev/null @@ -1,65 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common; - -import dev.openfeature.flagd.grpc.evaluation.Evaluation; -import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Server; -import io.grpc.netty.NettyServerBuilder; -import io.grpc.stub.StreamObserver; -import java.io.IOException; -import java.net.ServerSocket; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.mockito.MockitoAnnotations; - -class ChannelConnectorTest { - - private ManagedChannel testChannel; - private Server testServer; - private static final boolean CONNECTED = true; - private static final boolean DISCONNECTED = false; - - private final ServiceGrpc.ServiceImplBase testServiceImpl = new ServiceGrpc.ServiceImplBase() { - @Override - public void eventStream( - Evaluation.EventStreamRequest request, - StreamObserver responseObserver) { - // noop - } - }; - - @BeforeEach - void setUp() throws Exception { - MockitoAnnotations.openMocks(this); - setupTestGrpcServer(); - } - - private void setupTestGrpcServer() throws IOException { - var testSocket = new ServerSocket(0); - var port = testSocket.getLocalPort(); - testSocket.close(); - - testServer = - NettyServerBuilder.forPort(port).addService(testServiceImpl).build(); - testServer.start(); - - if (testChannel == null) { - testChannel = ManagedChannelBuilder.forAddress("localhost", port) - .usePlaintext() - .build(); - } - } - - @AfterEach - void tearDown() throws Exception { - tearDownGrpcServer(); - } - - private void tearDownGrpcServer() throws InterruptedException { - if (testServer != null) { - testServer.shutdownNow(); - testServer.awaitTermination(); - } - } -} diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java index ffd898447..7c4820a17 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java @@ -70,9 +70,9 @@ public void connectorSetup() { .build(); // then - assertInstanceOf(SyncStreamQueueSource.class, InProcessResolver.getConnector(forGrpcOptions, e -> {})); - assertInstanceOf(FileQueueSource.class, InProcessResolver.getConnector(forOfflineOptions, e -> {})); - assertInstanceOf(MockConnector.class, InProcessResolver.getConnector(forCustomConnectorOptions, e -> {})); + assertInstanceOf(SyncStreamQueueSource.class, InProcessResolver.getConnector(forGrpcOptions)); + assertInstanceOf(FileQueueSource.class, InProcessResolver.getConnector(forOfflineOptions)); + assertInstanceOf(MockConnector.class, InProcessResolver.getConnector(forCustomConnectorOptions)); } @Test