Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 39 additions & 23 deletions tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
Expand Down Expand Up @@ -116,6 +118,8 @@ public class StreamsResetter {
+ "*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that "
+ "you run this once with \"--dry-run\" to preview your changes before making them.\n\n";

private static final int MAX_REMOVE_MEMBERS_FROM_CONSUMER_GROUP_RETRIES = 3;

private final List<String> allTopics = new LinkedList<>();

public static void main(final String[] args) {
Expand Down Expand Up @@ -149,7 +153,7 @@ public int execute(final String[] args, final Properties config) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerValue);

try (Admin adminClient = Admin.create(properties)) {
maybeDeleteActiveConsumers(groupId, adminClient, options);
maybeDeleteActiveConsumers(groupId, adminClient, options.hasForce());

allTopics.clear();
allTopics.addAll(adminClient.listTopics().names().get(60, TimeUnit.SECONDS));
Expand Down Expand Up @@ -177,30 +181,42 @@ public int execute(final String[] args, final Properties config) {
}
}

private void maybeDeleteActiveConsumers(final String groupId,
final Admin adminClient,
final StreamsResetterOptions options)
// visible for testing
void maybeDeleteActiveConsumers(final String groupId,
final Admin adminClient,
final boolean force)
throws ExecutionException, InterruptedException {
final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(
Set.of(groupId),
new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
try {
final List<MemberDescription> members =
new ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
if (!members.isEmpty()) {
if (options.hasForce()) {
System.out.println("Force deleting all active members in the group: " + groupId);
adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all().get();
} else {
throw new IllegalStateException("Consumer group '" + groupId + "' is still active "
+ "and has following members: " + members + ". "
+ "Make sure to stop all running application instances before running the reset tool."
+ " You can use option '--force' to remove active members from the group.");
int retries = 0;
while (true) {
Copy link
Member

@mjsax mjsax Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use while(retries++ < 3) ?

Copy link
Contributor Author

@Nikita-Shupletsov Nikita-Shupletsov Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because I wanted to still throw the exception if we are still getting the exception after the retries. we can do while(retries++ < 3), but we would need to either store the exception or throw something generic, which would hide the original exception. so I decided to go this way

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. The diff was hard to read. I see now.

final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(
Set.of(groupId),
new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
try {
final List<MemberDescription> members =
new ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
if (!members.isEmpty()) {
if (force) {
System.out.println("Force deleting all active members in the group: " + groupId);
adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, but out of curiosity, is there any reason not to pass in the members retrieved in line 193/194 into the RemoveMembersFromConsumerGroupOptions constructor on line 198? That seems like it would save an extra lookup. But for a tool, I guess it's not as critical that it be as efficient as possible 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering it, but if we look at the implementation:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L4175-L4178
it passes groupInstanceId if it exists or memberId if it doesn't.
there is no way to replicate this behavior with https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java, as it doesn't accept memberId, only groupInstanceId. and it's a public class, so changing it would require a KIP, afaik

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

It seems a bit odd because while MemberToRemove doesn't support member IDs, all removeMembersFromConsumerGroup() does with the MemberRemoves is to convert each of them into a MemberIdentity which does support member IDs 🤷‍♂️

Thanks for the explanation 👍

} else {
throw new IllegalStateException("Consumer group '" + groupId + "' is still active "
+ "and has following members: " + members + ". "
+ "Make sure to stop all running application instances before running the reset tool."
+ " You can use option '--force' to remove active members from the group.");
}
}
break;
} catch (ExecutionException ee) {
// If the group ID is not found, this is not an error case
if (ee.getCause() instanceof GroupIdNotFoundException) {
break;
}
// if a member is unknown, it may mean that it left the group itself. Retrying to confirm.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing something, but I don't see how retrying 3 times will solve the issue here? Is the hope that on subsequent retries describeConsumerGroups won't return the member that left?

Copy link
Contributor Author

@Nikita-Shupletsov Nikita-Shupletsov Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removeMembersFromConsumerGroup fetches all members in the group and sends a request to delete them. if any of them fail, RemoveMembersFromConsumerGroupResult#all will throw an exception.
so, effectively, broker first said something existed, but then said it doesn't.

if we retry, we will repeat the same actions again. and at this point it's not expected to receive the same member(because the broker said it doesn't exit), that failed to get deleted as it was unknown. so even if we skipped some deletion for whatever reason a retry will try to delete it again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, didn't know removeMembersFromConsumerGroup fetched and deleted members.

if (ee.getCause() instanceof KafkaException ke && ke.getCause() instanceof UnknownMemberIdException) {
if (retries++ < MAX_REMOVE_MEMBERS_FROM_CONSUMER_GROUP_RETRIES) {
continue;
}
}
}
} catch (ExecutionException ee) {
// If the group ID is not found, this is not an error case
if (!(ee.getCause() instanceof GroupIdNotFoundException)) {
throw ee;
}
}
Expand Down
118 changes: 117 additions & 1 deletion tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,33 @@
*/
package org.apache.kafka.tools;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.MemberToRemove;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.protocol.Errors;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.lang.reflect.Constructor;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
Expand All @@ -41,7 +53,15 @@
import java.util.concurrent.ExecutionException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@Timeout(value = 600)
public class StreamsResetterTest {
Expand Down Expand Up @@ -293,6 +313,102 @@ public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() {
assertEquals(beginningAndEndOffset, position);
}

@Test
public void shouldRetryToRemoveMembersOnUnknownMemberIdExceptionAndForce() throws Exception {
final String groupId = "groupId";

final Admin adminClient = mock(Admin.class);
final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class);

when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
.thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription))));
when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any()))
.thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of()))
.thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()), Set.of()));
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));

streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true);

verify(adminClient, times(2)).removeMembersFromConsumerGroup(eq(groupId), any());
}

@Test
public void shouldFailAfterTooManyRetries() throws Exception {
final String groupId = "groupId";

final Admin adminClient = mock(Admin.class);
final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class);

when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
.thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription))));
when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any()))
.thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of()));
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));

assertThrows(ExecutionException.class, () -> streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true));

verify(adminClient, times(4)).removeMembersFromConsumerGroup(eq(groupId), any());
}

@Test
public void shouldFailIfThereAreMembersAndNotForce() throws Exception {
final String groupId = "groupId";

final Admin adminClient = mock(Admin.class);
final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class);

when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
.thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription))));
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));

assertThrows(IllegalStateException.class, () -> streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, false));

verify(adminClient, never()).removeMembersFromConsumerGroup(eq(groupId), any());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the force parameter is set to false, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. if it's not force, instead of removing the members, we just fail. as there weren't any tests for that method, I just added this one as well.

}

@Test
public void shouldRemoveIfThereAreMembersAndForce() throws Exception {
final String groupId = "groupId";

final Admin adminClient = mock(Admin.class);
final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class);

when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
.thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription))));
when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any()))
.thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()), Set.of()));
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));

streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true);

verify(adminClient).removeMembersFromConsumerGroup(eq(groupId), any());
}

@Test
public void shouldIgnoreGroupIdNotFoundException() throws Exception {
final String groupId = "groupId";

final Admin adminClient = mock(Admin.class);
final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class);

final KafkaFutureImpl<ConsumerGroupDescription> future = new KafkaFutureImpl<>();
future.completeExceptionally(new GroupIdNotFoundException(groupId));
when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
.thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, future)));
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));

streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true);

verify(adminClient, never()).removeMembersFromConsumerGroup(eq(groupId), any());
}

private RemoveMembersFromConsumerGroupResult createRemoveMembersFromConsumerGroupResult(final KafkaFuture<Map<LeaveGroupRequestData.MemberIdentity, Errors>> future,
final Set<MemberToRemove> memberInfos) throws Exception {
final Constructor<RemoveMembersFromConsumerGroupResult> constructor = RemoveMembersFromConsumerGroupResult.class.getDeclaredConstructor(KafkaFuture.class, Set.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you explore using MockAdminClient (which atm does not support removeMembersFromConsumerGroup(...) though, but we could maybe add it with this PR), to avoid using reflections?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did, but it looked like a lot of changes. if you think it's worth it, I would suggest looking at it separately if you don't mind

constructor.setAccessible(true);
return constructor.newInstance(future, memberInfos);
}

private Cluster createCluster(final int numNodes) {
final HashMap<Integer, Node> nodes = new HashMap<>();
for (int i = 0; i < numNodes; ++i) {
Expand All @@ -315,4 +431,4 @@ public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(fina
return topicPartitionToOffsetAndTimestamp;
}
}
}
}