Package Installation

At the console, within the cs207project directory, you can use

pip install -e .

or equivalently

python setup.py install

At this point, four subpackages will be available to you:

  1. timeseries
  2. TimeseriesDB
  3. Similarity
  4. cs207rbtree

In [1]:
import timeseries, TimeseriesDB, Similarity
import cs207rbtree.RedBlackTree as Database

In [2]:
dir(Database)


Out[2]:
['Color',
 'DBDB',
 'RedBlackNode',
 'RedBlackNodeRef',
 'RedBlackTree',
 'Storage',
 'ValueRef',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__spec__',
 'connect',
 'os',
 'pickle',
 'portalocker',
 'struct']

Using the Red-Black Tree

Below is a function that will allow us to visualize our tree (copied from CS207 Fall 2016 lecture notes).

Next, we create a tree, add a few nodes to it, and retrieve their contents.


In [3]:
demoDB = Database.connect("/tmp/test1.dbdb")

In [4]:
demoDB.set("rahul",    81)
demoDB.set("pavlos",   20)
demoDB.set("sarah",    29)
demoDB.set("courtney", 11)
demoDB.set("andrew",   12)
demoDB.set("laura",    81)

In [5]:
demoDB.get("sarah")


Out[5]:
29

In [6]:
demoDB.get("laura")


Out[6]:
81

Multithreadedness


In [1]:
from cs207rbtree import RedBlackTree
from threading import Thread
from pytest import raises
import portalocker
import os

In [2]:
def thread_function(num):
    print("FIRST FUN")
    db = RedBlackTree.connect("/tmp/test6.dbdb")
    db.set("kobe", "baby"+str(num))
    print("1")
    db.set("rahul", "veryyoung"+str(num))
    print("2")
    db.set("pavlos", "stillyoung"+str(num))
    print("3")
    db.set("andy", "old"+str(num))
    print("4")
    db.set("lisa", "ancient"+str(num))   
    print("5")
    db.commit()
    print("6")

def thread_function2():
    db = RedBlackTree.connect("/tmp/test6.dbdb")
    for i in ["kobe","rahul","pavlos","andy","lisa"]:
        print("SECOND FUNC")
        with raises(KeyError):
            print("FAILED")
            print(db.get(i))
   
os.remove('/tmp/test6.dbdb')
t1 = Thread(target=thread_function, args=([1]))
t2 = Thread(target=thread_function2)#, args=(2)) 
t1.start()
t2.start()
print("DONE")


FIRST FUN
DONE
SECOND FUNC
FAILED
SECOND FUNC
FAILED
SECOND FUNC
FAILED
SECOND FUNC
FAILED
SECOND FUNC
FAILED
1
2
3
4
5
6

In [7]:
import time

def thread_function():
    print("THREAD 1")
    db = RedBlackTree.connect("/tmp/test6.dbdb")
    db.set("Laura", "Ware")
    time.sleep(200)
    print("THREAD ONE DONE SLEEPING")
    db.commit()
    print("COMMITED RESULTS")
    
def thread_function2():
    print("THREAD 2")
    db2 = RedBlackTree.connect("/tmp/test6.dbdb")
    with raises(KeyError):
        print(db2.get('Laura'))
    print("THERE")
    time.sleep(10)
    print("THREAD TWO DONE SLEEPING")
    print(db2.get('Laura'))
    

os.remove('/tmp/test6.dbdb')
#t1 = Thread(target=thread_function)
#t2 = Thread(target=thread_function2)
#t1.start()
#t2.start()

import multiprocessing
p = multiprocessing.Process(target=thread_function) 
p2 = multiprocessing.Process(target=thread_function2) 
p.start()
p2.start()
print("I AM DONE")


#db = RedBlackTree.connect("/tmp/test6.dbdb")
#db.set("Laura", "Ware")
#print("HERE")
#db2 = RedBlackTree.connect("/tmp/test6.dbdb")
#print(db2.get("Laura"))
#print("HERE")


I AM DONE
THREAD 1
THREAD 2

In [2]:
os.remove('/tmp/test6.dbdb')
db = RedBlackTree.connect("/tmp/test6.dbdb")
db.set("Laura", "Ware")
print("HERE")


HERE

In [ ]:
db2 = RedBlackTree.connect("/tmp/test6.dbdb")
#print("CONNECTED")
with raises(KeyError):
    print(db2.get("Laura"))
db.commit()
print(db2.get("Laura"))
print("HERE")

db.close()
db2.close()


ensuring superblock
LOCKED?: False
LOCKING

In [4]:
from portalocker.utils import Lock
from portalocker import *
alock = Lock("/tmp/test6.dbdb", timeout=5)
#with assertRaises(Exception): #LockException
    #print("HERE")
alock.acquire()
print("DONE")


---------------------------------------------------------------------------
BlockingIOError                           Traceback (most recent call last)
/Users/courtneycochrane/anaconda/lib/python3.5/site-packages/portalocker/portalocker.py in lock(file_, flags)
     66         try:
---> 67             fcntl.flock(file_.fileno(), flags)
     68         except IOError as exc_value:

BlockingIOError: [Errno 35] Resource temporarily unavailable

During handling of the above exception, another exception occurred:

LockException                             Traceback (most recent call last)
/Users/courtneycochrane/anaconda/lib/python3.5/site-packages/portalocker/utils.py in acquire(self, timeout, check_interval, fail_when_locked)
    125             # Try to lock
--> 126             fh = self._get_lock(fh)
    127         except exceptions.LockException as exception:

/Users/courtneycochrane/anaconda/lib/python3.5/site-packages/portalocker/utils.py in _get_lock(self, fh)
    174         returns LockException if it fails'''
--> 175         portalocker.lock(fh, self.flags)
    176         return fh

/Users/courtneycochrane/anaconda/lib/python3.5/site-packages/portalocker/portalocker.py in lock(file_, flags)
     70             # every IO error
---> 71             raise exceptions.LockException(exc_value)
     72 

LockException: [Errno 35] Resource temporarily unavailable

During handling of the above exception, another exception occurred:

AlreadyLocked                             Traceback (most recent call last)
<ipython-input-4-e1cab6859ce6> in <module>()
      4 #with assertRaises(Exception): #LockException
      5     #print("HERE")
----> 6 alock.acquire()
      7 print("DONE")

/Users/courtneycochrane/anaconda/lib/python3.5/site-packages/portalocker/utils.py in acquire(self, timeout, check_interval, fail_when_locked)
    138                     # If fail_when_locked is true, then stop trying
    139                     if fail_when_locked:
--> 140                         raise exceptions.AlreadyLocked(exception)
    141 
    142                     else:  # pragma: no cover

AlreadyLocked: [Errno 35] Resource temporarily unavailable

In [4]:
from TimeseriesDB.MessageFormatting import *
import importlib
import unittest
from pytest import raises
import numpy as np
from TimeseriesDB.tsdb_error import *
from TimeseriesDB import DatabaseServer
from TimeseriesDB.MessageFormatting import * #Deserializer
from Similarity.find_most_similar import find_most_similiar, sanity_check
from TimeseriesDB.simsearch_init import initialize_simsearch_parameters
from socketserver import BaseRequestHandler, ThreadingTCPServer, TCPServer
from timeseries.ArrayTimeSeries import ArrayTimeSeries as ts
import threading
from socket import socket, AF_INET, SOCK_STREAM
import sys
from scipy.stats import norm
import multiprocessing

In [5]:
def query_1():
    #function to compute simsearch
    print("QUERY1")
    s = socket(AF_INET, SOCK_STREAM)
    s.connect(('localhost', 20000))
    d2 = {'op':'simsearch_id','id':12,'n_closest':2,'courtesy':'please'}
    s2 = serialize(json.dumps(d2))        
    s.send(s2)
    msg = s.recv(8192)
    ds = Deserializer()
    ds.append(msg)
    ds.ready()
    response = ds.deserialize()
    print(response)
    s.close()
    
def query_2():
    #function to return timeseries from id
    print("QUERY2")
    s = socket(AF_INET, SOCK_STREAM)
    s.connect(('localhost', 20000))
    d2 = {'op':'TSfromID','id':12,'courtesy':'please'}
    s2 = serialize(json.dumps(d2))        
    s.send(s2)
    msg = s.recv(8192)
    ds = Deserializer()
    ds.append(msg)
    ds.ready()
    response = ds.deserialize()
    print(response)
    s.close()

In [6]:
TCPServer.allow_reuse_address = True
serv = TCPServer(('', 20000), DatabaseServer)
serv.data = initialize_simsearch_parameters()
serv.deserializer = Deserializer()        
serv_thread = threading.Thread(target=serv.serve_forever)
serv_thread.setDaemon(True)
serv_thread.start()     


p = multiprocessing.Process(target=query_1) 
p2 = multiprocessing.Process(target=query_2) 
p.start()
p2.start()    


serv.socket.close()
serv.server_close()
print("DONE")


---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-6-e59d6e3d42e1> in <module>()
      1 TCPServer.allow_reuse_address = True
----> 2 serv = TCPServer(('', 20000), DatabaseServer)
      3 serv.data = initialize_simsearch_parameters()
      4 serv.deserializer = Deserializer()
      5 serv_thread = threading.Thread(target=serv.serve_forever)

/Users/courtneycochrane/anaconda/envs/py35/lib/python3.5/socketserver.py in __init__(self, server_address, RequestHandlerClass, bind_and_activate)
    438         if bind_and_activate:
    439             try:
--> 440                 self.server_bind()
    441                 self.server_activate()
    442             except:

/Users/courtneycochrane/anaconda/envs/py35/lib/python3.5/socketserver.py in server_bind(self)
    452         if self.allow_reuse_address:
    453             self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
--> 454         self.socket.bind(self.server_address)
    455         self.server_address = self.socket.getsockname()
    456 

OSError: [Errno 48] Address already in use

In [ ]: