kafka

I’ll cover Kafka in detail with introduction to programmability and will try to cover almost full architecture of it. So here it go:-

We need Kafka when there is need for building a real-time processing system as Kafka is a high-performance publisher-subscriber-based messaging system with highly scalable properties. Traditional systems unable to process this large data and mainly for offline used analysis, Kafka is a solution to the real-time problems of any software solution; that is to say, unify offline or online data processing and routing it to multiple consumers quickly.

Below are the Characteristics of Kafka:-

Persistent messaging: – TBs worth of messaged are persists on disk as well as replicated within the cluster to prevent data loss.
High Throughput: – Kafka designed to handle hundreds of MBs of reads writes per socnd on commodity hardware from large number of clients.
Distributed: – Kafka is cluster centric and can grow transparently without downtime.
Multiple Client Support: – It has easy integration with Java, .NET, PHP, RUBY, and Python.

Real Time: – Message produced by the producer should be immediately visible to consumer thread.

We can build similar solution in Java Messaging Service (JMS) but JMS has its own limitations below:-

Performance of JMS is not good in dealing with large volume of streaming data.
JMS is not good Horizontal scalable solution.
In JMS, broker bore responsibility of how many massages has been consumed by consumer is major drawback.

Note: – We have similar tools available like Kafka are RabbitMQ, Flume and ActiveMQ.

Kafka as a cluster can be installed as Single node with multiple Broker or with Multiple nodes Multiple Broker.

Note: – I skipped the Kafka Installation and guidelines as there are good reference already available for cluster type requirement on Kafka website therefore assuming Kafka already installed and your current directory is at “usr/hdp/current/kafka-broker”

With below diagram you can understand components like:-

Producer: Producers publish data to the topics by choosing the appropriate partition within the topic.
Zookeeper: – ZooKeeper designed to store coordination data: status information, configuration, location information, and so on.
Consumer: – Consumers are the applications or processes that subscribe to topics and process the feed of published messages.
Broker: A Kafka cluster consists of one or more servers where each one may have one or more server processes running and is called the broker. Topics are created within the context of broker processes.
Topic: A topic is a category or feed name to which messages are published by the message producers. Each message in the partition is assigned a unique sequential ID called the offset.
pic-a
A producer publishes messages to a Kafka topic (you can call it “Messaging Queue”). Kafka topics are created on Kafka broker acting as a Kafka server can be used to store messages if required. Consumers are then subscribe to the Kafka topic (one or more) to get the messages.

In Kafka topics, every partition is mapped to a logical log file that is represent as a set of segment file of equal size. All the message partitions are assigned a unique sequential number called the offset, which is used to identify each message within the partition. Each partition is replicated across configurable number of servers. In a Kafka cluster, each server plays a dual role; it acts as a leader for some of its partitions and also a follower for other partitions. This ensures the load balance within the Kafka cluster.

Now let’s try to cover some programming model of Kafka in command line and I am running Kafka in standalone mode but real power of Kafka is unlocked when it is run in the cluster mode with replication and the topics are appropriately partitioned. Cluster mode give power of parallelism and data safety even when Kafka node goes down.:-

First we create a Kafka Topic, in my case Kafka bin files are present at location “/usr/hdp/current/kafka-broker/bin”. The kafka-topics.sh utility will create a topic, override the default number of partitions from two to one, and show a successful creation message. It also takes ZooKeeper server information, as in this case:  localhost:2181.

[root@sanbox]#bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic Test

This proceeding command create a topic “Test” with a replication factor of 1 with 1 partition. Here we need to mention the ZooKeeper host and port number as well. The number of partition determine the parallelism that can be achieved at consumer side.

Now let’s see the list of topics on My Kafka server are running using below command:-

[root@sanbox]#bin/kafka-topics.sh –list –zookeeper localhost:2181

Test

Test2

There is more to know about topic using below command:-

[root@sanbox]#bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic  Test

Topic:Test PartitionCount:1 ReplicationFactor:1 Configs:

Topic: Test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Now start a producer to send messages to broker and In my case Kafka Broker is listening at port 6667:-

[root@sanbox]#bin/kafka-console-producer.sh –broker-list localhost:6667 –topic Test

Now let’s start a consumer to consume messages in another window:-

[root@sanbox]#bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic kafkatopic –from-beginning

After this command type any message at producers end will be appearing at consumer window.

Note: Here we are using single producer to get connected to the single broker. But if we requirement to use multiple brokers then we can pass list in argument broker-list for example below:-

[root@sanbox]#bin/kafka-console-producer.sh –broker-list localhost:6667, localhost:6668 –topic replicated-kafkatopic

We can have one more architecture for called “Multiple nodes – multiple broker” as below:-

pic-b
You can write client/custom code in any programming languages out of Java ,Python, Scala and JRuby for producer and consumer. But Java is the official client for Kafka broker.

We can start Kafka multiple broker by writing multiple configuration files i.e. server.properties.

Now below let’s have a look on server.properties file and discuss of its default values:-

