Skip to content

Conversation

@LucioFranco
Copy link

Fixes a critical bug where the message cache size calculation could result in 0, causing "Consume was already called" errors and stopping message processing.

Root Cause

When consumption was slow or message rate was low, #messageCacheMaxSize calculated to 0. JavaScript's truthy check (0 && callback is falsy) caused consume(0, callback) to call _consumeLoop() instead of _consumeNum(), starting a persistent background loop that blocked all other workers.

Solution

Added Math.max(1, ...) to ensure cache size is always at least 1, plus defensive validation in the consume() method.

When This Occurs

The bug triggers when messagesPerMillisecondSingleWorker < 0.000333, which happens with:

  • Slow eachMessage/eachBatch callbacks (DB writes, HTTP calls, etc.)
  • Low message volumes across multiple partitions
  • High partitionsConsumedConcurrently settings

Example

Processing 3 messages across 5 partitions taking 6 seconds:

messagesPerMillisecondSingleWorker = 3 / 5 / 6000 = 0.0001
Math.round(1500 * 0.0001) = 0  // Bug triggers

Fixes #415

Fixes a critical bug where the message cache size calculation could result in 0,
causing "Consume was already called" errors and stopping message processing.

## Root Cause
When consumption was slow or message rate was low, #messageCacheMaxSize
calculated to 0. JavaScript's truthy check (0 && callback is falsy) caused consume(0,
callback) to call _consumeLoop() instead of _consumeNum(), starting a persistent
background loop that blocked all other workers.

## Solution
Added Math.max(1, ...) to ensure cache size is always at least 1, plus
defensive validation in the consume() method.

## When This Occurs
The bug triggers when messagesPerMillisecondSingleWorker < 0.000333, which happens
with:
- Slow eachMessage/eachBatch callbacks (DB writes, HTTP calls, etc.)
- Low message volumes across multiple partitions
- High partitionsConsumedConcurrently settings

## Example
Processing 3 messages across 5 partitions taking 6 seconds:
```javascript
messagesPerMillisecondSingleWorker = 3 / 5 / 6000 = 0.0001
Math.round(1500 * 0.0001) = 0  // Bug triggers
```

Fixes confluentinc#415
@LucioFranco LucioFranco requested a review from a team as a code owner December 10, 2025 22:47
Copilot AI review requested due to automatic review settings December 10, 2025 22:47
@LucioFranco LucioFranco requested a review from a team as a code owner December 10, 2025 22:47
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a critical bug where the message cache size calculation could incorrectly evaluate to 0, causing message processing to halt with "Consume was already called" errors. The fix ensures the cache size is always at least 1 by adding 1 inside the Math.ceil() call before performing the calculation.

Key Changes:

  • Modified cache size calculation to guarantee minimum value of 1
  • Prevents division-by-zero-like behavior when message consumption is slow or message rates are low

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Error: Consume was already called + TypeError: messages is not iterable

1 participant