AMQP retry message consumption with delay - when ACK and REJECT is not enough
- Published
This blog post describes technique named delayed-retry
implemented
in alpha-amqp-consumer and inspired
by this blog post
.
When REJECT might not be enough
Rejected messages remains in the head of queue. That means once a consumer rejects a message that messages is immediately send to an available consumer. This is not always a desired behavior. Especially for cases where a message contains data that triggers an error in worker. In that case message is being consumed over and over again (hundreds or thousands times a minute), triggering an error every time and you can't even delete it from the queue. The only way to fix it is to stop all consumers, remove message and restart consumers. That's not super friendly.
Delayed retry
That's why delayed retry was invented. With alpha-amqp-consumer
you can simply say to retry message consumption after
X milliseconds. For that time a message is removed from the original queue and stored in "retry-queue". Forget killing
consumer/channels for removing "invalid" messages, just review messages in "retry-queue" and remove them when needed.
Delayed retry also makes monitoring a bit easier. Just track amount of messages in retry-queue and send notification on your email when greater than 0.
Another big advantage of this technique is debugging. Without it in order to see what's the content of problematic message you need to either stop all consumers or log the content of message in logs. In that case delayed retry is more elegant solution.
How to use it
Let me use async/await for readability.
const connect = require('alpha-amqp-consumer').connect;
(async () => {
const manager = await connect('amqp://amqp.broker?heartbeat=10');
// configure retry topology
await manager.setupDelayedRetryTopology({
exchange: {
pre: 'pre-retry',
post: 'post-retry'
},
queue: 'messages-to-retry'
});
manager.consume((message) => {
// consume the message
}, {
queue: 'queue-name-to-consume',
resultHandler(context, error, result) {
if (error) {
// some fatal error, consume message again after 10 seconds
context.retry(10000)
} else {
// everything is fine, ack message
context.ack();
}
}
})
})();
How it works
Let's start with the diagram
It contains following entities:
- foo-queue which is an example queue
- pre-retry exchange of "topic" type
- retry-queue that will store our messages to retry
- post-retry exchange of "direct" type
alpha-amqp-consumer
binds above entities in the following way:
- pre-retry sends all messages to retry-queue (bind routing key "*")
- retry-queue has dead letter exchange set to post-retry exchange
- post-retry is bound to foo-queue with "foo-queue" pattern
Now when you tell alpha-amqp-consumer
to retry the message the following actions takes place:
alpha-amqp-consumer
makes a copy of original message with TTL equal to retry delay and publishes it to pre-retry queue with routing key "foo-queue". Original messages gets rejected with requeue=false- Message automatically lands in retry-queue
- Once message expire it's dead-lettered to post-retry exchange
- Message lands again in original queue
And yes, this is very tricky and stupid but if it's stupid and it works then it's not stupid :)
Your feedback is precious
Tell me what you think about it and leave comment below or ping me on twitter