In [1]:
from IPython import parallel
clients = parallel.Client(profile='parallel')

In [2]:
print clients.ids
print "Total %i cores"%(len(clients.ids))


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Total 17 cores

In [3]:
%%px --local

import sys
sys.path.append("\\\\DAP-NAS\\work\\CNOP")
import cPickle as pickle
import numpy as np
import pandas as pd
from statsmodels.tsa.arima_process import arma_generate_sample
import matplotlib.pyplot as plt
import numpy as np
from CNOP import CNOP
import winsound
def threshold(x,thresholds=[],values=[-1,0,1]):
    for threshold,val in zip(thresholds,values):
        if x < threshold: 
            return val
    return values[-1]
import time
from itertools import repeat
import os
from datetime import datetime

In [4]:
%%px --local
x2000 = pd.read_csv("\\\\DAP-NAS\\work\\CNOP\\x2000.csv", delimiter=";")

In [5]:
%%px --local

N=100

def no_overlap(N, df, give_distortion=True):
    ########################################################
    ###### This code is for no-overlap case of CNOP#########
    ########################################################

    #PreSampling:
    df = df[:N]
    #df = x2000[:N]
    
    ### DGP generation
    regime = pd.DataFrame() 
    beta,   alpha   = [0.6], [0.95, 1.45]
    gammam, mum =     [0.9], [-1.22, 0.03]
    gammap, mup =     [0.8], [-0.03, 1.18]
    distortion = np.random.randn(3,N)
    regime["xbeta"]    = df[["X1"]].dot(beta)    + distortion[0]
    regime['z-gammam'] = df[["X2"]].dot(gammam)  + distortion[1]
    regime['z+gammap'] = df[["X3"]].dot(gammap)  + distortion[2]
    regime['regime'] = regime['xbeta'].apply(lambda x: threshold(x,thresholds=alpha))
    regime['Y-']=regime['z-gammam'].apply(lambda x: threshold(x, thresholds=mum,values=[-2,-1,0]))
    regime['Y+']=regime['z+gammap'].apply(lambda x: threshold(x, thresholds=mup,values=[0,1,2]))
    df['Y'] = 0
    df['Y'] += np.where(regime['regime']==-1,regime['Y-'],0)
    df['Y'] += np.where(regime['regime']==1,regime['Y+'],0)
    ###df is full data matrix

    #Model starts here:
    exog = df[["X1", "X2", "X3"]]
    endog = df[["Y"]]
    l = {0:df}
    pan = pd.Panel(l)
    y      = pan.ix[:,:,['Y']]
    x      = pan.ix[:,:,["X1"]]
    zminus = pan.ix[:,:,["X2"]]
    zplus  = pan.ix[:,:,["X3"]]
    exog = {'x':x,'zplus':zplus,'zminus':zminus}
    CNOP4 = CNOP(y,exog, model='CNOP',interest_step=1)

    #Counting execution time for fit ...
    exec_time = {"fit":0,"se":0}
    start_time = time.time()
    res = CNOP4.fit( maxiter=250, disp=0)
    exec_time["fit"] = time.time() - start_time

    # And for Standard Errors as well
    try:
        start_time = time.time()
        res["se"] = CNOP4.se(res.x)
        exec_time["se"] = time.time() - start_time
        res['status'] = "OK"
    except Exception, e:
        print e
        res['status'] = e

    res["exec_time"]=exec_time

    if give_distortion:
        return res, distortion
    else:
        return res

In [ ]:
N=100
res, distortion = no_overlap(N, df=x2000)
print "Done for N=%i" % (N)
print "Res.status:", res.success

beta,   alpha   = [0.6], [0.95, 1.45]
gammam, mum =     [0.9], [-1.22, 0.03]
gammap, mup =     [0.8], [-0.03, 1.18]
res_real = beta+alpha+gammam+mum+gammap+mup
%matplotlib inline
_ = plt.bar(np.arange(len(res.x-res_real)),
                  list(res.x-res_real))

winsound.PlaySound(u'C:\Windows\Media\Windows Print complete.wav', winsound.SND_FILENAME)
print "X & SE:", np.dstack((res["x"],res["se"]))

In [6]:
%%px --local

