In [1]:
from kafka import KafkaConsumer
import uuid
import json

In [2]:
consumer = KafkaConsumer(bootstrap_servers='', 
                         value_deserializer=lambda s: json.loads(s, encoding='utf-8'), 
                         auto_offset_reset='smallest', 
                         group_id=uuid.uuid4())

In [3]:
consumer.subscribe(['tweets'])

In [4]:
limit = 1000
consumer.poll(max_records=limit)
count = 0
data = []
for msg in consumer:
    data.append(msg.value)
    count += 1
    if count >= limit:
        break

In [5]:
len(data)


Out[5]:
1000

In [6]:
data[0]


Out[6]:
{u'created': u'2017-02-04 02:18:54',
 u'description': u'Addicted to News, Music, Cooking, Gardens, Outdoors, Tennis, Hunting, Politics, Living and Laughing with Family & Friends...',
 u'followers': 527,
 u'friends_count': 119,
 u'hashtags': u'[]',
 u'id_str': u'827702943428272128',
 u'loc': u'USA',
 u'message': u'RT @Nowsolarwa: Steve Bannon hires two racist Breitbart writers for White House policy positions https://t.co/iwwKeQH5Tu via @PalmerReport',
 u'name': u'DrWAVeSportCd1',
 u'original_id': 2707322102,
 u'original_name': u'Now!Solar',
 u'retweet': u'Y',
 u'retweet_count': 0,
 u'text': u'RT @Nowsolarwa: Steve Bannon hires two racist Breitbart writers for White House policy positions https://t.co/iwwKeQH5Tu via @PalmerReport',
 u'topics': [u'cucks', u'breitbart'],
 u'user_created': u'2013-08-04 21:07:08'}

In [15]:
import ast, itertools 
from dateutil import parser

In [28]:
hashtags = []
for tweet in data:
    hashtag = tweet.get('hashtags')
    created = tweet.get('created')
    if hashtag is not None and hashtag != u'[]':
        hashtags.append((ast.literal_eval(hashtag), parser.parse(created)))

hashtag_date = [(item.lower(), date) for sublist, date in hashtags for item in sublist]

In [10]:
consumer.close()