Kafka, a pragmatic guide

What

Create a Kafka cluster coordinated by a ZooKeeper Ensemble (a.k.a. cluster). Then test it with a Python project.

I’m not a Kafka expert… I’m just learning and these are my notes…

How

We will use a set of docker containers to build the cluster in just one host. But it can be moved to a distributed multihost environment.

This is what we will create:

  • A 3 node Zookeeper ensemble (a.k.a. cluster)
  • A 3 node Kafka cluster
  • A docker network to join them all

Let’s do it

Network

Create the docker network

sudo docker network create -d bridge metamorphosis

Zookeeper

Apache ZooKeeper is an open-source server which enables highly reliable distributed coordination.

Kafka relies on it to coordinate work.

By default the image used here exposes 2181, 2888 and 3888. As you will see we will map three different ports on host to 2181.

We will create the three ZooKeeper instances named: 

zookeeper1zookeeper2zookeeper3

Open three terminal emulators and run these commands:

sudo docker run -it --name zookeeper1 -e ZOO_MY_ID=1 -e ZOO_SERVERS="server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888" --network metamorphosis --restart always --publish 2181:2181 zookeeper:3.4

sudo docker run -it --name zookeeper2 -e ZOO_MY_ID=2 -e ZOO_SERVERS="server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888" --network metamorphosis --restart always --publish 2182:2181 zookeeper:3.4

sudo docker run -it --name zookeeper3 -e ZOO_MY_ID=3 -e ZOO_SERVERS="server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888" --network metamorphosis --restart always --publish 2183:2181 zookeeper:3.4

They are run with -it option so we can see the logs, once you get this to prod env set the option to -d.

  • ZOO_SERVERS are the addresses we must use to connect between zookeeper instances.
  • ZOO_MY_ID is the ID of each instance inside the ensemble.

You must see something like this:

Kafka

Custom Docker Image

We will use a custom image based on docker image java:8 and Kafka binary distribution.

So, here is our Dockerfile:

FROM java:8
MAINTAINER Juan Matias Kungfu de la Camara Beovide <juan.delacamara@3xmgroup.com>
ENV KAFKA_RELEASE_ARCHIVE="kafka_2.11-1.1.0.tgz"

RUN mkdir /kafka /data /logs

RUN apt-get update && apt-get upgrade -y

# Copy Kafka binary distribution 
COPY ${KAFKA_RELEASE_ARCHIVE} /tmp

WORKDIR /tmp

# Install Kafka to /kafka 
RUN tar -zx -C /kafka --strip-components=1 -f ${KAFKA_RELEASE_ARCHIVE} && \
  rm -rf ${KAFKA_RELEASE_ARCHIVE}
COPY start.sh /kafka/start.sh

# Set up a user to run Kafka 
RUN groupadd kafka && \
  useradd -d /kafka -g kafka -s /bin/false kafka && \
  chown -R kafka:kafka /kafka /data /logs && \
  chmod ug+rx /kafka/start.sh

USER kafka 

ENV PATH /kafka/bin:$PATH

WORKDIR /kafka

CMD ["./start.sh"]

Based on this file we can see we need to have two extra files:

  • The binary Kafka (kafka_2.11-1.1.0.tgz in this case)
  • start.sh file (we will create it soon)

You can download kafka binary from Apache Kafka Site.

Now, the start.sh file:

#!/bin/bash

echo "Replacing env values into server.properties"
echo "    BROKER_ID = $BROKER_ID"
sed -i 's/broker.id=0/broker.id='"$BROKER_ID"'/g' config/server.properties
echo "    KAFKA_NODE_NAME:BROKER_PORT = $KAFKA_NODE_NAME:$BROKER_PORT"
echo "    BROKER_PORT_I = $BROKER_PORT_I"
echo " " >> config/server.properties
echo "advertised.listeners = OUTSIDE://$KAFKA_NODE_NAME:$BROKER_PORT,INSIDE://:$BROKER_PORT_I" >> config/server.properties
echo "listeners = OUTSIDE://:$BROKER_PORT,INSIDE://:$BROKER_PORT_I" >> config/server.properties
echo "listener.security.protocol.map = OUTSIDE:PLAINTEXT,INSIDE:PLAINTEXT"  >> config/server.properties
echo "inter.broker.listener.name = INSIDE" >> config/server.properties
echo "    log.dirs"
sed -i 's/log.dirs=.*/log.dirs=\/data/g' config/server.properties
echo "    ZOOKEEPER_CONNECT = $ZOOKEEPER_CONNECT"
sed -i 's/zookeeper.connect=localhost:2181/zookeeper.connect='"$ZOOKEEPER_CONNECT"'/g' config/server.properties

