Achieving Order Guarnetee in Kafka with Partitioning

One of the most important features of Kafka is to do load balancing of messages and guarantee ordering in a distributed cluster, which otherwise would not be possible in a traditional queue.

Lets first try and understand the problem statement

Let us assume we have a topic where messages are sent and there is a consumer who is consuming these messages.
If there is only one consumer, it would receive the messages in the order in which they are in the queue, or in the order in which they are sent.

Now, in order to achieve higher performance, we need to process the messages faster and hence we introduce multiple instances of the consumer application.

This would lead to a problem, if the messages contain any state.

Lets try to understand this with an example :

If for a particular message id, we have 3 events :
First : CREATE
SECOND : UPDATE
THIRD : DELETE
We require that a message’s “UPDATE” or “DELETE” event should be processed ONLY after its “CREATE” event. Now, if 2 separate instances got the same message’s “CREATE” and “UPDATE” at almost the same time, there are chances that the instance with “UPDATE” message would try to process it even before the other instance has finished “CREATE” message. This could be a problem as the consumer would try to update a message that has not yet created and will throw an exception and this “update” could be lost.

Possible solutions

The first solution which comes to mind is Optimistic Locking on the Database, which will prevent this but the exception scenarios would then need to be later accommodated for. This is a not a very straight forward approach and might involve more locking and concurrency issues to be handled.

Another easier solution would be, if the messages/events of a particular id always go to a particular instance and hence they would be in order. In this case CREATE will always be executed before the UPDATE, as that was the original order in which they were sent.

This is where Kafka comes in handy.

Kafka has the concept of “partitions” within the topics which could provide both ordering guarantees and load balancing over a pool of consumer processes.

Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log. The messages in the partitions are each assigned a sequential id number called the offset that uniquely identifies each message within the partition.

So, a topic would have multiple partitions, each maintaining their own offset.
Now, to make sure that an event of a particular id should always go to a particular instance, can be done if we bind each consumer with a particular partition and then make sure that all the events and messages of a particular id always go to a certain partition so they are always consumed by the same consumer instance.

To achieve this partitioning, Kafka client API provides us with 2 ways :
1) Define a Key for partitioning which would be used as a key for default partitioning logic.
2) Write a Partitioning class to define our own partitioning logic.

Lets explore the first one :

Default partitioning logic

The default partitioning strategy is hash(key)%numPartitions. If the key is null, then a random partition is picked. So, if we want the key for partition to be a certain attribute we need to pass it in the ProducerRecord constructor while sending the message from a Producer.

Lets see an example :

NOTE : To run this example, we need to have the following :
1. Running Zookeeper (at localhost:2181)
2. Running Kafka ( at localhost:9092)
3. Create a topic named “TRADING-INFO” with 3 partitions.(For simplicity, we can have just a single broker.)
To complete the above three, follow the documentation here.

Lets assume we are sending information of trades on the “TRADING-INFO” topic which is consumed by a consumer.

1. Trade class

(Note: I have used Lombok here)

@Data
@Builder
public class Trade {
    private String id;
    private String securityId;
    private String fundShortName;
    private String value;
}

2. Kafka client dependency

In order to make a Kafka Producer we need to include the Kafka dependency :

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.10.0.0</version>
        </dependency>

Kafka Producer

public class Producer {

    public static void main(String[] args) {
        final String TOPIC = "TRADING-INFO";
        KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties());

        Runnable task1 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ABCD", 1, 5);
        Runnable task2 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "PQ1234@1211111111111", 6, 10);
        Runnable task3 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ZX12345OOO", 11, 15);

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.submit(task1);
        executorService.submit(task2);
        executorService.submit(task3);

        executorService.shutdown();

    }

    private static void sendTradeToTopic(String topic, KafkaProducer kafkaProducer, String securityId, int idStart, int idEnd) {
        for (int i = idStart; i <= idEnd; i++) {
            Trade trade = Trade.builder().id(i).securityId(securityId).value("abcd").build();
            try {
                String s = new ObjectMapper().writeValueAsString(trade);
                kafkaProducer.send(new ProducerRecord(topic, trade.getSecurityId(), s));
                System.out.println("Sending to " + topic + "msg : " + s);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
    }

    private static Properties getProducerProperties() {
        Properties props = new Properties();
        String KAFKA_SERVER_IP = "localhost:9092";
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_IP);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return props;
    }

}

Consumer

public class TConsumer {

    public static void main(String[] args) {
        final String TOPIC = "TRADING-INFO";
        final String CONSUMER_GROUP_ID = "consumer-group";
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getConsumerProperties(CONSUMER_GROUP_ID));
        kafkaConsumer.subscribe(Arrays.asList(TOPIC));

        while(true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            consumerRecords.forEach(e -> {
                System.out.println(e.value());
            });
        }
    }

    private static Properties getConsumerProperties(String consumerGroupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", consumerGroupId);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        return props;
    }
}

As we have 3 partitions, we will run 3 instances of Consumer.

Now, as we run the producer with different threads producing messages with 3 types of “Security type” which is our key here. We will see that a particular instance always caters to a particular “Security type” and hence will be able to process the messages in order.

Outputs

Consumer 1:

{"id":1,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":2,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":3,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":4,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":5,"securityId":"ABCD","fundShortName":null,"value":"abcd"}

Consumer 2:

{"id":6,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":7,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":8,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":9,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":10,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}

Consumer 3:

{"id":11,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":12,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":13,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":14,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":15,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}

So, here the 3 types of “securityIds” generated different hash values and hence got distributed into different partitions, making sure that one type of trade always goes to a particular instance.

Now if we do not want to use the default partitioning logic and our scenario is more complex,
we would need to implement our own Partitioner, in the next blog I will explain how to use it and how it works.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s