Apache Kafka Producer

The central part of the KafkaProducer API is KafkaProducer class. The KafkaProducer class provides an option to connect a Kafka broker.

Maven dependencies

    
        
            org.apache.kafka
            kafka-clients
            0.10.0.1
        
    
//starting zookeeper, 3 brokers, creating topic
//zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

//3 brokers
bin/kafka-server-start.sh config/server-0.properties --override delete.topic.enable=true
...

//topic
bin/kafka-topics.sh --create -topic my-topic --zookeeper localhost:2181 --replication-factor 2 --partitions 3

Producer:

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, localhost:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());

try (KafkaProducer < String, String > producer = new KafkaProducer < > (properties)) {
 for (int i = 0; i < 150; i++) {
  ProducerRecord < String, String > record = new ProducerRecord < > ("my-topic", Integer.toString(i), "value" + i);
  producer.send(record); //best practice try..catch
 }
} catch (Exception e) {
 e.printStackTrace();
}

Sending the Message

  • By the time send() is called and message arives to the broker:

record -> serializer -> partitioner -> recordAccumulator -> send

Mico-batching in Apache Kafka:

  • At scale, efficiency is everything.
  • Small, fast batches of messages:
    – Sending (Producer)
    – Writing (Broker)
    – Reading (Consumer)

Modern operation system functions:

  • Pagecache
  • Linux sendfile() system call (kernel)

Amortization of the constant cost

Message Buffering

  • batch.size – max number of bytes that can be buffered
  • buffer.memory – max memory that can be used for buffer
  • max.block.ms – when buffer.memory is over, how many ms will be used to send the buffer
  • in RecordBatch –
    – when batch.size is full record are send
    – second threshold that will trigger buffer transmission – linger.ms

Last part of sending is when batch messages are sended to the broker.

Delivery Guarantees:
– Broker acknowledgement („acks”)

  • 0: fire and forget – no acks, the fastests, not reliable
  • 1: leader acknowledged – good balance of performance and reliability
  • 2: replication quorum acknowledged

Broker responds with error:

  • „retries” – how many times a producer will retry to resend the message
  • „retry.backoff.ms” – period between retries

Ordering Guarantees

  • Messages order by partition – order is preserved only by partition
    – No global order across partitions Can get complicated with errors
  • retries, retry.backoff.ms
  • max.in.flight.request.per.connection – in order to avoid a failed message
    to be send after another message, HIGH COST
  • At most once, at-least-once, only-once

Lasă un răspuns

Adresa ta de email nu va fi publicată. Câmpurile obligatorii sunt marcate cu *

Acest site folosește Akismet pentru a reduce spamul. Află cum sunt procesate datele comentariilor tale.