echo "Starting server"
./bin/kafka-server-start.sh config/server.properties

Let’s go through this file.

First, we replace/add data to config/server.properties which is the config Kafka file. This way we can set this data later on the run time as docker parameters.

Broker Id, port, listeners (where to listen connections), log.dirs (where to store data files) and zookeeper connect (where are zookeeper instances listening).

Over the end we call the start Kafka script (passing config file as param).

Put these files on a directory and build the docker image.

sudo docker build -t kafka-cockroach-edition .

Let’s run to the castle

First we need to create a directory for each container/Kafka instance we want to create. This will allow to have persistent data.

So, for three instances create these three little pigs… I mean these dirs:

/<your_current_path>/data/data1
/<your_current_path>/data/data2
/<your_current_path>/data/data3

Sure, you can name them as you want (Moe, Curly and Larry will be fine), but you need to reference them correctly after.

Now, we will need to set env vars, for this we will use these three envfiles:

envfile1

BROKER_ID=1
BROKER_PORT=9091
BROKER_PORT_I=9094
BROKER_SERVER=
KAFKA_NODE_QTY=3
ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
KAFKA_DATA_DIR=/data
KAFKA_TOPICS=
KAFKA_CREATE_TOPICS=0
KAFKA_NODE_NAME=kafka1

envfile2

BROKER_ID=2
BROKER_PORT=9092
BROKER_PORT_I=9095
BROKER_SERVER=
KAFKA_NODE_QTY=3
ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
KAFKA_DATA_DIR=/data
KAFKA_TOPICS=
KAFKA_CREATE_TOPICS=0
KAFKA_NODE_NAME=kafka2

envfile3

BROKER_ID=3
BROKER_PORT=9093
BROKER_PORT_I=9096
BROKER_SERVER=
KAFKA_NODE_QTY=3
ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
KAFKA_DATA_DIR=/data
KAFKA_TOPICS=
KAFKA_CREATE_TOPICS=0
KAFKA_NODE_NAME=kafka3

And now run the Kafka instances:

sudo docker run -it --expose 9091 --expose 9094 -p 9091:9091 --env-file ./envfile1 -v /$(pwd)/data/data1:/data:Z --network=metamorphosis --name kafka1 kafka-cockroach-edition 

sudo docker run -it --expose 9092 --expose 9095 -p 9092:9092 --env-file ./envfile2 -v /$(pwd)/data/data2:/data:Z --network=metamorphosis --name kafka2 kafka-cockroach-edition 

sudo docker run -it --expose 9093 --expose 9096 -p 9093:9093 --env-file ./envfile3 -v /$(pwd)/data/data3:/data:Z --network=metamorphosis --name kafka3 kafka-cockroach-edition 

Notes

  • I’m using the third param for -v option (Z) since I’m working on a SELinux enabled distro

Wait until instances are up and running.

You must see something like this:

On the left Zookeeper instances, on the right Kafka ones.

Create topic

So let’s create a topic. To do this get inside one of the Kafka instances.

sudo docker exec -it kafka1 bash

This will give access to container’s bash.

While you are in the container create the topic:

./bin/kafka-topics.sh --create --zookeeper zookeeper1:2181 --replication-factor 2 --partitions 6 --topic test

zookeeper1:2181 could be any of the zookeeper instances. (Try it)

We are creating a 6 partitions 2 replica-factor topic called test.

Access any other Kafka container…:

sudo docker exec -it kafka2 bash

…and check the topic

./bin/kafka-topics.sh --list --zookeeper zookeeper2:2181 

You can even describe the topic:

./bin/kafka-topics.sh --describe --topic test --zookeeper zookeeper3:2181

