Kafka guarantees the order of delivery within a partition. That is, the order is not kept among partitions in a topic. There are some tips to keep the order of delivery in a topic.
1. One partition for a Topic
This is what Kafka basically supports.
<Pros>
- Simple solution
<Cons>
- A partition is applied to only one consumer, which has limitation in scale out
2. Custom partitioner
Partition number is decided by producer’s partitioner. With default option, org.apache.kafka.clients.producer.internals.DefaultPartitioner is used. DefaultPartitioner decides partition number by the hash value of a key.
Although it is difficult to keep the order of delivery for all messages, we can split it into groups which need the order of delivery. In this case, we can partition by the group info.
Suppose that messages are generated by several departments and the order is important within a department.
In this case,
- Prefix a message key with dept code
- Apply custom partitioner which decides partition based on dept code
The following is the sample code.
import java.util.List; | |
import java.util.Map; | |
import org.apache.kafka.clients.producer.Partitioner; | |
import org.apache.kafka.common.Cluster; | |
import org.apache.kafka.common.PartitionInfo; | |
public class TestPartitioner implements Partitioner{ | |
@Override | |
public void configure(Map<String, ?> arg0) { | |
// Do nothing | |
} | |
@Override | |
public void close() { | |
// Do nothing | |
} | |
@Override | |
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { | |
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); | |
int numPartitions = partitions.size(); | |
String strKey = (String)key; | |
String partitionToken = strKey.substring(0, strKey.indexOf('.')); // dept1 ~ dept5 | |
return partitionToken.hashCode() % numPartitions; | |
} | |
} |
Above partitioner extracts prefix from key and partition by it.
import org.apache.kafka.clients.producer.Producer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.RecordMetadata; | |
import java.util.Properties; | |
import java.util.concurrent.Future; | |
public class TestProducer { | |
private Producer<String, String> producer; | |
public static void main(String[] args) throws Exception { | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092"); | |
props.put("acks", "all"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("partitioner.class", "test.producer.TestPartitioner"); | |
TestProducer test = new TestProducer(); | |
test.producer = new KafkaProducer<>(props); | |
test.sendAMessage("dept1.key-1", "value-1"); | |
test.sendAMessage("dept1.key-2", "value-2"); | |
test.sendAMessage("dept2.key-3", "value-3"); | |
test.sendAMessage("dept2.key-4", "value-4"); | |
test.sendAMessage("dept3.key-5", "value-5"); | |
test.sendAMessage("dept3.key-6", "value-6"); | |
test.sendAMessage("dept4.key-7", "value-7"); | |
test.sendAMessage("dept4.key-8", "value-8"); | |
test.sendAMessage("dept5.key-9", "value-9"); | |
test.sendAMessage("dept5.key-10", "value-10"); | |
test.producer.close(); | |
} | |
private void sendAMessage(String key, String value) throws Exception{ | |
Future<RecordMetadata> metaData = producer.send(new ProducerRecord<String, String>("test-topic", key, value)); | |
int partition = metaData.get().partition(); | |
System.out.println(String.format("Partition %d for key %s", partition, key)); | |
} | |
} |
Custom partitioner is set by “partitioner.class” property.
<Message with default partitioner>
Partition 6 for key dept1.key-1 Partition 4 for key dept1.key-2 Partition 8 for key dept2.key-3 Partition 6 for key dept2.key-4 Partition 5 for key dept3.key-5 Partition 7 for key dept3.key-6 Partition 9 for key dept4.key-7 Partition 3 for key dept4.key-8 Partition 2 for key dept5.key-9 Partition 8 for key dept5.key-10
<Message with custom partitioner>
Partition 8 for key dept1.key-1 Partition 8 for key dept1.key-2 Partition 9 for key dept2.key-3 Partition 9 for key dept2.key-4 Partition 0 for key dept3.key-5 Partition 0 for key dept3.key-6 Partition 1 for key dept4.key-7 Partition 1 for key dept4.key-8 Partition 2 for key dept5.key-9 Partition 2 for key dept5.key-10
You can make sure that messages with the same dept id are sent to the same partition.