Using Kafka-Python with Jupyter Notebook

Kafka-Python Installation

Log into Jupyter node (standard PNDA cluster) or edge node (pico PNDA cluster), and run the command below to install kafka-python:

sudo su

export PATH=/opt/cloudera/parcels/Anaconda/bin:$PATH

conda install -c conda-forge kafka-python

Run the command below to verify the installation:

conda list | grep 'kafka-python'

kafka-python 1.3.3 py27_0 conda-forge

NOTE: you don't have to install kafka-python on each data nodes, as it will be visible by spark driver process only. </font>

In order to make python packages (e.g. kafka-python) available on data nodes or executor processes, you have two options:

a) repeat the above steps on each data nodes (it is recommended only when you have multiple Spark apps share the same dependencies).

b) ship python modules (.egg, .zip, .py) to each node in a spark session python package. Belows are example steps to build 'kafka-python' egg distribution.

exit # logout super user

wget https://github.com/dpkp/kafka-python/archive/1.3.3.tar.gz

tar -zxvf 1.3.3.tar.gz

cd kafka-python-1.3.3

python setup.py bdist_egg

ls dist kafka_python-1.3.3-py2.7.egg


In [ ]:
# Run this cell only if you want to add python module to spark context and have run through steps of option b) 
sc.addPyFile("/home/ubuntu/kafka-python-1.3.3/dist/kafka_python-1.3.3-py2.7.egg")

Simple producer test

Test Environment:

  • PNDA release 3.4 (pico)
  • Kafka-Python: version 1.3.3
  • Kafka: version 0.10.0.1
  • Kafka Consumer: kafka console consumer
  • Kafka topic: 'test'

Log on to kafka broker node and follow the steps below to create a 'test' topic if it does not exist:

[NOTE: you will need to find out zookeeper ip address and port (default port: 2181). You can get the information from Kafka manager portal link via PNDA portal.]

sudo /opt/pnda/kafka_2.11-0.10.0.1/bin/kafka-topics.sh --create --zookeeper <zookeeper_host_ip>:<zookeeper_service_port> --replication-factor 1 --partitions 1 --topic test

e.g.

sudo /opt/pnda/kafka_2.11-0.10.0.1/bin/kafka-topics.sh --create --zookeeper 10.0.1.160:2181 --replication-factor 1 --partitions 1 --topic test

Check the 'test' topic is created successfully.

kafka-topics.sh --list --zookeeper <zookeeper_host_ip>:<zookeeper_service_port>

e.g.

sudo /opt/pnda/kafka_2.11-0.10.0.1/bin/kafka-topics.sh --list --zookeeper 10.0.1.160:2181 __consumer_offsets avro.internal.testbot test

Define test kafka brokers and topics


In [ ]:
kafka_broker='10.0.1.160:9092' # replace argument with your kafka broker ip (if you have multiple brokers, pick one)
topics = ('test')

Simple producer test


In [ ]:
from kafka import SimpleProducer
from kafka import KafkaClient
kafka=KafkaClient(kafka_broker) 
producer = SimpleProducer(kafka)
producer.send_messages(topics, b'Hello From Kafka-Python Producer')

Simple consumer test


In [ ]:
from kafka import KafkaConsumer
consumer=KafkaConsumer(topics, bootstrap_servers=[kafka_broker], auto_offset_reset='earliest')
for msg in consumer:
    print(msg.value)

The End