Retry mechanism in an event based architecture

In this article I would like to explain what kind of problems can we encounter when working with Apache Kafka and how we can address them.

Apache Kafka is a distributed streaming platform, it is used for building real-time data pipelines, because of persistence of topics it can be used for processing historical message events;

In order to improve scalability, topics consists of many partitions. A topic consists from 1 or more partitions. Consumers are grouped in Consumer Groups. When multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different subset of the partitions in the topic.

More about kafka partitions and consumer groups: https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html

Message processing resilience

Consuming messages and processing them is straightforward until it comes to failures, a message processing may fail for different reasons, for example an external dependency is down or your application has some internal defect. Some of problems can be resolved in future, for external dependencies we can retry after a period of time.

First retry approach

We can just retry message processing indefinitely with some fixed delay between subsequent retries. Sample pseudo-code of the consumer might look like this:

void consumeMainTopicWithSimpleRetry() {
    while (true) {
        boolean processedSuccessfully = true;
        Message message = takeNextMessage("main_topic");
        do {
            try {
                process(message);
            } catch (Exception ex) {
                waitSomeTime();
                processedSuccessfully = false;
                LOGGER.warn("Message processing failure. Will try once again.", ex);
            }
        } while (!processedSuccessfully);
    }
}

Non-blocking retry logic

In some streaming system it is possible to skip message processing and return back to it later but not in Kafka, once pointer is moved from current message we cannot go back.

Suppose that consumer offset is commit after we process current message, in such a way it is not possible to take next message until we process current one. If processing single message fails constantly it stops system from handling next messages. It is obvious we would like to avoid such scenario because very often failure of one message handling does not imply failure of next messages handling.

Moreover, after a period of time, processing of failed message may succeed, for example system that we depend one is back in service.

For processing later messages we need a retry mechanism, for example publish message to another topic where a consumer listens and processes messages after a period of time.

Let’s call the new topic the ‘retry_topic’. The consumer of the ‘retry_topic’ will receive the message from the Kafka and then will wait some predefined time, for example one hour, before starting the message processing. This way we can postpone next attempts of the message processing without any impact on the ‘main_topic’ consumer. If processing in the retry_topic consumer fails we just have to give up and store the message in the failed_topic for further manual handling of this problem. The main_topic’consumer code may look like this:

void consumeMainTopicWithPostponedRetry() {
    while (true) {
        Message message = takeNextMessage("main_topic");
        try {
            process(message);
        } catch (Exception ex) {
            publishTo("retry_topic");
            LOGGER.warn("Message processing failure. Will try once again in the future.", ex);
        }
    }
}

retry_topic consumer logic:

void consumeRetryTopic() {
    while (true) {
        Message message = takeNextMessage("retry_topic");
        try {
            process(message);
            waitSomeLongerTime();
        } catch (Exception ex) {
            publishTo("failed_topic");
            LOGGER.warn("Message processing failure. Will skip it.", ex);
        }
    }
}

Flexible non-blocking retry logic

The aforementioned approach looks good but it can be improved to support several retries at different period of times. The depending system may be done for a longer period of time and our software is an important piece and we are trying hard to process all messages.

It is important not to flood external time and not to speed CPU time waiting for it, we may try processing after a longer period of time, let’s improve our retry mechanism:

Assuming we want to have the following retrying strategy:

  • Every 5 minutes — 2 times
  • Then after 30 minutes — 3 times
  • Then after 1 hour only one time
  • Then we skip the message

We can represent it as a sequence with elements: 5m, 5m, 30m, 30m, 30m, 1h – the number of retries that our system supports equals sequence size.

We will create 3 separate topics to support described strategy:

  • retry_5m_topic — for retry in 5 minutes
  • retry_30m_topic — for retry in 30 minutes
  • retry_1h_topic — for retry in 1 hour

The message routing algorithm is very similar like in the previous approach. It only extends it from 1 to 3 available delay values and allows to retry predefined number of times.

Let’s discuss a simple scenario. One new message was written to the topic main_topic. When processing fails, then we want to try once more time in 5 minutes, since 5m is the first value in the Retries Sequence. How can we do it? We should write a new message to the retry_5m_topic that wraps the failed message and adds 2 fields:

  • retry_number with value 1
  • retry_timestamp with value calculated as now + 5 minutes

This means that main_topic delegates responsibility for retry to other topic, it becomes free and can process other messages. The retry_5m_topic will receive message immediately, read the retry_timestamp, compare with current time, decide if it will sleep for a period of time or will proceed to processing immediately. Sleep will block current thread until it is right time for message processing.

If message processing passes after first retry then we are good to continue with processing of next message, in case it failed, we want to retry for few more times, we take current message, increment retry_number and publish it back to retry_5m_topic.

You can notice that on each message processing failure, the copy of the message will be routed to one of retry_5m_topic, retry_30m_topic or retry_1h_topic topics. The very important thing is not to mix messages in one topic with retry_timestamp property calculated from different delay values.

Messages are time sorted in each topic and in every topic there is only one type of delayed message – for example in retry_5m_topic only 5m delayed messages.

If we reach the last element in the Retries Sequence it means that it was the last attempt. Now it’s time to say “stop”. We should decide if we can continue processing or we just say „stop” and write failed message to a failed_topic.

Summary

Implementation of postponing message processing in case of some failures, is not a trivial thing to do. Have in mind that:

  • Messages can be consumed from topic partitions in sequential order only
  • You cannot skip messages and come back to them later
  • If you want to postpone processing of some messages you can republish them to separate topics, one for each delay value
  • Processing failed messages can be achieved by cloning the message and republishing it to one of retry topics with updated information about attempt number and next retry timestamp
  • Consumers of retry topics should block the thread unless it is time to process the message
  • Messages in retry topics are naturally organized in the chronological order, sorted by the retry_timestamp field
  • When blocking for a long period of time, Kafka may throw away your consumer from group, it is important to send the heart beat signal when waiting to proceed with message processing.

Original article: https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a

Lasă un răspuns

Adresa ta de email nu va fi publicată. Câmpurile obligatorii sunt marcate cu *

Acest site folosește Akismet pentru a reduce spamul. Află cum sunt procesate datele comentariilor tale.