In [1]:
"""
The wapper of sockets for both server & client

__author__ = "Alex Xiao <http://www.alexxiao.me/>"
__date__ = "2017-03-13"
__version__ = "0.5"

"""
import socket
import sys,time
from Log import log
from threading import Thread
from exceptions import ProcessEndException

In [2]:
#To show threads
#import threading
#threading.enumerate()

In [3]:
class socket_server(Thread):
    """
        To open a socket server
        
        addr 
    """
    def __init__(self,addr,port,in_core_logic,socket_name='Socket_server'
                 ,backlog=None,init_fun=None):
        Thread.__init__(self)
        self.address=addr
        self.port=port
        self.socket_name=socket_name
        self.backlog=backlog
        log('Socket Server initing @'+addr+':'+str(port),self.socket_name,'Info')
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # Bind the socket to the port
        self.server_address = (self.address, self.port)
        #print >>sys.stderr, 'starting up on %s port %s' % server_address
        self.sock.bind(self.server_address)
        self.core_logic=in_core_logic
        self.running=True
        self.timeout=-1 #300
        self.available_check_interval=30
        self.thread_list=dict()
        self.max_connections=1
        if init_fun!=None:
            init_fun(self)
        
    
    def init_logic(self):#,*args,**kwargs):
        '''Method to be override
        Input:
            *args: list arguments
            **kwargs: key words arguments        
        '''
        #raise TypeError('Method not yet defined')
        pass
    
    #def thread_logic(in_fun):
    #    def wrapper(self,connection,client_address):
    #        th_name=str(client_address)
    #        log('Connection to '+th_name+' is set...',self.socket_name,'Info')
    #        # run the loop while the server is up and the thread has not been removed from thread list
    #        while self.running and th_name in list(self.thread_list):
    #            try:                    
    #                self.core_logic(self,connection,client_address)
    #            except ProcessEndException:
    #                log('Connection to '+th_name+' is closing...',self.socket_name,'Info')
    #            except Exception as e:
    #                log(str(e),self.socket_name,'Error')
    #            finally:
    #                connectoin.close()
    #                log('Connection to '+th_name+' has been closed',self.socket_name,'Info')
    #        return wrapper
    #
    #@thread_logic
    #def each_client_logic(self,connection,client_address):
    #    self.core_logic(self,connection,client_address)
    def each_client_logic(self,connection,client_address):
        th_name=str(client_address)
        log('Connection to '+th_name+' is set...',self.socket_name,'Info')
        # run the loop while the server is up and the thread has not been removed from thread list
        alive=True
        while alive and self.running and th_name in list(self.thread_list):
            try:                    
                self.core_logic(self,connection,client_address)
            except socket.timeout:
                log('Connection to '+th_name+' is timeout...',self.socket_name,'Error')
                alive=False
            except ProcessEndException:
                log('Connection to '+th_name+' is closing...',self.socket_name,'Info')
                alive=False
            except Exception as e:
                print(str(e))
                log(str(e),self.socket_name,'Error')
        connection.close()
        log('Connection to '+th_name+' has been closed',self.socket_name,'Info')
        #self.core_logic(self,connection,client_address)
    
    
    #def core_logic(self,connection,client_address):
    #    '''Method to be override'''
    #    raise TypeError('Method not yet defined')
    
    def stop(self):
        self.running=False
        log('Waiting for all connections to be closed...',self.socket_name,'Info')
        for th in list(self.thread_list):
            if self.thread_list[th].is_alive():
                self.thread_list[th].join() 
        log('All connections closed. server is stopped',self.socket_name,'Info')
        self.sock.close()
        
    def check_clean_up(self):
        connections_available_cnt=self.max_connections
        for th in list(self.thread_list):
            if self.thread_list[th].is_alive():
                connections_available_cnt-=1 
            else:
                log('Closing connections '+th,self.socket_name,'Info')
                del self.thread_list[th]
                #pass
        return connections_available_cnt
    
    def run(self):
        
        # Listen for incoming connections
        if self.backlog!=None:
            self.sock.listen(self.backlog)
        else:
            self.sock.listen()
        connections_available_cnt=self.max_connections
        while self.running:
            # Wait for a connection
            #print >>sys.stderr, 'waiting for a connection'
            if self.max_connections>0 and connections_available_cnt>0:
                connection, client_address = self.sock.accept()
                try:
                    #print >>sys.stderr, 'connection from', client_address
                    log('Connected by client:'+str(client_address),self.socket_name,'Info')
                    # Receive the data in small chunks and retransmit it
                    #while self.running:
                    #try:
                        #self.core_logic(connection, client_address)
                    if self.timeout>0:
                        connection.settimeout(self.timeout)
                    # Start a new thread for the connection
                    th=Thread(target = self.each_client_logic,args = (connection, client_address))                    
                    self.thread_list[str(client_address)]=th
                    th.start()
                #except ProcessEndException:
                    #self.running=False

                except Exception as e:
                    log(str(e),self.socket_name,'Error')
                    #self.running=False
                #finally:
                    # Clean up the connection
                    #connection.close() 
                    #log('Socket Server Closed @'+self.address+':'+str(self.port),self.socket_name,'Info')
            else:
                if self.max_connections>1:
                    log('Max connections reached, could not accept more',self.socket_name,'Warning')
            #run connections check
            connections_available_cnt=self.check_clean_up()
                    
            #set main socket thread to sleep
            time.sleep(self.available_check_interval)

    def send_line(self,conn,string, line_end='\n'):
        conn.sendall((string+line_end).encode())

    def read_line(self,conn,buffer_size=256, line_end='\n'):
        buffer = ''
        data = True
        while data:
            data = conn.recv(buffer_size)
            buffer += data.decode()
            while buffer.find(line_end) != -1:
                line, buffer = buffer.split(line_end, 1)
                yield line
        return

In [4]:
class socket_client():
    def __init__(self,addr,port,socket_name='Socket_client'):
        self.address=addr
        self.port=port
        self.socket_name=socket_name
        log('Socket client initing @'+addr+':'+str(port),socket_name,'Info')# Create a TCP/IP socket
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # Connect the socket to the port where the server is listening
        self.server_address = (addr, port)
        #print >>sys.stderr, 'connecting to %s port %s' % server_address
        self.sock.connect(self.server_address)
        self.sendall=self.sock.sendall
        self.send=self.sock.send
        
    def send_line(self,string,line_end='\n'):
        self.sendall((string+line_end).encode())
        
    def read_line(self,buffer_size=256, line_end='\n'):
        buffer = ''
        data = True
        while data:
            data = self.sock.recv(buffer_size)
            buffer += data.decode()
            while buffer.find(line_end) != -1:
                line, buffer = buffer.split(line_end, 1)
                yield line
        return
        
    def close(self):
        self.sock.close()

In [ ]:


In [5]:
#socket_port=16319
#
##@socket_serverd('localhost',socket_port)
#def server_logic(self,connection, client_address):
#    for line in self.read_line(connection):
#        print('rec',line)
#        if line=='Stop':
#            raise ProcessEndException()
#
#
#
#server=socket_server('localhost',socket_port,server_logic)
#server.start()
#
#
##d=socket_client('localhost',socket_port)
#
#c=socket_client('localhost',socket_port)
#
#c.send_line('Stsop')
#
#
#
#

In [19]:
#c.send_line('Stop')
#
#t=server.thread_list["('127.0.0.1', 34418)"]
#t.is_alive()


rec Stop

In [17]:
#connections_available_cnt=server.max_connections
#for th in list(server.thread_list):
#    if server.thread_list[th].is_alive():
#        connections_available_cnt-=1 
#    else:
#        log('Closing connections '+th,server.socket_name,'Info')
#        del server.thread_list[th]
#
#server.thread_list
#
#import DB
#ca=DB.get_im_memory_share()
#
#ca.select('logs',' 1=1 order by id desc limit 10')