The central part of the KafkaProducer API is KafkaProducer class. The KafkaProducer class provides an option to connect a Kafka broker.
Maven dependencies
org.apache.kafka kafka-clients 0.10.0.1
//starting zookeeper, 3 brokers, creating topic
//zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
//3 brokers
bin/kafka-server-start.sh config/server-0.properties --override delete.topic.enable=true
...
//topic
bin/kafka-topics.sh --create -topic my-topic --zookeeper localhost:2181 --replication-factor 2 --partitions 3
Producer:
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, localhost:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
try (KafkaProducer < String, String > producer = new KafkaProducer < > (properties)) {
for (int i = 0; i < 150; i++) {
ProducerRecord < String, String > record = new ProducerRecord < > ("my-topic", Integer.toString(i), "value" + i);
producer.send(record); //best practice try..catch
}
} catch (Exception e) {
e.printStackTrace();
}
Sending the Message
- By the time send() is called and message arives to the broker:
record -> serializer -> partitioner -> recordAccumulator -> send
Mico-batching in Apache Kafka:
- At scale, efficiency is everything.
- Small, fast batches of messages:
– Sending (Producer)
– Writing (Broker)
– Reading (Consumer)
Modern operation system functions:
- Pagecache
- Linux sendfile() system call (kernel)
Amortization of the constant cost
Message Buffering
- batch.size – max number of bytes that can be buffered
- buffer.memory – max memory that can be used for buffer
- max.block.ms – when buffer.memory is over, how many ms will be used to send the buffer
- in RecordBatch –
– when batch.size is full record are send
– second threshold that will trigger buffer transmission – linger.ms
Last part of sending is when batch messages are sended to the broker.
Delivery Guarantees:
– Broker acknowledgement („acks”)
- 0: fire and forget – no acks, the fastests, not reliable
- 1: leader acknowledged – good balance of performance and reliability
- 2: replication quorum acknowledged
Broker responds with error:
- „retries” – how many times a producer will retry to resend the message
- „retry.backoff.ms” – period between retries
Ordering Guarantees
- Messages order by partition – order is preserved only by partition
– No global order across partitions Can get complicated with errors - retries, retry.backoff.ms
- max.in.flight.request.per.connection – in order to avoid a failed message
to be send after another message, HIGH COST - At most once, at-least-once, only-once