In [1]:
from scipy.stats import gamma,rv_continuous,norm
from scipy.special import gamma as gm
from scipy.special import kv
from scipy.integrate import quad
import matplotlib as mpl
import matplotlib.pyplot as plt
import math
import numpy as np
In [2]:
class gen_gamma(rv_continuous):
def _pdf(self, x, v, k, b):
return abs(v)*x**(k*v-1)*np.exp(-(x**v)/(b**abs(v)))/(b**(k*abs(v))*gm(k))
def _argcheck(self, v, k,b):
return k>0 and b>0
In [3]:
class GVG:
def cdf(self,x,a,v,k,b):
def integrand(z,x,a,v,k,b):
gg = gen_gamma(a=0.)
return norm.cdf((x-a*z)/math.sqrt(z))*gg.pdf(z,v,k,b)
return quad(integrand,0.0001,np.inf,args=(x,a,v,k,b))[0]
def pdf(self,x,a,v,k,b):
return (self.cdf(x,a,v,k,b)-self.cdf(x-0.001,a,v,k,b))/0.001
In [4]:
class gvg1(rv_continuous):
def _cdf(self,x,a,v,k,b):
def integrand(z,x,a,v,k,b):
gg = gen_gamma(a=0.)
return norm.cdf((x-a*z)/math.sqrt(z))*gg.pdf(z,v,k,b)
return quad(integrand,0.0001,np.inf,args=(x,a,v,k,b))[0]
def _argcheck(self,a, v, k,b):
return k>0 and b>0
In [5]:
def gvg_rand(a,v, k, b,size=1000):
GG = gen_gamma(a=0.)
rand_gg=GG.rvs(v, k, b, size=size)
return norm.rvs(size=size) * np.sqrt(rand_gg) + a*rand_gg
In [6]:
#a- масштаб
plt.hist(gvg_rand(2,0.8,1,1,1000),40,normed=True)
plt.show()
In [ ]:
plt.hist(gvg_rand(0,0.8,3,1,1000),40,normed=True)
plt.show()
In [ ]:
plt.hist(gvg_rand(-1.2,2,1,3,1000),40,normed=True)
plt.show()
In [ ]:
plt.hist(gvg_rand(1.2,2,1,3,1000),40,normed=True)
plt.show()
In [ ]:
plt.hist(gvg_rand(1.2,2,0.2,0.8,1000),40,normed=True)
plt.show()
In [ ]:
plt.hist(gvg_rand(0.2,2,7,0.8,1000),40,normed=True)
plt.show()
In [ ]:
plt.hist(gvg_rand(2,1,0.5,0.8,1000),40,normed=True)
plt.show()
In [ ]:
plt.hist(gvg_rand(4,1,0.3,0.8,1000),40,normed=True)
plt.show()
In [ ]:
plt.hist(gvg_rand(0,2,0.25,5,1000),40,normed=True)
plt.show()
In [ ]:
params_sets = [
[0,2,0.25,5],
[4,1,0.3,0.8],
[2,1,0.5,0.8],
[0.2,2,7,0.8],
[1.2,2,0.2,0.8],
[1.2,2,1,3],
[-1.2,2,1,3],
[0,0.8,3,1],
[2,0.8,1,1]]
steps_grid = [0,10,20]
grid_sizes = [5,10,20]
In [7]:
def experiment(params_sets,steps_grid,grid_sizes):
for i,params in enumerate(params_sets):
np.random.seed(0)
sample = gvg_rand(params[0],params[1],params[2],params[3],1000)
plt.hist(sample,40)
x=np.linspace(np.min(sample),np.max(sample),50)
y=[gvg.pdf(x,params[0],params[1],params[2],params[3]) for x in x]
plt.plot(x,y)
plt.savefig('pics/params'+str(i)+'.png')
for steps in steps_grid:
for grid_size in grid_sizes:
print('Params: ',params,', steps:',steps,', grid:',grid_size)
grid = np.linspace(1,40,grid_sizes)
p,a,new_grid,iter_num = fit_mixture(sample,grid,0.01,irreg_grid_step=2,recalc_steps=10)
res=fit_gvg(p,a,new_grid)
dist = pdf_dist([a,res[0],res[1],res[2]],[2,0.8,3,1])
In [52]:
class Mixture():
def __init__(self,p,a,u):
self.p=p
self.a=a
self.u=u
def cdf(self,x):
return (self.p*norm.cdf((x-self.a*self.u)/np.sqrt(self.u))).sum()
def pdf(self,x):
return (self.cdf(x)-self.cdf(x-0.001))/0.001
def grid_likehood(sample,u,p,a):
summ=0
u,p = np.array(u),np.array(p)
temp2 = np.sqrt(u)
temp1 = p/temp2
temp3= a*u
for x in sample:
summ += (temp1 * norm.pdf((x-temp3)/temp2)).sum()
return summ
In [56]:
def fit_mixture(sample,u,eps=0.05,verbose=1,irreg_grid_step=0,recalc_steps=10):
import math
p = np.full(len(u), 1/len(u))
mean = np.mean(sample)
a = mean/(u*p).sum()
f = np.zeros((len(u),len(sample)))
g = np.zeros((len(u),len(sample)))
new_likehood = grid_likehood(sample,u,p,a)
iteration = 0
bins=list(np.array(grid)-(grid[1]-grid[0])/2) + [grid[-1]+(grid[1]-grid[0])/2]
while True:
for i in range(0,len(u)):
f[i,:]= 1/math.sqrt(u[i]) * norm.pdf((sample-a*u[i])/math.sqrt(u[i]))
for j in range(0,len(sample)):
g[:,j] = p*f[:,j]/ (p*f[:,j]).sum()
p = g.sum(axis=1)/len(sample)
a = mean/(u*p).sum()
prev_likehood = new_likehood
new_likehood = grid_likehood(sample,u,p,a)
iteration += 1
if (verbose): print('\nIter: %d likehood: %f' %(iteration,new_likehood))
if ((new_likehood-prev_likehood)<eps) & (iteration>recalc_steps+1) : break
if (irreg_grid_step):
if ((iteration%irreg_grid_step==0) & (iteration<=recalc_steps)):
new_bins,new_grid = move_grid(bins,p)
p = piecewise(new_grid,u,p)+1e-6
p = p/p.sum()
bins,u=new_bins,new_grid
a = mean/(u*p).sum()
return p,a,u,iteration
In [10]:
def piecewise(x,grid,p):
bins=list(np.array(grid)-(grid[1]-grid[0])/2) + [grid[-1]+(grid[1]-grid[0])/2]
square = np.sum(np.array(p)*(grid[1]-grid[0]))
p=p/square
try:
res=np.zeros(x.shape[0])
except:
res=np.zeros(1)
x=[x]
for i,num in enumerate(x):
#closest point
try:
idx_bin=bins.index(min([i for i in bins if i>num], key=lambda k:abs(k-num)))
except:
res[i]= 0
continue
if (idx_bin==0):
res[i]= 0
continue
try:
idx_grid=grid.index(min([i for i in grid if i>num], key=lambda k:abs(k-num)))
except:
res[i] = (bins[-1]-num)*p[-1]/(bins[-1]-grid[-1])
continue
if (idx_grid==0):
res[i] = 2*(num - bins[0])*p[0]/(bins[1]-bins[0])
continue
s = (grid[idx_grid]-grid[idx_grid-1])*(p[idx_grid]+p[idx_grid-1])/2
h1 = num - grid[idx_grid-1]
h2 = grid[idx_grid] - num
res[i] = (2*s - h1*p[idx_grid-1]-h2*p[idx_grid])/(h1+h2)
return res
In [11]:
def move_grid(bins,p):
bins=np.array(bins)
new_bins=split_hist(np.array(bins),np.array(p)/(np.array(p)*(bins[1:]-bins[:-1])).sum(),20)
new_bins.append(bins[0])
new_bins.append(bins[-1])
new_bins=np.array(sorted(new_bins))
return new_bins,(new_bins[1:] + new_bins[:-1]) / 2
In [12]:
def split_hist(x,p,k):
c = np.cumsum(p*(x[1:]-x[:-1]))
res=[]
for i in range(k-1):
excess = c-(i+1)/k
ix = np.where(excess>0)[0].min()
res.append(x[ix+1]-excess[ix]/p[ix])
return res
grid=[ 0.01 , 2.5075, 5.005 , 7.5025, 10. ]
p=[ 0.25941888, 0.64792493, 0.02372754, 0.00882218, 0.06010648]
bins=list(np.array(grid)-(grid[1]-grid[0])/2) + [grid[-1]+(grid[1]-grid[0])/2]
square = np.sum(np.array(p)*(grid[1]-grid[0]))
p=p/square
split_hist(np.array(bins),np.array(p),5)
Out[12]:
In [38]:
sample = gvg_rand(0.2,2,7,0.8,200)
grid = np.linspace(1,40,20)
In [25]:
plt.hist(sample,50,normed=True)
plt.show()
In [66]:
p,a,new_grid,_ = fit_mixture(sample,grid,0.01,irreg_grid_step=2,recalc_steps=0)
In [62]:
mixture = Mixture(p,a,new_grid)
#mixture = Mixture(np.array([0.2,0.2,0.2,0.2,0.2]),a,grid)
x=np.linspace(-5,30,100)
y=[mixture.pdf(x) for x in x]
plt.hist(sample,30,normed=True)
#plt.hist(sample,60,normed=1)
plt.plot(x,y)
plt.show()
In [124]:
def fit_gvg(p,a,u,x0):
from scipy.stats import entropy
from scipy.optimize import minimize
GG = gen_gamma(shapes='v,k,b')
#fun = lambda x: entropy(GG.pdf(u,x[0],x[1],x[2]),p+1e-06)
fun = lambda x: -(np.array(p)*np.log(GG.pdf(u,x[0],x[1],x[2])+1e-7)).sum()
bnds = ((None, None), (0, None), (0, None))
res = minimize(fun, x0=x0, bounds=bnds)
return res
In [64]:
%%time
res=fit_gvg(p,a,new_grid)
print(res.message)
res=res.x
gvg=GVG()
x=np.linspace(-3,25,50)
y=[gvg.pdf(x,a,res[0],res[1],res[2]) for x in x]
plt.hist(sample,40,normed=1)
plt.plot(x,y)
plt.show()
In [ ]:
# Если сгенерировать выборку
def gen(p,a,grid):
sample_gen=[]
rate=10000
for i in range(0,len(p)):
sample0 = np.full(math.floor(p[i]*rate),grid[i]) #+ np.random.normal(grid[i],(grid[1]-grid[0])*1.5,size=math.floor(p[i]*rate))
sample_gen= sample_gen+[i for i in list(sample0) if i>0]
GG = gen_gamma(shapes='v,k,b',a=0.)
v,k,b,_,_ = GG.fit(sample_gen, floc=0, fscale=1)
x=np.linspace(-1,25,100)
plt.hist(sample_gen,40,normed=1)
plt.plot(x,GG.pdf(x,v,k,b))
plt.show()
gvg=GVG()
x=np.linspace(-3,25,100)
y=[gvg.pdf(x,a,v,k,b) for x in x]
plt.hist(sample,40,normed=1)
plt.plot(x,y,label=('fitted: a=%0.3f v=%0.3f k=%0.3f b=%0.3f' %(a,v,k,b) ))
plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.03),prop={'size':10})
plt.show()
gen(p,a,grid)
gen(new_p,new_a,new_grid)
In [ ]:
def pdf_dist(params1,params2):
gvg_DIST=gvg1()
a1,v1,k1,b1 = params1
a2,v2,k2,b2 = params2
lower = min(gvg_DIST.ppf(0.0001,a1,v1,k1,b1),gvg_DIST.ppf(0.0001,a2,v2,k2,b2))
upper = max(gvg_DIST.ppf(0.9999,a1,v1,k1,b1),gvg_DIST.ppf(0.9999,a2,v2,k2,b2))
step=(upper-lower)/100
grid=np.arange(lower,upper,step)
dist=0
for dot in list(grid):
dist=dist+step*abs(gvg_DIST.pdf(dot,a1,v1,k1,b1)-gvg_DIST.pdf(dot,a2,v2,k2,b2))
return dist
In [ ]:
#print(pdf_dist([0.849,0.399,15.6,0.009],[2,0.8,3,1]))
print(pdf_dist([a,res[0],res[1],res[2]],[2,0.8,3,1]))
In [ ]:
print(pdf_dist([a,res[0],res[1],res[2]],[2,0.8,3,1]))
In [ ]:
gvg1=GVG()
gvg2=GVG()
abs(gvg1.pdf(10,0.849,0.399,15.6,0.009)-gvg2.pdf(10,2,0.8,3,1))
In [6]:
class GIG(rv_continuous):
def _pdf(self, x, v, m,l):
return (l**(v/2) * x**(v-1) * np.exp(-0.5 * (m/x + l*x)))/(2*m**(v/2)*kv(v,np.sqrt(m*l)))
def _argcheck(self, v, m,l):
return m>1e-10 and l > 1e-10
class GH:
def cdf(self,x,a,v,m,l):
def integrand(z,x,a,v,m,l):
gig = GIG(a=1e-10)
return norm.cdf((x-a*z)/math.sqrt(z))*gig.pdf(z,v,m,l)
return quad(integrand,0.0001,np.inf,args=(x,a,v,m,l),limit=1000)[0]
def pdf(self,x,a,v,m,l):
return (self.cdf(x,a,v,m,l)-self.cdf(x-0.001,a,v,m,l))/0.001
In [7]:
class gh1(rv_continuous):
def _cdf(self,x,a,v,m,l):
def integrand(z,x,a,v,m,l):
gig = GIG(a=1e-10)
return norm.cdf((x-a*z)/math.sqrt(z))*gig.pdf(z,v,m,l)
return quad(integrand,0.0001,np.inf,args=(x,a,v,m,l),limit=1000)[0]
def _argcheck(self,a,v,m,l):
return m>0 and l>0
In [8]:
def gh_rand(a,v, m,l,size=1000):
gig = GIG(a=1e-10)
rand_gig=gig.rvs(v, m,l, size=size)
return norm.rvs(size=size) * np.sqrt(rand_gig) + a*rand_gig
In [9]:
def fit_gh(p,a,u):
from scipy.stats import entropy
from scipy.optimize import minimize
#GG = gen_gamma(shapes='v,k,b')
gig = GIG()
#fun = lambda x: entropy(GG.pdf(u,x[0],x[1],x[2]),p+1e-06)
fun = lambda x: -(np.array(p)*np.log(gig.pdf(u,x[0],x[1],x[2])+1e-7)).sum()
x0 = [1, 1, 1]
bnds = ((None, None), (0, None), (0, None))
res = minimize(fun, x0=x0, bounds=bnds)
return res
In [10]:
def pdf_dist_gh(params1,params2):
gh_DIST=gh1()
a1,v1,k1,b1 = params1
a2,v2,k2,b2 = params2
lower = min(gh_DIST.ppf(0.0001,a1,v1,k1,b1),gh_DIST.ppf(0.0001,a2,v2,k2,b2))
upper = max(gh_DIST.ppf(0.9999,a1,v1,k1,b1),gh_DIST.ppf(0.9999,a2,v2,k2,b2))
step=(upper-lower)/100
grid=np.arange(lower,upper,step)
dist=0
for dot in list(grid):
dist=dist+step*abs(gh_DIST.pdf(dot,a1,v1,k1,b1)-gh_DIST.pdf(dot,a2,v2,k2,b2))
return dist
In [9]:
sample = gh_rand(1,1.5,1,1,2000)
grid = np.linspace(0.001,5,30)
In [28]:
params_sets = [
#[1,1.5,1,1],
#[-2,1.5,1,1],
#[0,1.5,1,1],
#[0,2,20,1],
#[2,-3,2,4],
#[2,-3,2,0.01],
[-2,-3,2,0.05],
[0,0,0.1,0.1],
[-0.5,0,0.1,0.1]]
steps_grid = [0,10,20]
In [22]:
gh_DIST = gh1()
for i,params in enumerate(params_sets):
np.random.seed(0)
sample = gh_rand(params[0],params[1],params[2],params[3],1000)
x=np.linspace(np.min(sample),np.max(sample),50)
plt.hist(sample,40 ,ec='black',color='blue',normed=True)
y=[gh_DIST.pdf(x,params[0],params[1],params[2],params[3]) for x in x]
plt.plot(x,y,label='a=%0.3f v=%0.3f k=%0.3f b=%0.3f' %(params[0],params[1],params[2],params[3]))
plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.08),prop={'size':10})
plt.savefig('pics/gh_params'+str(i)+'.png')
plt.clf()
In [29]:
def experiment(params_sets,steps_grid,grid_sizes):
for i,params in enumerate(params_sets):
np.random.seed(0)
sample = gh_rand(params[0],params[1],params[2],params[3],1000)
for steps in steps_grid:
for grid_size in grid_sizes:
print('Params: ',params,', steps:',steps,', grid:',grid_size)
grid = np.linspace(1,40,grid_size)
p,a,new_grid,iter_num = fit_mixture(sample,grid,0.01,irreg_grid_step=2,recalc_steps=steps,verbose=0)
res=fit_gh(p,a,new_grid).x
dist = pdf_dist_gh([a,res[0],res[1],res[2]],[params[0],params[1],params[2],params[3]])
print(dist)
print()
experiment(params_sets,steps_grid,[20])
In [17]:
plt.hist(sample,40 ,ec='black',color='blue',normed=True)
x=np.linspace(np.min(sample),np.max(sample),50)
y=[gh_DIST.pdf(x,1,1.5,1,1) for x in x]
plt.plot(x,y,label='fitted: a=%0.3f v=%0.3f k=%0.3f b=%0.3f' %(1,2,3,4) )
plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.08),prop={'size':10})
plt.show()
In [ ]:
p,a = fit_mixture(sample,grid,0.01)
In [ ]:
mixture = Mixture(p,a,grid)
x=np.linspace(-6,20,100)
y=[mixture.pdf(x) for x in x]
plt.hist(sample,40,normed=1)
plt.plot(x,y)
plt.show()
In [ ]:
sample_gen=[]
rate=10000
for i in range(0,len(p)):
sample0 = np.full(math.floor(p[i]*rate),grid[i])
sample_gen= sample_gen+[i for i in list(sample0) if i>0]
In [ ]:
gig = GIG(shapes='v,m,l',a=0.)
v,m,l,_,_ = gig.fit(sample_gen, floc=0, fscale=1)
In [ ]:
x=np.linspace(-1,12,100)
plt.hist(sample_gen,40,normed=1)
plt.plot(x,gig.pdf(x,v,m,l))
plt.show()
In [ ]:
gh=GH()
x=np.linspace(-6,20,100)
y=[gh.pdf(x,a,v,m,l) for x in x]
plt.hist(sample,40,normed=1)
plt.plot(x,y)
plt.show()
In [ ]:
for i,j in zip(p,grid):
print(j,' ',i)
In [ ]:
gig.pdf(0.001 ,v,m,l)
In [17]:
def fit_mixture(sample,u,eps=0.05,verbose=1,irreg_grid_step=0,recalc_steps=10):
p = np.full(len(u), 1/len(u))
mean = np.mean(sample)
a = mean/(u*p).sum()
f = np.zeros((len(u),len(sample)))
g = np.zeros((len(u),len(sample)))
new_likehood = grid_likehood(sample,u,p,a)
iteration = 0
bins=list(np.array(grid)-(grid[1]-grid[0])/2) + [grid[-1]+(grid[1]-grid[0])/2]
while True:
for i in range(0,len(u)):
for j in range(0,len(sample)):
f[i,j]= 1/math.sqrt(u[i]) * norm.pdf((sample[j]-a*u[i])/math.sqrt(u[i]))
for i in range(0,len(u)):
for j in range(0,len(sample)):
g[i,j] = p[i]*f[i,j]/ (p*f[:,j]).sum()
p = g.sum(axis=1)/len(sample)
a = mean/(u*p).sum()
prev_likehood = new_likehood
new_likehood = grid_likehood(sample,u,p,a)
iteration += 1
if (verbose): print('\nIter: %d likehood: %f' %(iteration,new_likehood))
if ((new_likehood-prev_likehood)<eps) & (iteration>recalc_steps+1) : break
print('p:')
print(p)
print('grid:')
print(u)
if (irreg_grid_step):
if ((iteration%irreg_grid_step==0) & (iteration<=recalc_steps)):
new_bins,new_grid = move_grid(bins,p)
#print('p after moving:')
#print(p)
print('new_bins:')
print(new_bins)
print('new_grid:')
print(new_grid)
new_p = piecewise(np.array(new_grid),u,p)+1e-6
new_p = new_p/new_p.sum()
print('new_p:')
print(new_p)
bins,u,p=new_bins,new_grid,new_p
a = mean/(u*p).sum()
plot_hist(np.array(bins),p)
return p,a,u,iteration
p,a,new_grid,_ = fit_mixture(sample,grid,0.01,irreg_grid_step=2,recalc_steps=10)
In [14]:
def plot_hist(bins,p):
plt.bar(bins[:-1], p, width=bins[1:] - bins[:-1])
plt.show()
In [ ]:
x = np.sort(np.random.rand(6))
y = np.random.rand(5)
plt.bar(x[:-1], y, width=x[1:] - x[:-1])
plt.show()
In [68]:
import pandas as pd
In [71]:
data = np.log(pd.read_csv('KOSPI.csv',sep=';')['<CLOSE>'].values)
data = data[:-1] - data[1:]
In [95]:
plt.hist(data,40)
plt.show()
In [117]:
grid = np.linspace(3e-8,3e-6,20)
p,a,new_grid,_ = fit_mixture(data,grid,0.1,irreg_grid_step=2,recalc_steps=0)
In [118]:
grid = np.linspace(3e-8,3e-6,20)
Out[118]:
In [125]:
window_size = 200
step = 2
test = data[-window_size:]
train = data[:-window_size]
result = []
for window_pos in range(len(train)-window_size):
if (window_pos%10==0):
print(window_pos)
print(res)
window = train[window_pos:window_pos+window_size]
p,a,new_grid,_ = fit_mixture(window,grid,0.1,irreg_grid_step=2,recalc_steps=0,verbose=0)
if (window_pos == 0):
x0=[1,1,1]
else:
x0=res
res=fit_gvg(p,a,new_grid,x0).x
result.append(res)