Skip to content

Help: Queue with ack = true and prefetchCount >1. Multiple handlers #461

@FrostBy

Description

@FrostBy

Hi!
Can anybody help me with the next issue:

  1. I subscribe to a queue with ack = true and prefetchCount >1.
  2. All messages transfers to a balancer, which creates instances of workers, which works with messages.
  3. All messages from a subscription are sent to an array and when any worker is ready for a message, I send it to him from this array.
  4. Once the message is processed, worker acknowledge this message and tries to take another message from the array.
  5. If the array is empty, worker tries to call queue.shift();

So... if i try to call queue.shift() after messageObject.acknowledge, i get

Error: PRECONDITION_FAILED - unknown delivery tag 1

Question: How can i acknowledge messages and then request queue for another bundle of messages

Here some examples of my logic:

   /**
     *
     * @param connection {Connection}
     * @param queue {Queue}
     */
    runQueue(connection, queue) {
        this._connection = connection;
        this._queue = queue;
        queue.subscribe({
            ack:           true,
            prefetchCount: this._availableBots.length
        }, (message, headers, deliveryInfo, ack) => {
            message = JSON.parse(message.data.toString());
            this.balance(message, ack);
        });
    }
    /**
     *
     * @param message {string}
     * @param messageObject {Message}
     */
    balance(message, messageObject) {
        let selectedBot = null;

        for (const bot of this._bots) {
            if (!bot.busy) {
                selectedBot = bot;
                break;
            }
        }
        if (selectedBot) {
            this.run(message, messageObject, selectedBot)
        }
        else {
            this._tasks.push([message, messageObject]);
            if (this._availableBots.length) {
                this.createBot();
            }
        }
    }
 bot.on('free', (messageObject) => {
            console.log('freeBot');
            bot.busy = false;
            if (messageObject) {
                messageObject.acknowledge();
            }
            if (this._tasks.length) {
                let task = this._tasks.shift();
                this.run(task[0], task[1], bot)
            }
            else {
                this._queue.shift(true);
            }
        });

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions