In [1]:
from IPython import parallel
clients = parallel.Client(profile='parallel')

In [3]:
print clients.ids
print "Total %i cores"%(len(clients.ids))


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Total 17 cores

In [4]:
%%px --local

import sys
sys.path.append("\\\\DAP-NAS\\work\\CNOP")
import cPickle as pickle
import numpy as np
import pandas as pd
from statsmodels.tsa.arima_process import arma_generate_sample
import matplotlib.pyplot as plt
import numpy as np
from CNOP import CNOP
import winsound
def threshold(x,thresholds=[],values=[-1,0,1]):
    for threshold,val in zip(thresholds,values):
        if x < threshold: 
            return val
    return values[-1]
import time
from itertools import repeat
import os
from datetime import datetime

In [5]:
%%px --local
x2000 = pd.read_csv("\\\\DAP-NAS\\work\\CNOP\\x2000.csv", delimiter=";")

In [6]:
%%px --local

N=100

def full_overlap(N, df, give_distortion=True):
    ########################################################
    ##### This code is for full-overlap case of CNOP########
    ########################################################

    #PreSampling:
    df = df[:N]
    #df = x2000[:N]
    
    ### DGP generation
    regime = pd.DataFrame() 
    beta,   alpha   = [0.6, 0.4, 0.8], [0.85, 1.55]
    gammam, mum =     [0.4, 0.3, 0.9], [-1.2, 0.07]
    gammap, mup =     [0.2, 0.8, 0.3], [1.28, 2.5]
    distortion = np.random.randn(3,N)
    regime["xbeta"]    = df.dot(beta) + distortion[0]
    regime['z-gammam'] = df.dot(gammam)  + distortion[1]
    regime['z+gammap'] = df.dot(gammap)  + distortion[2]
    regime['regime'] = regime['xbeta'].apply(lambda x: threshold(x,thresholds=alpha))
    regime['Y-']=regime['z-gammam'].apply(lambda x: threshold(x, thresholds=mum,values=[-2,-1,0]))
    regime['Y+']=regime['z+gammap'].apply(lambda x: threshold(x, thresholds=mup,values=[0,1,2]))
    df['Y'] = 0
    df['Y'] += np.where(regime['regime']==-1,regime['Y-'],0)
    df['Y'] += np.where(regime['regime']==1,regime['Y+'],0)
    ###df is full data matrix

    #Model starts here:
    exog = df[["X1", "X2", "X3"]]
    endog = df[["Y"]]
    l = {0:df}
    pan = pd.Panel(l)
    y      = pan.ix[:,:,['Y']]
    x      = pan.ix[:,:,["X1", "X2", "X3"]]
    zminus = pan.ix[:,:,["X1", "X2", "X3"]]
    zplus  = pan.ix[:,:,["X1", "X2", "X3"]]
    exog = {'x':x,'zplus':zplus,'zminus':zminus}
    CNOP4 = CNOP(y,exog, model='CNOP',interest_step=1)

    #Counting execution time for fit ...
    exec_time = {"fit":0,"se":0}
    start_time = time.time()
    res = CNOP4.fit( maxiter=250, disp=0)
    exec_time["fit"] = time.time() - start_time

    # And for Standard Errors as well
    try:
        start_time = time.time()
        res["se"] = CNOP4.se(res.x)
        exec_time["se"] = time.time() - start_time
        res['status'] = "OK"
    except Exception, e:
        print e
        res['status'] = e

    res["exec_time"]=exec_time

    if give_distortion:
        return res, distortion
    else:
        return res

In [8]:
N=100
res, distortion = full_overlap(N, df=x2000)
print "Done for N=%i" % (N)
print "Res.status:", res.success

beta,   alpha   = [0.6, 0.4, 0.8], [0.85, 1.55]
gammam, mum =     [0.4, 0.3, 0.9], [-1.2, 0.07]
gammap, mup =     [0.2, 0.8, 0.3], [1.28, 2.5]
res_real = beta+alpha+gammam+mum+gammap+mup
%matplotlib inline
_ = plt.bar(np.arange(len(res.x-res_real)),
                  list(res.x-res_real))

winsound.PlaySound(u'C:\Windows\Media\Windows Print complete.wav', winsound.SND_FILENAME)
print "X & SE:", np.dstack((res["x"],res["se"]))


Done for N=100
Res.status: True
X & SE: [[[  2.81476767e-01   1.69101785e-01]
  [  4.10339962e-01   1.54701777e-01]
  [  1.10619866e+00   2.46646900e-01]
  [ -5.23392304e-01   2.99204718e-01]
  [  2.22209892e+00   4.77711028e-01]
  [  2.84088182e+00   1.86006294e+00]
  [ -2.97480086e-01   6.36473368e-01]
  [  2.35259017e+00   1.76802337e+00]
  [  7.87947780e-01   1.33009169e+00]
  [  7.99017787e+00   5.87944485e+00]
  [  4.40798118e+01   8.41972438e+04]
  [  1.28265188e+02              nan]
  [  9.00229536e+01   1.20072540e+05]
  [  2.23739824e+01   1.20914887e+05]
  [  2.42965786e+02   3.60215399e+05]]]

In [7]:
%%px --local

def runMC(n_obs, n_repl, fun, path = "W:/CNOP/dumps/", prefix = "CNOPres", ext = ".p", **kwargs):
    print "Starting MC on %i cores, %i replications of \"%s\" with %i observations"\
               %(len(clients.ids), n_repl, fun.__name__, n_obs)
    print "If you again run next(), you will get wait_interactive() form and results would be backed up"
    print "Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!"
    view = clients.load_balanced_view()

    def doJob(i):
        n_obs,path,fun, kwargs=i
        dumb_str=time.strftime("%Y%m%d%H%M%S")+str(np.random.rand())

        filename = path + dumb_str

        try:
            local_res = fun(n_obs, **kwargs)
            pickle.dump(local_res, open( filename, "wb" ) )
            return local_res, filename
        except Exception, e:
            pickle.dump(e, open( filename, "wb" ) )
            return Exception, e, filename
    
    cur_time_str=time.strftime('%Y%m%d_%H%M%S')
    temp_folder_str = path+'temp/'+cur_time_str+"/"
    if not os.path.exists(temp_folder_str):
        os.makedirs(temp_folder_str)

    print temp_folder_str
    readme_f = file(temp_folder_str+"!README.txt", "w")
    readme_f.write("Doing MC on %i cores, %i replications of \"%s\" with %i observations"\
                   %(len(clients.ids), n_repl, fun.__name__, n_obs))
    readme_f.close()

    ar = view.map_async(doJob, [[n_obs, temp_folder_str, fun, kwargs]]*n_repl)
    yield ar

    ar.wait_interactive()
    results = ar.get()
    cur_time_str=time.strftime('%Y%m%d_%H%M%S')
    filename = path + prefix + cur_time_str + ext
    print "DONE! DB in %s"%(temp_folder_str)
    yield results

In [8]:
MCgenr = runMC(250, 10000, full_overlap, df=x2000)
MCAsyncMapResult = next(MCgenr)


Starting MC on 8 cores, 10000 replications of "full_overlap" with 250 observations
If you again run next(), you will get wait_interactive() form and results would be backed up
Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!
W:/CNOP/dumps/temp/20150323_212714/

In [9]:
results=next(MCgenr)


10000/10000 tasks finished after 31304 s
done
DONE! DB in W:/CNOP/dumps/temp/20150323_212714/

In [10]:
res_250obs = results

In [27]:
fails = [x for x in results if len(x[0])<2]

In [28]:
fails


Out[28]:
[]

In [90]:
fails_obj = [pickle.load(file(x, "r")) for x in fails]

In [91]:
fails_obj


Out[91]:
[IndexError('index 14 is out of bounds for axis 0 with size 14'),
 IndexError('index 14 is out of bounds for axis 0 with size 14'),
 IndexError('index 14 is out of bounds for axis 0 with size 14'),
 IndexError('index 14 is out of bounds for axis 0 with size 14'),
 IndexError('index 14 is out of bounds for axis 0 with size 14'),
 IndexError('index 14 is out of bounds for axis 0 with size 14')]

In [29]:
res_100obs = [x[0][0] for x in results if len(x[0])>0 and x[0][0].success]

