At which point does Kafka consumer start to read

At which point does  Kafka consumer start to read? It depends on the following scenarios.

Basic concept

  • Kafka can have several topics and each topic can have several partitions
  • Each partition keeps offset for each message
  • Offset starts from 0 and increases by 1 when a new message is published
  • Kafka server keeps the next offset to read for each consumer (or consumer group)
  • Kafka doesn’t have any offset for unknown consumer group

Consumer’s starting offset is decided

  • If a consumer group’s next offset is not kept on Kafka, consumer reads from the latest (which is the next published message after the consumer started) – consumer’s subscribing option is “auto.offset.reset = latest” (default value)
  • If consumer’s subscribing option is “auto.offset.reset = earliest“, then it reads from the first (whose offset depends on Kafka’s log retention policy)
  • If a consumer group’s next offset is kept on Kafka, the consumer reads from the offset regardless of auto.offset.reset (whose value is CURRENT_OFFSET in the following example)
  • If a new consumer is added to the existing group, it reads from the latest after rebalancing among consumers is finished (a message is consumed by only one consumer in a group)
  • A consumer can move to any offset by calling seek(), seekToBeginning() or seekToEnd() (refer to Consumer javadoc)

Example

A sample consumer app.

public class TestConsumer {
    private String topicName = "consumer-test-topic";
    private KafkaConsumer kConsumer;
    private String bootstrapServers = "localhost:9092";
    private String groupName = "testGroup01";
    private String consumerId = groupName + "-consumer01" ;

    public static void main(String[] args) throws Exception{
        TestConsumer testConsumer = new TestConsumer();
        testConsumer.comsumeMain();
    }

    private void comsumeMain() throws Exception{
        init();
        while(true){
            ConsumerRecords consumerRecords = kConsumer.poll(Duration.ofMillis(100));
            if(consumerRecords == null || consumerRecords.isEmpty()) continue;
            for(ConsumerRecord consumerRecord : consumerRecords){
                printRecord(consumerRecord);
            }
            kConsumer.commitSync();
        }
    }

    private void printRecord(ConsumerRecord consumerRecord){
        String key = consumerRecord.key();
        String value = consumerRecord.value();
        long offset = consumerRecord.offset();
        int partition = consumerRecord.partition();
        System.out.println(String.format("%s received message info - partition : %d, offset : %d, key : %s, value length : %s", this.consumerId, partition, offset, key, value.length()));
    }

    private void init(){
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", this.groupName);
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("client.id", this.consumerId);
        this.kConsumer = new KafkaConsumer(props);
        this.kConsumer.subscribe(Arrays.asList(topicName));
    }
}

For test, I created a simple topic with 1 partition.

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic consumer-test-topic

When the consumer has not connected yet, no offset info exists.

[tkstone@localhost bin]$ ./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group testGroup01 --describe
Error: Consumer group 'testGroup01' does not exist.

But after initial connection, the status changes.

tkstone@localhost bin]$ ./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group testGroup01 --describe 

TOPIC               PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                 HOST            CLIENT-ID
consumer-test-topic 0          -               0               -               testGroup01-consumer01-b37fb0e0-f704-45ff-9108-c704a00c915c /127.0.0.1      testGroup01-consumer01

Above result shows that the latest offset is 0 (no message is in the topic). After some messages are published, the result changes.

[tkstone@localhost bin]$ ./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group testGroup01 --describe 

TOPIC               PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                 HOST            CLIENT-ID
consumer-test-topic 0          10              10              0               testGroup01-consumer01-b37fb0e0-f704-45ff-9108-c704a00c915c /127.0.0.1      testGroup01-consumer01

Above result shows that next offset is 10 and test consumer is up to date. (LAG value is the gap)
But for some reason, if the consumer didn’t consume all messages, then the result is as follows.

[tkstone@localhost bin]$ ./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group testGroup01 --describe 

TOPIC               PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                 HOST            CLIENT-ID
consumer-test-topic 0          10              20              10              testGroup01-consumer01-b37fb0e0-f704-45ff-9108-c704a00c915c /127.0.0.1      testGroup01-consumer01

Above result shows that there are 10 messages left to chase up.

Suppose that the test client stopped here. If the client starts again, it will consume from offset 10, whose value is “CURRENT-OFFSET“.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.