In [1]:
import json
import os
import shutil
import pandas as pd
import matplotlib.pyplot as plt
from math import copysign as sgn
%matplotlib inline
In [2]:
# data manipulation functions
def number_of_tests():
return len(os.listdir('tests'))
def clear_tests():
for i in os.listdir('tests'):
os.remove('tests/{0}'.format(i))
def load_all_tests(num):
test_name = 1
for root, dirs, files in os.walk('all_tests/test_{0}'.format(num)):
for file in files:
if file != '1.test':
shutil.copyfile('{0}/{1}'.format(root, file), 'tests/{0}.test'.format(test_name))
test_name += 1
def load_test(num):
jsObj = open("tests/{0}.test".format(num), "r")
testObj = json.load(jsObj)
jsObj.close()
return testObj
In [3]:
# parameters extracting
# -------- Functions for static experiments
def distance(x1, y1, x2, y2):
return ((x1-x2)**2 + (y1-y2)**2)**(1/2)
def velocity(arr, num):
try:
return distance(arr[num]['x'], arr[num]['y'], arr[num-1]['x'], arr[num-1]['y']) / (arr[num]['time'] - arr[num-1]['time'])
except:
return 0
def first_distance(obj):
return distance(obj['ballX'], obj['ballY'], obj['data'][0]['x'], obj['data'][0]['y'])
def searching_delay(obj):
garanted_find = min(20, len(obj['data'])-1)
barrier_velocity = 0.5 #px/msec
for i in range(garanted_find):
if velocity(obj['data'], i+1) > barrier_velocity:
return obj['data'][i+1]['time']
return obj['data'][garanted_find]['time']
def all_distance(obj):
result = 0
for i in range(1, len(obj['data'])):
result += distance(obj['data'][i]['x'], obj['data'][i]['y'], obj['data'][i-1]['x'], obj['data'][i-1]['y'])
return result
def all_time(obj):
return obj['data'][-1]['time']
def average_velocity(obj):
return all_distance(obj) / all_time(obj)
def max_deviation(obj):
maximum = 0
x1, y1 = (obj['data'][0]['x'], obj['data'][0]['y'])
x2, y2 = (obj['ballX'], obj['ballY'])
A = (y2-y1)/(x1*y2-x2*y1)
B = (x1-x2)/(x1*y2-x2*y1)
H = lambda v: abs(A*v[0]+B*v[1]-1)/( (A**2 + B**2)**(1/2) ) #x = v[0], y = v[1]
for i in obj['data']:
maximum = max(maximum, H([i['x'], i['y']]))
return maximum
def inertion(obj): # length, time
x0, y0, R = (obj['ballX'], obj['ballY'], obj['ballSize'])
max_len = 0
in_ball = lambda x, y: (x-x0)**2+(y-y0)**2 < max(R, 50)**2
first_intersect = -1
for i in range(len(obj['data'])):
if in_ball(obj['data'][i]['x'], obj['data'][i]['y']):
first_intersect = i
break
if first_intersect < 0:
return (0, 0)
point = first_intersect # первое пересечение с шариком
while point < len(obj['data']) and in_ball(obj['data'][point]['x'], obj['data'][point]['y']):
point += 1
if point == len(obj['data']):
return (0, 0)
second_intersect = point # второе пересечение - вылет из шарика из-за инерции
while point < len(obj['data']) and not in_ball(obj['data'][point]['x'], obj['data'][point]['y']):
max_len = max(max_len, distance(obj['data'][point]['x'], obj['data'][point]['y'], x0, y0))
point += 1
if point == len(obj['data']):
return (max_len, obj['data'][second_intersect]['time']-obj['data'][first_intersect]['time'])
third_intersect = point # третье пересечение - возвражение обратно в шарик (корректировка)
return (max_len, obj['data'][third_intersect]['time']-obj['data'][second_intersect]['time'])
def right_or_left(obj):
if obj['data'][0]['x'] > obj['ballX']:
return 1
else:
return -1
#----- Functions for dynamic experiments
def max_deviation_d(obj): # +, если прошел по верху, -, если прошел по низу
maximum = 0
minimum = 0
x1, y1 = (obj['data'][0]['x'], obj['data'][0]['y'])
x2, y2 = (lambda n: obj['ballData'][n]['x'], lambda n: obj['ballData'][n]['y'])
A = lambda n: (y2(n)-y1)/(x1*y2(n)-x2(n)*y1)
B = lambda n: (x1-x2(n))/(x1*y2(n)-x2(n)*y1)
H = lambda v, n: (A(n)*v[0]+B(n)*v[1]-1)/( (A(n)**2 + B(n)**2)**(1/2) ) #x = v[0], y = v[1]
for i in range(len(obj['data'])):
h = H([obj['data'][i]['x'], obj['data'][i]['y']], i)
maximum = max(maximum, h)
minimum = min(minimum, h)
if(abs(minimum) > maximum):
return minimum
else:
return maximum
def ball_velocity(obj):
try:
return distance(obj['ballData'][0]['x'], obj['ballData'][0]['y'], obj['ballData'][1]['x'], obj['ballData'][1]['y'])/(obj['ballData'][1]['time']-obj['ballData'][0]['time'])
except:
return 0
def correl(obj):
bx, by = (obj['ballData'][1]['x']-obj['ballData'][0]['x'], obj['ballData'][1]['y']-obj['ballData'][0]['y'])
x, y = (obj['data'][1]['x']-obj['data'][0]['x'], obj['data'][1]['y']-obj['data'][0]['y'])
return sgn(1, bx*x + by*y)
def scalar(obj, i, j):
return (obj['data'][i]['x']-obj['data'][i-1]['x'])*(obj['data'][j]['x']-obj['data'][j-1]['x']) + (obj['data'][i]['y']-obj['data'][i-1]['y'])*(obj['data'][j]['y']-obj['data'][j-1]['y'])
def scal_cos(obj, i, j):
try:
return scalar(obj, i, j)/(scalar(obj, i, i)*scalar(obj, j, j))**(1/2)
except:
return 0
def deviation_coef(obj, i, j):
return 1+( abs( scal_cos(obj, i, j) -1)/2 )**(1/2)
def delay_d(obj):
barrier_value = 0.3
for i in range(1, len(obj['data'])-1):
if velocity(obj['data'], i+1)*deviation_coef(obj, i+1, i) > barrier_value:
return obj['data'][i]['time']
return 0 # пользователь не подал признаков того, что он нашел, значит в предположении, он сразу его увидел
def inertion_d(obj): # length, time
x0, y0, R = (lambda x: obj['ballData'][x]['x'], lambda x: obj['ballData'][x]['y'], obj['ballSize'])
max_len = 0
in_ball = lambda x, y, p: (x-x0(p))**2+(y-y0(p))**2 < max(R, 50)**2
first_intersect = -1
for i in range(len(obj['data'])):
if in_ball(obj['data'][i]['x'], obj['data'][i]['y'], i):
first_intersect = i
break
if first_intersect < 0:
return (0, 0)
point = first_intersect # первое пересечение с шариком
while point < len(obj['data']) and in_ball(obj['data'][point]['x'], obj['data'][point]['y'], point):
point += 1
if point == len(obj['data']):
return (0, 0)
second_intersect = point # второе пересечение - вылет из шарика из-за инерции
while point < len(obj['data']) and not in_ball(obj['data'][point]['x'], obj['data'][point]['y'], point):
max_len = max(max_len, distance(obj['data'][point]['x'], obj['data'][point]['y'], x0(point), y0(point)))
point += 1
if point == len(obj['data']):
return (max_len, obj['data'][second_intersect]['time']-obj['data'][first_intersect]['time'])
third_intersect = point # третье пересечение - возвражение обратно в шарик (корректировка)
return (max_len, obj['data'][third_intersect]['time']-obj['data'][second_intersect]['time'])
def relative_coords(obj):
return (obj['ballX']-obj['data'][0]['x'], obj['ballY']-obj['data'][0]['y'])
#----- Agreration Data -----------
# Перед тем как использовать эту функцию, убедитесь, что в папку all_tests загруженны все ваши тесты
def get_test_data(number):
clear_tests()
load_all_tests(number)
raw_data = [load_test(x) for x in range(1, number_of_tests())]
functions = [
[lambda x: x['ballX'], lambda x: x['ballY'], lambda x: x['ballSize'], right_or_left, first_distance, all_distance, searching_delay, all_time, average_velocity, max_deviation, lambda x: inertion(x)[0], lambda x: inertion(x)[1], lambda x: x['shots']],
[lambda x: x['data'][0]['x'], lambda x: x['data'][0]['y'], lambda x: relative_coords(x)[0], lambda x: relative_coords(x)[1], lambda x: x['ballSize'], first_distance, ball_velocity, correl, all_distance, all_time, average_velocity, lambda x: abs(max_deviation_d(x)), lambda x: sgn(1, max_deviation_d(x)), delay_d, lambda x: inertion_d(x)[0], lambda x: inertion_d(x)[1]],
[lambda x: x['data'][0]['x'], lambda x: x['data'][0]['y'], lambda x: relative_coords(x)[0], lambda x: relative_coords(x)[1], lambda x: x['ballSize'], first_distance, ball_velocity, correl, all_distance, all_time, average_velocity, lambda x: abs(max_deviation_d(x)), lambda x: sgn(1, max_deviation_d(x)), delay_d, lambda x: inertion_d(x)[0], lambda x: inertion_d(x)[1]]
]
cols = [
['BallX', 'BallY', 'BallSize', 'RorL', 'FirstDist', 'AllDist', 'Delay', 'AllTime', 'AverVelocity', 'MaxDev', 'InerLen', 'InerTime', 'Shots'],
['Px', 'Py', 'RBallX', 'RBallY', 'BallSize', 'FirstDist', 'BallVelocity', 'Correlation', 'AllDist', 'AllTime', 'AverVelocity', 'AbsMaxDev', 'UpOrDown', 'Delay', 'InerLen', 'InerTime'],
['Px', 'Py', 'RBallX', 'RBallY', 'BallSize', 'FirstDist', 'BallVelocity', 'Correlation', 'AllDist', 'AllTime', 'AverVelocity', 'AbsMaxDev', 'UpOrDown', 'Delay', 'InerLen', 'InerTime']
]
return pd.DataFrame.from_records([[f(x) for f in functions[number-1]] for x in raw_data], columns=cols[number-1])
In [4]:
data1 = get_test_data(1)
data1 = data1[data1.AllTime < 5000]
from sklearn import linear_model
pparams = ['AllDist', 'Delay', 'AllTime', 'AverVelocity', 'MaxDev', 'InerLen', 'InerTime', 'Shots']
regs = {}
for p in pparams:
regs[p] = linear_model.LinearRegression()
regs[p].fit(data1[['BallSize', 'FirstDist']], data1[p])
def predict(size, dist):
for p in regs.keys():
print(p, ": ", regs[p].predict([size, dist])[0])
def print_regs():
for p in regs.keys():
print("{0:<12} = {1:<8.5} + {2:<8.3} * Size + {3:<9.3} * Distantion".format(p, regs[p].intercept_, regs[p].coef_[0], regs[p].coef_[1]))
print('Static model:')
print_regs()
In [5]:
from sklearn import svm
data2 = get_test_data(2)
data2corr = data2[data2.Correlation > 0]
data2ncorr = data2[data2.Correlation < 0]
data2iner = data2[data2.InerLen > 0]
inerPercentage = len(data2iner)/len(data2)
# --- FOR CORRELATE ----
pparamscorr = ['AllDist', 'Delay', 'AllTime', 'AverVelocity', 'AbsMaxDev']
regscorr = {}
for p in pparamscorr:
regscorr[p] = linear_model.LinearRegression()
regscorr[p].fit(data2corr[['BallSize', 'FirstDist', 'BallVelocity']], data2corr[p])
def print_regs_corr():
for p in regscorr.keys():
print("{0:<12} = {1:<9.5} + {2:<9.3} * Size + {3:<9.3} * Distantion + {4:<9.3} * Velocity".format(p, regscorr[p].intercept_, regscorr[p].coef_[0], regscorr[p].coef_[1], regscorr[p].coef_[2]))
devPos = svm.SVC(kernel='linear')
devPos.fit(data2corr[['RBallX', 'RBallY']], data2corr['UpOrDown'])
def print_dev_corr():
xx, yy = 0, 0
for i in range(len(devPos.dual_coef_[0])):
xx+=devPos.dual_coef_[0][i]*devPos.support_vectors_[i][0]
yy+=devPos.dual_coef_[0][i]*devPos.support_vectors_[i][1]
print("DevPos = sign( {0:<9.5} * RBallX + {1:<9.5} * RBallY + {2:<9.5} )".format(xx, yy, devPos.intercept_[0]))
# --- FOR NOT CORRELATE ----
pparamsncorr = ['AllDist', 'Delay', 'AllTime', 'AverVelocity', 'AbsMaxDev']
regsncorr = {}
for p in pparamsncorr:
regsncorr[p] = linear_model.LinearRegression()
regsncorr[p].fit(data2ncorr[['BallSize', 'FirstDist', 'BallVelocity']], data2ncorr[p])
def print_regs_ncorr():
for p in regsncorr.keys():
print("{0:<12} = {1:<9.5} + {2:<9.3} * Size + {3:<9.3} * Distantion + {4:<9.3} * Velocity".format(p, regsncorr[p].intercept_, regsncorr[p].coef_[0], regsncorr[p].coef_[1], regsncorr[p].coef_[2]))
devPos = svm.SVC(kernel='linear')
devPos.fit(data2ncorr[['RBallX', 'RBallY']], data2ncorr['UpOrDown'])
def print_dev_ncorr():
xx, yy = 0, 0
for i in range(len(devPos.dual_coef_[0])):
xx+=devPos.dual_coef_[0][i]*devPos.support_vectors_[i][0]
yy+=devPos.dual_coef_[0][i]*devPos.support_vectors_[i][1]
print("DevPos = sign( {0:<9.5} * RBallX + {1:<9.5} * RBallY + {2:<9.5} )".format(xx, yy, devPos.intercept_[0]))
pparamsiner = ['InerTime', 'InerLen']
regsiner = {}
for p in pparamsiner:
regsiner[p] = linear_model.LinearRegression()
regsiner[p].fit(data2iner[['BallSize', 'FirstDist', 'BallVelocity']], data2iner[p])
print("Dynamic model without green balls")
print("For correlate: ")
print_regs_corr()
# print_dev_corr()
print("")
print("For not correlate: ")
print_regs_ncorr()
# print_dev_ncorr()
print("")
print("Inertion percentage: ", inerPercentage)
for p in regsiner.keys():
print("{0:<12} = {1:<9.5} + {2:<9.3} * Size + {3:<9.3} * Distantion + {4:<9.3} * Velocity".format(p, regsiner[p].intercept_, regsiner[p].coef_[0], regsiner[p].coef_[1], regsiner[p].coef_[2]))
In [6]:
data2[:10]
Out[6]:
In [7]:
data3 = get_test_data(3)
data3corr = data3[data3.Correlation > 0]
data3ncorr = data3[data3.Correlation < 0]
data3iner = data3[data3.InerLen > 0]
inerPercentage3 = len(data3iner)/len(data3)
data3[:10]
# --- FOR CORRELATE ----
pparamscorr3 = ['AllDist', 'Delay', 'AllTime', 'AverVelocity', 'AbsMaxDev']
regscorr3 = {}
for p in pparamscorr3:
regscorr3[p] = linear_model.LinearRegression()
regscorr3[p].fit(data3corr[['BallSize', 'FirstDist', 'BallVelocity']], data3corr[p])
def print_regs_corr3():
for p in regscorr3.keys():
print("{0:<12} = {1:<9.5} + {2:<9.3} * Size + {3:<9.3} * Distantion + {4:<9.3} * Velocity".format(p, regscorr3[p].intercept_, regscorr3[p].coef_[0], regscorr3[p].coef_[1], regscorr3[p].coef_[2]))
# --- FOR NOT CORRELATE ----
pparamsncorr3 = ['AllDist', 'Delay', 'AllTime', 'AverVelocity', 'AbsMaxDev']
regsncorr3 = {}
for p in pparamsncorr3:
regsncorr3[p] = linear_model.LinearRegression()
regsncorr3[p].fit(data3ncorr[['BallSize', 'FirstDist', 'BallVelocity']], data3ncorr[p])
def print_regs_ncorr3():
for p in regsncorr3.keys():
print("{0:<12} = {1:<9.5} + {2:<9.3} * Size + {3:<9.3} * Distantion + {4:<9.3} * Velocity".format(p, regsncorr3[p].intercept_, regsncorr3[p].coef_[0], regsncorr3[p].coef_[1], regsncorr3[p].coef_[2]))
pparamsiner3 = ['InerTime', 'InerLen']
regsiner3 = {}
for p in pparamsiner3:
regsiner3[p] = linear_model.LinearRegression()
regsiner3[p].fit(data3iner[['BallSize', 'FirstDist', 'BallVelocity']], data3iner[p])
print("Dynamic model with green balls")
print("For correlate: ")
print_regs_corr3()
# print_dev_corr()
print("")
print("For not correlate: ")
print_regs_ncorr3()
# print_dev_ncorr()
print("")
print("Inertion percentage: ", inerPercentage3)
for p in regsiner3.keys():
print("{0:<12} = {1:<9.5} + {2:<9.3} * Size + {3:<9.3} * Distantion + {4:<9.3} * Velocity".format(p, regsiner3[p].intercept_, regsiner3[p].coef_[0], regsiner3[p].coef_[1], regsiner3[p].coef_[2]))
In [20]:
data3.plot.scatter(x='FirstDist', y='AllDist')
data3.plot.scatter(x='FirstDist', y='AverVelocity')
data3.plot.scatter(x='BallSize', y='Delay')
data3.plot.scatter(x='FirstDist', y='Delay')
data3.plot.scatter(x='BallSize', y='AllTime')
data3.plot.scatter(x='FirstDist', y='AllTime')
Out[20]:
In [15]:
cond = data3.UpOrDown > 0
dup = data3.RBallY > 0
ddown = data3.RBallY < 0
dright = data3.RBallX > 0
dleft = data3.RBallX < 0
upright = data3[dup][dright]
upleft = data3[dup][dleft]
downright = data3[ddown][dright]
downleft = data3[ddown][dleft]
In [16]:
subset_a = upright[cond].dropna()
subset_b = upright[~cond].dropna()
plt.scatter(subset_a.RBallX, subset_a.RBallY, s=60, c='b', label='Up')
plt.scatter(subset_b.RBallX, subset_b.RBallY, s=60, c='r', label='Down')
plt.legend()
Out[16]:
In [17]:
subset_a = upleft[cond].dropna()
subset_b = upleft[~cond].dropna()
plt.scatter(subset_a.RBallX, subset_a.RBallY, s=60, c='b', label='Up')
plt.scatter(subset_b.RBallX, subset_b.RBallY, s=60, c='r', label='Down')
plt.legend()
Out[17]:
In [18]:
subset_a = downleft[cond].dropna()
subset_b = downleft[~cond].dropna()
plt.scatter(subset_a.RBallX, subset_a.RBallY, s=60, c='b', label='Up')
plt.scatter(subset_b.RBallX, subset_b.RBallY, s=60, c='r', label='Down')
plt.legend()
Out[18]:
In [19]:
subset_a = downright[cond].dropna()
subset_b = downright[~cond].dropna()
plt.scatter(subset_a.RBallX, subset_a.RBallY, s=60, c='b', label='Up')
plt.scatter(subset_b.RBallX, subset_b.RBallY, s=60, c='r', label='Down')
plt.legend()
Out[19]:
In [ ]:
In [14]:
for i in pparams:
print(i, " : ", data1['RorL'].corr(data1[i], method='pearson'))
rorl = linear_model.LinearRegression()
p = "RorL"
rorl.fit(data1[['BallSize', 'FirstDist']], data1[p])
print("{0:<12} = {1:<8.5} + {2:<8.3} * Size + {3:<9.3} * Distantion".format(p, rorl.intercept_, rorl.coef_[0], rorl.coef_[1]))
print("So, we can see, that correlation beetween this parameters too small and we can ignore thay")
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: