At which point does Kafka consumer start to read? It depends on the following scenarios.
Basic concept
- Kafka can have several topics and each topic can have several partitions
- Each partition keeps offset for each message
- Offset starts from 0 and increases by 1 when a new message is published
- Kafka server keeps the next offset to read for each consumer (or consumer group)
- Kafka doesn’t have any offset for unknown consumer group
Consumer’s starting offset is decided
- If a consumer group’s next offset is not kept on Kafka, consumer reads from the latest (which is the next published message after the consumer started) – consumer’s subscribing option is “auto.offset.reset = latest” (default value)
- If consumer’s subscribing option is “auto.offset.reset = earliest“, then it reads from the first (whose offset depends on Kafka’s log retention policy)
- If a consumer group’s next offset is kept on Kafka, the consumer reads from the offset regardless of auto.offset.reset (whose value is CURRENT_OFFSET in the following example)
- If a new consumer is added to the existing group, it reads from the latest after rebalancing among consumers is finished (a message is consumed by only one consumer in a group)
- A consumer can move to any offset by calling seek(), seekToBeginning() or seekToEnd() (refer to Consumer javadoc)
Example
A sample consumer app.
public class TestConsumer { private String topicName = "consumer-test-topic"; private KafkaConsumer kConsumer; private String bootstrapServers = "localhost:9092"; private String groupName = "testGroup01"; private String consumerId = groupName + "-consumer01" ; public static void main(String[] args) throws Exception{ TestConsumer testConsumer = new TestConsumer(); testConsumer.comsumeMain(); } private void comsumeMain() throws Exception{ init(); while(true){ ConsumerRecords consumerRecords = kConsumer.poll(Duration.ofMillis(100)); if(consumerRecords == null || consumerRecords.isEmpty()) continue; for(ConsumerRecord consumerRecord : consumerRecords){ printRecord(consumerRecord); } kConsumer.commitSync(); } } private void printRecord(ConsumerRecord consumerRecord){ String key = consumerRecord.key(); String value = consumerRecord.value(); long offset = consumerRecord.offset(); int partition = consumerRecord.partition(); System.out.println(String.format("%s received message info - partition : %d, offset : %d, key : %s, value length : %s", this.consumerId, partition, offset, key, value.length())); } private void init(){ Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", this.groupName); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("client.id", this.consumerId); this.kConsumer = new KafkaConsumer(props); this.kConsumer.subscribe(Arrays.asList(topicName)); } }
For test, I created a simple topic with 1 partition.
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic consumer-test-topic
When the consumer has not connected yet, no offset info exists.
[tkstone@localhost bin]$ ./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group testGroup01 --describe Error: Consumer group 'testGroup01' does not exist.
But after initial connection, the status changes.
tkstone@localhost bin]$ ./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group testGroup01 --describe TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID consumer-test-topic 0 - 0 - testGroup01-consumer01-b37fb0e0-f704-45ff-9108-c704a00c915c /127.0.0.1 testGroup01-consumer01
Above result shows that the latest offset is 0 (no message is in the topic). After some messages are published, the result changes.
[tkstone@localhost bin]$ ./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group testGroup01 --describe TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID consumer-test-topic 0 10 10 0 testGroup01-consumer01-b37fb0e0-f704-45ff-9108-c704a00c915c /127.0.0.1 testGroup01-consumer01
Above result shows that next offset is 10 and test consumer is up to date. (LAG value is the gap)
But for some reason, if the consumer didn’t consume all messages, then the result is as follows.
[tkstone@localhost bin]$ ./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group testGroup01 --describe TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID consumer-test-topic 0 10 20 10 testGroup01-consumer01-b37fb0e0-f704-45ff-9108-c704a00c915c /127.0.0.1 testGroup01-consumer01
Above result shows that there are 10 messages left to chase up.
Suppose that the test client stopped here. If the client starts again, it will consume from offset 10, whose value is “CURRENT-OFFSET“.