In [1]:
"""
The connector for Kafka
__author__ = "Alex Xiao <http://www.alexxiao.me/>"
__date__ = "2017-04-22"
__version__ = "0.5"
"""
#!sudo pip3 install kafka-python
In [2]:
from kafka import KafkaConsumer
import queue
from threading import Thread
from kafka import KafkaProducer
import time
import json
#from multiprocessing import Process
from datetime import datetime
from exceptions import TimeoutException
In [3]:
CONNECTOR_ID=0
INIT_TIMEOUT=5000
In [4]:
class receiver(Thread):
def __init__(self,topic,max_queue_size=2000):
Thread.__init__(self)
self.max_queue_size=max_queue_size
self.topic=topic
self.consumer = KafkaConsumer(topic)
self.Q=queue.Queue()
self.get=self.Q.get
self.empty=self.Q.empty
self.running=True
self.DEBUG=True
self.results=dict()
self.last_msg=None
self.listening=False
def stop(self):
self.running=False
self.consumer.close()
def debug(self,flag):
self.DEBUG=flag
def check_clean(self):
#check and clean up queue
#if self.DEBUG:
# print('Stopped listening to topic:',self.topic)
if self.last_msg!=None:
#ever recrived msg:
#1. Check Q
if self.Q.qsize()>self.max_queue_size:
if self.DEBUG:
print('cleaning up commands received')
for i in range(0,self.max_queue_size/2):
#throw away 1st half of the queue
self.Q.get()
#2. Check dict
if len(self.results)>self.max_queue_size:
if self.DEBUG:
print('cleaning up results received')
drop_list=sorted(list(self.results.keys()))
for k in range(0,int(len(drop_list)/2)):
self.results.pop(k)
def run(self):
if self.DEBUG:
print('Start listening to topic:',self.topic)
self.listening=True
while self.running:
for msg in self.consumer:
if self.DEBUG:
print('Topic:',self.topic,' original msg:',msg)
try:
self.last_msg=msg
#convert to json
obj=json.loads(msg.value.decode())
if obj['connector_id']==self.connector_id or obj['original_request'] :
#discard if not for this connector
if self.DEBUG:
print('Topic:',self.topic,' received object:',obj)
if obj['original_request']:
obj['original_request']=False
obj['received_timestamp']=msg.timestamp
self.results[obj['msg_id']]=obj
except:
if self.DEBUG:
print('Topic:',self.topic,' received message:',msg)
self.Q.put(msg)
#trigger clean up if the holding too many messsages
if self.Q.qsize()>self.max_queue_size or len(self.results)>self.max_queue_size:
self.check_clean()
self.listening
if self.DEBUG:
print('Stopped listening to topic:',self.topic)
In [5]:
class sender():
def __init__(self,**kwargs):
Thread.__init__(self)
self.producer = KafkaProducer(**kwargs)
self.send_binary=self.producer.send
self.flush=self.producer.flush
def send(self,topic,string,timeout=0):
future = self.producer.send(topic,string.encode())
if timeout>0:
return future.get(timeout=timeout)
In [6]:
class connector():
def __init__(self,**kwargs):
global CONNECTOR_ID
CONNECTOR_ID+=1
self.connector_id=CONNECTOR_ID
self.sender = sender(**kwargs)
self.send_binary = self.sender.send
self.receivers=dict()
self.reqest_id=0
self.results=dict()
self.DEBUG=True
def listen(self,topic):
'''
Listening to a certain topic
'''
if topic not in self.receivers:
#not receiver has been set up
self.receivers[topic]=receiver(topic)
self.receivers[topic].start()
self.receivers[topic].__setattr__('timeout',50)
self.receivers[topic].__setattr__('connector_id',self.connector_id)
while not self.receivers[topic].listening:
time.sleep(1)
def get_request_id(self):
self.reqest_id+=1
return self.reqest_id
def receive(self,topic,txnid,timeout):
'''
To receive reply, is called by request function
topic: the topic which the reply will be from
txnid: the id of the tranaction you are looking for
timeout: in milliseconds
'''
timeout_ts=time.time() * 1000 +timeout
if self.receivers[topic].last_msg==None:
timeout_ts=time.time() * 1000 +INIT_TIMEOUT
while True:
if txnid in self.receivers[topic].results:
break
if time.time() * 1000>timeout_ts:
if self.DEBUG:
print('Timeout when waiting for response from: ',topic,' Id [',txnid,']')
raise TimeoutException()
#time.sleep(1)
return self.receivers[topic].results.pop(txnid)
def request(self,topic_to,topic_rec_from,msg,timeout=-1):
'''
To request a reply from certain topic
topic_to: the topic accepts requests
topic_rec_from: the topic which the reply will be from
msg: The real request message
timeout: [default to 50] in milliseconds
'''
#send and receive
self.listen(topic_rec_from)
if timeout<0:
timeout=self.receivers[topic_rec_from].timeout
if self.DEBUG:
print('No timeout set, use default from the topic: ',timeout)
txnid=self.get_request_id()
outmsg=dict()
outmsg['connector_id']=self.connector_id
outmsg['msg_id']=txnid
outmsg['msg']=msg
outmsg['original_request']=True
self.send_obj(topic_to,outmsg)
#p = Process(target=
#self.send_receive(topic_rec_from,txnid,timeout)
#p.start()
#p.join(timeout)
#self.receivers[topic_rec_from].remove(txnid)
return self.receive(topic_rec_from,txnid,timeout)
def get(self,topic):
'''
Get a none-standard message e,g, command from the queue
'''
self.listen(topic)
yield self.receivers[topic].get()
def send_obj(self,topic,obj,timeout=0):
return self.send(topic,json.dumps(obj),timeout)
def send(self,topic,string,timeout=0):
'''
Send info via sender
topic: the topic message is sent to
string: message in string
timeout: [default to 0], if greater than 0 will be block-send
'''
return self.sender.send(topic,string,timeout)
def stop(self):
'''
Stop all the listening receivers and quit
'''
#sending stop signal
for rec in self.receivers:
self.receivers[rec].stop()
self.send(self.receivers[rec].topic,'closing')
self.receivers[rec].join()
if self.DEBUG:
print('Connector offline')
In [7]:
#conn=connector(bootstrap_servers='localhost:9092')
In [8]:
#conn.request('test2','test2','ab3cd')
#Sample of msg:
#Topic: test original msg: ConsumerRecord(topic='test', partition=0, offset=115, timestamp=1490236534335, timestamp_type=0, key=None, value=b'{"connector_id": 2, "msg": "ab3cd", "msg_id": 3}', checksum=169569946, serialized_key_size=-1, serialized_value_size=48)
In [9]:
#conn.stop()
In [10]:
class connector_server(Thread):
def __init__(self,in_topic,out_topic,**kwargs):
Thread.__init__(self)
self.conn=connector(**kwargs)
self.in_topic=in_topic
self.out_topic=out_topic
self.conn.listen(self.in_topic)
self.running=True
def start(self):
self.running=True
def stop(self):
self.running=False
self.conn.stop()
def init_logic(self):
pass
def after_logic(self):
pass
def reply(self,msg):
self.conn.send_obj(self.out_topic,msg)
def core_logic(self,in_msg):
#raise TypeError('The core logic needs to be overwritten')
#default is echo
self.relply(in_msg)
print('echoing:',in_msg)
def run(self):
self.init_logic()
incoming=self.conn.receivers[self.in_topic].results
while self.running:
for msgkey in list(incoming.keys()):
self.core_logic(incoming.pop(msgkey))
self.after_logic()
ss=connector_server('test','test_reply',bootstrap_servers='localhost:9092') ss.start()
conn=connector(bootstrap_servers='localhost:9092')
conn.request('test','test_reply','abc')
class server_echo(Thread): def init(self): Thread.init(self) self.conn=connector(bootstrap_servers='localhost:9092') self.conn.listen('test1')
def run(self):
print('echo server is up')
while True:
for incoming in list(self.conn.receivers['test1'].results.keys()):
print('echo server got a message [',incoming,']')
obj=self.conn.receivers['test1'].results.pop(incoming)
print('[echo server]',obj)
obj['msg']='Echo:'+obj['msg']
self.conn.send_obj('test_reply',obj)
ss=server_echo() ss.start()
conn=connector(bootstrap_servers='localhost:9092')
conn.request('test2','test2','ab3cd')