def runMC(n_obs, n_repl, fun, path="\\\\DAP-NAS\\work\\CNOP\\dumps\\", prefix = "CNOPres", ext = ".p", **kwargs):
    print "Starting MC on %i cores, %i replications of \"%s\" with %i observations"\
               %(len(clients.ids), n_repl, fun.__name__, n_obs)
    print "If you again run next(), you will get wait_interactive() form and results would be backed up"
    print "Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!"
    view = clients.load_balanced_view()

    def doJob(i):
        n_obs,path,fun, kwargs=i
        dumb_str=time.strftime("%Y%m%d%H%M%S")+str(np.random.rand())

        filename = path + dumb_str

        try:
            local_res = fun(n_obs, **kwargs)
            pickle.dump(local_res, open( filename, "wb" ) )
            return local_res, filename
        except Exception, e:
            pickle.dump(e, open( filename, "wb" ) )
            return Exception, e, filename
    
    cur_time_str=time.strftime('%Y%m%d_%H%M%S')
    temp_folder_str = path+'temp/'+cur_time_str+"/"
    if not os.path.exists(temp_folder_str):
        os.makedirs(temp_folder_str)

    print temp_folder_str
    readme_f = file(temp_folder_str+"!README.txt", "w")
    readme_f.write("Doing MC on %i cores, %i replications of \"%s\" with %i observations"\
                   %(len(clients.ids), n_repl, fun.__name__, n_obs))
    readme_f.close()

    ar = view.map_async(doJob, [[n_obs, temp_folder_str, fun, kwargs]]*n_repl)
    yield ar

    ar.wait_interactive()
    results = ar.get()
    cur_time_str=time.strftime('%Y%m%d_%H%M%S')
    filename = path + prefix + cur_time_str + ext
    print "DONE! DB in %s"%(temp_folder_str)
    yield results

In [20]:
MCgenr100 = runMC(100, 10000, no_overlap, df=x2000, path="\\\\DAP-NAS\\work\\CNOP\\dumps\\")
MCAsyncMapResult100 = next(MCgenr100)


Starting MC on 13 cores, 10000 replications of "no_overlap" with 100 observations
If you again run next(), you will get wait_interactive() form and results would be backed up
Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!
\\DAP-NAS\work\CNOP\dumps\temp/20150327_204924/

In [21]:
results100=next(MCgenr100)
pickle.dump(results100, file("\\\\DAP-NAS\\work\\CNOP\\dumps\\res100NoOverlap", "w"))


10000/10000 tasks finished after 4049 s
done
DONE! DB in \\DAP-NAS\work\CNOP\dumps\temp/20150327_204924/

In [28]:
del results100

In [29]:
MCgenr250 = runMC(250, 10000, no_overlap, df=x2000, path="\\\\DAP-NAS\\work\\CNOP\\dumps\\")
MCAsyncMapResult250 = next(MCgenr250)


Starting MC on 16 cores, 10000 replications of "no_overlap" with 250 observations
If you again run next(), you will get wait_interactive() form and results would be backed up
Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!
\\DAP-NAS\work\CNOP\dumps\temp/20150327_215959/

In [30]:
results250=next(MCgenr250)
pickle.dump(results250, file("\\\\DAP-NAS\\work\\CNOP\\dumps\\res250NoOverlap", "w"))


10000/10000 tasks finished after 9260 s
done
DONE! DB in \\DAP-NAS\work\CNOP\dumps\temp/20150327_215959/

In [31]:
del results250

In [7]:
MCgenr500 = runMC(500, 4225, no_overlap, df=x2000, path="\\\\DAP-NAS\\work\\CNOP\\dumps\\")
MCAsyncMapResult500 = next(MCgenr500)


Starting MC on 15 cores, 4225 replications of "no_overlap" with 500 observations
If you again run next(), you will get wait_interactive() form and results would be backed up
Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!
\\DAP-NAS\work\CNOP\dumps\temp/20150328_094006/

In [ ]:
results500=next(MCgenr500)
pickle.dump(results500, file("\\\\DAP-NAS\\work\\CNOP\\dumps\\res500NoOverlap(half)", "w"))


   0/4225 tasks finished after   11 s

In [ ]:
del results500

In [11]:
MCgenr1000 = runMC(1000, 9811, no_overlap, df=x2000, path="\\\\DAP-NAS\\work\\CNOP\\dumps\\")
MCAsyncMapResult1000 = next(MCgenr1000)


Starting MC on 17 cores, 9811 replications of "no_overlap" with 1000 observations
If you again run next(), you will get wait_interactive() form and results would be backed up
Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!
\\DAP-NAS\work\CNOP\dumps\temp/20150328_193826/

In [12]:
results1000=next(MCgenr1000)
pickle.dump(results1000, file("\\\\DAP-NAS\\work\\CNOP\\dumps\\res1000NoOverlap", "w"))


9811/9811 tasks finished after 47594 s
done
DONE! DB in \\DAP-NAS\work\CNOP\dumps\temp/20150328_193826/

In [ ]:
del results1000

150 obs


In [7]:
MCgenr150 = runMC(150, 10000, no_overlap, df=x2000, path="\\\\DAP-NAS\\work\\CNOP\\dumps\\")
MCAsyncMapResult150 = next(MCgenr150)


Starting MC on 17 cores, 10000 replications of "no_overlap" with 150 observations
If you again run next(), you will get wait_interactive() form and results would be backed up
Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!
\\DAP-NAS\work\CNOP\dumps\temp/20150405_184016/

In [8]:
results150=next(MCgenr150)


10000/10000 tasks finished after 6592 s
done
DONE! DB in \\DAP-NAS\work\CNOP\dumps\temp/20150405_184016/

In [10]:
pickle.dump(results150, file("W:\\CNOP\\dumps\\MC 31.03-results\\1No+\\res150_CHECKED", "w"))

In [11]:
del results150

Тесты:


In [9]:
MCgenr1000 = runMC(1000, 100, no_overlap, df=x2000, path="\\\\DAP-NAS\\work\\CNOP\\dumps\\")
MCAsyncMapResult1000 = next(MCgenr1000)


Starting MC on 8 cores, 100 replications of "no_overlap" with 1000 observations
If you again run next(), you will get wait_interactive() form and results would be backed up
Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!
\\DAP-NAS\work\CNOP\dumps\temp/20150329_150059/

In [1]:
rere=[i for i in MCAsyncMapResult1000]


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-1-5c06d486f7c2> in <module>()
----> 1 rere=[i for i in MCAsyncMapResult1000]

NameError: name 'MCAsyncMapResult1000' is not defined

In [ ]:


In [10]:
results1000=next(MCgenr1000)


---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-10-bd280df30743> in <module>()
----> 1 results1000=next(MCgenr1000)

<ipython-input-8-110071053e29> in runMC(n_obs, n_repl, fun, path, prefix, ext, **kwargs)
     33     yield ar
     34 
---> 35     ar.wait_interactive()
     36     results = ar.get()
     37     cur_time_str=time.strftime('%Y%m%d_%H%M%S')

C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\client\asyncresult.pyc in wait_interactive(self, interval, timeout)
    394             self.wait(interval)
    395             clear_output(wait=True)
--> 396             print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
    397             sys.stdout.flush()
    398         print()

C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\client\asyncresult.pyc in elapsed(self)
    345     def elapsed(self):
    346         """elapsed time since initial submission"""
--> 347         if self.ready():
    348             return self.wall_time
    349 

C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\client\asyncresult.pyc in ready(self)
    127         """Return whether the call has completed."""
    128         if not self._ready:
--> 129             self.wait(0)
    130         elif not self._outputs_ready:
    131             self._wait_for_outputs(0)

C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\client\asyncresult.pyc in wait(self, timeout)
    141             self._wait_for_outputs(timeout)
    142             return
--> 143         self._ready = self._client.wait(self.msg_ids, timeout)
    144         if self._ready:
    145             try:

C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\client\client.pyc in wait(self, jobs, timeout)
   1089             if timeout >= 0 and ( time.time()-tic ) > timeout:
   1090                 break
-> 1091             time.sleep(1e-3)
   1092             self.spin()
   1093         return len(theids.intersection(self.outstanding)) == 0

KeyboardInterrupt: 

In [11]:
MCAsyncMapResult1000.abort()

In [ ]: