Acquire and decompress Kafka
$ wget http://download.nextag.com/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
$ tar xzf kafka_2.11-1.0.0.tgz
Run the following from separate terminals:
cd kafka_2.11-1.0.0
bin/zookeeper-server-start.sh config/zookeeper.properties
cd kafka_2.11-1.0.0
bin/kafka-server-start.sh config/server.properties
In [1]:
!cd ~/software/kafka_2.11-1.0.0; \
./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
!cd ~/software/kafka_2.11-1.0.0; \
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
!cd ~/software/kafka_2.11-1.0.0; \
./bin/kafka-topics.sh --list --zookeeper localhost:2181
To delete a topic:
Add the following lines to config/server.properties
: delete.topic.enable=true
In [ ]:
!cd ~/software/kafka_2.11-1.0.0 & bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Setup Confluent_Kafka
First, install your own Anaconda to a local directory in your home on Palmetto.
Next, perform the following steps
$ wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh
$ sh Anaconda3-5.0.1-Linux-x86_64.sh
$ export PATH=/home/lngo/software/anaconda3/bin:$PATH
$ conda create --name kafka python=3.6
$ source activate kafka
$ conda install jupyter
$ python -m ipykernel install --prefix=/home/lngo/.local/ --name 'Python-Kafka-3.6'
$ conda install -c conda-forge python-confluent-kafka
$ conda install -c conda-forge tweepy
In [ ]:
from confluent_kafka import Producer
import sys
import logging
import json
from datetime import datetime
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
broker = 'localhost:9092'
topic = 'test'
# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {'bootstrap.servers': broker}
# Create Producer instance
p = Producer(**conf)
# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message key %s delivered to %s [%d]\n' % (msg.key(), msg.topic(), msg.partition()))
class StdOutListener(StreamListener):
def on_data(self, data):
try:
jsonData = json.loads(data)
p.produce(topic, json.dumps(data), key=str(jsonData['id']), callback=delivery_callback)
except BufferError as e:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))
p.poll(0)
return True
def on_error(self, status):
print (status)
# read cert (not on github)
keyFile = open('/home/lngo/.cert/Twitter','r')
CONSUMER_KEY = keyFile.readline().rstrip()
CONSUMER_SECRET = keyFile.readline().rstrip()
ACCESS_TOKEN_KEY = keyFile.readline().rstrip()
ACCESS_TOKEN_SECRET = keyFile.readline().rstrip()
auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET)
while True:
try:
stream = Stream(auth, StdOutListener())
stream.filter(track=['Clemson'])
except IncompleteRead:
print ("Lets try it again")
continue
except KeyboardInterrupt:
stream.disconnect()
break
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
In [ ]: