diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java index 417826437..a2ae3e9ea 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java @@ -1,7 +1,10 @@ package dev.openfeature.contrib.providers.flagd; import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType; +import java.util.Arrays; +import java.util.List; import java.util.function.Function; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; /** Helper class to hold configuration default values. */ @@ -36,6 +39,7 @@ public final class Config { static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS"; static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS"; static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR"; + static final String FATAL_STATUS_CODES_ENV_VAR_NAME = "FLAGD_FATAL_STATUS_CODES"; /** * Environment variable to fetch Provider id. * @@ -91,6 +95,16 @@ static long fallBackToEnvOrDefault(String key, long defaultValue) { } } + static List fallBackToEnvOrDefaultList(String key, List defaultValue) { + try { + return System.getenv(key) != null ? Arrays.stream(System.getenv(key).split(",")) + .map(String::trim) + .collect(Collectors.toList()) : defaultValue; + } catch (Exception e) { + return defaultValue; + } + } + static Resolver fromValueProvider(Function provider) { final String resolverVar = provider.apply(RESOLVER_ENV_VAR); if (resolverVar == null) { diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java index 17e86e6d1..1005c4a4c 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java @@ -1,6 +1,7 @@ package dev.openfeature.contrib.providers.flagd; import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefault; +import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefaultList; import static dev.openfeature.contrib.providers.flagd.Config.fromValueProvider; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource; @@ -122,6 +123,14 @@ public class FlagdOptions { @Builder.Default private int retryGracePeriod = fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD); + + /** + * List of grpc response status codes for which failed connections are not retried. + * Defaults to empty list + */ + @Builder.Default + private List fatalStatusCodes = fallBackToEnvOrDefaultList(Config.FATAL_STATUS_CODES_ENV_VAR_NAME, List.of()); + /** * Selector to be used with flag sync gRPC contract. **/ diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 4ce6e06ee..88cd71fa7 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -5,6 +5,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver; import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver; import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache; +import dev.openfeature.sdk.ErrorCode; import dev.openfeature.sdk.EvaluationContext; import dev.openfeature.sdk.EventProvider; import dev.openfeature.sdk.Hook; @@ -222,20 +223,25 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { onReady(); syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY); break; - - case PROVIDER_ERROR: - if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) { - onError(); - syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR); + case PROVIDER_STALE: + if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_STALE) { + onStale(); + syncResources.setPreviousEvent(ProviderEvent.PROVIDER_STALE); } break; - + case PROVIDER_ERROR: + onError(); + break; default: log.warn("Unknown event {}", flagdProviderEvent.getEvent()); } } } + private void onError() { + this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build()); + } + private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) { this.emitProviderConfigurationChanged(ProviderEventDetails.builder() .flagsChanged(flagdProviderEvent.getFlagsChanged()) @@ -255,7 +261,7 @@ private void onReady() { ProviderEventDetails.builder().message("connected to flagd").build()); } - private void onError() { + private void onStale() { log.debug( "Stream error. Emitting STALE, scheduling ERROR, and waiting {}s for connection to become available.", gracePeriod); @@ -270,7 +276,7 @@ private void onError() { if (!errorExecutor.isShutdown()) { errorTask = errorExecutor.schedule( () -> { - if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) { + if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_STALE) { log.error( "Provider did not reconnect successfully within {}s. Emitting ERROR event...", gracePeriod); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index e02cdd59b..6d3fbe34b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -77,6 +77,9 @@ public void init() throws Exception { storageStateChange.getSyncMetadata())); log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED"); break; + case STALE: + onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE)); + break; case ERROR: onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR)); break; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java index eaa3dfa5f..a01f93c23 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java @@ -138,6 +138,11 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc } break; case ERROR: + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) { + log.warn("Failed to convey STALE status, queue is full"); + } + break; + case FATAL: if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) { log.warn("Failed to convey ERROR status, queue is full"); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java index c47670b7d..d6b8b30c5 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java @@ -1,6 +1,6 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage; -/** Satus of the storage. */ +/** Status of the storage. */ public enum StorageState { /** Storage is upto date and working as expected. */ OK, diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java index 93675fb60..74e02912e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java @@ -3,5 +3,6 @@ /** Payload type emitted by {@link QueueSource}. */ public enum QueuePayloadType { DATA, - ERROR + ERROR, + FATAL } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index a033fac3b..1118dc7e0 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -19,6 +19,7 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -37,6 +38,7 @@ public class SyncStreamQueueSource implements QueueSource { private final AtomicBoolean shutdown = new AtomicBoolean(false); private final AtomicBoolean shouldThrottle = new AtomicBoolean(false); + private final AtomicBoolean successfulSync = new AtomicBoolean(false); private final int streamDeadline; private final int deadline; private final int maxBackoffMs; @@ -47,6 +49,7 @@ public class SyncStreamQueueSource implements QueueSource { private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private final FlagSyncServiceStub flagSyncStub; private final FlagSyncServiceBlockingStub metadataStub; + private final List fatalStatusCodes; /** * Creates a new SyncStreamQueueSource responsible for observing the event stream. @@ -63,6 +66,7 @@ public SyncStreamQueueSource(final FlagdOptions options) { FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady(); metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel()) .withWaitForReady(); + fatalStatusCodes = options.getFatalStatusCodes(); } // internal use only @@ -80,6 +84,7 @@ protected SyncStreamQueueSource( flagSyncStub = stubMock; syncMetadataDisabled = options.isSyncMetadataDisabled(); metadataStub = blockingStubMock; + fatalStatusCodes = options.getFatalStatusCodes(); } /** Initialize sync stream connector. */ @@ -129,23 +134,35 @@ private void observeSyncStream() { } log.debug("Initializing sync stream request"); - SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle); + SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle, fatalStatusCodes); try { observer.metadata = getMetadata(); - } catch (Exception metaEx) { - // retry if getMetadata fails - String message = metaEx.getMessage(); - log.debug("Metadata request error: {}, will restart", message, metaEx); - enqueueError(String.format("Error in getMetadata request: %s", message)); + } catch (StatusRuntimeException metaEx) { + if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name()) && !successfulSync.get()) { + log.debug("Fatal status code for metadata request: {}, not retrying", metaEx.getStatus().getCode()); + enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", metaEx.getStatus().getCode())); + } else { + // retry for other status codes + String message = metaEx.getMessage(); + log.debug("Metadata request error: {}, will restart", message, metaEx); + enqueueError(String.format("Error in getMetadata request: %s", message)); + } shouldThrottle.set(true); continue; } try { syncFlags(observer); - } catch (Exception ex) { - log.error("Unexpected sync stream exception, will restart.", ex); - enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); + successfulSync.set(true); + } catch (StatusRuntimeException ex) { + if (fatalStatusCodes.contains(ex.getStatus().getCode().toString()) && !successfulSync.get()) { + log.debug("Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode()); + enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", ex.getStatus().getCode())); + } else { + // retry for other status codes + log.error("Unexpected sync stream exception, will restart.", ex); + enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); + } shouldThrottle.set(true); } } catch (InterruptedException ie) { @@ -212,22 +229,34 @@ private void enqueueError(String message) { enqueueError(outgoingQueue, message); } + private void enqueueFatal(String message) { + enqueueFatal(outgoingQueue, message); + } + private static void enqueueError(BlockingQueue queue, String message) { if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, message, null))) { log.error("Failed to convey ERROR status, queue is full"); } } + private static void enqueueFatal(BlockingQueue queue, String message) { + if (!queue.offer(new QueuePayload(QueuePayloadType.FATAL, message, null))) { + log.error("Failed to convey FATAL status, queue is full"); + } + } + private static class SyncStreamObserver implements StreamObserver { private final BlockingQueue outgoingQueue; private final AtomicBoolean shouldThrottle; private final Awaitable done = new Awaitable(); + private final List fatalStatusCodes; private Struct metadata; - public SyncStreamObserver(BlockingQueue outgoingQueue, AtomicBoolean shouldThrottle) { + public SyncStreamObserver(BlockingQueue outgoingQueue, AtomicBoolean shouldThrottle, List fatalStatusCodes) { this.outgoingQueue = outgoingQueue; this.shouldThrottle = shouldThrottle; + this.fatalStatusCodes = fatalStatusCodes; } @Override @@ -245,9 +274,14 @@ public void onNext(SyncFlagsResponse syncFlagsResponse) { @Override public void onError(Throwable throwable) { try { + Status status = Status.fromThrowable(throwable); String message = throwable != null ? throwable.getMessage() : "unknown"; log.debug("Stream error: {}, will restart", message, throwable); - enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); + if (fatalStatusCodes.contains(status.getCode())) { + enqueueFatal(outgoingQueue, String.format("Error from stream: %s", message)); + } else { + enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); + } // Set throttling flag to ensure backoff before retry this.shouldThrottle.set(true); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index afb06120b..1f601151e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -60,6 +60,7 @@ public final class RpcResolver implements Resolver { private static final int QUEUE_SIZE = 5; private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final AtomicBoolean successfulConnection = new AtomicBoolean(false); private final ChannelConnector connector; private final Cache cache; private final ResolveStrategy strategy; @@ -68,6 +69,7 @@ public final class RpcResolver implements Resolver { private final Consumer onProviderEvent; private final ServiceStub stub; private final ServiceBlockingStub blockingStub; + private final List fatalStatusCodes; /** * Resolves flag values using @@ -89,6 +91,7 @@ public RpcResolver( this.stub = ServiceGrpc.newStub(this.connector.getChannel()).withWaitForReady(); this.blockingStub = ServiceGrpc.newBlockingStub(this.connector.getChannel()).withWaitForReady(); + this.fatalStatusCodes = options.getFatalStatusCodes(); } // testing only @@ -107,6 +110,7 @@ protected RpcResolver( this.onProviderEvent = onProviderEvent; this.stub = mockStub; this.blockingStub = mockBlockingStub; + this.fatalStatusCodes = options.getFatalStatusCodes(); } /** @@ -348,13 +352,20 @@ private void observeEventStream() throws InterruptedException { Throwable streamException = taken.getError(); if (streamException != null) { - log.debug( - "Exception in event stream connection, streamException {}, will reconnect", - streamException); - this.handleErrorOrComplete(); + if (streamException instanceof StatusRuntimeException && fatalStatusCodes.contains( + ((StatusRuntimeException) streamException).getStatus().getCode().name()) && !successfulConnection.get()) { + log.debug("Fatal error code received: {}", ((StatusRuntimeException) streamException).getStatus().getCode()); + this.handleFatalError(); + } else { + log.debug( + "Exception in event stream connection, streamException {}, will reconnect", + streamException); + this.handleErrorOrComplete(); + } break; } + successfulConnection.set(true); final EventStreamResponse response = taken.getResponse(); log.debug("Got stream response: {}", response); @@ -410,9 +421,19 @@ private void handleProviderReadyEvent() { * Handles provider error events by clearing the cache (if enabled) and notifying listeners of the error. */ private void handleErrorOrComplete() { - log.debug("Emitting provider error event"); + log.debug("Emitting provider stale event"); // complete is an error, logically...even if the server went down gracefully we need to reconnect. + onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE)); + } + + /** + * Handles fatal error events (i.e. error codes defined in fatalStatusCodes) by transitioning the provider into + * fatal state + */ + private void handleFatalError() { + log.debug("Emitting provider error event"); + onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR)); } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java index 27806f955..90d082292 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java @@ -1,6 +1,7 @@ package dev.openfeature.contrib.providers.flagd.e2e.steps; import static io.restassured.RestAssured.when; +import static org.assertj.core.api.Assertions.assertThat; import dev.openfeature.contrib.providers.flagd.Config; import dev.openfeature.contrib.providers.flagd.FlagdOptions; @@ -9,10 +10,12 @@ import dev.openfeature.contrib.providers.flagd.e2e.State; import dev.openfeature.sdk.FeatureProvider; import dev.openfeature.sdk.OpenFeatureAPI; +import dev.openfeature.sdk.ProviderState; import io.cucumber.java.After; import io.cucumber.java.AfterAll; import io.cucumber.java.BeforeAll; import io.cucumber.java.en.Given; +import io.cucumber.java.en.Then; import io.cucumber.java.en.When; import java.io.File; import java.io.IOException; @@ -31,6 +34,7 @@ public class ProviderSteps extends AbstractSteps { public static final int UNAVAILABLE_PORT = 9999; + public static final int FORBIDDEN_PORT = 9212; static ComposeContainer container; static Path sharedTempDir; @@ -49,6 +53,7 @@ public static void beforeAll() throws IOException { .withExposedService("flagd", 8015, Wait.forListeningPort()) .withExposedService("flagd", 8080, Wait.forListeningPort()) .withExposedService("envoy", 9211, Wait.forListeningPort()) + .withExposedService("envoy", FORBIDDEN_PORT, Wait.forListeningPort()) .withStartupTimeout(Duration.ofSeconds(45)); container.start(); } @@ -85,6 +90,10 @@ public void setupProvider(String providerType) throws InterruptedException { } wait = false; break; + case "forbidden": + state.builder.port(container.getServicePort("envoy", FORBIDDEN_PORT)); + wait = false; + break; case "socket": this.state.providerType = ProviderType.SOCKET; String socketPath = @@ -188,4 +197,9 @@ public void the_flag_was_modded() { .then() .statusCode(200); } + + @Then("the client should be in {} state") + public void the_client_should_be_in_fatal_state(String clientState) { + assertThat(state.client.getProviderState()).isEqualTo(ProviderState.valueOf(clientState.toUpperCase())); + } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java index 7dca50533..a89f8560e 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java @@ -4,7 +4,10 @@ import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType; import dev.openfeature.sdk.Value; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; public final class Utils { @@ -37,6 +40,9 @@ public static Object convert(String value, String type) throws ClassNotFoundExce } case "CacheType": return CacheType.valueOf(value.toUpperCase()).getValue(); + case "StringList": + return value.isEmpty() ? List.of() : Arrays.stream(value.split(",")).map(String::trim).collect( + Collectors.toList()); case "Object": return Value.objectToValue(new ObjectMapper().readValue(value, Object.class)); } diff --git a/providers/flagd/test-harness b/providers/flagd/test-harness index b62f5dbe8..9b73b3a95 160000 --- a/providers/flagd/test-harness +++ b/providers/flagd/test-harness @@ -1 +1 @@ -Subproject commit b62f5dbe860ecf4f36ec757dfdc0b38f7b3dec6e +Subproject commit 9b73b3a95cd9e0885937d244b118713b26374b1d