replication.factor=1 > Set default replication factor
replica.fetchers=1 > Set num.replica.fatchers
and many more …

Same way we have producer.properties file and contains some of the default parameters.

Another important configuration file is with name consumer.properties in config folder contains the default values for consumer.

Now let’s debug Kafka and have a look on tools come with Kafka build:-

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker: – We get some information on consumer group and offset.
bin/kafka-run-class.sh kafka.tools.DumpLogSegments : – To debug the Kafka logs data for various debugging purposes such as understanding how much of the logs have been written and what’s the status of the various segments.
bin/kafka-run-class.sh kafka.tools.ExportZkOffsets :- When we want to take a backup of the offsets saved in ZooKeeper this tool would be very useful.
bin/kafka-run-class.sh kafka.tools.KafkaMigrationTool :- When we would like to migrate Kafka data from any lower version to another higher version.
bin/kafka-run-class.sh kafka.tools.MirrorMaker :- When we are running two different instance of Kfka and want to replicate data on one another.
bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK :- When our Kafka instance is up and running then this tool is to reset offset of a consumer in ZooKeeper.

Next Blog I’ll discuss integrating Kafka with Java and Python. We will write simple producer, consumer in both languages.
Above lest review few things and do that again:-

Window 1 run below command:-

echo “This is my first message” | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh –broker-list sandbox.hortonworks.com:6667 –topic new_messages_recvd  –new-producer

In Window2 run below to see the messages are appearing from Window1:-

kafka-console-consumer.sh –zookeeper sandbox.hortonworks.com:2181 –topic new_messages_recvd

Now let us move a step further to understand what is offset that I skipped completely in my previous article.

Kafka maintains offsets per consumer group. A group is a particular use case for consuming a topic. If we have two identical jobs running in parallel on a given topic to split the workload collectively, they share same group ID and collectively maintain topic offsets.

Groups are high level consumer to manage the major aspects of consuming topics. These consumer manage the offsets per partition per topic including parallel consumption by groups. Zookeeper is responsible to store this information.
Let us create post a message to producer with new topic :-

echo “This is my Mukesh message” | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh –broker-list sandbox.hortonworks.com:6667 –topic new_messages_recvd  –new-producer

Create custom properties configuration file with name “consumer.properties” and add below values to custom properties file:-

group.id=hungry_hippo2

consumer.timeout.ms=5000

Now pass this properties file to consumer using below command and see message appeared:-

/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh –zookeeper sandbox.hortonworks.com:2181 –topic new_messages_recvd –from-beginning –consumer.config consumer.properties

To see offset details we need to go into Zookeeper using below commands:-

zookeeper-shell.sh sandbox.hortonworks.com:2181

ls /consumers

[console-consumer-2529, console-consumer-31045, hungry_hippo, console-consumer-11236]

ls /consumers/hungry_hippo2/offsets/new_messages_recvd

[0]

ls /consumers/hungry_hippo/offsets/new_messages_recvd/0

[2]

This is how Kafka manage the offset for consumer group, in our case 2 is our offset.

Let us proceed further into Partitions i.e. a topic can be divided into partitions which may be distributed. Partitions are distribute/shard data across the brokers, enable parallelism and sequencing. Kafka very fast because it reads data sequentially from disk. When a consumer reads a topic, the messages will arrive in the order they were received.

We can Keyed our messages and default key value is null. The default behavior and routing scheme to distribute messages among partitions based on their keys and this routing scheme can be overridden with custom implementation.

/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh  –zookeeper sandbox.hortonworks.com:2181  –topic new_messages_recvd  –property print.key=true  –property key.separator=,  –from-beginning

This above command passed new parameters to the consumer, one is telling about to print separator and second is to specify a separator.

Now let us specify these values at producer using below commands:-

echo “KEY1,A keyed message” | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh –broker-list sandbox.hortonworks.com:6667 –topic new_messages_recvd –property parse.key=true –property key.separator=, –new-producer

Topics have some basic configuration properties that can change the behavior of a topic. Many topic related settings exists at the broker level and primarily topics help how to retain messages.

A Message can live forever, a period, until logs reach threshold, after log deleted and latest keyed version. Log compaction can be used to maintain the latest version of a message with given key. So before start it we first need to enable the log compaction in the primary server.properties:-

log.cleaner.enable=true

Now some cleanup stuff:-

To Alter a topic

kafka-topics.sh –zookeeper sandbox.hortonworks.com:2181 /chroot –alter –topic new_messages_recvd –partitions 40 –config delete.retention.ms=10000 –deleteConfig retention.ms

To delete topic we use below command:-

kafka-topics.sh –zookeeper sandbox.hortonworks.com:2181  –delete –topic new_messages_recvd

A graceful shutdown is sometime very important therefore use below:-

bin/kafka-server-stop.sh

Mirroring data between Kafka clusters: – Sometime we required to copy data from multiple Kafka clusters to a single one.

Leave a Reply

Your email address will not be published. Required fields are marked *