Log into Jupyter node (standard PNDA cluster) or edge node (pico PNDA cluster), and run the command below to install kafka-python:
sudo su
export PATH=/opt/cloudera/parcels/Anaconda/bin:$PATH
conda install -c conda-forge kafka-python
Run the command below to verify the installation:
conda list | grep 'kafka-python'
kafka-python 1.3.3 py27_0 conda-forge
NOTE: you don't have to install kafka-python on each data nodes, as it will be visible by spark driver process only. </font>
In order to make python packages (e.g. kafka-python) available on data nodes or executor processes, you have two options:
a) repeat the above steps on each data nodes (it is recommended only when you have multiple Spark apps share the same dependencies).
b) ship python modules (.egg, .zip, .py) to each node in a spark session python package. Belows are example steps to build 'kafka-python' egg distribution.
exit# logout super user
wget https://github.com/dpkp/kafka-python/archive/1.3.3.tar.gz
tar -zxvf 1.3.3.tar.gz
cd kafka-python-1.3.3
python setup.py bdist_egg
ls dist kafka_python-1.3.3-py2.7.egg
In [ ]:
# Run this cell only if you want to add python module to spark context and have run through steps of option b)
sc.addPyFile("/home/ubuntu/kafka-python-1.3.3/dist/kafka_python-1.3.3-py2.7.egg")
Test Environment:
Log on to kafka broker node and follow the steps below to create a 'test' topic if it does not exist:
[NOTE: you will need to find out zookeeper ip address and port (default port: 2181). You can get the information from Kafka manager portal link via PNDA portal.]
sudo /opt/pnda/kafka_2.11-0.10.0.1/bin/kafka-topics.sh --create --zookeeper <zookeeper_host_ip>:<zookeeper_service_port> --replication-factor 1 --partitions 1 --topic test
e.g.
sudo /opt/pnda/kafka_2.11-0.10.0.1/bin/kafka-topics.sh --create --zookeeper 10.0.1.160:2181 --replication-factor 1 --partitions 1 --topic test
Check the 'test' topic is created successfully.
kafka-topics.sh --list --zookeeper <zookeeper_host_ip>:<zookeeper_service_port>
e.g.
sudo /opt/pnda/kafka_2.11-0.10.0.1/bin/kafka-topics.sh --list --zookeeper 10.0.1.160:2181 __consumer_offsets avro.internal.testbot test
In [ ]:
kafka_broker='10.0.1.160:9092' # replace argument with your kafka broker ip (if you have multiple brokers, pick one)
topics = ('test')
In [ ]:
from kafka import SimpleProducer
from kafka import KafkaClient
kafka=KafkaClient(kafka_broker)
producer = SimpleProducer(kafka)
producer.send_messages(topics, b'Hello From Kafka-Python Producer')
In [ ]:
from kafka import KafkaConsumer
consumer=KafkaConsumer(topics, bootstrap_servers=[kafka_broker], auto_offset_reset='earliest')
for msg in consumer:
print(msg.value)