What
Create a Kafka cluster coordinated by a ZooKeeper Ensemble (a.k.a. cluster). Then test it with a Python project.
I’m not a Kafka expert… I’m just learning and these are my notes…
How
We will use a set of docker containers to build the cluster in just one host. But it can be moved to a distributed multihost environment.
This is what we will create:
- A 3 node Zookeeper ensemble (a.k.a. cluster)
- A 3 node Kafka cluster
- A docker network to join them all

Let’s do it

Network
Create the docker network
sudo docker network create -d bridge metamorphosis
Zookeeper
Apache ZooKeeper is an open-source server which enables highly reliable distributed coordination.
Kafka relies on it to coordinate work.
By default the image used here exposes 2181, 2888 and 3888. As you will see we will map three different ports on host to 2181.
We will create the three ZooKeeper instances named:
zookeeper1, zookeeper2, zookeeper3
Open three terminal emulators and run these commands:
sudo docker run -it --name zookeeper1 -e ZOO_MY_ID=1 -e ZOO_SERVERS="server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888" --network metamorphosis --restart always --publish 2181:2181 zookeeper:3.4
sudo docker run -it --name zookeeper2 -e ZOO_MY_ID=2 -e ZOO_SERVERS="server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888" --network metamorphosis --restart always --publish 2182:2181 zookeeper:3.4
sudo docker run -it --name zookeeper3 -e ZOO_MY_ID=3 -e ZOO_SERVERS="server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888" --network metamorphosis --restart always --publish 2183:2181 zookeeper:3.4
They are run with -it option so we can see the logs, once you get this to prod env set the option to -d.
- ZOO_SERVERS are the addresses we must use to connect between zookeeper instances.
- ZOO_MY_ID is the ID of each instance inside the ensemble.
You must see something like this:

Kafka
Custom Docker Image
We will use a custom image based on docker image java:8 and Kafka binary distribution.
So, here is our Dockerfile:
FROM java:8
MAINTAINER Juan Matias Kungfu de la Camara Beovide <juan.delacamara@3xmgroup.com>
ENV KAFKA_RELEASE_ARCHIVE="kafka_2.11-1.1.0.tgz"
RUN mkdir /kafka /data /logs
RUN apt-get update && apt-get upgrade -y
# Copy Kafka binary distribution
COPY ${KAFKA_RELEASE_ARCHIVE} /tmp
WORKDIR /tmp
# Install Kafka to /kafka
RUN tar -zx -C /kafka --strip-components=1 -f ${KAFKA_RELEASE_ARCHIVE} && \
rm -rf ${KAFKA_RELEASE_ARCHIVE}
COPY start.sh /kafka/start.sh
# Set up a user to run Kafka
RUN groupadd kafka && \
useradd -d /kafka -g kafka -s /bin/false kafka && \
chown -R kafka:kafka /kafka /data /logs && \
chmod ug+rx /kafka/start.sh
USER kafka
ENV PATH /kafka/bin:$PATH
WORKDIR /kafka
CMD ["./start.sh"]
Based on this file we can see we need to have two extra files:
- The binary Kafka (kafka_2.11-1.1.0.tgz in this case)
- start.sh file (we will create it soon)
You can download kafka binary from Apache Kafka Site.
Now, the start.sh file:
#!/bin/bash
echo "Replacing env values into server.properties"
echo " BROKER_ID = $BROKER_ID"
sed -i 's/broker.id=0/broker.id='"$BROKER_ID"'/g' config/server.properties
echo " KAFKA_NODE_NAME:BROKER_PORT = $KAFKA_NODE_NAME:$BROKER_PORT"
echo " BROKER_PORT_I = $BROKER_PORT_I"
echo " " >> config/server.properties
echo "advertised.listeners = OUTSIDE://$KAFKA_NODE_NAME:$BROKER_PORT,INSIDE://:$BROKER_PORT_I" >> config/server.properties
echo "listeners = OUTSIDE://:$BROKER_PORT,INSIDE://:$BROKER_PORT_I" >> config/server.properties
echo "listener.security.protocol.map = OUTSIDE:PLAINTEXT,INSIDE:PLAINTEXT" >> config/server.properties
echo "inter.broker.listener.name = INSIDE" >> config/server.properties
echo " log.dirs"
sed -i 's/log.dirs=.*/log.dirs=\/data/g' config/server.properties
echo " ZOOKEEPER_CONNECT = $ZOOKEEPER_CONNECT"
sed -i 's/zookeeper.connect=localhost:2181/zookeeper.connect='"$ZOOKEEPER_CONNECT"'/g' config/server.properties
echo "Starting server"
./bin/kafka-server-start.sh config/server.properties
Let’s go through this file.
First, we replace/add data to config/server.properties which is the config Kafka file. This way we can set this data later on the run time as docker parameters.
Broker Id, port, listeners (where to listen connections), log.dirs (where to store data files) and zookeeper connect (where are zookeeper instances listening).
Over the end we call the start Kafka script (passing config file as param).
Put these files on a directory and build the docker image.
sudo docker build -t kafka-cockroach-edition .
Let’s run to the castle

