Skip to content

Conversation

@nadberezny
Copy link

Purpose of the Change
This PR fixes a critical stability issue in the FanOutKinesisShardSubscription where the connector could enter a "zombie" state—appearing active but not consuming events—following a failed subscription attempt (ResourceInUseException).

It also resolves some potential duplicates due to non thread safe shard starting position.

More details can be found in the JIRA ticket description:
https://issues.apache.org/jira/browse/FLINK-38814

- call void error(String var1, Throwable var2) instead void error(String var1, Object... var2)
he FanOutKinesisShardSubscription could enter a zombie state where it appeared active but was not consuming events. This occurred due to incorrect handling of the `ResourceInUseException` during the subscription phase.

Problem:
When the Kinesis SDK threw a `ResourceInUseException`, the code incorrectly released the `waitForSubscriptionLatch`. This signaled "success" to the waiting thread, even though no `onSubscribe` callback had occurred.

Root Cause:
1. The waiting thread woke up and attempted to call `requestRecords()` on a null `subscription` object, throwing a `NullPointerException`.
2. The `CompletableFuture.runAsync` lambda only caught `InterruptedException`. The `NullPointerException` (a RuntimeException) killed the thread silently.
3. The `subscriptionActive` flag remained true, causing the main poll loop to wait indefinitely for events that would never arrive.

Changes:
- Updated the `.exceptionally()` block to treat `ResourceInUseException` as a failure (calling `terminateSubscription`) rather than a success.
- Added a catch-all `catch(Throwable t)` block to the async activation thread to ensure unexpected errors are logged and handled rather than swallowed.
- Added a safety check to ensure `subscription` is not null before requesting records.
@nadberezny nadberezny changed the title [Flink-388814][Connector/Kinesis] Kinesis Source in EFO - Fix idle consumer on subscription failure [Flink-38814][Connector/Kinesis] Kinesis Source in EFO - Fix idle consumer on subscription failure Dec 18, 2025
@nadberezny nadberezny changed the title [Flink-38814][Connector/Kinesis] Kinesis Source in EFO - Fix idle consumer on subscription failure [FLINK-38814][Connector/Kinesis] Kinesis Source in EFO - Fix idle consumer on subscription failure Dec 18, 2025
@AndyShoeshim
Copy link

java.util.concurrent.TimeoutException: Timeout when subscribing to shard shardId-000000000033 with starting position StartingPosition{shardIteratorType=AFTER_SEQUENCE_NUMBER, startingMarker=49670023483232461788730612813735157811693693431491592722}

can this exception also be related to this? I have a similar use case described in the issue, in which the consumer gets in a zombie like state

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants