In [1]:
from IPython.parallel import Client
import sys
import json
sys.path.append('/Users/hencrice/Dropbox/MyMacWorkspace/AsterixDB_semanticPredictor/predictors/')
from TwitterStream import getTwitterStream
from itertools import izip
On the server, the models folder should be put in the same place as the predictor. This is just for testing on my laptop. Load a few predictors and data transformer for testing:
In [2]:
import os
modelsFolderPath='/Volumes/Data/models/'
clfList=sorted([fName for fName in next(os.walk(modelsFolderPath))[2] if fName.endswith('.pkl') and fName[0]=='c'])
vectorizerList=sorted([fName for fName in next(os.walk(modelsFolderPath))[2] if fName.endswith('.pkl') and fName[0]=='v'])
clfList=clfList[:3]
vectorizerList=vectorizerList[:3]
print((clfList, vectorizerList))
clfList=[load(modelsFolderPath+clfName) for clfName in clfList]
vectorizerList=[load(modelsFolderPath+vectorizerName) for vectorizerName in vectorizerList]
Gather some tweets for testing:
In [14]:
fakeData=[]
with getTwitterStream([]) as stream:
for t in stream:
tweet=json.loads(t)
if 'created_at' in tweet and tweet['lang']=='en':
fakeData.append(tweet['text'])
if len(fakeData)%100==0:
print(len(fakeData))
if len(fakeData)>1000:
break
In [19]:
vectorizerList[0].transform(fakeData)
Out[19]:
In [20]:
for i in xrange(3):
print(clfList[i].predict(vectorizerList[i].transform(fakeData)))
In [42]:
%%timeit -n10
serial_result = map(lambda vecAndClf: vecAndClf[1].predict(vecAndClf[0].transform(fakeData)), [vecAndClf for vecAndClf in izip(vectorizerList, clfList)])
serial_result # same as above, so treating functions (i.e. vectorizer and predictors) like data works
In [45]:
serial_result=map(lambda vecAndClf: vecAndClf[1].predict(vecAndClf[0].transform(fakeData)), [vecAndClf for vecAndClf in izip(vectorizerList, clfList)])
In [25]:
rc = Client()
In [ ]:
%%timeit -n10
dview = rc[:] # select all engines
dview.push({'fakeData':fakeData}) # because engines (i.e. slave processes don't know the variable stored in this python namespace, so has to push to them to make them aware)
#lview = rc.load_balanced_view()
#lview.block = True # http://ipython.org/ipython-doc/stable/api/generated/IPython.parallel.client.view.html#IPython.parallel.client.view.LoadBalancedView.map
#lview.map(lambda vecAndClf: vecAndClf[1].predict(vecAndClf[0].transform(fakeData)), [vecAndClf for vecAndClf in izip(vectorizerList, clfList)])
dview.map_sync(lambda vecAndClf: vecAndClf[1].predict(vecAndClf[0].transform(fakeData)), [vecAndClf for vecAndClf in izip(vectorizerList, clfList)])
In [67]:
%load_ext cythonmagic
In [79]:
%%cython
import numpy as np
cimport numpy as np
from cython.parallel cimport *
def f(np.ndarray[double] x, double alpha):
cdef double s = 0
cdef double tmp
with nogil:
for i in prange(x.shape[0]):
# alpha is only read, so shared
# tmp assigned before being used -> safe and natural to make it implicitly thread-private
tmp = alpha * i
s += x[i] * tmp # turns into reduction + thread-private
s += tmp * 10 # after the loop we emulate sequential loop execution(OpenMP lastprivate)
return s
In [51]:
serial_result
Out[51]:
In [57]:
results[0]
Out[57]: