Apache Kafka Consumer

Consuming messages is similar with producing messages because of abstraction offered by kafka library. There are two methods for consuming messages, subscribe(List<String>) can be used to consume messages from one or more topics, assign(List<TopicPartition>) should be used when we are interested to consume messages from specific partitions.


//will start zookeeper and one kafka broker
bin/zookeeper-server-start.sh config/zookeeper.properties

//kafka broker x 3
bin/kafka-server-start.sh config/server-0.properties --override delete.topic.enable=true

//create a topic on 3 partitions with replication factor 1
bin/kafka-topics.sh --create -topic my-topic-c --zookeeper localhost:2181 --partitions 3 --replication-factor 1 

//second topic, first one will be modified and this will be used as an example
bin/kafka-topics.sh --create -topic my-topic-c1 --zookeeper localhost:2181 --partitions 3 --replication-factor 1 
//consumer will use subscribe() method
public class Consumer {
 public static void main(String[] args) {
  Properties properties = new Properties();
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, localhost:9093");
  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

  KafkaConsumer consumer = new KafkaConsumer(properties);

  consumer.subscribe(Arrays.asList("my-topic-c", "my-topic-c1"));

  try {
   while (true) {
    ConsumerRecords < String, String > records = consumer.poll(100 L);

    for (ConsumerRecord < String, String > record: records) {
     System.out.println(String.format("Topic: %s, Partition %s, Offset %s, Key %s, Value %s",
      record.topic(), record.partition(), record.offset(), record.key(), record.value()));
    }
   }
  } catch (Exception e) {
   System.out.println(e.getMessage());
  } finally {
   consumer.close();
  }
 }
}
//consumer will use assign() method
public class ConsumerPartition {
 public static void main(String[] args) {
  Properties properties = new Properties();
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, localhost:9093");
  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  //properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

  KafkaConsumer consumer = new KafkaConsumer(properties);

  List < TopicPartition > topicPartitionList = new ArrayList < > ();
  topicPartitionList.add(new TopicPartition("my-topic-c", 0));
  topicPartitionList.add(new TopicPartition("my-topic-c1", 2));

  consumer.assign(topicPartitionList);

  try {
   while (true) {
    ConsumerRecords < String, String > records = consumer.poll(100 L);

    for (ConsumerRecord < String, String > record: records) {
     System.out.println(String.format("Topic: %s, Partition %s, Offset %s, Key %s, Value %s",
      record.topic(), record.partition(), record.offset(), record.key(), record.value()));
    }
   }
  } catch (Exception e) {
   System.out.println(e.getMessage());
  } finally {
   consumer.close();
  }
 }
}

Generating data

We can use kafka-producer-perf-test.sh tool in order to generate some messages and observe how our consumers behave

//will generate 50 records of 1 byte, 10 per second, using StringSerializer for value and key
bin/kafka-producer-perf-test.sh --topic my-topic-c --num-records 50 --record-size 1 --throughput 10 --producer-props \
bootstrap.servers=localhost:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.common.serialization.StringSerializer

We should observe behavior of our consumers, the one using topics will receive messages from all 3 partitions, another one will receive messages only from partitions 0 and 2.

When modifying a topic partitions, consumer that is subscribed to topic should start receiving messages from new added partition.

bin/kafka-topics.sh --alter --topic my-topic-c --partitions 4 --zookeeper localhost:2181 
bin/kafka-topics.sh --describe --topic my-topic-c --zookeeper localhost:2181  

Kafka Consumer Polling

SubscriptionState – content of collection passed to subscribe() used, source of truth for topics the consumer is subscribed, manages offsets.

ConsumerCoordinator – with metadata about the cluster, can take responsibility to coordinate the consumer (to coordinate SubcriptionState), two main responsibilities:

  • been aware of dynamic/automatic partition reassignment and notification of assignment changes to the Notifications state object,
  • committing offsets to the cluster – will cause the update of the subscription state.

Fetcher – responsible object for most communication between consumer and cluster. To start getting information should know what partitions or topics should asking for, it gets this information from the SubcriptionState object.

ConsumerNetworkClient – tcp/ip communication with cluster.

Polling timeout is the number of ms the network client will spent polling from the cluster getting messages before returning, the minimum amount of time retrieval will take.

The Consumer Offset in Detail

Offset ss the critical detail that allows consumer to operate independently. How Kafka manages the consumer offset is an important thing to understand.

Different categories of offsets, when an individual is reading from a partitions it has to establish what it has read

  • last committed offset – we are looking at it from a partition point of view.
  • current position – As the consumer reads messages, it tracks it. The position advance. Advances with consumer.
  • long-end offset – the end of the messages log.
  • un-committed offset – Between last committed offset and current position It is important to understand what creates this gap and how to fix this.

Properties:
– enable.auto.commit – gives kafka responsibility to decide when uncommitted offsets should be marked as committed. It is blind setting because Kafka doesn’t know when actually when a messages can be committed. default 5000ms.

Offset Behavior

Read != Committed
Offset commit behavior is configurable

  • enable.auto.commit = true (default)
  • auto.commit.interval.ms = 5000 (default)
  • auto.offset.rest = „latest” (default) / „earliest” / „none”

As long as there is a gap there is an inconssitency.
Consumer tracks the offsets, Kafka stores commited offset in
__consumer_offsets” (default on 50 partitions).

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic
 __consumer_offsets

ConsumerCoordinator – produces the messages for topic „__consumer_offsets”.

Offset Management

  • Automatic vs Manual
  • enable.auto.commit = false – full control of offset commits.
  • API commitSync() and commitAsync()

commitSync() – when you want precise control, for high consistency, you don’t want to receive new records before you have processed current messages. Should be invoked after for loop.

Synchronous – blocks until receives response from cluster Retries until success or unrecoverable error.

  • retry.backoff.ms (default 100ms).
try {
 for(…) { //processing batches of records…}
 //Commit when you know you're done, after batch is processed:
 consumer.commitSync();
 } catch(CommitFailedException) {
 log.error("there's not much else we can do at this point…");
 }

commitAsync() – you may not exactly when a commit succeed or not. Does not automatically retries. Can lead to possible duplications of records. With Callback it is possible the determine the status of this action, because it is async, the performance can be better.

try {
     for(…) { //processing batches of records…}
     //Commit when you know you're done, after batch is processed:
     consumer.commitAsync(); //not recommended
consumer.commitAsync(new OffsetCommitCallback() {
    public void onComplete(...) {//do something}
});
} catch(CommitFailedException) {
     log.error("there's not much else we can do at this point…");
}

What are the reasons for taking control of the offsets?

  • Consistency control
  • When is „done”
  • Atomicity – being able to tread the steps of messages processing as single atomic operation
  • Exactly once vs. At-least-once – it is possible to have duplicates, it depends on consumers/application

Scaling-out Consumers

In order to handle a high number of records it is necessary to increase to number of consumers.

In Kafka if we need more messages produced, the solution is to add more producers.
If we need more message retention and redundancy we add more brokers.
If we need more metadata management we add more zookeeper members.

Scaling the ability to read and process messages beyond the single consumer, consumers in Consumer Groups is the solution.

A consumer groups is a collection of consumers working together as a team, the only thing required to join a consumer to a Consumer Group is to use the group.id when starting a consumer.

When a consumer is part of a consumer group, the tasks for processing the messages for an entire topic is distributed across all group members.

A consumer group can enable higher level of parallelism and throughput.
Can increase the level of redundancy, working in parallel the overall performance will be improved.

GroupCoordinator takes care to assign partitions to the group members. Group members are sending heart beats to GroupCoordinator in order to make it know they are alive.

  • heartbeat.interval.ms = 3000
  • session.timeout.ms = 30000 the total amount of time a GroupCoordinator will wait for heartbeat before considering a consumer dead.

Consumer Group Rebalance

When a new consumer is assigned to a new partition, it should know where to start ConsumerSubscriptionState object has cached the offset from previous consumer.

New consumer will start reading from the latest non committed offset.
If the previous consumer died in the middle of processing the record and didn’t had time to commit, when kafka rebalances, there could be a changes that new consumer will receive an already processed record.

Group Coordinator

  • Evenly balances available Consumers to partitions
  • 1:1 Consumer-to-partition ratio
  • Can’t avoid over-provisioning
  • Initiates the rebalacing protocol
  • Rebalaces when a consumers failes.

Demo

  • three consumers with same group id
  • consuming a single topic with three partitions

Look for:

  • shared topic consumption
  • adding an additional consumer
  • adding an additional topic
  • forcing rebalance
public class Consumer1 {
 public static void main(String[] args) {
  Properties properties = new Properties();
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, localhost:9093");
  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");

  KafkaConsumer consumer = new KafkaConsumer(properties);

  consumer.subscribe(Arrays.asList("my-big-topic"));

  try {
   while (true) {
    ConsumerRecords < String, String > records = consumer.poll(100 L);

    for (ConsumerRecord < String, String > record: records) {
     System.out.println(String.format("Topic: %s, Partition %s, Offset %s, Key %s, Value %s",
      record.topic(), record.partition(), record.offset(), record.key(), record.value()));
    }
   }
  } catch (Exception e) {
   System.out.println(e.getMessage());
  } finally {
   consumer.close();
  }
 }
}

Start 3 brokers, create topic with 3 partitions

bin/kafka-topics.sh --create --topic my-big-topic --partitions 3 --replication-factor 1 --zookeeper localhost:2181

Start 3 consumers and then start one producer

public class Producer {
 public static void main(String[] args) {
  Properties properties = new Properties();
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, localhost:9093");
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());

  try (KafkaProducer < String, String > producer = new KafkaProducer < > (properties)) {
   for (int i = 0; i < 100; i++) {
    ProducerRecord < String, String > record = new ProducerRecord < > ("my-big-topic", "value" + i);
    producer.send(record); //best practice try..catch
   }
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

Observe message partition received by each consumer, as we have 3 consumers and 3 partitions, each of them will receive messages from one partition.

Next start one more consumer, wait for rebalance and re-run producer, observe messages received by consumers (one consumer will not receive any message).

Next add one more partition to the „my-big-topic”

bin/kafka-topics.sh --describe --topic my-big-topic --zookeeper localhost:2181

After rebalance, when producer will be runned, all consumers should start receiving messages.

Now it’s time to kill 2 consumers, wait for rebalance and re-run producer – we should observe that those two consumers that are alive started to receive messages from 4 partitions.

Consumer Configuration

  • Consumer performance and efficiency
    fetch.min.bytes – min nr of bytes that should be returned from the poll, batch.size on producer
    max.fetch.wait.ms – analog with linger.ms on producer
    max.partition.fetch.bytes – max number of bytes that poll can retrieve per cycle
    max.poll.records – max number of records per poll cycle

Advanced topics

  • Consumer position control API
    – seek()
    – seekToBeginning()
    – seekToEnd()
  • Flow control
    – pause()
    – resume()
  • Rebalance listeners – will notify you when a rebalance occurs.

Lasă un răspuns

Adresa ta de email nu va fi publicată. Câmpurile obligatorii sunt marcate cu *

Acest site folosește Akismet pentru a reduce spamul. Află cum sunt procesate datele comentariilor tale.