First we need to create a directory for each container/Kafka instance we want to create. This will allow to have persistent data.
So, for three instances create these three little pigs… I mean these dirs:
/<your_current_path>/data/data1
/<your_current_path>/data/data2
/<your_current_path>/data/data3
Sure, you can name them as you want (Moe, Curly and Larry will be fine), but you need to reference them correctly after.
Now, we will need to set env vars, for this we will use these three envfiles:
envfile1
BROKER_ID=1
BROKER_PORT=9091
BROKER_PORT_I=9094
BROKER_SERVER=
KAFKA_NODE_QTY=3
ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
KAFKA_DATA_DIR=/data
KAFKA_TOPICS=
KAFKA_CREATE_TOPICS=0
KAFKA_NODE_NAME=kafka1
envfile2
BROKER_ID=2
BROKER_PORT=9092
BROKER_PORT_I=9095
BROKER_SERVER=
KAFKA_NODE_QTY=3
ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
KAFKA_DATA_DIR=/data
KAFKA_TOPICS=
KAFKA_CREATE_TOPICS=0
KAFKA_NODE_NAME=kafka2
envfile3
BROKER_ID=3
BROKER_PORT=9093
BROKER_PORT_I=9096
BROKER_SERVER=
KAFKA_NODE_QTY=3
ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
KAFKA_DATA_DIR=/data
KAFKA_TOPICS=
KAFKA_CREATE_TOPICS=0
KAFKA_NODE_NAME=kafka3
And now run the Kafka instances:
sudo docker run -it --expose 9091 --expose 9094 -p 9091:9091 --env-file ./envfile1 -v /$(pwd)/data/data1:/data:Z --network=metamorphosis --name kafka1 kafka-cockroach-edition
sudo docker run -it --expose 9092 --expose 9095 -p 9092:9092 --env-file ./envfile2 -v /$(pwd)/data/data2:/data:Z --network=metamorphosis --name kafka2 kafka-cockroach-edition
sudo docker run -it --expose 9093 --expose 9096 -p 9093:9093 --env-file ./envfile3 -v /$(pwd)/data/data3:/data:Z --network=metamorphosis --name kafka3 kafka-cockroach-edition
Notes
- I’m using the third param for -v option (Z) since I’m working on a SELinux enabled distro
Wait until instances are up and running.
You must see something like this:

On the left Zookeeper instances, on the right Kafka ones.
Create topic
So let’s create a topic. To do this get inside one of the Kafka instances.
sudo docker exec -it kafka1 bash
This will give access to container’s bash.
While you are in the container create the topic:
./bin/kafka-topics.sh --create --zookeeper zookeeper1:2181 --replication-factor 2 --partitions 6 --topic test
zookeeper1:2181 could be any of the zookeeper instances. (Try it)
We are creating a 6 partitions 2 replica-factor topic called test.
Access any other Kafka container…:
sudo docker exec -it kafka2 bash
…and check the topic
./bin/kafka-topics.sh --list --zookeeper zookeeper2:2181
You can even describe the topic:
./bin/kafka-topics.sh --describe --topic test --zookeeper zookeeper3:2181
This should output this:
Topic:test PartitionCount:6 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: test Partition: 1 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: test Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test Partition: 3 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: test Partition: 4 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test Partition: 5 Leader: 2 Replicas: 2,3 Isr: 2,3
Test The Test Topic, Mr T

Let Pub some messages and subscribe to the topic so we can read them.
So far you have two terminals connected to kafka1 and kafka2 (there you created the topic).
In kafka2 (could be in any of the instances anyway) run the following command:
./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9091 --from-beginning --topic test
This will let you connected to topic, awaiting for messages, like here:

In kafka1 run this command:
./bin/kafka-console-producer.sh --broker-list kafka1:9091,kafka2:9092,kafka3:9093 --topic test
This will show a prompt to you to add messages (type and press enter):

Now type something into producer and let the magic happens… your message is shown into the consumer… mind explosion, dude….

Exit both prod and consumer with Ctrl+C.
Time to enchant the python, Franz
We will use package kafka-python, so let’s install it:
pip install --user kafka-python
If you remember our solution design:

We have three Kafka ports bound to host ones. But we have an issue here.
We have published each Kafka instance to the Docker container name. Recall file start.sh and this line:
echo "listeners=PLAINTEXT://$KAFKA_NODE_NAME:$BROKER_PORT" >> config/server.properties
The listeners are set to KAFKA_NODE_NAME. So we only be able to connect to Kafka on kafka1:9091, kafka2:9092 or kafka3:9093.
So, we have two ways to connect to these instances from python running on host:
- Set names on /etc/hosts
- Change the listener names to the host name
Just for this test… let use the first solution. Edit your /etc/hosts and add this line:
127.0.0.1 kafka1 kafka2 kafka3
The producer
Ok, now we have a little python to send data:

