In [1]:
from IPython import parallel
clients = parallel.Client(profile='parallel')
In [3]:
print clients.ids
print "Total %i cores"%(len(clients.ids))
In [4]:
%%px --local
import sys
sys.path.append("\\\\DAP-NAS\\work\\CNOP")
import cPickle as pickle
import numpy as np
import pandas as pd
from statsmodels.tsa.arima_process import arma_generate_sample
import matplotlib.pyplot as plt
import numpy as np
from CNOP import CNOP
import winsound
def threshold(x,thresholds=[],values=[-1,0,1]):
for threshold,val in zip(thresholds,values):
if x < threshold:
return val
return values[-1]
import time
from itertools import repeat
import os
from datetime import datetime
In [5]:
%%px --local
x2000 = pd.read_csv("\\\\DAP-NAS\\work\\CNOP\\x2000.csv", delimiter=";")
In [6]:
%%px --local
N=100
def full_overlap(N, df, give_distortion=True):
########################################################
##### This code is for full-overlap case of CNOP########
########################################################
#PreSampling:
df = df[:N]
#df = x2000[:N]
### DGP generation
regime = pd.DataFrame()
beta, alpha = [0.6, 0.4, 0.8], [0.85, 1.55]
gammam, mum = [0.4, 0.3, 0.9], [-1.2, 0.07]
gammap, mup = [0.2, 0.8, 0.3], [1.28, 2.5]
distortion = np.random.randn(3,N)
regime["xbeta"] = df.dot(beta) + distortion[0]
regime['z-gammam'] = df.dot(gammam) + distortion[1]
regime['z+gammap'] = df.dot(gammap) + distortion[2]
regime['regime'] = regime['xbeta'].apply(lambda x: threshold(x,thresholds=alpha))
regime['Y-']=regime['z-gammam'].apply(lambda x: threshold(x, thresholds=mum,values=[-2,-1,0]))
regime['Y+']=regime['z+gammap'].apply(lambda x: threshold(x, thresholds=mup,values=[0,1,2]))
df['Y'] = 0
df['Y'] += np.where(regime['regime']==-1,regime['Y-'],0)
df['Y'] += np.where(regime['regime']==1,regime['Y+'],0)
###df is full data matrix
#Model starts here:
exog = df[["X1", "X2", "X3"]]
endog = df[["Y"]]
l = {0:df}
pan = pd.Panel(l)
y = pan.ix[:,:,['Y']]
x = pan.ix[:,:,["X1", "X2", "X3"]]
zminus = pan.ix[:,:,["X1", "X2", "X3"]]
zplus = pan.ix[:,:,["X1", "X2", "X3"]]
exog = {'x':x,'zplus':zplus,'zminus':zminus}
CNOP4 = CNOP(y,exog, model='CNOP',interest_step=1)
#Counting execution time for fit ...
exec_time = {"fit":0,"se":0}
start_time = time.time()
res = CNOP4.fit( maxiter=250, disp=0)
exec_time["fit"] = time.time() - start_time
# And for Standard Errors as well
try:
start_time = time.time()
res["se"] = CNOP4.se(res.x)
exec_time["se"] = time.time() - start_time
res['status'] = "OK"
except Exception, e:
print e
res['status'] = e
res["exec_time"]=exec_time
if give_distortion:
return res, distortion
else:
return res
In [8]:
N=100
res, distortion = full_overlap(N, df=x2000)
print "Done for N=%i" % (N)
print "Res.status:", res.success
beta, alpha = [0.6, 0.4, 0.8], [0.85, 1.55]
gammam, mum = [0.4, 0.3, 0.9], [-1.2, 0.07]
gammap, mup = [0.2, 0.8, 0.3], [1.28, 2.5]
res_real = beta+alpha+gammam+mum+gammap+mup
%matplotlib inline
_ = plt.bar(np.arange(len(res.x-res_real)),
list(res.x-res_real))
winsound.PlaySound(u'C:\Windows\Media\Windows Print complete.wav', winsound.SND_FILENAME)
print "X & SE:", np.dstack((res["x"],res["se"]))
In [7]:
%%px --local
def runMC(n_obs, n_repl, fun, path = "W:/CNOP/dumps/", prefix = "CNOPres", ext = ".p", **kwargs):
print "Starting MC on %i cores, %i replications of \"%s\" with %i observations"\
%(len(clients.ids), n_repl, fun.__name__, n_obs)
print "If you again run next(), you will get wait_interactive() form and results would be backed up"
print "Untill then you are free to use AsyncResult object that was apready yielded, but data will not be backed up!"
view = clients.load_balanced_view()
def doJob(i):
n_obs,path,fun, kwargs=i
dumb_str=time.strftime("%Y%m%d%H%M%S")+str(np.random.rand())
filename = path + dumb_str
try:
local_res = fun(n_obs, **kwargs)
pickle.dump(local_res, open( filename, "wb" ) )
return local_res, filename
except Exception, e:
pickle.dump(e, open( filename, "wb" ) )
return Exception, e, filename
cur_time_str=time.strftime('%Y%m%d_%H%M%S')
temp_folder_str = path+'temp/'+cur_time_str+"/"
if not os.path.exists(temp_folder_str):
os.makedirs(temp_folder_str)
print temp_folder_str
readme_f = file(temp_folder_str+"!README.txt", "w")
readme_f.write("Doing MC on %i cores, %i replications of \"%s\" with %i observations"\
%(len(clients.ids), n_repl, fun.__name__, n_obs))
readme_f.close()
ar = view.map_async(doJob, [[n_obs, temp_folder_str, fun, kwargs]]*n_repl)
yield ar
ar.wait_interactive()
results = ar.get()
cur_time_str=time.strftime('%Y%m%d_%H%M%S')
filename = path + prefix + cur_time_str + ext
print "DONE! DB in %s"%(temp_folder_str)
yield results
In [8]:
MCgenr = runMC(250, 10000, full_overlap, df=x2000)
MCAsyncMapResult = next(MCgenr)
In [9]:
results=next(MCgenr)
In [10]:
res_250obs = results
In [27]:
fails = [x for x in results if len(x[0])<2]
In [28]:
fails
Out[28]:
In [90]:
fails_obj = [pickle.load(file(x, "r")) for x in fails]
In [91]:
fails_obj
Out[91]:
In [29]:
res_100obs = [x[0][0] for x in results if len(x[0])>0 and x[0][0].success]
In [30]:
xs100 =np.mean([i.x for i in res_100obs], axis = 0)
se100_true = np.std([i.x for i in res_100obs], axis=0)
ses100 =np.nanmean([i.se for i in res_100obs if "se" in i], axis = 0)
In [31]:
_ = plt.bar(np.arange(len(xs100-res_real)),
list(xs100-res_real), label="params")
plt.axis([0,18,-10,10])
print "SE (true):", se100_true
print "Bias:", np.mean(xs100-res_real)
print "RMSE:", np.sqrt(np.mean((xs100-res_real)**2))
In [89]:
pickle.loads(fails[0])
In [11]:
MCgenr = runMC(100, 10000, full_overlap, df=x2000)
MCAsyncMapResult = next(MCgenr)
In [12]:
res_100obs=next(MCgenr)
In [13]:
MCgenr = runMC(500, 10000, full_overlap, df=x2000)
MCAsyncMapResult = next(MCgenr)
In [14]:
res_500obs=next(MCgenr)
In [10]:
MCgenr = runMC(1000, 4860, full_overlap, df=x2000)
MCAsyncMapResult = next(MCgenr)
In [11]:
res_1000obs=next(MCgenr)
In [43]:
pickle.dump(res_1000obs, file("W:/CNOP/dumps/temp/20150326_120353/!all_res", "w"))
In [11]:
MCgenr = runMC(125, 10000, full_overlap, df=x2000, path="\\\\DAP-NAS\\work\\CNOP\\dumps\\")
MCAsyncMapResult = next(MCgenr)
In [12]:
res_1000obs=next(MCgenr)
In [25]:
res_150obs=MCAsyncMapResult.get()
In [30]:
i=MCAsyncMapResult.get_dict()
In [35]:
rere=[i for i in MCAsyncMapResult]
In [63]:
ll=[]
for i in range(len(MCAsyncMapResult)):
try:
ll.append(MCAsyncMapResult[i])
except:
print i
In [71]:
for lll, i in enumerate(ll):
if len(i) != 1:
print
In [74]:
ll=[i[0] for i in ll]
In [75]:
pickle.dump(ll, file("W:\\CNOP\\dumps\\MC 31.03-results\\3Full\\res150_CHECKED", "w"))
In [24]:
help(MCAsyncMapResult.)
In [ ]:
In [10]:
MCAsyncMapResult.abort()
In [ ]:
pickle.dump(res_1000obs, file("W:/CNOP/dumps/temp/20150326_120353/!all_res", "w"))
In [24]:
MCgenr = runMC(100, 100, full_overlap, df=x2000, path="\\\\DAP-NAS\\work\\CNOP\\dumps\\")
MCAsyncMapResult = next(MCgenr)
In [25]:
res_test=next(MCgenr)