Friday, 18 December 2015

Apache Kafka – Java Producer Example with Multibroker & Partition

In this post I will be demonstrating about how you can implement Java producer which can connect to multiple brokers and how you can produce messages to different partitions in a topic.

I also published couple of other posts about Kafka. If you are new and would like to learn Kafka from scratch, I would recommend to walk through below posts first.

Prerequisite 
I am assuming that you already have Kafka setup in your local environment. If not, you can setup Kafka in windows environment by following this link.

Setup Mutibroker and Topic with Partition
1. First you need to start Zookeeper server. To start it, execute below command.<kafka_dir> needs to be replaced with the location where you have installed kafka.
<kafka_dir>\bin\windows\zookeeper-server-start.bat ..\..\config\zookeeper.properties
2. Go to <kafka_dir>\config\server.properties file and make a copy of it at same location say ‘first- broker-server.properties’.

3. You just need to change couple of properties in first- broker-server.properties to setup first broker.
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

# The port the socket server listens on, it should be unique for each broker

port=9092

# A comma seperated list of directories under which to store log files

log.dirs=<kafka_dir>/kafka-logs/first-broker-server

# Zookeeper connection string. This this the host and port where your zookeeper server is running.

zookeeper.connect=localhost:2181
4. Go to <kafka_dir>\config\server.properties file and make another copy of it at same location say ‘second-broker-server.properties’.

5. Now change the properties in second-broker-server.properties for second broker.
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

# The port the socket server listens on, it should be unique for each broker

port=9093

# A comma seperated list of directories under which to store log files

log.dirs=<kafka_dir>/kafka-logs/second-broker-server

# Zookeeper connection string. This this the host and port where your zookeeper server is running.

zookeeper.connect=localhost:2181
6. Now you need to start both brokers. To start broker, execute below commands for all the brokers:

    Start first broker:
<kafka_dir>\bin\windows\kafka-server-start.bat ..\..\config\first-broker-server.properties
    Start second broker:
<kafka_dir>\bin\windows\kafka-server-start.bat ..\..\config\second-broker-server.properties
7. Now create topic 'multibrokertopic' with 2 partition and 2 replication. 
<kafka_dir>\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic multibrokertopic
Java Producer Example with Multibroker And Partition
Now let's write a Java producer which will connect to two brokers. We already created a topic 'EmployeeLoginEventTopic' with 2 partitions. In this example we will see how we can send message to specific partition in a topic.


In this example program, I have tried to simulate the logic about sending employee login events to different Kafka brokers. Auto generated employeeId will be used as key and message will be sent to different partitions in format of 'EmployeeID:<employeeId>, LoginTime: <currentDate&Time>'. 


Firs of all you need to understand what properties are required to initialize producer:
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092,localhost:9093"); //1
props.put("serializer.class", "kafka.serializer.StringEncoder"); //2
props.put("partitioner.class", "com.nvexample.kafka.partition.PartitionerExample"); //3
props.put("request.required.acks", "1"); //4
  • In first property, you need to mention the list of kafka brokers where producer will be connected.
  • In second property, serializer class for the message key needs to be mentioned. You can use default class i.e. 'kafka.serializer.StringEncoder'.
  • In third property, you need to implement 'kafka.producer.Partitioner' interface. In this implementation you can write a logic that will decide which message should be sent to which partition based on message key. 
  • In forth property, set as '1' if you want to make sure that producer will be acknowledged when message is received by brokers successfully 
Partitioner Class Implementation: 
package com.nvexample.kafka.partition;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class PartitionerExample implements Partitioner {

       public PartitionerExample(VerifiableProperties props) {
       }
       public int partition(Object employeeIdStr, int numOfPartitions) {
             int partition = 0;
             String stringKey = (String) employeeIdStr;
             Integer intKey = Integer.parseInt(stringKey);
             if (intKey > 0) {
                    partition = intKey % numOfPartitions;               
             }
             System.out.println("Returning partition number [" + partition + "] " +
                           "for key ["+employeeIdStr+"]");
             return partition;  
       }
}

In this implementation class, get the key 'employeeIdStr' and perform modulo operation on the number of partitions configured in a topic 'multibrokertopic'. This partitioning logic ensures that message will be sent to the same partition for same key. I mean, all login event for same employeeId will be served by same partition. 

Multibroker Producer Example:
package com.nvexample.kafka.partition;

import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerWithPartitionExample {
      
       private static Producer<String, String> producer;
       public final static String brokerList = "localhost:9092,localhost:9093";
       public final static String PARTITIONER_IMPLEMENTATION_CLASS
                           = "com.nvexample.kafka.partition.PartitionerExample";
       private static final String TOPIC = "EmployeeLoginEventTopic";
      
       public void initialize() {
             Properties props = new Properties();
             props.put("metadata.broker.list", brokerList);
             props.put("serializer.class", "kafka.serializer.StringEncoder");
             props.put("partitioner.class", PARTITIONER_IMPLEMENTATION_CLASS);
             props.put("request.required.acks", "1");
             ProducerConfig config = new ProducerConfig(props);
             producer = new Producer<String, String>(config);
       }
       public void publish(String key, String message) {
             KeyedMessage<String, String> data = new KeyedMessage<String, String>(
                           TOPIC, key, message);
             producer.send(data);
       }
       public void closeProducer() {
             producer.close();
       }
       public static void main(String[] args) {
             ProducerWithPartitionExample producerWithPartition
                       = new ProducerWithPartitionExample();
             // Initialize the producer with required properties
             producerWithPartition.initialize();           
             // Publish message to brokers
             Random rnd = new Random();
             for (long employeeLogInEvent = 0; employeeLogInEvent < 10;
                                                             employeeLogInEvent++) {
                    String employeeId = String.valueOf(rnd.nextInt(10));
                    String msg =   "EmployeeID:" + employeeId + ",
                                   LoginTime: " + new Date();
                    producerWithPartition.publish(employeeId, msg);
             }           
             // Close the connection between broker and producer
             producerWithPartition.closeProducer();
       }
}

In this example, we are sending employee login event as message along with employeeId as key. If you've defined a partitioner class and key is not sent along with message, Kafka assigns the message to a random partition.

Java Consumer Example 
This is the consumer program. On starting this program, consumer will connect to different brokers via zookeeper and will start consuming messages published on 'EmployeeLoginEventTopic'.

   package com.nvexample.kafka;

   import java.util.*;
   import kafka.consumer.Consumer;
   import kafka.consumer.ConsumerConfig;
   import kafka.consumer.ConsumerIterator;
   import kafka.consumer.KafkaStream;
   import kafka.javaapi.consumer.ConsumerConnector;

    public class KafkaConsumer {
       private ConsumerConnector consumerConnector = null;
       private final String topic = "EmployeeLoginEventTopic";

       public void initialize() {
             Properties props = new Properties();
             props.put("zookeeper.connect""localhost:2181");
             props.put("group.id""testgroup");
             props.put("zookeeper.session.timeout.ms""400");
             props.put("zookeeper.sync.time.ms""300");
             props.put("auto.commit.interval.ms""1000");
             ConsumerConfig conConfig = new ConsumerConfig(props);
             consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
       }

       public void consume() {
             //Key = topic name, Value = No. of threads for topic
             Map<String, Integer> topicCount = new HashMap<String, Integer>();       
             topicCount.put(topicnew Integer(1));
            
             //ConsumerConnector creates the message stream for each topic
             Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
                   consumerConnector.createMessageStreams(topicCount);         
            
             // Get Kafka stream for topic 'mytopic'
             List<KafkaStream<byte[], byte[]>> kStreamList =
                                                  consumerStreams.get(topic);
             // Iterate stream using ConsumerIterator
             for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                    ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
                   
                    while (consumerIte.hasNext())
                           System.out.println("Message consumed from topic
                                         [" + topic + "] : "       +
                                           new String(consumerIte.next().message()));              
             }
             //Shutdown the consumer connector
             if (consumerConnector != null)   consumerConnector.shutdown();          
       }

       public static void main(String[] args) throws InterruptedException {
             KafkaConsumer kafkaConsumer = new KafkaConsumer();
             // Configure Kafka consumer
             kafkaConsumer.initialize();
             // Start consumption
             kafkaConsumer.consume();
       }
   }



Output of Producer and Consumer Program
Execute consumer program first and than producer program. You will get following output. Notice in producer program console same partition number is returned for same key. This is ensuring that, the message for same employee id will be sent to same partition. 

ProducerWithPartitionExample.java Program Console Output:



KafkaConsumer.java Program Console Output:


Hope this post helped you learning Java kafka producer with Multibroker & Partition flavor. It will be great if you leave your feedback on this post.