#!/usr/bin/env python
import time, random
import os
import json
from kafka import KafkaProducer
producer_qty=3
childs=[]
# SEND DATA FUNCTION
def send_data(cid):
# SET TOPIC AND SERVERS
topic='test'
servers='kafka1:9091,kafka2:9092,kafka3:9093'
random.seed(cid)
time2sleep=random.randint(5,10)
print("Creating producer (timer: {})...".format(time2sleep))
producer = KafkaProducer(bootstrap_servers=servers)
# SEND DATA TO FRANZ
while True:
print("@{} Preparing data:".format(cid))
d = { 'title': 'my test 001',
'name' : 'wall-e',
'unusefuldata':
{
'data': time.time(),
'producer': cid
}
}
print("@{} {}".format(cid,json.dumps(d, indent=4)))
print("@{} Sending message...".format(cid))
sent = producer.send(topic, json.dumps(d, indent=4).encode('utf-8'))
print("@{} Waiting results...".format(cid))
result = sent.get(timeout=60)
print("@{} Result: {}".format(cid,result))
time.sleep(time2sleep)
# READ FILE FUNCTION
def do_send():
for i in range(0,producer_qty):
print("Creating new fork")
newpid = os.fork()
if newpid == 0:
print(" Forked... calling consume function...")
send_data(os.getpid())
else:
pids = ()
print("parent: {}, child: {}\n".format(os.getpid(), newpid))
childs.append(newpid)
while True:
print("Childs running: {}".format(childs))
time.sleep(10)
# OK, WELL, THE "MAIN" IF OF EVERY PYTHON (ALMOST)
if __name__ == "__main__":
do_send()
Let’s step through the code.
We have two functions.
do_send
Creates the processes and wait… you can change how many processes will be forked by modifying the producer_qty var.
send_data
This is where magic take place…
First set the topic and the servers to connect to. Then prepare data and connect to Kafka. This part it’s self explanatory.
The consumers
Partitions
While in one hand we have m producers sending data to Kafka in the other we can only have a few consumers. More on this now, see this graph:

On the left is a general view of our broker with one topic (the one we have created).
On the right side is a deep view on how topic works. Remember we have created 6 partitions…
./bin/kafka-topics.sh --create --zookeeper zookeeper1:2181 --replication-factor 2 --partitions 6 --topic test
… so is like we have six topics with the same name (test). When a producer sends a message kafka, by default, sends it to one partition in a round-robin fashion (theoretically, needs more research). We can have m producers.
On the consumer side we can have subscripted and active up to x consumers (being x the number of partitions) under the same consumer_group. Working on this way:
- If we have one subscriber kafka will take in a round-robin fashion (theoretically, needs more research) the messages from partitions and will send them to consumer
- If we have two subscribers each one will be effectively subscribed to x/2 partitions
- If we have 3 each one will be subscribed to x/3 partitions (always using integer numbers, of course) and in general with y subscribers each one will be subscribed to x/y partitions
- Due to the last point the max y for subscribers is x, any new subscriber after y=x will remain inactive until any of the active ones dies
Replication

By the way, other issue is replication. Despite the number of partitions replication works on the cluster, replicating each partition across the nodes.
Remember when we asked for a topic description:
Topic:test PartitionCount:6 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: test Partition: 1 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: test Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test Partition: 3 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: test Partition: 4 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test Partition: 5 Leader: 2 Replicas: 2,3 Isr: 2,3
Here we have description for each partition, which node is the leader for this partition and what nodes replicas lives in.
Let’s start consuming, D10S
Here it is our python script:
#!/usr/bin/env python
from kafka import KafkaConsumer
import os
import time
topic='test'
servers='kafka1:9091,kafka2:9092,kafka3:9093'
consumer_id=1
consumer_qty=3
childs=[]
def consume(cid):
print(" Starting conumser # {}".format(cid))
consumer = KafkaConsumer(topic, group_id='grp_'+topic+'_1', bootstrap_servers=servers)
for msg in consumer:
print ("Consumidor {}: {}".format(cid,msg))
if __name__ == '__main__':
for i in range(0,consumer_qty):
print("Creating new fork")
newpid = os.fork()
if newpid == 0:
print(" Forked... calling consume function...")
consume(os.getpid())
else:
pids = ()
print("parent: {}, child: {}\n".format(os.getpid(), newpid))
childs.append(newpid)
while True:
print("Childs running: {}".format(childs))
time.sleep(10)
This will create 3 consumers (modify consumer_qty to change this). So, with 6 partitions we must have one consumer subscription each two of them.
Let’s play a little bit
So, test you can play:
- Kill one zookeeper instance and see how all is still working (e.g. use sudo docker kill zookeeper1)
- How many instances you can kill before your brokers stop working?
- Do the same with the Kafka instances (e.g. use sudo docker kill kafka1)
- Modify the consumer to run 7 processes… take note of the last pid, it this last process receiving messages? (it shouldn’t)
References
https://devopscube.com/multi-broker-kafka-cluster-beginners-guide/
https://hub.docker.com/_/zookeeper/
https://kafka-python.readthedocs.io/en/master/
https://github.com/dpkp/kafka-python
https://towardsdatascience.com/getting-started-with-apache-kafka-in-python-604b3250aa05
https://sookocheff.com/post/kafka/kafka-in-a-nutshell/
Leave a Reply