This should output this:

Topic:test  PartitionCount:6    ReplicationFactor:2 Configs:
    Topic: test Partition: 0    Leader: 3   Replicas: 3,2   Isr: 3,2
    Topic: test Partition: 1    Leader: 1   Replicas: 1,3   Isr: 1,3
    Topic: test Partition: 2    Leader: 2   Replicas: 2,1   Isr: 2,1
    Topic: test Partition: 3    Leader: 3   Replicas: 3,1   Isr: 3,1
    Topic: test Partition: 4    Leader: 1   Replicas: 1,2   Isr: 1,2
    Topic: test Partition: 5    Leader: 2   Replicas: 2,3   Isr: 2,3

Test The Test Topic, Mr T

Let Pub some messages and subscribe to the topic so we can read them.

So far you have two terminals connected to kafka1 and kafka2 (there you created the topic).

In kafka2 (could be in any of the instances anyway) run the following command:

./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9091 --from-beginning --topic test

This will let you connected to topic, awaiting for messages, like here:

In kafka1 run this command:

./bin/kafka-console-producer.sh --broker-list kafka1:9091,kafka2:9092,kafka3:9093 --topic test

This will show a prompt to you to add messages (type and press enter):

Now type something into producer and let the magic happens… your message is shown into the consumer… mind explosion, dude….

Exit both prod and consumer with Ctrl+C.

Time to enchant the python, Franz

We will use package kafka-python, so let’s install it:

pip install --user kafka-python

If you remember our solution design:

We have three Kafka ports bound to host ones. But we have an issue here.

We have published each Kafka instance to the Docker container name. Recall file start.sh and this line: 

echo "listeners=PLAINTEXT://$KAFKA_NODE_NAME:$BROKER_PORT" >> config/server.properties

The listeners are set to KAFKA_NODE_NAME. So we only be able to connect to Kafka on kafka1:9091, kafka2:9092 or kafka3:9093.

So, we have two ways to connect to these instances from python running on host:

  • Set names on /etc/hosts
  • Change the listener names to the host name

Just for this test… let use the first solution. Edit your /etc/hosts and add this line:

127.0.0.1   kafka1 kafka2 kafka3

The producer

Ok, now we have a little python to send data:

#!/usr/bin/env python

import time, random
import os 
import json
from kafka import KafkaProducer

producer_qty=3

childs=[]

# SEND DATA FUNCTION
def send_data(cid):
  # SET TOPIC AND SERVERS
  topic='test'
  servers='kafka1:9091,kafka2:9092,kafka3:9093'

  random.seed(cid)
  time2sleep=random.randint(5,10)

  print("Creating producer (timer: {})...".format(time2sleep))
  producer = KafkaProducer(bootstrap_servers=servers)

  # SEND DATA TO FRANZ
  while True:
    print("@{}   Preparing data:".format(cid))
    d = { 'title': 'my test 001',
      'name' : 'wall-e',
      'unusefuldata': 
    {
      'data': time.time(),
      'producer': cid
     }
    }
    print("@{}   {}".format(cid,json.dumps(d, indent=4)))

    print("@{}   Sending message...".format(cid))
    sent = producer.send(topic, json.dumps(d, indent=4).encode('utf-8'))
    print("@{}   Waiting results...".format(cid))
    result = sent.get(timeout=60)
    print("@{}   Result: {}".format(cid,result))
    time.sleep(time2sleep)

# READ FILE FUNCTION
def do_send():

  for i in range(0,producer_qty):
    print("Creating new fork")
    newpid = os.fork()
    if newpid == 0:
      print("    Forked... calling consume function...")
      send_data(os.getpid())
    else:
      pids = ()
      print("parent: {}, child: {}\n".format(os.getpid(), newpid))
      childs.append(newpid)
  while True:
    print("Childs running: {}".format(childs))
    time.sleep(10)

# OK, WELL, THE "MAIN" IF OF EVERY PYTHON (ALMOST)
if __name__ == "__main__":
  do_send()

Let’s step through the code.

We have two functions. 

do_send

Creates the processes and wait… you can change how many processes will be forked by modifying the producer_qty var.

