Kafka tip – keeping the order of delivery

Kafka guarantees the order of delivery within a partition. That is, the order is not kept among partitions in a topic. There are some tips to keep the order of delivery in a topic.

1. One partition for a Topic

This is what Kafka basically supports.

<Pros>

  • Simple solution

<Cons>

  • A partition is applied to only one consumer, which has limitation in scale out

2. Custom partitioner

Partition number is decided by producer’s partitioner. With default option, org.apache.kafka.clients.producer.internals.DefaultPartitioner is used. DefaultPartitioner decides partition number by the hash value of a key.

Although it is difficult to keep the order of delivery for all messages, we can split it into groups which need the order of delivery. In this case, we can partition by the group info.

Suppose that messages are generated by several departments and the order is important within a department.

In this case,

  1. Prefix a message key with dept code
  2. Apply custom partitioner which decides partition based on dept code

The following is the sample code.

import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
public class TestPartitioner implements Partitioner{
@Override
public void configure(Map<String, ?> arg0) {
// Do nothing
}
@Override
public void close() {
// Do nothing
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
String strKey = (String)key;
String partitionToken = strKey.substring(0, strKey.indexOf('.')); // dept1 ~ dept5
return partitionToken.hashCode() % numPartitions;
}
}

Above partitioner extracts prefix from key and partition by it.

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class TestProducer {
private Producer<String, String> producer;
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "test.producer.TestPartitioner");
TestProducer test = new TestProducer();
test.producer = new KafkaProducer<>(props);
test.sendAMessage("dept1.key-1", "value-1");
test.sendAMessage("dept1.key-2", "value-2");
test.sendAMessage("dept2.key-3", "value-3");
test.sendAMessage("dept2.key-4", "value-4");
test.sendAMessage("dept3.key-5", "value-5");
test.sendAMessage("dept3.key-6", "value-6");
test.sendAMessage("dept4.key-7", "value-7");
test.sendAMessage("dept4.key-8", "value-8");
test.sendAMessage("dept5.key-9", "value-9");
test.sendAMessage("dept5.key-10", "value-10");
test.producer.close();
}
private void sendAMessage(String key, String value) throws Exception{
Future<RecordMetadata> metaData = producer.send(new ProducerRecord<String, String>("test-topic", key, value));
int partition = metaData.get().partition();
System.out.println(String.format("Partition %d for key %s", partition, key));
}
}

Custom partitioner is set by “partitioner.class” property.

<Message with default partitioner>

Partition 6 for key dept1.key-1
Partition 4 for key dept1.key-2
Partition 8 for key dept2.key-3
Partition 6 for key dept2.key-4
Partition 5 for key dept3.key-5
Partition 7 for key dept3.key-6
Partition 9 for key dept4.key-7
Partition 3 for key dept4.key-8
Partition 2 for key dept5.key-9
Partition 8 for key dept5.key-10

<Message with custom partitioner>

Partition 8 for key dept1.key-1
Partition 8 for key dept1.key-2
Partition 9 for key dept2.key-3
Partition 9 for key dept2.key-4
Partition 0 for key dept3.key-5
Partition 0 for key dept3.key-6
Partition 1 for key dept4.key-7
Partition 1 for key dept4.key-8
Partition 2 for key dept5.key-9
Partition 2 for key dept5.key-10

You can make sure that messages with the same dept id are sent to the same partition.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.