AMQP retry message consumption with delay - when ACK and REJECT is not enough

Published

This blog post describes technique named delayed-retry implemented in alpha-amqp-consumer and inspired by this blog post .

When REJECT might not be enough

Rejected messages remains in the head of queue. That means once a consumer rejects a message that messages is immediately send to an available consumer. This is not always a desired behavior. Especially for cases where a message contains data that triggers an error in worker. In that case message is being consumed over and over again (hundreds or thousands times a minute), triggering an error every time and you can't even delete it from the queue. The only way to fix it is to stop all consumers, remove message and restart consumers. That's not super friendly.

Delayed retry

That's why delayed retry was invented. With alpha-amqp-consumer you can simply say to retry message consumption after X milliseconds. For that time a message is removed from the original queue and stored in "retry-queue". Forget killing consumer/channels for removing "invalid" messages, just review messages in "retry-queue" and remove them when needed.

Delayed retry also makes monitoring a bit easier. Just track amount of messages in retry-queue and send notification on your email when greater than 0.

Another big advantage of this technique is debugging. Without it in order to see what's the content of problematic message you need to either stop all consumers or log the content of message in logs. In that case delayed retry is more elegant solution.

How to use it

Let me use async/await for readability.

const connect = require('alpha-amqp-consumer').connect;
(async () => {
    const manager = await connect('amqp://amqp.broker?heartbeat=10');
    // configure retry topology
    await manager.setupDelayedRetryTopology({
        exchange: {
            pre: 'pre-retry',
            post: 'post-retry'
        },
        queue: 'messages-to-retry'
    });

    manager.consume((message) => {
        // consume the message
    }, {
        queue: 'queue-name-to-consume',
        resultHandler(context, error, result) {
            if (error) {
                // some fatal error, consume message again after 10 seconds
                context.retry(10000)
            } else {
                // everything is fine, ack message
                context.ack();
            }
        }
    })
})();

How it works

Let's start with the diagram

It contains following entities:

  • foo-queue which is an example queue
  • pre-retry exchange of "topic" type
  • retry-queue that will store our messages to retry
  • post-retry exchange of "direct" type

alpha-amqp-consumer binds above entities in the following way:

  • pre-retry sends all messages to retry-queue (bind routing key "*")
  • retry-queue has dead letter exchange set to post-retry exchange
  • post-retry is bound to foo-queue with "foo-queue" pattern

Now when you tell alpha-amqp-consumer to retry the message the following actions takes place:

  1. alpha-amqp-consumer makes a copy of original message with TTL equal to retry delay and publishes it to pre-retry queue with routing key "foo-queue". Original messages gets rejected with requeue=false
  2. Message automatically lands in retry-queue
  3. Once message expire it's dead-lettered to post-retry exchange
  4. Message lands again in original queue

And yes, this is very tricky and stupid but if it's stupid and it works then it's not stupid :)

Your feedback is precious

Tell me what you think about it and leave comment below or ping me on twitter

Łukasz Kużyński - Wookieb
Thoughts, tips about programming and related subjects to make your job easier and more pleasant so you won't burnout quickly.
Copyright © Łukasz Kużyński - Wookieb 2023 • All rights reserved.