send_data

This is where magic take place…

First set the topic and the servers to connect to. Then prepare data and connect to Kafka. This part it’s self explanatory.

The consumers

Partitions

While in one hand we have m producers sending data to Kafka in the other we can only have a few consumers. More on this now, see this graph:

On the left is a general view of our broker with one topic (the one we have created).

On the right side is a deep view on how topic works. Remember we have created 6 partitions…

./bin/kafka-topics.sh --create --zookeeper zookeeper1:2181 --replication-factor 2 --partitions 6 --topic test

… so is like we have six topics with the same name (test). When a producer sends a message kafka, by default, sends it to one partition in a round-robin fashion (theoretically, needs more research). We can have m producers.

On the consumer side we can have subscripted and active up to x consumers (being x the number of partitions) under the same consumer_group. Working on this way:

  • If we have one subscriber kafka will take in a round-robin fashion (theoretically, needs more research) the messages from partitions and will send them to consumer
  • If we have two subscribers each one will be effectively subscribed to x/2 partitions
  • If we have 3 each one will be subscribed to x/3 partitions (always using integer numbers, of course) and in general with y subscribers each one will be subscribed to x/y partitions
  • Due to the last point the max y for subscribers is x, any new subscriber after y=x will remain inactive until any of the active ones dies
Replication

By the way, other issue is replication. Despite the number of partitions replication works on the cluster, replicating each partition across the nodes.

Remember when we asked for a topic description:

Topic:test  PartitionCount:6    ReplicationFactor:2 Configs:
    Topic: test Partition: 0    Leader: 3   Replicas: 3,2   Isr: 3,2
    Topic: test Partition: 1    Leader: 1   Replicas: 1,3   Isr: 1,3
    Topic: test Partition: 2    Leader: 2   Replicas: 2,1   Isr: 2,1
    Topic: test Partition: 3    Leader: 3   Replicas: 3,1   Isr: 3,1
    Topic: test Partition: 4    Leader: 1   Replicas: 1,2   Isr: 1,2
    Topic: test Partition: 5    Leader: 2   Replicas: 2,3   Isr: 2,3

Here we have description for each partition, which node is the leader for this partition and what nodes replicas lives in.

Let’s start consuming, D10S

Here it is our python script:

#!/usr/bin/env python
from kafka import KafkaConsumer
import os
import time

topic='test'
servers='kafka1:9091,kafka2:9092,kafka3:9093'
consumer_id=1
consumer_qty=3

childs=[]

def consume(cid):
  print("    Starting conumser # {}".format(cid))
  consumer = KafkaConsumer(topic, group_id='grp_'+topic+'_1', bootstrap_servers=servers)
  for msg in consumer:
    print ("Consumidor {}: {}".format(cid,msg))

if __name__ == '__main__':
  for i in range(0,consumer_qty):
    print("Creating new fork")
    newpid = os.fork()
    if newpid == 0:
      print("    Forked... calling consume function...")
      consume(os.getpid())
    else:
      pids = ()
      print("parent: {}, child: {}\n".format(os.getpid(), newpid))
      childs.append(newpid)
  while True:
    print("Childs running: {}".format(childs))
    time.sleep(10)

This will create 3 consumers (modify consumer_qty to change this). So, with 6 partitions we must have one consumer subscription each two of them.

Let’s play a little bit

So, test you can play:

  • Kill one zookeeper instance and see how all is still working (e.g. use sudo docker kill zookeeper1)
  • How many instances you can kill before your brokers stop working?
  • Do the same with the Kafka instances (e.g. use sudo docker kill kafka1)
  • Modify the consumer to run 7 processes… take note of the last pid, it this last process receiving messages? (it shouldn’t)

References

https://devopscube.com/multi-broker-kafka-cluster-beginners-guide/
https://hub.docker.com/_/zookeeper/
https://kafka-python.readthedocs.io/en/master/
https://github.com/dpkp/kafka-python
https://towardsdatascience.com/getting-started-with-apache-kafka-in-python-604b3250aa05
https://sookocheff.com/post/kafka/kafka-in-a-nutshell/
Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Blog at WordPress.com.

Up ↑

%d bloggers like this: