Skip to content
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

/** Helper class to hold configuration default values. */
Expand Down Expand Up @@ -36,6 +39,7 @@ public final class Config {
static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS";
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
static final String FATAL_STATUS_CODES_ENV_VAR_NAME = "FLAGD_FATAL_STATUS_CODES";
/**
* Environment variable to fetch Provider id.
*
Expand Down Expand Up @@ -91,6 +95,16 @@ static long fallBackToEnvOrDefault(String key, long defaultValue) {
}
}

static List<String> fallBackToEnvOrDefaultList(String key, List<String> defaultValue) {
try {
return System.getenv(key) != null ? Arrays.stream(System.getenv(key).split(","))
.map(String::trim)
.collect(Collectors.toList()) : defaultValue;
} catch (Exception e) {
return defaultValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should print an info/warn that the env vars are invalid

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for this method? Or the other ones too? I'd either leave it or add it in all cases to be consistent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should add it everywhere, but in a different PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, sounds good. Should we create a new issue for this or is that overkill?

}
}

static Resolver fromValueProvider(Function<String, String> provider) {
final String resolverVar = provider.apply(RESOLVER_ENV_VAR);
if (resolverVar == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.openfeature.contrib.providers.flagd;

import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefault;
import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefaultList;
import static dev.openfeature.contrib.providers.flagd.Config.fromValueProvider;

import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
Expand Down Expand Up @@ -122,6 +123,14 @@ public class FlagdOptions {
@Builder.Default
private int retryGracePeriod =
fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);

/**
* List of grpc response status codes for which failed connections are not retried.
* Defaults to empty list
*/
@Builder.Default
private List<String> fatalStatusCodes = fallBackToEnvOrDefaultList(Config.FATAL_STATUS_CODES_ENV_VAR_NAME, List.of());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to retry on every error code per default? How is this handled in our other sdks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you're right, I'll rephrase it to "for which the provider transitions into fatal mode upon first connection". The general retry policy is defined here and is the same for all sdks afaik


/**
* Selector to be used with flag sync gRPC contract.
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache;
import dev.openfeature.sdk.ErrorCode;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.EventProvider;
import dev.openfeature.sdk.Hook;
Expand Down Expand Up @@ -222,20 +223,25 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
onReady();
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY);
break;

case PROVIDER_ERROR:
if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) {
onError();
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR);
case PROVIDER_STALE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the javadoc above the switch, we do now use the STALE state

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do once we agree on a final implementation

if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_STALE) {
onStale();
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_STALE);
}
break;

case PROVIDER_ERROR:
onError();
break;
default:
log.warn("Unknown event {}", flagdProviderEvent.getEvent());
}
}
}

private void onError() {
this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build());
}

private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
this.emitProviderConfigurationChanged(ProviderEventDetails.builder()
.flagsChanged(flagdProviderEvent.getFlagsChanged())
Expand All @@ -255,7 +261,7 @@ private void onReady() {
ProviderEventDetails.builder().message("connected to flagd").build());
}

private void onError() {
private void onStale() {
log.debug(
"Stream error. Emitting STALE, scheduling ERROR, and waiting {}s for connection to become available.",
gracePeriod);
Expand All @@ -270,7 +276,7 @@ private void onError() {
if (!errorExecutor.isShutdown()) {
errorTask = errorExecutor.schedule(
() -> {
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) {
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_STALE) {
log.error(
"Provider did not reconnect successfully within {}s. Emitting ERROR event...",
gracePeriod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public void init() throws Exception {
storageStateChange.getSyncMetadata()));
log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
break;
case STALE:
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE));
break;
case ERROR:
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
}
break;
case ERROR:
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) {
log.warn("Failed to convey STALE status, queue is full");
}
break;
case FATAL:
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) {
log.warn("Failed to convey ERROR status, queue is full");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage;

/** Satus of the storage. */
/** Status of the storage. */
public enum StorageState {
/** Storage is upto date and working as expected. */
OK,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
/** Payload type emitted by {@link QueueSource}. */
public enum QueuePayloadType {
DATA,
ERROR
ERROR,
FATAL
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -37,6 +38,7 @@ public class SyncStreamQueueSource implements QueueSource {

private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean shouldThrottle = new AtomicBoolean(false);
private final AtomicBoolean successfulSync = new AtomicBoolean(false);
private final int streamDeadline;
private final int deadline;
private final int maxBackoffMs;
Expand All @@ -47,6 +49,7 @@ public class SyncStreamQueueSource implements QueueSource {
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final FlagSyncServiceStub flagSyncStub;
private final FlagSyncServiceBlockingStub metadataStub;
private final List<String> fatalStatusCodes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we do lots of .contains operation on this data structure, a HashSet might be more performant. How many entries do we expect in this list?

Copy link
Member Author

@leakonvalinka leakonvalinka Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's hard for me to estimate, what do the others think? The currently defined default is an empty list


/**
* Creates a new SyncStreamQueueSource responsible for observing the event stream.
Expand All @@ -63,6 +66,7 @@ public SyncStreamQueueSource(final FlagdOptions options) {
FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
.withWaitForReady();
fatalStatusCodes = options.getFatalStatusCodes();
}

// internal use only
Expand All @@ -80,6 +84,7 @@ protected SyncStreamQueueSource(
flagSyncStub = stubMock;
syncMetadataDisabled = options.isSyncMetadataDisabled();
metadataStub = blockingStubMock;
fatalStatusCodes = options.getFatalStatusCodes();
}

/** Initialize sync stream connector. */
Expand Down Expand Up @@ -129,23 +134,35 @@ private void observeSyncStream() {
}

log.debug("Initializing sync stream request");
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle);
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle, fatalStatusCodes);
try {
observer.metadata = getMetadata();
} catch (Exception metaEx) {
// retry if getMetadata fails
String message = metaEx.getMessage();
log.debug("Metadata request error: {}, will restart", message, metaEx);
enqueueError(String.format("Error in getMetadata request: %s", message));
} catch (StatusRuntimeException metaEx) {
if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name()) && !successfulSync.get()) {
log.debug("Fatal status code for metadata request: {}, not retrying", metaEx.getStatus().getCode());
enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", metaEx.getStatus().getCode()));
} else {
// retry for other status codes
String message = metaEx.getMessage();
log.debug("Metadata request error: {}, will restart", message, metaEx);
enqueueError(String.format("Error in getMetadata request: %s", message));
}
shouldThrottle.set(true);
continue;
}

try {
syncFlags(observer);
} catch (Exception ex) {
log.error("Unexpected sync stream exception, will restart.", ex);
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
successfulSync.set(true);
} catch (StatusRuntimeException ex) {
if (fatalStatusCodes.contains(ex.getStatus().getCode().toString()) && !successfulSync.get()) {
log.debug("Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode());
enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", ex.getStatus().getCode()));
} else {
// retry for other status codes
log.error("Unexpected sync stream exception, will restart.", ex);
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
}
shouldThrottle.set(true);
}
} catch (InterruptedException ie) {
Expand Down Expand Up @@ -212,22 +229,34 @@ private void enqueueError(String message) {
enqueueError(outgoingQueue, message);
}

private void enqueueFatal(String message) {
enqueueFatal(outgoingQueue, message);
}

private static void enqueueError(BlockingQueue<QueuePayload> queue, String message) {
if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, message, null))) {
log.error("Failed to convey ERROR status, queue is full");
}
}

private static void enqueueFatal(BlockingQueue<QueuePayload> queue, String message) {
if (!queue.offer(new QueuePayload(QueuePayloadType.FATAL, message, null))) {
log.error("Failed to convey FATAL status, queue is full");
}
}

private static class SyncStreamObserver implements StreamObserver<SyncFlagsResponse> {
private final BlockingQueue<QueuePayload> outgoingQueue;
private final AtomicBoolean shouldThrottle;
private final Awaitable done = new Awaitable();
private final List<String> fatalStatusCodes;

private Struct metadata;

public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue, AtomicBoolean shouldThrottle) {
public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue, AtomicBoolean shouldThrottle, List<String> fatalStatusCodes) {
this.outgoingQueue = outgoingQueue;
this.shouldThrottle = shouldThrottle;
this.fatalStatusCodes = fatalStatusCodes;
}

@Override
Expand All @@ -245,9 +274,14 @@ public void onNext(SyncFlagsResponse syncFlagsResponse) {
@Override
public void onError(Throwable throwable) {
try {
Status status = Status.fromThrowable(throwable);
String message = throwable != null ? throwable.getMessage() : "unknown";
log.debug("Stream error: {}, will restart", message, throwable);
enqueueError(outgoingQueue, String.format("Error from stream: %s", message));
if (fatalStatusCodes.contains(status.getCode())) {
enqueueFatal(outgoingQueue, String.format("Error from stream: %s", message));
} else {
enqueueError(outgoingQueue, String.format("Error from stream: %s", message));
}

// Set throttling flag to ensure backoff before retry
this.shouldThrottle.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
public final class RpcResolver implements Resolver {
private static final int QUEUE_SIZE = 5;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean successfulConnection = new AtomicBoolean(false);
private final ChannelConnector connector;
private final Cache cache;
private final ResolveStrategy strategy;
Expand All @@ -68,6 +69,7 @@ public final class RpcResolver implements Resolver {
private final Consumer<FlagdProviderEvent> onProviderEvent;
private final ServiceStub stub;
private final ServiceBlockingStub blockingStub;
private final List<String> fatalStatusCodes;

/**
* Resolves flag values using
Expand All @@ -89,6 +91,7 @@ public RpcResolver(
this.stub = ServiceGrpc.newStub(this.connector.getChannel()).withWaitForReady();
this.blockingStub =
ServiceGrpc.newBlockingStub(this.connector.getChannel()).withWaitForReady();
this.fatalStatusCodes = options.getFatalStatusCodes();
}

// testing only
Expand All @@ -107,6 +110,7 @@ protected RpcResolver(
this.onProviderEvent = onProviderEvent;
this.stub = mockStub;
this.blockingStub = mockBlockingStub;
this.fatalStatusCodes = options.getFatalStatusCodes();
}

/**
Expand Down Expand Up @@ -348,13 +352,20 @@ private void observeEventStream() throws InterruptedException {

Throwable streamException = taken.getError();
if (streamException != null) {
log.debug(
"Exception in event stream connection, streamException {}, will reconnect",
streamException);
this.handleErrorOrComplete();
if (streamException instanceof StatusRuntimeException && fatalStatusCodes.contains(
((StatusRuntimeException) streamException).getStatus().getCode().name()) && !successfulConnection.get()) {
log.debug("Fatal error code received: {}", ((StatusRuntimeException) streamException).getStatus().getCode());
this.handleFatalError();
} else {
log.debug(
"Exception in event stream connection, streamException {}, will reconnect",
streamException);
this.handleErrorOrComplete();
}
break;
}

successfulConnection.set(true);
final EventStreamResponse response = taken.getResponse();
log.debug("Got stream response: {}", response);

Expand Down Expand Up @@ -410,9 +421,19 @@ private void handleProviderReadyEvent() {
* Handles provider error events by clearing the cache (if enabled) and notifying listeners of the error.
*/
private void handleErrorOrComplete() {
log.debug("Emitting provider error event");
log.debug("Emitting provider stale event");

// complete is an error, logically...even if the server went down gracefully we need to reconnect.
onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE));
}

/**
* Handles fatal error events (i.e. error codes defined in fatalStatusCodes) by transitioning the provider into
* fatal state
*/
private void handleFatalError() {
log.debug("Emitting provider error event");

onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR));
}
}
Loading
Loading