In [1]:
from IPython.parallel import Client
import sys
import json
sys.path.append('/Users/hencrice/Dropbox/MyMacWorkspace/AsterixDB_semanticPredictor/predictors/')
from TwitterStream import getTwitterStream
from itertools import izip

On the server, the models folder should be put in the same place as the predictor. This is just for testing on my laptop. Load a few predictors and data transformer for testing:


In [2]:
import os
modelsFolderPath='/Volumes/Data/models/'
clfList=sorted([fName for fName in next(os.walk(modelsFolderPath))[2] if fName.endswith('.pkl') and fName[0]=='c'])
vectorizerList=sorted([fName for fName in next(os.walk(modelsFolderPath))[2] if fName.endswith('.pkl') and fName[0]=='v'])
clfList=clfList[:3]
vectorizerList=vectorizerList[:3]
print((clfList, vectorizerList))
clfList=[load(modelsFolderPath+clfName) for clfName in clfList]
vectorizerList=[load(modelsFolderPath+vectorizerName) for vectorizerName in vectorizerList]


(['clf_Arts.pkl', 'clf_Automotive.pkl', 'clf_Baby.pkl'], ['vectorizerTfIdf_Arts.pkl', 'vectorizerTfIdf_Automotive.pkl', 'vectorizerTfIdf_Baby.pkl'])

Gather some tweets for testing:


In [14]:
fakeData=[]
with getTwitterStream([]) as stream:
    for t in stream:
        tweet=json.loads(t)
        if 'created_at' in tweet and tweet['lang']=='en':
            fakeData.append(tweet['text'])
            if len(fakeData)%100==0:
                print(len(fakeData))
        if len(fakeData)>1000:
            break


100
200
300
400
500
600
700
800
900
1000

In [19]:
vectorizerList[0].transform(fakeData)


Out[19]:
<1001x25925 sparse matrix of type '<type 'numpy.float64'>'
	with 8808 stored elements in Compressed Sparse Row format>

In [20]:
for i in xrange(3):
    print(clfList[i].predict(vectorizerList[i].transform(fakeData)))


[2 1 5 ..., 5 1 2]
[1 1 5 ..., 5 1 1]
[1 1 2 ..., 2 2 1]

In [42]:
%%timeit -n10
serial_result = map(lambda vecAndClf: vecAndClf[1].predict(vecAndClf[0].transform(fakeData)), [vecAndClf for vecAndClf in izip(vectorizerList, clfList)])
serial_result # same as above, so treating functions (i.e. vectorizer and predictors) like data works


10 loops, best of 3: 151 ms per loop

In [45]:
serial_result=map(lambda vecAndClf: vecAndClf[1].predict(vecAndClf[0].transform(fakeData)), [vecAndClf for vecAndClf in izip(vectorizerList, clfList)])

Ipython cluster parallel version is too slow!


In [25]:
rc = Client()

In [ ]:
%%timeit -n10
dview = rc[:] # select all engines
dview.push({'fakeData':fakeData}) # because engines (i.e. slave processes don't know the variable stored in this python namespace, so has to push to them to make them aware)
#lview = rc.load_balanced_view()
#lview.block = True # http://ipython.org/ipython-doc/stable/api/generated/IPython.parallel.client.view.html#IPython.parallel.client.view.LoadBalancedView.map
#lview.map(lambda vecAndClf: vecAndClf[1].predict(vecAndClf[0].transform(fakeData)), [vecAndClf for vecAndClf in izip(vectorizerList, clfList)])
dview.map_sync(lambda vecAndClf: vecAndClf[1].predict(vecAndClf[0].transform(fakeData)), [vecAndClf for vecAndClf in izip(vectorizerList, clfList)])

Maybe we should also try out Cython


In [67]:
%load_ext cythonmagic

In [79]:
%%cython
import numpy as np
cimport numpy as np
from cython.parallel cimport *
def f(np.ndarray[double] x, double alpha):
    cdef double s = 0
    cdef double tmp
    with nogil:
        for i in prange(x.shape[0]):
            # alpha is only read, so shared
            # tmp assigned before being used -> safe and natural to make it implicitly thread-private
            tmp = alpha * i
            s += x[i] * tmp # turns into reduction + thread-private
        s += tmp * 10 # after the loop we emulate sequential loop execution(OpenMP lastprivate)
    return s


Error compiling Cython file:
------------------------------------------------------------
...
from cython.parallel cimport *
def f(np.ndarray[double] x, double alpha):
    cdef double s = 0
    cdef double tmp
    with nogil:
        for i in prange(x.shape[0]):
             ^
------------------------------------------------------------

/Users/hencrice/.ipython/cython/_cython_magic_f1336b02496b60bb2547a25e393af6b5.pyx:8:14: target may not be a Python object as we don't have the GIL

