In [1]:
"""
The DB tools for Sqlite 3

__author__ = "Alex Xiao <http://www.alexxiao.me/>"
__date__ = "2017-02-08"
__version__ = "0.5"

"""


Out[1]:
'\nThe DB tools for Sqlite 3\n\n__author__ = "Alex Xiao <http://www.alexxiao.me/>"\n__date__ = "2017-02-08"\n__version__ = "0.5"\n\n'

In [2]:
import sqlite3
import collections
#conn = sqlite3.connect('example.db')
# to create in memory database use
#conn = sqlite3.connect(':memory:')

In [3]:
sqlite3.enable_shared_cache(True)

In [4]:
Column = collections.namedtuple('Column', ['name','type','nullable'])
Column.__new__.__defaults__ = (False,)


class DB:
    def __init__(self,dbname,**args):
        self.conn=None
        self.DB_NAME=dbname
        self.DEBUG=False
        self.conn = sqlite3.connect(self.DB_NAME,**args)

    def connect(self,**args):
        #file::memory:?cache=shared to open a shared DB
        #global DB_NAME, conn
        if self.conn!=None:
            #Try to check/ disconnect
            #TBC
            if self.DEBUG:
                print('Trying to check beforce connect')
        self.conn = sqlite3.connect(self.DB_NAME,**args)
        
    def disconnect(self):
        self.conn.close()

    def run(self,cmd,variables=None):
        """
        Run the DB command
        Input Parameters
            cmd: the actual command
            variables: [Optional] variables if any
        Output:
            Result set: [list of named tuple]
            Row  count: Integer
            Result Type: String data/ cmd/ error
        """   
        c=None
        try:
            c = self.conn.cursor()
            if variables!=None:
                c.executemany(cmd,variables)
            else:
                c.execute(cmd)   
            rtn=c.fetchall()
            cnt=len(rtn)
            rtype='data'
            if 'select' not in cmd.lower():
                self.conn.commit()  
                cnt=c.rowcount
                rtype='cmd'
            else:
                header=''
                for colheader in c.description:
                    header+=colheader[0]+' '
                RES=collections.namedtuple('Data',header.strip())
                rtno=rtn
                rtn=[]
                for cur in range(0,cnt):
                    #convert to namedtuple
                    rtn.append(RES(*rtno[cur]))
            if self.DEBUG:
                print(cnt,'rows')
        except Exception as e:
            rtype='error'
            rtn=e
            cnt=-2
            
        if c!=None:
            c.close()
        return rtn,cnt,rtype

    def create_list_of_tuple(self,headers=[],data=[],tupletype='Data'):
        """
        The function to create list of named tuple from normal tuple
        Inputs:
            headers: list of header names
            data: list of tuples
            tupletype [Optional]: The type name of the tuple, default to Data
        Output:
            the named tuple
        """
        rtn=[]
        RES=collections.namedtuple(tupletype,headers)
        for cur in range(0,len(data)):
                #convert to namedtuple
                rtn.append(RES(*data[cur]))    
        return rtn
    
    def create_table(self,tablename,coldef,ignore_if_exist=False):
        """
        The default table creator, it will create table with id as primary which is auto incremental
        Input:
            tablename: the name of the table
            coldef: namedtuple of column definitions, e.g
                coldef=[Column(name='name',type='text',nullable=True),Column(name='type',type='CHARACTER(1)')]
            ignore_if_exist: [optional, default to False], if True, will check if the current tables is alreay there with same definition
        Output: N/A
        """
        if ignore_if_exist:
            rtn,cod,typ=self.select('sqlite_master',"name='"+tablename+"'")
            print(rtn,cod,typ)
            if typ=='data' and cod>0:  #len(rtn)>0:
                #table alreay exisis
                
                return rtn,cod,typ

        stmt="create table "+tablename+'(ID INTEGER  PRIMARY KEY AUTOINCREMENT'
        for col in coldef:
            stmt+=','
            stmt+=col.name+' '+col.type
            if not col.nullable:
                stmt+=' NOT NULL '
        stmt+=')'
        rtn=self.run(stmt)
        index_stmt='CREATE INDEX '+tablename+'_main_index ON '+tablename+'(id)'
        return self.run(index_stmt)

    def insert(self,tablename,headers=[], data=[]):
        """
        Insert into table

        Inputs:
            tablename: the target table
            headers: the list of headers e.g. headers=['name','type']
            data: list of data e.g. data = [('Alex Xiao','A'),('Michael Ngo','N')]
        Output:
            Number of rows instered
        """

        stmt='INSERT INTO '+tablename+' ('
        c=0
        for col in headers:
            if c>0:
                stmt+=','
            stmt+=col
            c+=1
        stmt+=') VALUES (?'
        for num in range(1,len(data[0])):
            stmt+=',?'
        stmt+=')'
        return self.run(stmt, data)

    def select(self,tablename, condition=None):
        """
        Run the select
        
        Input:
            tablename: the name of table ( could be joined)
            condition: conditions after where clause
        
        Output:
            Result set: [list of named tuple]
            Row  count: Integer
            Result Type: String data/ cmd/ error 
        
        """
        sql='select * from '+tablename
        if condition!=None:
            sql+=' where '+condition
        return self.run(sql)

In [5]:
#SHARE_PATH="file:in_mem_db?mode=memory&cache=shared" #this is in memory
SHARE_PATH='/tmp/shared_DB'
im_memory_share=DB(SHARE_PATH,uri=True)

In [6]:
def get_im_memory_share():
    return DB(SHARE_PATH,uri=True)