Skip to content

Conversation

@Nikita-Shupletsov
Copy link
Contributor

@Nikita-Shupletsov Nikita-Shupletsov commented Dec 4, 2025

While we are trying to delete members from a group, they may leave the
group themselves. Which would lead to a failure of the tool, even though
not having that member in the group is the desired state.

@github-actions github-actions bot added triage PRs from the community tools clients labels Dec 4, 2025

RemoveMembersFromConsumerGroupResult(KafkaFuture<Map<MemberIdentity, Errors>> future,
public RemoveMembersFromConsumerGroupResult(KafkaFuture<Map<MemberIdentity, Errors>> future,
Set<MemberToRemove> memberInfos) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm -- can we make this constructor public w/o a KIP?

nit: fix indention for second parmeter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. as it's done only for the test, I will modify it

+ "Make sure to stop all running application instances before running the reset tool."
+ " You can use option '--force' to remove active members from the group.");
int retries = 0;
while (true) {
Copy link
Member

@mjsax mjsax Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use while(retries++ < 3) ?

Copy link
Contributor Author

@Nikita-Shupletsov Nikita-Shupletsov Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because I wanted to still throw the exception if we are still getting the exception after the retries. we can do while(retries++ < 3), but we would need to either store the exception or throw something generic, which would hide the original exception. so I decided to go this way

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. The diff was hard to read. I see now.

} catch (ExecutionException ee) {
// If the group ID is not found, this is not an error case
if (ee.getCause() instanceof GroupIdNotFoundException) {
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would break be cleaner? Atm, there is no code after the loop, to no difference, but if we add code after the loop in the future, using return here might skip this code accidentally?

Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @Nikita-Shupletsov.

I have mostly questions, as I'm following along to learn more about this area.

Thanks!

if (!members.isEmpty()) {
if (force) {
System.out.println("Force deleting all active members in the group: " + groupId);
adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, but out of curiosity, is there any reason not to pass in the members retrieved in line 193/194 into the RemoveMembersFromConsumerGroupOptions constructor on line 198? That seems like it would save an extra lookup. But for a tool, I guess it's not as critical that it be as efficient as possible 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering it, but if we look at the implementation:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L4175-L4178
it passes groupInstanceId if it exists or memberId if it doesn't.
there is no way to replicate this behavior with https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java, as it doesn't accept memberId, only groupInstanceId. and it's a public class, so changing it would require a KIP, afaik

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

It seems a bit odd because while MemberToRemove doesn't support member IDs, all removeMembersFromConsumerGroup() does with the MemberRemoves is to convert each of them into a MemberIdentity which does support member IDs 🤷‍♂️

Thanks for the explanation 👍

}
// if a member is unknown, it may mean that it left the group itself. Retrying to confirm.
if (ee.getCause() instanceof KafkaException ke && ke.getCause() instanceof UnknownMemberIdException) {
if (retries++ < 3) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding some kind of constant like NUM_RETRIES or something.


assertThrows(IllegalStateException.class, () -> streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, false));

verify(adminClient, never()).removeMembersFromConsumerGroup(eq(groupId), any());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the force parameter is set to false, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. if it's not force, instead of removing the members, we just fail. as there weren't any tests for that method, I just added this one as well.

@github-actions github-actions bot removed the triage PRs from the community label Dec 4, 2025
if (ee.getCause() instanceof GroupIdNotFoundException) {
break;
}
// if a member is unknown, it may mean that it left the group itself. Retrying to confirm.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing something, but I don't see how retrying 3 times will solve the issue here? Is the hope that on subsequent retries describeConsumerGroups won't return the member that left?

Copy link
Contributor Author

@Nikita-Shupletsov Nikita-Shupletsov Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removeMembersFromConsumerGroup fetches all members in the group and sends a request to delete them. if any of them fail, RemoveMembersFromConsumerGroupResult#all will throw an exception.
so, effectively, broker first said something existed, but then said it doesn't.

if we retry, we will repeat the same actions again. and at this point it's not expected to receive the same member(because the broker said it doesn't exit), that failed to get deleted as it was unknown. so even if we skipped some deletion for whatever reason a retry will try to delete it again

@mjsax mjsax added streams and removed clients labels Dec 5, 2025

private RemoveMembersFromConsumerGroupResult createRemoveMembersFromConsumerGroupResult(final KafkaFuture<Map<LeaveGroupRequestData.MemberIdentity, Errors>> future,
final Set<MemberToRemove> memberInfos) throws Exception {
final Constructor<RemoveMembersFromConsumerGroupResult> constructor = RemoveMembersFromConsumerGroupResult.class.getDeclaredConstructor(KafkaFuture.class, Set.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you explore using MockAdminClient (which atm does not support removeMembersFromConsumerGroup(...) though, but we could maybe add it with this PR), to avoid using reflections?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did, but it looked like a lot of changes. if you think it's worth it, I would suggest looking at it separately if you don't mind

Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there's some linting errors in StreamsResetterTest that are preventing test runs. You can run ./gradlew check -x test locally to perform the same checks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants