Kafka for Beginners (Part 1)

Apache Kafka® is a distributed streaming platform. What exactly does that mean?
Can look here => https://kafka.apache.org/intro

Jumping directly to the Practical Part:

I will be using Docker image to start Zookeeper instance , ElasticSearch Version and Kafka with default topic.
How to use docker to start these follow => http://thebadengineer.com/docker-compose/

//The only change is in the environement variable of the kafka topic to define the local topic
KAFKA_CREATE_TOPICS: "myTestTopic:1:1"   //topicname:partition number:replication factor

After running the docker images for all, verify using docker ps.

Create Consumer class:

 package KafkaConsumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;

public class ConsumerMain {
    private final static String TOPIC = "myTestTopic"; //topic to be consumed from
    private final static String BOOTSTRAP_SERVERS =   //zookeeper url which hold the property
            "127.0.0.1:9092";

  //Function to populate the properties of the consumer
    private static Consumer<Long, String> createConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        // Create the consumer using props.
        final Consumer<Long, String> consumer =
                new KafkaConsumer<>(props);

        // Subscribe to the topic.
        consumer.subscribe(Collections.singletonList(TOPIC));
        return consumer;
    }

    public static void main(String[] args) {
        Consumer<Long, String> consumer = createConsumer();  //create the consumer instance
        while (true) {
            ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000); //listen to the topic
            for (ConsumerRecord<Long, String> record : consumerRecords) {  //prints it
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            }
            consumer.commitAsync();
        }
        //consumer.close();  //If Wanted to return from any statement
    }
}

Create Producer Class:

 package KafkaConsumer;


import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.json.JSONObject;

import java.io.IOException;
import java.net.URL;
import java.util.Properties;

class ProducerMain {
    private static final String TEST_TOPIC = "myTestTopic";  //topic name to post the data
    public static final String broker = "127.0.0.1:9092";   //zookeeper url

    public static void main(String[] args) throws IOException {
        Properties prop = new Properties();  //create the properties
        prop = setKafkaProp();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
        String data = getJsonFromFile("data1.json"); //read the json from file
        ProducerRecord<String, String> record  = new ProducerRecord<String, String>(TEST_TOPIC, new JSONObject(data).toString());
        try {
            producer.send(record);  //send the record to the topic
        }catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
        System.out.println("send data : " + data);
    }

    //Function to read the json and convert it into string
    private static String getJsonFromFile(String file) throws IOException {
        URL url = Resources.getResource(file);
        String json = Resources.toString(url, Charsets.UTF_8);
        return json;
    }
    //properties function
    public static Properties setKafkaProp() {
        Properties prop = new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                LongSerializer.class.getName());
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,
                "KafkaExampleProducer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        return prop;
    }
}

data1.json

 {
  "id": "trial21",
  "UserId": "8aefa65b",
  "SomeArrays": [
    "the.badEngineer1",
    "the.badEngineer2"
  ],
  "SomeParameter": true,
  "addOperation":true
}

Now our producer and consumer class is set to produce and consume the data but before that we need to instruct consumer class the path for the zookeeper and bootstrap servers etc.Also we need to give the name of the kafka topic to consume from.

Fill the edit configuration as below:

Click Apply and run the Consumer class.It will not print anything at the start since it is looping through and waiting for the message from the topic.

No configuration is needed to run the producer class.Simply run the ProducerMain Class.

If everything runs fine then you will get the following output:

In the ConsumerMain Class output,you will find that the data has been read from the topic and printed in the console.

Finally we have created a basic producer and consumer entirely from java code.In the Project in which I worked I didnt had to put any properties settings as I had the API to which I can call and populate all the properties.
Also , I had to avoid the auto commit feature from the kafka which I will post in later tutorials.

Full Project : https://github.com/hi2saif/KafkaProducerConsumerSample