In [30]:
xs100 =np.mean([i.x for i in res_100obs], axis = 0)
se100_true = np.std([i.x for i in res_100obs], axis=0)
ses100 =np.nanmean([i.se for i in res_100obs if "se" in i], axis = 0)

In [31]:
_ = plt.bar(np.arange(len(xs100-res_real)),
                  list(xs100-res_real), label="params")
plt.axis([0,18,-10,10])
print "SE (true):", se100_true
print "Bias:", np.mean(xs100-res_real)
print "RMSE:", np.sqrt(np.mean((xs100-res_real)**2))


SE (true): [ 0.383111    0.44997838  0.50774186  1.18040036  1.01104538  0.3427681
  0.30961436  0.7105371   0.73162982  2.23039764  0.47624193  0.58189487
  1.02723329  2.66345378  1.77246904]
Bias: 0.105209810719
RMSE: 0.535956128539

In [89]:
pickle.loads(fails[0])


---------------------------------------------------------------------------
UnpicklingError                           Traceback (most recent call last)
<ipython-input-89-db1e9629214b> in <module>()
----> 1 pickle.loads(fails[0])

UnpicklingError: invalid load key, 'W'.

In [11]:
MCgenr = runMC(100, 10000, full_overlap, df=x2000)
MCAsyncMapResult = next(MCgenr)


Starting MC on 8 cores, 10000 replications of "full_overlap" with 100 observations
If you again run next(), you will get wait_interactive() form and results would be backed up
Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!
W:/CNOP/dumps/temp/20150324_060859/

In [12]:
res_100obs=next(MCgenr)


10000/10000 tasks finished after 11368 s
done
DONE! DB in W:/CNOP/dumps/temp/20150324_060859/

In [13]:
MCgenr = runMC(500, 10000, full_overlap, df=x2000)
MCAsyncMapResult = next(MCgenr)


Starting MC on 8 cores, 10000 replications of "full_overlap" with 500 observations
If you again run next(), you will get wait_interactive() form and results would be backed up
Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!
W:/CNOP/dumps/temp/20150324_101209/

In [14]:
res_500obs=next(MCgenr)


10000/10000 tasks finished after 68469 s
done
DONE! DB in W:/CNOP/dumps/temp/20150324_101209/

In [10]:
MCgenr = runMC(1000, 4860, full_overlap, df=x2000)
MCAsyncMapResult = next(MCgenr)


Starting MC on 8 cores, 4860 replications of "full_overlap" with 1000 observations
If you again run next(), you will get wait_interactive() form and results would be backed up
Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!
W:/CNOP/dumps/temp/20150326_120353/

In [11]:
res_1000obs=next(MCgenr)


4860/4860 tasks finished after 74888 s
done
DONE! DB in W:/CNOP/dumps/temp/20150326_120353/

In [43]:
pickle.dump(res_1000obs, file("W:/CNOP/dumps/temp/20150326_120353/!all_res", "w"))

125 obs


In [11]:
MCgenr = runMC(125, 10000, full_overlap, df=x2000, path="\\\\DAP-NAS\\work\\CNOP\\dumps\\")
MCAsyncMapResult = next(MCgenr)


Starting MC on 17 cores, 10000 replications of "full_overlap" with 125 observations
If you again run next(), you will get wait_interactive() form and results would be backed up
Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!
\\DAP-NAS\work\CNOP\dumps\temp/20150405_141224/

In [12]:
res_1000obs=next(MCgenr)


10000/10000 tasks finished after 9951 s
done
[Engine Exception]
Traceback (most recent call last):
  File "C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\controller\scheduler.py", line 357, in handle_stranded_tasks
    raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
EngineError: Engine '75ecec11-1ca9-4482-abb3-361446436889' died while running task u'f4da4fdf-b817-4975-9bc7-1f37f3b41790'

[Engine Exception]
Traceback (most recent call last):
  File "C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\controller\scheduler.py", line 357, in handle_stranded_tasks
    raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
EngineError: Engine 'ef1e0c54-5f8e-4b86-a4bc-e4487e2c0fad' died while running task u'0b5826c2-8778-4ddc-b8a6-a64f61e2e739'

In [25]:
res_150obs=MCAsyncMapResult.get()


[Engine Exception]
Traceback (most recent call last):
  File "C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\controller\scheduler.py", line 357, in handle_stranded_tasks
    raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
EngineError: Engine '75ecec11-1ca9-4482-abb3-361446436889' died while running task u'f4da4fdf-b817-4975-9bc7-1f37f3b41790'

[Engine Exception]
Traceback (most recent call last):
  File "C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\controller\scheduler.py", line 357, in handle_stranded_tasks
    raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
EngineError: Engine 'ef1e0c54-5f8e-4b86-a4bc-e4487e2c0fad' died while running task u'0b5826c2-8778-4ddc-b8a6-a64f61e2e739'

In [30]:
i=MCAsyncMapResult.get_dict()


[Engine Exception]
Traceback (most recent call last):
  File "C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\controller\scheduler.py", line 357, in handle_stranded_tasks
    raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
EngineError: Engine '75ecec11-1ca9-4482-abb3-361446436889' died while running task u'f4da4fdf-b817-4975-9bc7-1f37f3b41790'

[Engine Exception]
Traceback (most recent call last):
  File "C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\controller\scheduler.py", line 357, in handle_stranded_tasks
    raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
EngineError: Engine 'ef1e0c54-5f8e-4b86-a4bc-e4487e2c0fad' died while running task u'0b5826c2-8778-4ddc-b8a6-a64f61e2e739'

In [35]:
rere=[i for i in MCAsyncMapResult]


[Engine Exception]
Traceback (most recent call last):
  File "C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\controller\scheduler.py", line 357, in handle_stranded_tasks
    raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
EngineError: Engine '75ecec11-1ca9-4482-abb3-361446436889' died while running task u'f4da4fdf-b817-4975-9bc7-1f37f3b41790'

[Engine Exception]
Traceback (most recent call last):
  File "C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\controller\scheduler.py", line 357, in handle_stranded_tasks
    raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
EngineError: Engine 'ef1e0c54-5f8e-4b86-a4bc-e4487e2c0fad' died while running task u'0b5826c2-8778-4ddc-b8a6-a64f61e2e739'

In [63]:
ll=[]
for i in range(len(MCAsyncMapResult)):
    try:
        ll.append(MCAsyncMapResult[i])
    except:
        print i


4091
4097

In [71]:
for lll, i in enumerate(ll):
    if len(i) != 1:
        print

In [74]:
ll=[i[0] for i in ll]

In [75]:
pickle.dump(ll, file("W:\\CNOP\\dumps\\MC 31.03-results\\3Full\\res150_CHECKED", "w"))

In [24]:
help(MCAsyncMapResult.)


[Engine Exception]
Traceback (most recent call last):
  File "C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\controller\scheduler.py", line 357, in handle_stranded_tasks
    raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
EngineError: Engine '75ecec11-1ca9-4482-abb3-361446436889' died while running task u'f4da4fdf-b817-4975-9bc7-1f37f3b41790'

[Engine Exception]
Traceback (most recent call last):
  File "C:\Users\Andrew\Anaconda\lib\site-packages\IPython\parallel\controller\scheduler.py", line 357, in handle_stranded_tasks
    raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
EngineError: Engine 'ef1e0c54-5f8e-4b86-a4bc-e4487e2c0fad' died while running task u'0b5826c2-8778-4ddc-b8a6-a64f61e2e739'

In [ ]:


In [10]:
MCAsyncMapResult.abort()

In [ ]:
pickle.dump(res_1000obs, file("W:/CNOP/dumps/temp/20150326_120353/!all_res", "w"))

Тесты:


In [24]:
MCgenr = runMC(100, 100, full_overlap, df=x2000, path="\\\\DAP-NAS\\work\\CNOP\\dumps\\")
MCAsyncMapResult = next(MCgenr)


Starting MC on 13 cores, 100 replications of "full_overlap" with 100 observations
If you again run next(), you will get wait_interactive() form and results would be backed up
Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!
\\DAP-NAS\work\CNOP\dumps\temp/20150327_202812/

In [25]:
res_test=next(MCgenr)


 100/100 tasks finished after   70 s
done
DONE! DB in \\DAP-NAS\work\CNOP\dumps\temp/20150327_202812/