The central part of the KafkaProducer API is KafkaProducer class. The KafkaProducer class provides an option to connect a Kafka broker.
Maven dependencies
1 2 3 4 5 6 7 |
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependency> </dependencies> |
1 2 3 4 5 6 7 8 9 10 |
//starting zookeeper, 3 brokers, creating topic //zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties //3 brokers bin/kafka-server-start.sh config/server-0.properties --override delete.topic.enable=true ... //topic bin/kafka-topics.sh --create -topic my-topic --zookeeper localhost:2181 --replication-factor 2 --partitions 3 |
Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, localhost:9093"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); try (KafkaProducer < String, String > producer = new KafkaProducer < > (properties)) { for (int i = 0; i < 150; i++) { ProducerRecord < String, String > record = new ProducerRecord < > ("my-topic", Integer.toString(i), "value" + i); producer.send(record); //best practice try..catch } } catch (Exception e) { e.printStackTrace(); } |
Sending the Message

- By the time send() is called and message arives to the broker:
record -> serializer -> partitioner -> recordAccumulator -> send

Mico-batching in Apache Kafka:
- At scale, efficiency is everything.
- Small, fast batches of messages:
— Sending (Producer)
— Writing (Broker)
— Reading (Consumer)
Modern operation system functions:
- Pagecache
- Linux sendfile() system call (kernel)
Amortization of the constant cost
Message Buffering
- batch.size – max number of bytes that can be buffered
- buffer.memory – max memory that can be used for buffer
- max.block.ms – when buffer.memory is over, how many ms will be used to send the buffer
- in RecordBatch –
— when batch.size is full record are send
— second threshold that will trigger buffer transmission – linger.ms
Last part of sending is when batch messages are sended to the broker.
Delivery Guarantees:
— Broker acknowledgement (“acks”)
- 0: fire and forget – no acks, the fastests, not reliable
- 1: leader acknowledged – good balance of performance and reliability
- 2: replication quorum acknowledged
Broker responds with error:
- “retries” – how many times a producer will retry to resend the message
- “retry.backoff.ms” – period between retries
Ordering Guarantees
- Messages order by partition – order is preserved only by partition
— No global order across partitions Can get complicated with errors - retries, retry.backoff.ms
- max.in.flight.request.per.connection – in order to avoid a failed message
to be send after another message, HIGH COST - At most once, at-least-once, only-once