Kafka Consumer for Twitter


In [1]:
from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import getopt
import json
from pprint import pformat

In [6]:
def stats_cb(stats_json_str):
    stats_json = json.loads(stats_json_str)
    print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))

broker = 'node1774.palmetto.clemson.edu:9092'
group = '1'
topics = ['test']
    
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {'bootstrap.servers': broker, \
        'group.id': group, \
        'session.timeout.ms': 6000,\
        'default.topic.config': {'auto.offset.reset': 'smallest'}}

In [8]:
# Create Consumer instance
c = Consumer(**conf)

def print_assignment(consumer, partitions):
    print('Assignment:', partitions)

# Subscribe to topics
c.subscribe(topics, on_assign=print_assignment)


# Read messages from Kafka, print to stdout
try:
    while True:
        msg = c.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            # Error or event
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset()))
            elif msg.error():
                # Error
                raise KafkaException(msg.error())
        else:
            # Proper message
            # sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' % (msg.topic(), msg.partition(), msg.offset(), str(msg.key())))
            jsonData = json.loads(json.loads(msg.value()))
            # print (jsonData['text'])
            print(msg.value())

except KeyboardInterrupt:
    sys.stderr.write('%% Aborted by user\n')

# Close down consumer to commit final offsets.
c.close()


Assignment: []
Assignment: []
Assignment: []
Assignment: []
Assignment: []
Assignment: []
%% Aborted by user

In [ ]:


In [ ]: