Kafka Producer for Twitter

Acquire and decompress Kafka

$ wget http://download.nextag.com/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
$ tar xzf kafka_2.11-1.0.0.tgz

Run the following from separate terminals:

cd kafka_2.11-1.0.0
bin/zookeeper-server-start.sh config/zookeeper.properties
cd kafka_2.11-1.0.0
bin/kafka-server-start.sh config/server.properties

In [1]:
!cd ~/software/kafka_2.11-1.0.0; \
    ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
!cd ~/software/kafka_2.11-1.0.0; \
    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
!cd ~/software/kafka_2.11-1.0.0; \
    ./bin/kafka-topics.sh --list --zookeeper localhost:2181


Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Created topic "test".
__consumer_offsets
test

To delete a topic:

Add the following lines to config/server.properties: delete.topic.enable=true


In [ ]:
!cd ~/software/kafka_2.11-1.0.0 & bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

Setup Confluent_Kafka

First, install your own Anaconda to a local directory in your home on Palmetto.

Next, perform the following steps

$ wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh
$ sh Anaconda3-5.0.1-Linux-x86_64.sh
$ export PATH=/home/lngo/software/anaconda3/bin:$PATH
$ conda create --name kafka python=3.6
$ source activate kafka
$ conda install jupyter
$ python -m ipykernel install --prefix=/home/lngo/.local/ --name 'Python-Kafka-3.6'
$ conda install -c conda-forge python-confluent-kafka
$ conda install -c conda-forge tweepy

In [ ]:
from confluent_kafka import Producer
import sys

import logging
import json

from datetime import datetime

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream

broker = 'localhost:9092'
topic = 'test'

# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {'bootstrap.servers': broker}

# Create Producer instance
p = Producer(**conf)

# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
def delivery_callback(err, msg):
    if err:
        sys.stderr.write('%% Message failed delivery: %s\n' % err)
    else:
        sys.stderr.write('%% Message key %s delivered to %s [%d]\n' % (msg.key(), msg.topic(), msg.partition()))

class StdOutListener(StreamListener):
    def on_data(self, data):
        try:
            jsonData = json.loads(data)
            p.produce(topic, json.dumps(data), key=str(jsonData['id']), callback=delivery_callback)
        except BufferError as e:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))
        p.poll(0)
        return True
    def on_error(self, status):
        print (status)
        
# read cert (not on github)
keyFile = open('/home/lngo/.cert/Twitter','r')
CONSUMER_KEY = keyFile.readline().rstrip()
CONSUMER_SECRET = keyFile.readline().rstrip()
ACCESS_TOKEN_KEY = keyFile.readline().rstrip()
ACCESS_TOKEN_SECRET = keyFile.readline().rstrip()

auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET)

while True:
    try:
        stream = Stream(auth, StdOutListener())
        stream.filter(track=['Clemson'])
    except IncompleteRead:
        print ("Lets try it again")
        continue
    except KeyboardInterrupt:
        stream.disconnect()
        break
        
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()