Error compiling Cython file:
------------------------------------------------------------
...
    cdef double tmp
    with nogil:
        for i in prange(x.shape[0]):
            # alpha is only read, so shared
            # tmp assigned before being used -> safe and natural to make it implicitly thread-private
            tmp = alpha * i
                       ^
------------------------------------------------------------

/Users/hencrice/.ipython/cython/_cython_magic_f1336b02496b60bb2547a25e393af6b5.pyx:11:24: Coercion from Python not allowed without the GIL

Error compiling Cython file:
------------------------------------------------------------
...
    cdef double tmp
    with nogil:
        for i in prange(x.shape[0]):
            # alpha is only read, so shared
            # tmp assigned before being used -> safe and natural to make it implicitly thread-private
            tmp = alpha * i
                       ^
------------------------------------------------------------

/Users/hencrice/.ipython/cython/_cython_magic_f1336b02496b60bb2547a25e393af6b5.pyx:11:24: Operation not allowed without gil

Error compiling Cython file:
------------------------------------------------------------
...
    cdef double tmp
    with nogil:
        for i in prange(x.shape[0]):
            # alpha is only read, so shared
            # tmp assigned before being used -> safe and natural to make it implicitly thread-private
            tmp = alpha * i
                       ^
------------------------------------------------------------

/Users/hencrice/.ipython/cython/_cython_magic_f1336b02496b60bb2547a25e393af6b5.pyx:11:24: Converting to Python object not allowed without gil

Error compiling Cython file:
------------------------------------------------------------
...
    with nogil:
        for i in prange(x.shape[0]):
            # alpha is only read, so shared
            # tmp assigned before being used -> safe and natural to make it implicitly thread-private
            tmp = alpha * i
            s += x[i] * tmp # turns into reduction + thread-private
             ^
------------------------------------------------------------

/Users/hencrice/.ipython/cython/_cython_magic_f1336b02496b60bb2547a25e393af6b5.pyx:12:14: Coercion from Python not allowed without the GIL

Error compiling Cython file:
------------------------------------------------------------
...
    with nogil:
        for i in prange(x.shape[0]):
            # alpha is only read, so shared
            # tmp assigned before being used -> safe and natural to make it implicitly thread-private
            tmp = alpha * i
            s += x[i] * tmp # turns into reduction + thread-private
             ^
------------------------------------------------------------

/Users/hencrice/.ipython/cython/_cython_magic_f1336b02496b60bb2547a25e393af6b5.pyx:12:14: Operation not allowed without gil

Error compiling Cython file:
------------------------------------------------------------
...
    with nogil:
        for i in prange(x.shape[0]):
            # alpha is only read, so shared
            # tmp assigned before being used -> safe and natural to make it implicitly thread-private
            tmp = alpha * i
            s += x[i] * tmp # turns into reduction + thread-private
             ^
------------------------------------------------------------

/Users/hencrice/.ipython/cython/_cython_magic_f1336b02496b60bb2547a25e393af6b5.pyx:12:14: Converting to Python object not allowed without gil

Error compiling Cython file:
------------------------------------------------------------
...
    with nogil:
        for i in prange(x.shape[0]):
            # alpha is only read, so shared
            # tmp assigned before being used -> safe and natural to make it implicitly thread-private
            tmp = alpha * i
            s += x[i] * tmp # turns into reduction + thread-private
                     ^
------------------------------------------------------------

/Users/hencrice/.ipython/cython/_cython_magic_f1336b02496b60bb2547a25e393af6b5.pyx:12:22: Operation not allowed without gil

Error compiling Cython file:
------------------------------------------------------------
...
    with nogil:
        for i in prange(x.shape[0]):
            # alpha is only read, so shared
            # tmp assigned before being used -> safe and natural to make it implicitly thread-private
            tmp = alpha * i
            s += x[i] * tmp # turns into reduction + thread-private
                 ^
------------------------------------------------------------

/Users/hencrice/.ipython/cython/_cython_magic_f1336b02496b60bb2547a25e393af6b5.pyx:12:18: Indexing Python object not allowed without gil

Error compiling Cython file:
------------------------------------------------------------
...
    with nogil:
        for i in prange(x.shape[0]):
            # alpha is only read, so shared
            # tmp assigned before being used -> safe and natural to make it implicitly thread-private
            tmp = alpha * i
            s += x[i] * tmp # turns into reduction + thread-private
                                                                  ^
------------------------------------------------------------

/Users/hencrice/.ipython/cython/_cython_magic_f1336b02496b60bb2547a25e393af6b5.pyx:12:67: Converting to Python object not allowed without gil

In [51]:
serial_result


Out[51]:
[array([2, 1, 5, ..., 5, 1, 2], dtype=uint8),
 array([1, 1, 5, ..., 5, 1, 1], dtype=uint8),
 array([1, 1, 2, ..., 2, 2, 1], dtype=uint8)]

In [57]:
results[0]


Out[57]:
1