Kafka Tutorial: Writing a Kafka Producer in Java
In this tutorial, we are going to create simple Java example that creates a Kafka producer. You create a new replicated Kafka topic called
my-example-topic
, then you create a Kafka producer that uses this topic to send records. You will send records with the Kafka producer. You will send records synchronously. Later, you will send records asynchronously.Before you start
Prerequisites to this tutorial are Kafka from the command line and Kafka clustering and failover basics.
This tutorial is part of a series. If you are not sure what Kafka is, you should start with What is Kafka?. If you are unfamiliar with the architecture of Kafka then we suggest reading Kafka Architecture, Kafka Topics Architecture, Kafka Producer Architecture and Kafka Consumer Architecture.
Create Replicated Kafka Topic
Next, you need to create a replicated topic.
~/kafka-training/lab3/create-topic.sh
#!/usr/bin/env bash
cd ~/kafka-training
## Create topics
kafka/bin/kafka-topics.sh --create \
--replication-factor 3 \
--partitions 13 \
--topic my-example-topic \
--zookeeper localhost:2181
## List created topics
kafka/bin/kafka-topics.sh --list \
--zookeeper localhost:2181
Above we create a topic named
my-example-topic
with 13 partitions and a replication factor of 3. Then we list the Kafka topics.
Runs
create-topic.sh
as follows.Output from running create-topic.sh
~/kafka-training/lab3
$ ./create-topic.sh
Created topic "my-example-topic".
__consumer_offsets
my-example-topic
my-failsafe-topic
Gradle Build Script
For this example, we use gradle to build the project.
~/kafka-training/lab3/solution/build.gradle
group 'cloudurable-kafka'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile 'org.apache.kafka:kafka-clients:0.10.2.0'
compile 'ch.qos.logback:logback-classic:1.2.2'
}
Notice that we import the jar file
kafka-clients:0.10.2.0
. Apache Kafka uses sl4j
so to setup logging we use logback (ch.qos.logback:logback-classic:1.2.2
).Construct a Kafka Producer
To create a Kafka producer, you will need to pass it a list of bootstrap servers (a list of Kafka brokers). You will also specify a
client.id
that uniquely identifies this Producer client. In this example, we are going to send messages with ids. The message body is a string, so we need a record value serializer as we will send the message body in the Kafka’s records value field. The message id (long), will be sent as the Kafka’s records key. You will need to specify a Key serializer and a value serializer, which Kafka will use to encode the message id as a Kafka record key, and the message body as the Kafka record value.Common Kafka imports and constants
Next, we will import the Kafka packages and define a constant for the topic and a constant to define the list of bootstrap servers that the producer will connect.
KafkaProducerExample.java - imports and constants
~/kafka-training/lab3/src/main/java/com/cloudurable/kafka/KafkaProducerExample.java
package com.cloudurable.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerExample {
private final static String TOPIC = "my-example-topic";
private final static String BOOTSTRAP_SERVERS =
"localhost:9092,localhost:9093,localhost:9094";
Notice that
KafkaProducerExample
imports LongSerializer
which gets configured as the Kafka record key serializer, and imports StringSerializer
which gets configured as the record value serializer. The constant BOOTSTRAP_SERVERS
is set tolocalhost:9092,localhost:9093,localhost:9094
which is the three Kafka servers that we started up in the last lesson. Go ahead and make sure all three Kafka servers are running. The constant TOPIC
is set to the replicated Kafka topic that we just created.Create Kafka Producer to send records
Now, that we imported the Kafka classes and defined some constants, let’s create a Kafka producer.
KafkaProducerExample.java - Create Producer to send Records
~/kafka-training/lab3/src/main/java/com/cloudurable/kafka/KafkaProducerExample.java
public class KafkaProducerExample {
...
private static Producer<Long, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
To create a Kafka producer, you use
java.util.Properties
and define certain properties that we pass to the constructor of a KafkaProducer
.
Above
KafkaProducerExample.createProducer
sets the BOOTSTRAP_SERVERS_CONFIG
(“bootstrap.servers) property to the list of broker addresses we defined earlier. BOOTSTRAP_SERVERS_CONFIG
value is a comma separated list of host/port pairs that the Producer
uses to establish an initial connection to the Kafka cluster. The producer uses of all servers in the cluster no matter which ones we list here. This list only specifies the initial Kafka brokers used to discover the full set of servers of the Kafka cluster. If a server in this list is down, the producer will just go to the next broker in the list to discover the full topology of the Kafka cluster.
The
CLIENT_ID_CONFIG
(“client.id”) is an id to pass to the server when making requests so the server can track the source of requests beyond just IP/port by passing a producer name for things like server-side request logging.
The
KEY_SERIALIZER_CLASS_CONFIG
(“key.serializer”) is a Kafka Serializer class for Kafka record keys that implements the Kafka Serializer interface. Notice that we set this to LongSerializer
as the message ids in our example are longs.
The
VALUE_SERIALIZER_CLASS_CONFIG
(“value.serializer”) is a Kafka Serializer class for Kafka record values that implements the Kafka Serializer interface. Notice that we set this to StringSerializer
as the message body in our example are strings.Send records synchronously with Kafka Producer
Kafka provides a synchronous send method to send a record to a topic. Let’s use this method to send some message ids and messages to the Kafka topic we created earlier.
KafkaProducerExample.java - Send Records Synchronously
~/kafka-training/lab3/src/main/java/com/cloudurable/kafka/KafkaProducerExample.java
public class KafkaProducerExample {
...
static void runProducer(final int sendMessageCount) throws Exception {
final Producer<Long, String> producer = createProducer();
long time = System.currentTimeMillis();
try {
for (long index = time; index < time + sendMessageCount; index++) {
final ProducerRecord<Long, String> record =
new ProducerRecord<>(TOPIC, index,
"Hello Mom " + index);
RecordMetadata metadata = producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - time;
System.out.printf("sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(),
metadata.offset(), elapsedTime);
}
} finally {
producer.flush();
producer.close();
}
}
...
The above just iterates through a for loop, creating a
ProducerRecord
sending an example message ("Hello Mom " + index
) as the record value
and the for loop index
as the record key
. For each iteration, runProducer
calls the send
method of the producer
(RecordMetadata metadata = producer.send(record).get()
). The send method returns a Java Future
.
The response RecordMetadata has ‘partition’ where the record was written and the ‘offset’ of the record in that partition.
Notice the call to flush and close. Kafka will auto flush on its own, but you can also call flush explicitly which will send the accumulated records now. It is polite to close the connection when we are done.
Running the Kafka Producer
Next you define the
main
method.KafkaProducerExample.java - Running the Producer
~/kafka-training/lab3/src/main/java/com/cloudurable/kafka/KafkaProducerExample.java
public static void main(String... args) throws Exception {
if (args.length == 0) {
runProducer(5);
} else {
runProducer(Integer.parseInt(args[0]));
}
}
The
main
method just calls runProducer
.Send records asynchronously with Kafka Producer
Kafka provides an asynchronous send method to send a record to a topic. Let’s use this method to send some message ids and messages to the Kafka topic we created earlier. The big difference here will be that we use a lambda expression to define a callback.
KafkaProducerExample.java - Send Records Asynchronously with Kafka Producer
~/kafka-training/lab3/src/main/java/com/cloudurable/kafka/KafkaProducerExample.java
static void runProducer(final int sendMessageCount) throws InterruptedException {
final Producer<Long, String> producer = createProducer();
long time = System.currentTimeMillis();
final CountDownLatch countDownLatch = new CountDownLatch(sendMessageCount);
try {
for (long index = time; index < time + sendMessageCount; index++) {
final ProducerRecord<Long, String> record =
new ProducerRecord<>(TOPIC, index, "Hello Mom " + index);
producer.send(record, (metadata, exception) -> {
long elapsedTime = System.currentTimeMillis() - time;
if (metadata != null) {
System.out.printf("sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(),
metadata.offset(), elapsedTime);
} else {
exception.printStackTrace();
}
countDownLatch.countDown();
});
}
countDownLatch.await(25, TimeUnit.SECONDS);
}finally {
producer.flush();
producer.close();
}
}
Notice the use of a CountDownLatch so we can send all N messages and then wait for them all to send.
Async Interface Callback and Async Send Method
Kafka defines a Callback interface that you use for asynchronous operations. The callback interface allows code to execute when the request is complete. The callback executes in a background I/O thread so it should be fast (don’t block it). The
onCompletion(RecordMetadata metadata, Exception exception)
gets called when the asynchronous operation completes. The metadata
gets set (not null) if the operation was a success, and the exception gets set (not null) if the operation had an error.
The async
send
method is used to send a record to a topic, and the provided callback gets called when the send
is acknowledged. The send
method is asynchronous, and when called returns immediately once the record gets stored in the buffer of records waiting to post to the Kafka broker. The send
method allows sending many records in parallel without blocking to wait for the response after each one.Since the send call is asynchronous it returns a Future for the RecordMetadata that will be assigned to this record. Invoking get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record. KafkaProducer
Conclusion Kafka Producer example
We created a simple example that creates a Kafka Producer. First, we created a new replicated Kafka topic; then we created Kafka Producer in Java that uses the Kafka replicated topic to send records. We sent records with the Kafka Producer using async and sync send methods.
We hope you enjoyed this article. Please provide feedback. See the Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Review Kafka Producer
What does the Callback lambda do?
The callback gets notified when the request is complete.
What will happen if the first server is down in the bootstrap list? Can the producer still connect to the other Kafka brokers in the cluster?
The producer will try to contact the next broker in the list. Any of the brokers once contacted, will let the producer know about the entire Kafka cluster. The Producer will connect as long as at least one of the brokers in the list is running. If you have 100 brokers and two of the brokers in a list of three servers in the bootstrap list are down, the producer can still use the 98 remaining brokers.
When would you use Kafka async send vs. sync send?
If you were already using an async code (Akka, QBit, Reakt, Vert.x) base, and you wanted to send records quickly.
Why do you need two serializers for a Kafka record?
One of the serializers is for the Kafka record key, and the other is for the Kafka record value.
Kafka Tutorial: Writing a Kafka Consumer in Java
In this tutorial, you are going to create simple Kafka Consumer. This consumer consumes messages from the Kafka Producer you wrote in the last tutorial. This tutorial demonstrates how to process records from a Kafka topic with a Kafka Consumer.
This tutorial describes how Kafka Consumers in the same group divide up and share partitions while each consumer group appears to get its own copy of the same data.
Construct a Kafka Consumer
Just like we did with the producer, you need to specify bootstrap servers. You also need to define a group.id that identifies which consumer group this consumer belongs. Then you need to designate a Kafka record key deserializer and a record value deserializer. Then you need to subscribe the consumer to the topic you created in the producer tutorial.
Kafka Consumer imports and constants
Next, you import the Kafka packages and define a constant for the topic and a constant to set the list of bootstrap servers that the consumer will connect.
KafkaConsumerExample.java - imports and constants
~/kafka-training/lab4/src/main/java/com/cloudurable/kafka/KafkaConsumerExample.java
package com.cloudurable.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
private final static String TOPIC = "my-example-topic";
private final static String BOOTSTRAP_SERVERS =
"localhost:9092,localhost:9093,localhost:9094";
...
}
Notice that
KafkaConsumerExample
imports LongDeserializer
which gets configured as the Kafka record key deserializer, and imports StringDeserializer
which gets set up as the record value deserializer. The constant BOOTSTRAP_SERVERS
gets set to localhost:9092,localhost:9093,localhost:9094
which is the three Kafka servers that we started up in the last lesson. Go ahead and make sure all three Kafka servers are running. The constant TOPIC
gets set to the replicated Kafka topic that you created in the last tutorial.Create Kafka Consumer using Topic to Receive Records
Now, that you imported the Kafka classes and defined some constants, let’s create the Kafka consumer.
KafkaConsumerExample.java - Create Consumer to process Records
~/kafka-training/lab4/src/main/java/com/cloudurable/kafka/KafkaConsumerExample.java
public class KafkaConsumerExample {
...
private static Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Create the consumer using props.
final Consumer<Long, String> consumer =
new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
...
}
To create a Kafka consumer, you use
java.util.Properties
and define certain properties that we pass to the constructor of a KafkaConsumer
.
Above
KafkaConsumerExample.createConsumer
sets the BOOTSTRAP_SERVERS_CONFIG
(“bootstrap.servers”) property to the list of broker addresses we defined earlier. BOOTSTRAP_SERVERS_CONFIG
value is a comma separated list of host/port pairs that the Consumer
uses to establish an initial connection to the Kafka cluster. Just like the producer, the consumer uses of all servers in the cluster no matter which ones we list here.
The
GROUP_ID_CONFIG
identifies the consumer group of this consumer.
The
KEY_DESERIALIZER_CLASS_CONFIG
(“key.deserializer”) is a Kafka Deserializer class for Kafka record keys that implements the Kafka Deserializer interface. Notice that we set this to LongDeserializer
as the message ids in our example are longs.
The
VALUE_DESERIALIZER_CLASS_CONFIG
(“value.deserializer”) is a Kafka Serializer class for Kafka record values that implements the Kafka Deserializer interface. Notice that we set this to StringDeserializer
as the message body in our example are strings.
Important notice that you need to subscribe the consumer to the topic
consumer.subscribe(Collections.singletonList(TOPIC));
. The subscribe method takes a list of topics to subscribe to, and this list will replace the current subscriptions if any.Process messages from Kafka with Consumer
Now, let’s process some records with our Kafka Producer.
KafkaConsumerExample.java - Process records from Consumer
~/kafka-training/lab4/src/main/java/com/cloudurable/kafka/KafkaConsumerExample.java
public class KafkaConsumerExample {
...
static void runConsumer() throws InterruptedException {
final Consumer<Long, String> consumer = createConsumer();
final int giveUp = 100; int noRecordsCount = 0;
while (true) {
final ConsumerRecords<Long, String> consumerRecords =
consumer.poll(1000);
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
record.key(), record.value(),
record.partition(), record.offset());
});
consumer.commitAsync();
}
consumer.close();
System.out.println("DONE");
}
}
Notice you use
ConsumerRecords
which is a group of records from a Kafka topic partition. The ConsumerRecords
class is a container that holds a list of ConsumerRecord(s) per partition for a particular topic. There is one ConsumerRecord
list for every topic partition returned by a the consumer.poll()
.
Notice if you receive records (
consumerRecords.count()!=0
), then runConsumer
method calls consumer.commitAsync()
which commit offsets returned on the last call to consumer.poll(…) for all the subscribed list of topic partitions.Kafka Consumer Poll method
The poll method returns fetched records based on current partition offset. The poll method is a blocking method waiting for specified time in seconds. If no records are available after the time period specified, the poll method returns an empty ConsumerRecords.
When new records become available, the poll method returns straight away.
You can can control the maximum records returned by the poll() with
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
. The poll method is not thread safe and is not meant to get called from multiple threads.Running the Kafka Consumer
Next you define the
main
method.KafkaConsumerExample.java - Running the Consumer
~/kafka-training/lab4/src/main/java/com/cloudurable/kafka/KafkaConsumerExample.java
public class KafkaConsumerExample {
public static void main(String... args) throws Exception {
runConsumer();
}
}
The
main
method just calls runConsumer
.Try running the consumer and producer
Run the consumer from your IDE. Then run the producer from the last tutorial from your IDE. You should see the consumer get the records that the producer sent.
Logging set up for Kafka
If you don’t set up logging well, it might be hard to see the consumer get the messages.
Kafka like most Java libs these days uses
sl4j
. You can use Kafka with Log4j, Logback or JDK logging. We used logback in our gradle build (compile 'ch.qos.logback:logback-classic:1.2.2'
).~/kafka-training/lab4/solution/src/main/resources/logback.xml
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="org.apache.kafka" level="INFO"/>
<logger name="org.apache.kafka.common.metrics" level="INFO"/>
<root level="debug">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Notice that we set
org.apache.kafka
to INFO, otherwise we will get a lot of log messages. You should run it set to debug and read through the log messages. It gives you a flavor of what Kafka is doing under the covers. Leave org.apache.kafka.common.metrics
or what Kafka is doing under the covers is drowned by metrics logging.Try this: Three Consumers in same group and one Producer sending 25 messages
Run the consumer example three times from your IDE. Then change Producer to send 25 records instead of 5. Then run the producer once from your IDE. What happens? The consumers should share the messages.
Producer Output
sent record(key=1495048417121 value=..) meta(partition=6, offset=16) time=118
sent record(key=1495048417131 value=..) meta(partition=6, offset=17) time=120
sent record(key=1495048417133 value=..) meta(partition=12, offset=17) time=120
sent record(key=1495048417140 value=..) meta(partition=12, offset=18) time=121
sent record(key=1495048417143 value=..) meta(partition=12, offset=19) time=121
sent record(key=1495048417123 value=..) meta(partition=0, offset=19) time=121
sent record(key=1495048417126 value=..) meta(partition=0, offset=20) time=121
sent record(key=1495048417134 value=..) meta(partition=0, offset=21) time=122
sent record(key=1495048417122 value=..) meta(partition=3, offset=19) time=122
sent record(key=1495048417127 value=..) meta(partition=3, offset=20) time=122
sent record(key=1495048417139 value=..) meta(partition=3, offset=21) time=123
sent record(key=1495048417142 value=..) meta(partition=3, offset=22) time=123
sent record(key=1495048417136 value=..) meta(partition=10, offset=19) time=127
sent record(key=1495048417144 value=..) meta(partition=1, offset=26) time=128
sent record(key=1495048417125 value=..) meta(partition=5, offset=22) time=128
sent record(key=1495048417138 value=..) meta(partition=5, offset=23) time=128
sent record(key=1495048417128 value=..) meta(partition=8, offset=21) time=129
sent record(key=1495048417124 value=..) meta(partition=11, offset=18) time=129
sent record(key=1495048417130 value=..) meta(partition=11, offset=19) time=129
sent record(key=1495048417132 value=..) meta(partition=11, offset=20) time=130
sent record(key=1495048417141 value=..) meta(partition=11, offset=21) time=130
sent record(key=1495048417145 value=..) meta(partition=11, offset=22) time=131
sent record(key=1495048417129 value=..) meta(partition=2, offset=24) time=132
sent record(key=1495048417135 value=..) meta(partition=2, offset=25) time=132
sent record(key=1495048417137 value=..) meta(partition=2, offset=26) time=132
Notice the producer sends 25 messages.
Consumer 0 in same group
Consumer Record:(1495048417121, Hello Mom 1495048417121, 6, 16)
Consumer Record:(1495048417131, Hello Mom 1495048417131, 6, 17)
Consumer Record:(1495048417125, Hello Mom 1495048417125, 5, 22)
Consumer Record:(1495048417138, Hello Mom 1495048417138, 5, 23)
Consumer Record:(1495048417128, Hello Mom 1495048417128, 8, 21)
Consumer 1 in same group
Consumer Record:(1495048417123, Hello Mom 1495048417123, 0, 19)
Consumer Record:(1495048417126, Hello Mom 1495048417126, 0, 20)
Consumer Record:(1495048417134, Hello Mom 1495048417134, 0, 21)
Consumer Record:(1495048417144, Hello Mom 1495048417144, 1, 26)
Consumer Record:(1495048417122, Hello Mom 1495048417122, 3, 19)
Consumer Record:(1495048417127, Hello Mom 1495048417127, 3, 20)
Consumer Record:(1495048417139, Hello Mom 1495048417139, 3, 21)
Consumer Record:(1495048417142, Hello Mom 1495048417142, 3, 22)
Consumer Record:(1495048417129, Hello Mom 1495048417129, 2, 24)
Consumer Record:(1495048417135, Hello Mom 1495048417135, 2, 25)
Consumer Record:(1495048417137, Hello Mom 1495048417137, 2, 26)
Consumer 2 in same group
Consumer Record:(1495048417136, Hello Mom 1495048417136, 10, 19)
Consumer Record:(1495048417133, Hello Mom 1495048417133, 12, 17)
Consumer Record:(1495048417140, Hello Mom 1495048417140, 12, 18)
Consumer Record:(1495048417143, Hello Mom 1495048417143, 12, 19)
Consumer Record:(1495048417124, Hello Mom 1495048417124, 11, 18)
Consumer Record:(1495048417130, Hello Mom 1495048417130, 11, 19)
Consumer Record:(1495048417132, Hello Mom 1495048417132, 11, 20)
Consumer Record:(1495048417141, Hello Mom 1495048417141, 11, 21)
Consumer Record:(1495048417145, Hello Mom 1495048417145, 11, 22)
Can you answer these questions?
Which consumer owns partition 10?
How many ConsumerRecords objects did Consumer 0 get?
What is the next offset from Partition 11 that Consumer 2 should get?
Why does each consumer get unique messages?
Which consumer owns partition 10?
Consumer 2 owns partition 10.
How many ConsumerRecords objects did Consumer 0 get?
3
What is the next offset from Partition 11 that Consumer 2 should get?
22
Why does each consumer get unique messages?
Each gets its share of partitions for the topic.
Try this: Three Consumers in different Consumer group and one Producer sending 5 messages
Modify the consumer, so each consumer processes will have a unique group id.
Stop all consumers and producers processes from the last run.
Then execute the consumer example three times from your IDE. Then change producer to send five records instead of 25. Then run the producer once from your IDE. What happens? The consumers should each get a copy of the messages.
First, let’s modify the Consumer to make their group id unique as follows:
KafkaConsumerExample - Make the Consumer group id unique
~/kafka-training/lab4/src/main/java/com/cloudurable/kafka/KafkaConsumerExample.java
public class KafkaConsumerExample {
private final static String TOPIC = "my-example-topic";
private final static String BOOTSTRAP_SERVERS =
"localhost:9092,localhost:9093,localhost:9094";
private static Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer" +
System.currentTimeMillis());
...
}
...
}
Notice, to make the group id unique you just add
System.currentTimeMillis()
to it.Producer Output
sent record(key=1495049585396 value=..) meta(partition=7, offset=30) time=134
sent record(key=1495049585392 value=..) meta(partition=4, offset=24) time=138
sent record(key=1495049585393 value=..) meta(partition=4, offset=25) time=139
sent record(key=1495049585395 value=..) meta(partition=4, offset=26) time=139
sent record(key=1495049585394 value=..) meta(partition=11, offset=25) time=140
Notice the producer sends 25 messages.
Consumer 0 in own group
Consumer Record:(1495049585396, Hello Mom 1495049585396, 7, 30)
Consumer Record:(1495049585394, Hello Mom 1495049585394, 11, 25)
Consumer Record:(1495049585392, Hello Mom 1495049585392, 4, 24)
Consumer Record:(1495049585393, Hello Mom 1495049585393, 4, 25)
Consumer Record:(1495049585395, Hello Mom 1495049585395, 4, 26)
Consumer 1 in unique consumer group
Consumer Record:(1495049585396, Hello Mom 1495049585396, 7, 30)
Consumer Record:(1495049585394, Hello Mom 1495049585394, 11, 25)
Consumer Record:(1495049585392, Hello Mom 1495049585392, 4, 24)
Consumer Record:(1495049585393, Hello Mom 1495049585393, 4, 25)
Consumer Record:(1495049585395, Hello Mom 1495049585395, 4, 26)
Consumer 2 in its own consumer group
Consumer Record:(1495049585396, Hello Mom 1495049585396, 7, 30)
Consumer Record:(1495049585394, Hello Mom 1495049585394, 11, 25)
Consumer Record:(1495049585392, Hello Mom 1495049585392, 4, 24)
Consumer Record:(1495049585393, Hello Mom 1495049585393, 4, 25)
Consumer Record:(1495049585395, Hello Mom 1495049585395, 4, 26)
Can you answer these questions?
Which consumer owns partition 10?
How many ConsumerRecords objects did Consumer 0 get?
What is the next offset from Partition 11 that Consumer 2 should get?
Why does each consumer get unique messages?
Which consumer owns partition 10?
They all do! Since they are all in a unique consumer group, and there is only one consumer in each group, then each consumer we ran owns all of the partitions.
How many ConsumerRecords objects did Consumer 0 get?
3
What is the next offset from Partition 11 that Consumer 2 should get?
26
Why does each consumer get the same messages?
They do because they are each in their own consumer group, and each consumer group is a subscription to the topic.
Conclusion Kafka Consumer example
You created a simple example that creates a Kafka consumer to consume messages from the Kafka Producer you created in the last tutorial. We used the replicated Kafka topic from producer lab. You created a Kafka Consumer that uses the topic to receive messages. The Kafka consumer uses the
poll
method to get N number of records.
Consumers in the same group divide up and share partitions as we demonstrated by running three consumers in the same group and one producer. Each consumer groups gets a copy of the same data. More precise, each consumer group really has a unique set of offset/partition pairs per.
Review Kafka Consumer
How did we demonstrate Consumers in a Consumer Group dividing up topic partitions and sharing them?
We ran three consumers in the same consumer group, and then sent 25 messages from the producer. We saw that each consumer owned a set of partitions.
How did we demonstrate Consumers in different Consumer Groups each getting their own offsets?
We ran three consumers each in its own unique consumer group, and then sent 5 messages from the producer. We saw that each consumer owned every partition.
How many records does poll get?
However many you set in with
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
in the properties that you pass to KafkaConsumer
.Does a call to poll ever get records from two different partitions?
No
Related content
- Kafka Tutorial: Kafka Producer
- Kafka Architecture
- What is Kafka?
- Kafka Topic Architecture
- Kafka Consumer Architecture
- Kafka Producer Architecture
- Kafka and Schema Registry
- Kafka and Avro
- Kafka Tutorial Slides
- Kafka from the command line
- Kafka clustering and failover basics
We hope you enjoyed this article. Please provide feedback. See the Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
No comments:
Post a Comment