% Message key b'935205074590224384' delivered to test [0]
% Message key b'935205079136854016' delivered to test [0]
% Message key b'935205080797794305' delivered to test [0]
% Message key b'935205098933968896' delivered to test [0]
% Message key b'935205117766356993' delivered to test [0]
% Message key b'935205119498575873' delivered to test [0]
% Message key b'935205153896050688' delivered to test [0]
% Message key b'935205171415498754' delivered to test [0]
% Message key b'935205197533638657' delivered to test [0]
% Message key b'935205199123304449' delivered to test [0]
% Message key b'935205232283389952' delivered to test [0]
% Message key b'935205246539915264' delivered to test [0]
% Message key b'935205261689712641' delivered to test [0]
% Message key b'935205265716236288' delivered to test [0]
% Message key b'935205294899998720' delivered to test [0]
% Message key b'935205296452132865' delivered to test [0]
% Message key b'935205309475360771' delivered to test [0]
% Message key b'935205313095045120' delivered to test [0]
% Message key b'935205317293600769' delivered to test [0]
% Message key b'935205321462702081' delivered to test [0]
% Message key b'935205322825850880' delivered to test [0]
% Message key b'935205327385124864' delivered to test [0]
% Message key b'935205327942938624' delivered to test [0]
% Message key b'935205339435237378' delivered to test [0]
% Message key b'935205342799155200' delivered to test [0]
% Message key b'935205353523957760' delivered to test [0]
% Message key b'935205376177434632' delivered to test [0]
% Message key b'935205381952999424' delivered to test [0]
% Message key b'935205399883632640' delivered to test [0]
% Message key b'935205413171101702' delivered to test [0]
% Message key b'935205413166944256' delivered to test [0]
% Message key b'935205416258228225' delivered to test [0]
% Message key b'935205417831010304' delivered to test [0]
% Message key b'935205453230899201' delivered to test [0]
% Message key b'935205468833796096' delivered to test [0]
% Message key b'935205498130857984' delivered to test [0]
% Message key b'935205508046344192' delivered to test [0]
% Message key b'935205517550604296' delivered to test [0]
% Message key b'935205526115438592' delivered to test [0]
% Message key b'935205528502001664' delivered to test [0]
% Message key b'935205532604026881' delivered to test [0]
% Message key b'935205588153372677' delivered to test [0]
% Message key b'935205608579649536' delivered to test [0]
% Message key b'935205612719394816' delivered to test [0]
% Message key b'935205616397750273' delivered to test [0]
% Message key b'935205628116701184' delivered to test [0]
% Message key b'935205643526471680' delivered to test [0]
% Message key b'935205648320618497' delivered to test [0]
% Message key b'935205654138155011' delivered to test [0]
% Message key b'935205675587731459' delivered to test [0]
% Message key b'935205678666387456' delivered to test [0]
% Message key b'935205688162377728' delivered to test [0]
% Message key b'935205698090225664' delivered to test [0]
% Message key b'935205713550413825' delivered to test [0]
% Message key b'935205732898824192' delivered to test [0]
% Message key b'935205735293734912' delivered to test [0]
% Message key b'935205745385029634' delivered to test [0]
% Message key b'935205787303063552' delivered to test [0]
% Message key b'935205787915505664' delivered to test [0]
% Message key b'935205790524301312' delivered to test [0]
% Message key b'935205795431673856' delivered to test [0]
% Message key b'935205802729791488' delivered to test [0]
% Message key b'935205802880708609' delivered to test [0]
% Message key b'935205807599378434' delivered to test [0]
% Message key b'935205817292177410' delivered to test [0]
% Message key b'935205831448068096' delivered to test [0]
% Message key b'935205883268788224' delivered to test [0]
% Message key b'935205915900481536' delivered to test [0]
% Message key b'935205920119906310' delivered to test [0]
% Message key b'935205928147800064' delivered to test [0]
% Message key b'935205953653432320' delivered to test [0]
% Message key b'935205983005085696' delivered to test [0]
% Message key b'935205991129468928' delivered to test [0]
% Message key b'935205991477579778' delivered to test [0]
% Message key b'935205996326203397' delivered to test [0]
% Message key b'935206004593168384' delivered to test [0]
% Message key b'935206038902575104' delivered to test [0]
% Message key b'935206057512751104' delivered to test [0]
% Message key b'935206060511686656' delivered to test [0]
% Message key b'935206082615619586' delivered to test [0]
% Message key b'935206092161867778' delivered to test [0]
% Message key b'935206124722192384' delivered to test [0]
% Message key b'935206162487799808' delivered to test [0]
% Message key b'935206162844274690' delivered to test [0]
% Message key b'935206170410868743' delivered to test [0]
% Message key b'935206197682163717' delivered to test [0]
% Message key b'935206200328753153' delivered to test [0]
% Message key b'935206220834754560' delivered to test [0]
% Message key b'935206235875528704' delivered to test [0]
% Message key b'935206239507795969' delivered to test [0]
% Message key b'935206255404179459' delivered to test [0]
% Message key b'935206264547790849' delivered to test [0]
% Message key b'935206302191669248' delivered to test [0]
% Message key b'935206312555810816' delivered to test [0]
% Message key b'935206317253451776' delivered to test [0]
% Message key b'935206329957978113' delivered to test [0]
% Message key b'935206330905882624' delivered to test [0]
% Message key b'935206337193107456' delivered to test [0]
% Message key b'935206351952805889' delivered to test [0]
% Message key b'935206366591049728' delivered to test [0]
% Message key b'935206377479458824' delivered to test [0]
% Message key b'935206392071376896' delivered to test [0]
% Message key b'935206403563769858' delivered to test [0]
% Message key b'935206428750630917' delivered to test [0]
% Message key b'935206439441846273' delivered to test [0]
% Message key b'935206470110629889' delivered to test [0]
% Message key b'935206470676811776' delivered to test [0]
% Message key b'935206474506211329' delivered to test [0]
% Message key b'935206507267919873' delivered to test [0]
% Message key b'935206523776663552' delivered to test [0]
% Message key b'935206536502276096' delivered to test [0]
% Message key b'935206541619355648' delivered to test [0]
% Message key b'935206545658466304' delivered to test [0]
% Message key b'935206552595779584' delivered to test [0]
% Message key b'935206562330824704' delivered to test [0]
% Message key b'935206565229088770' delivered to test [0]
% Message key b'935206588339568640' delivered to test [0]
% Message key b'935206600725422080' delivered to test [0]
% Message key b'935206603891994624' delivered to test [0]
% Message key b'935206612670836738' delivered to test [0]
% Message key b'935206616454123521' delivered to test [0]
% Message key b'935206627891974150' delivered to test [0]
% Message key b'935206629791944705' delivered to test [0]
% Message key b'935206634225270791' delivered to test [0]
% Message key b'935206663442886656' delivered to test [0]
% Message key b'935206694312972288' delivered to test [0]
% Message key b'935206696573636609' delivered to test [0]
% Message key b'935206714420408320' delivered to test [0]
% Message key b'935206716161052673' delivered to test [0]
% Message key b'935206771685072896' delivered to test [0]
% Message key b'935206786583457792' delivered to test [0]
% Message key b'935206798314954757' delivered to test [0]
% Message key b'935206815981363200' delivered to test [0]
% Message key b'935206816576888832' delivered to test [0]
% Message key b'935206816845197313' delivered to test [0]
% Message key b'935206830741053440' delivered to test [0]
% Message key b'935206841537187841' delivered to test [0]
% Message key b'935206845203042305' delivered to test [0]
% Message key b'935206848818446337' delivered to test [0]
% Message key b'935206851330879488' delivered to test [0]
% Message key b'935206881823461376' delivered to test [0]
% Message key b'935206895136198656' delivered to test [0]
% Message key b'935206895199096833' delivered to test [0]
% Message key b'935206900018401280' delivered to test [0]
% Message key b'935206901675110403' delivered to test [0]
% Message key b'935206924009857025' delivered to test [0]
% Message key b'935206929311334401' delivered to test [0]
% Message key b'935206941814599681' delivered to test [0]
% Message key b'935206955114786816' delivered to test [0]
% Message key b'935206976509894659' delivered to test [0]
% Message key b'935206985976418305' delivered to test [0]
% Message key b'935206993261772800' delivered to test [0]
% Message key b'935207012488630272' delivered to test [0]
% Message key b'935207034152140800' delivered to test [0]
% Message key b'935207085767307264' delivered to test [0]
% Message key b'935207100556328960' delivered to test [0]
% Message key b'935207119455956992' delivered to test [0]
% Message key b'935207120777220096' delivered to test [0]
% Message key b'935207129153236992' delivered to test [0]
% Message key b'935207140138070017' delivered to test [0]
% Message key b'935207160363012102' delivered to test [0]
% Message key b'935207162632122368' delivered to test [0]
% Message key b'935207170622263296' delivered to test [0]
% Message key b'935207173969403907' delivered to test [0]
% Message key b'935207176515223552' delivered to test [0]
% Message key b'935207178314665985' delivered to test [0]
% Message key b'935207188003516416' delivered to test [0]
% Message key b'935207192017457154' delivered to test [0]
% Message key b'935207204940066816' delivered to test [0]
% Message key b'935207211097141248' delivered to test [0]
% Message key b'935207221046194176' delivered to test [0]
% Message key b'935207230722285569' delivered to test [0]
% Message key b'935207248225284096' delivered to test [0]
% Message key b'935207265912664065' delivered to test [0]
% Message key b'935207305242730497' delivered to test [0]
% Message key b'935207307193081859' delivered to test [0]
% Message key b'935207361295372288' delivered to test [0]
% Message key b'935207378974277632' delivered to test [0]
% Message key b'935207387220398080' delivered to test [0]
% Message key b'935207388130328578' delivered to test [0]
% Message key b'935207425170464768' delivered to test [0]
% Message key b'935207439485624320' delivered to test [0]
% Message key b'935207439888273409' delivered to test [0]
% Message key b'935207455977590784' delivered to test [0]
% Message key b'935207474210181120' delivered to test [0]
% Message key b'935207491062894592' delivered to test [0]
% Message key b'935207548784906245' delivered to test [0]
% Message key b'935207557513310208' delivered to test [0]
% Message key b'935207575448096769' delivered to test [0]
% Message key b'935207623716110336' delivered to test [0]
% Message key b'935207629068087297' delivered to test [0]
% Message key b'935207635111985152' delivered to test [0]
% Message key b'935207647996936193' delivered to test [0]
% Message key b'935207652908568576' delivered to test [0]
% Message key b'935207664060993536' delivered to test [0]
% Message key b'935207694717411331' delivered to test [0]
% Message key b'935207711767019520' delivered to test [0]
% Message key b'935207718633263104' delivered to test [0]
% Message key b'935207720575225857' delivered to test [0]
% Message key b'935207731174223874' delivered to test [0]
% Message key b'935207744067555329' delivered to test [0]
% Message key b'935207764011470848' delivered to test [0]
% Message key b'935207795829497856' delivered to test [0]
% Message key b'935207797813399552' delivered to test [0]
% Message key b'935207815223959552' delivered to test [0]
% Message key b'935207831535579141' delivered to test [0]
% Message key b'935207838649110528' delivered to test [0]
% Message key b'935207854063194112' delivered to test [0]
% Message key b'935207858014003200' delivered to test [0]
% Message key b'935207882106294273' delivered to test [0]
% Message key b'935207887751843840' delivered to test [0]
% Message key b'935207887743406081' delivered to test [0]
% Message key b'935207923176955904' delivered to test [0]
% Message key b'935207966185283586' delivered to test [0]
% Message key b'935207993259503617' delivered to test [0]
% Message key b'935207995511844869' delivered to test [0]
% Message key b'935208000867979264' delivered to test [0]
% Message key b'935208006559596544' delivered to test [0]
% Message key b'935208007742435328' delivered to test [0]
% Message key b'935208011056013314' delivered to test [0]
% Message key b'935208013748727812' delivered to test [0]
% Message key b'935208066366271489' delivered to test [0]
% Message key b'935208070984142855' delivered to test [0]
% Message key b'935208082308763650' delivered to test [0]
% Message key b'935208088789049344' delivered to test [0]
% Message key b'935208092157075456' delivered to test [0]
% Message key b'935208101053116416' delivered to test [0]
% Message key b'935208102680498176' delivered to test [0]
% Message key b'935208105729765376' delivered to test [0]
% Message key b'935208139032494081' delivered to test [0]
% Message key b'935208145093349376' delivered to test [0]
% Message key b'935208151955267585' delivered to test [0]
% Message key b'935208173677445120' delivered to test [0]
% Message key b'935208178542895104' delivered to test [0]
% Message key b'935208236705337344' delivered to test [0]
% Message key b'935208249334337536' delivered to test [0]
% Message key b'935208268581847040' delivered to test [0]
% Message key b'935208276379217920' delivered to test [0]
% Message key b'935208303679991810' delivered to test [0]
% Message key b'935208308499197958' delivered to test [0]
% Message key b'935208311561080834' delivered to test [0]
% Message key b'935208316237701120' delivered to test [0]
% Message key b'935208328879398913' delivered to test [0]
% Message key b'935208349670498306' delivered to test [0]
% Message key b'935208350832320512' delivered to test [0]
% Message key b'935208360097517568' delivered to test [0]
% Message key b'935208363520077824' delivered to test [0]
% Message key b'935208370667229185' delivered to test [0]
% Message key b'935208389478703105' delivered to test [0]
% Message key b'935208397942730752' delivered to test [0]
% Message key b'935208400065060864' delivered to test [0]
% Message key b'935208426191425536' delivered to test [0]
% Message key b'935208432608608261' delivered to test [0]
% Message key b'935208441509015552' delivered to test [0]
% Message key b'935208448769347586' delivered to test [0]
% Message key b'935208459020177408' delivered to test [0]
% Message key b'935208491756720128' delivered to test [0]
% Message key b'935208509771218946' delivered to test [0]
% Message key b'935208528763080704' delivered to test [0]
% Message key b'935208549986205698' delivered to test [0]
% Message key b'935208591753203712' delivered to test [0]
% Message key b'935208597864280066' delivered to test [0]
% Message key b'935208617070006272' delivered to test [0]
% Message key b'935208617048985600' delivered to test [0]
% Message key b'935208681490153473' delivered to test [0]
% Message key b'935208714067415040' delivered to test [0]
% Message key b'935208717334798337' delivered to test [0]
% Message key b'935208752294367232' delivered to test [0]
% Message key b'935208756534829058' delivered to test [0]
% Message key b'935208766848544769' delivered to test [0]
% Message key b'935208777846083584' delivered to test [0]
% Message key b'935208778429009920' delivered to test [0]
% Message key b'935208786494705665' delivered to test [0]
% Message key b'935208790839975938' delivered to test [0]
% Message key b'935208792932761601' delivered to test [0]
% Message key b'935208800046518274' delivered to test [0]
% Message key b'935208807051022337' delivered to test [0]
% Message key b'935208817977065473' delivered to test [0]
% Message key b'935208837862363136' delivered to test [0]
% Message key b'935208845097488384' delivered to test [0]
% Message key b'935208850101342209' delivered to test [0]
% Message key b'935208856476581889' delivered to test [0]
% Message key b'935208870926016512' delivered to test [0]
% Message key b'935208872360464384' delivered to test [0]
% Message key b'935208874021449729' delivered to test [0]
% Message key b'935208883898916864' delivered to test [0]
% Message key b'935208889842323456' delivered to test [0]
% Message key b'935208924873031680' delivered to test [0]
% Message key b'935208925946957826' delivered to test [0]
% Message key b'935208934150983682' delivered to test [0]
% Message key b'935208944439570439' delivered to test [0]
% Message key b'935208954078089217' delivered to test [0]
% Message key b'935208955231469568' delivered to test [0]
% Message key b'935208966509928448' delivered to test [0]
% Message key b'935208994687279105' delivered to test [0]
% Message key b'935209027855966208' delivered to test [0]
% Message key b'935209038459146240' delivered to test [0]
% Message key b'935209111729463296' delivered to test [0]
% Message key b'935209131836956674' delivered to test [0]
% Message key b'935209135557218304' delivered to test [0]
% Message key b'935209148739878912' delivered to test [0]
% Message key b'935209148995833862' delivered to test [0]
% Message key b'935209159510937600' delivered to test [0]
% Message key b'935209173637369856' delivered to test [0]
% Message key b'935209182604742659' delivered to test [0]
% Message key b'935209199038095360' delivered to test [0]
% Message key b'935209200384466947' delivered to test [0]

In [ ]: