In [1]:
"""
The connector for Kafka
__author__ = "Alex Xiao <http://www.alexxiao.me/>"
__date__ = "2017-04-22"
__version__ = "0.5"

"""
#!sudo pip3 install kafka-python

In [2]:
from kafka import KafkaConsumer
import queue
from threading import Thread
from kafka import KafkaProducer
import time
import json
#from multiprocessing import Process
from datetime import datetime
from exceptions import TimeoutException

In [3]:
CONNECTOR_ID=0
INIT_TIMEOUT=5000

In [4]:
class receiver(Thread):
    def __init__(self,topic,max_queue_size=2000):
        Thread.__init__(self)
        self.max_queue_size=max_queue_size
        self.topic=topic
        self.consumer = KafkaConsumer(topic)
        self.Q=queue.Queue()
        self.get=self.Q.get
        self.empty=self.Q.empty
        self.running=True
        self.DEBUG=True
        self.results=dict()
        self.last_msg=None
        self.listening=False
       
    def stop(self):
        self.running=False
        self.consumer.close()
        
    def debug(self,flag):
        self.DEBUG=flag
        
    def check_clean(self):
        #check and clean up queue
        #if self.DEBUG:
        #    print('Stopped listening to topic:',self.topic)
        if self.last_msg!=None:
            #ever recrived msg:
            #1. Check Q
            if self.Q.qsize()>self.max_queue_size:
                if self.DEBUG:
                    print('cleaning up commands received')
                for i in range(0,self.max_queue_size/2):
                    #throw away 1st half of the queue
                    self.Q.get()
            #2. Check dict
            if len(self.results)>self.max_queue_size:
                if self.DEBUG:
                    print('cleaning up results received')
                drop_list=sorted(list(self.results.keys()))
                for k in range(0,int(len(drop_list)/2)):
                    self.results.pop(k)

        
    def run(self):
        if self.DEBUG:
            print('Start listening to topic:',self.topic)
        self.listening=True
        while self.running:            
            for msg in self.consumer:
                if self.DEBUG:
                    print('Topic:',self.topic,' original msg:',msg)
                try:
                    self.last_msg=msg
                    #convert to json                    
                    obj=json.loads(msg.value.decode())
                    if obj['connector_id']==self.connector_id or obj['original_request'] :
                        #discard if not for this connector
                        if self.DEBUG:
                            print('Topic:',self.topic,' received object:',obj)
                        if obj['original_request']:
                            obj['original_request']=False
                        obj['received_timestamp']=msg.timestamp
                        self.results[obj['msg_id']]=obj
                except:
                    if self.DEBUG:
                        print('Topic:',self.topic,' received message:',msg)
                    self.Q.put(msg)
            #trigger clean up if the holding too many messsages
            if self.Q.qsize()>self.max_queue_size or len(self.results)>self.max_queue_size:
                self.check_clean()
        self.listening
        if self.DEBUG:
            print('Stopped listening to topic:',self.topic)

In [5]:
class sender():
    def __init__(self,**kwargs):
        Thread.__init__(self)
        self.producer = KafkaProducer(**kwargs)
        self.send_binary=self.producer.send
        self.flush=self.producer.flush
        
    def send(self,topic,string,timeout=0):
        future = self.producer.send(topic,string.encode())
        if timeout>0:
            return future.get(timeout=timeout)

In [6]:
class connector():
    def __init__(self,**kwargs):
        global CONNECTOR_ID
        CONNECTOR_ID+=1
        self.connector_id=CONNECTOR_ID
        self.sender = sender(**kwargs)
        self.send_binary = self.sender.send
        self.receivers=dict()
        self.reqest_id=0
        self.results=dict()
        self.DEBUG=True
        
    def listen(self,topic):
        '''
            Listening to a certain topic
        '''
        if topic not in self.receivers:
            #not receiver has been set up
            self.receivers[topic]=receiver(topic)
            self.receivers[topic].start()
            self.receivers[topic].__setattr__('timeout',50)
            self.receivers[topic].__setattr__('connector_id',self.connector_id)
            while not self.receivers[topic].listening:
                time.sleep(1)
            
    
    def get_request_id(self):
        self.reqest_id+=1
        return self.reqest_id
    
    def receive(self,topic,txnid,timeout):
        '''
            To receive reply, is called by request function
            topic: the topic which the reply will be from
            txnid: the id of the tranaction you are looking for
            timeout: in milliseconds
        '''
        timeout_ts=time.time() * 1000 +timeout
        if self.receivers[topic].last_msg==None:
            timeout_ts=time.time() * 1000 +INIT_TIMEOUT
        while True:
            if txnid in self.receivers[topic].results:
                break
            if time.time() * 1000>timeout_ts:
                if self.DEBUG:
                    print('Timeout when waiting for response from: ',topic,' Id [',txnid,']')
                raise TimeoutException()
            #time.sleep(1)
        return self.receivers[topic].results.pop(txnid)
    
    def request(self,topic_to,topic_rec_from,msg,timeout=-1):
        '''
            To request a reply from certain topic
            topic_to: the topic accepts requests
            topic_rec_from: the topic which the reply will be from
            msg: The real request message
            timeout: [default to 50] in milliseconds
        
        '''
        #send and receive
        
        self.listen(topic_rec_from)
        if timeout<0:
            timeout=self.receivers[topic_rec_from].timeout
            if self.DEBUG:
                print('No timeout set, use default from the topic: ',timeout)
        txnid=self.get_request_id()
        outmsg=dict()
        outmsg['connector_id']=self.connector_id
        outmsg['msg_id']=txnid
        outmsg['msg']=msg
        outmsg['original_request']=True
        self.send_obj(topic_to,outmsg)
        #p = Process(target=
        #self.send_receive(topic_rec_from,txnid,timeout)
        #p.start()
        #p.join(timeout)
        #self.receivers[topic_rec_from].remove(txnid)
        return self.receive(topic_rec_from,txnid,timeout)
        
    
    def get(self,topic):
        '''
            Get a none-standard message e,g, command from the queue
        '''
        self.listen(topic)
        yield self.receivers[topic].get()
        
    def send_obj(self,topic,obj,timeout=0):
        return self.send(topic,json.dumps(obj),timeout)
        
    def send(self,topic,string,timeout=0):
        '''
            Send info via sender
            topic: the topic message is sent to
            string: message in string
            timeout: [default to 0], if greater than 0 will be block-send
        '''
        return self.sender.send(topic,string,timeout)
    
    def stop(self):
        '''
            Stop all the listening receivers and quit
        '''
        #sending stop signal
        for rec in self.receivers:
            self.receivers[rec].stop()
            self.send(self.receivers[rec].topic,'closing')
            self.receivers[rec].join()
        if self.DEBUG:
            print('Connector offline')

In [7]:
#conn=connector(bootstrap_servers='localhost:9092')

In [8]:
#conn.request('test2','test2','ab3cd')
#Sample of msg:
#Topic: test  original msg: ConsumerRecord(topic='test', partition=0, offset=115, timestamp=1490236534335, timestamp_type=0, key=None, value=b'{"connector_id": 2, "msg": "ab3cd", "msg_id": 3}', checksum=169569946, serialized_key_size=-1, serialized_value_size=48)

In [9]:
#conn.stop()

In [10]:
class connector_server(Thread):
    def __init__(self,in_topic,out_topic,**kwargs):
        Thread.__init__(self)
        self.conn=connector(**kwargs)
        self.in_topic=in_topic
        self.out_topic=out_topic
        self.conn.listen(self.in_topic)
        self.running=True
    
    def start(self):
        self.running=True
    
    def stop(self):
        self.running=False
        self.conn.stop()
        
    def init_logic(self):
        pass
    
    def after_logic(self):
        pass
    
        
    def reply(self,msg):
        self.conn.send_obj(self.out_topic,msg)
    
    def core_logic(self,in_msg):
        #raise TypeError('The core logic needs to be overwritten')
        #default is echo
        self.relply(in_msg)
        print('echoing:',in_msg)
        
    
    def run(self):
        self.init_logic()
        incoming=self.conn.receivers[self.in_topic].results
        while self.running:
            for msgkey in list(incoming.keys()):
                self.core_logic(incoming.pop(msgkey))
        self.after_logic()

ss=connector_server('test','test_reply',bootstrap_servers='localhost:9092') ss.start()

conn=connector(bootstrap_servers='localhost:9092')

conn.request('test','test_reply','abc')

class server_echo(Thread): def init(self): Thread.init(self) self.conn=connector(bootstrap_servers='localhost:9092') self.conn.listen('test1')

def run(self):
    print('echo server is up')
    while True:
        for incoming in list(self.conn.receivers['test1'].results.keys()):
            print('echo server got a message [',incoming,']')
            obj=self.conn.receivers['test1'].results.pop(incoming)
            print('[echo server]',obj)
            obj['msg']='Echo:'+obj['msg']
            self.conn.send_obj('test_reply',obj)

ss=server_echo() ss.start()

conn=connector(bootstrap_servers='localhost:9092')

conn.request('test2','test2','ab3cd')