In [1]:
from scipy import stats
from scipy.stats import norm, chi2
from math import sqrt, floor
from random import *
#have two distributions, one with conversion rate 10% and the other one with 20%
def getConversionA():
conversion_rate = 0.1
return 1 if random() < conversion_rate else 0
def getConversionB():
conversion_rate = 0.11
return 1 if random() < conversion_rate else 0
def getConversionIndex(i=0):
conversion_rates = [0.1,0.15,0.2]
return 1 if random() < conversion_rates[i] else 0
def getPValueZtest(x_visitors, x_conversions, y_visitors, y_conversions):
if x_visitors < 5 or x_conversions < 5 or y_visitors < 5 or y_conversions < 5: return 1
mean_x = float(x_conversions)/x_visitors
mean_y = float(y_conversions)/y_visitors
sd_x = sqrt(mean_x*(1-mean_x))
sd_y = sqrt(mean_y*(1-mean_y))
se_x = sd_x/sqrt(x_visitors)
se_y = sd_y/sqrt(y_visitors)
#combined_se = sqrt(pow(se_x,2) + pow(se_y,2))
average_conversion_rate = float(x_conversions+y_conversions)/(x_visitors+y_visitors)
combined_se = sqrt(average_conversion_rate * (1-average_conversion_rate) * ((1/float(x_visitors)) + (1/float(y_visitors))))
if combined_se==0: return 1
z_score = (mean_y - mean_x)/combined_se
p_value = 2*(1-norm.cdf(abs(z_score)))
return p_value
def getPValueChiSquare(x_visitors, x_conversions, y_visitors, y_conversions):
if x_visitors < 5 or x_conversions < 5 or y_visitors < 5 or y_conversions < 5: return 1
average_conversion_rate = float(x_conversions+y_conversions)/(x_visitors+y_visitors)
x_expected = x_visitors*average_conversion_rate
y_expected = y_visitors*average_conversion_rate
x_not_expected = x_visitors - x_expected
y_not_expected = y_visitors - y_expected
chi_square = pow((x_conversions-x_expected),2)/x_expected + pow((y_conversions-y_expected),2)/y_expected + pow(x_visitors-x_conversions-x_not_expected,2)/x_not_expected + pow(y_visitors-y_conversions-y_not_expected,2)/y_not_expected
print chi_square
p_value = 1-chi2.cdf(chi_square,1)
return p_value
def getPValues(visitors,conversions):
p_val = []
p_val.append(getPValueZtest(visitors[0],conversions[0],visitors[1],conversions[1]))
p_val.append(getPValueZtest(visitors[0],conversions[0],visitors[2],conversions[2]))
p_val.append(getPValueZtest(visitors[1],conversions[1],visitors[2],conversions[2]))
return p_val
def normal_randomization():
total_runs = 10000
visitors = [1,1,1]
conversions = [0,0,0]
p_values= []
total_conversions = []
total_visitors = []
for i in range(total_runs):
chosen_variation = int(floor(random()*3))
visitors[chosen_variation]+=1
if getConversionIndex(chosen_variation): conversions[chosen_variation]+=1
p_value = getPValues(visitors,conversions)
#p_value = getPValueZtest(x_visitors,x_conversions,y_visitors,y_conversions)
p_values.append(p_value[:])
total_visitors.append(visitors[:])
total_conversions.append(conversions[:])
#total_conversions.append(x_conversions+y_conversions)
# print x_visitors+y_visitors,x_visitors,y_visitors,x_conversions,float(x_conversions)/x_visitors,y_conversions,float(y_conversions)/y_visitors,p_value
return p_values, total_visitors, total_conversions
def normal_bandit():
total_runs = 10000
visitors = [1,1,1]
conversions = [1,1,1]
p_values= [[1,1,1]]
total_conversions = [conversions[:]]
total_visitors = [visitors[:]]
for i in range(total_runs):
if random()<0.1: #exploration phase
chosen_variation = int(floor(random()*3))
visitors[chosen_variation]+=1
if getConversionIndex(chosen_variation): conversions[chosen_variation]+=1
else: #exploitation phase
conversion_rates = [float(conversions[0])/visitors[0], float(conversions[1])/visitors[1], float(conversions[2])/visitors[2]]
chosen_variation = conversion_rates.index(max(conversion_rates))
visitors[chosen_variation]+=1
if getConversionIndex(chosen_variation): conversions[chosen_variation]+=1
p_value = getPValues(visitors,conversions)
#p_value = getPValueZtest(x_visitors,x_conversions,y_visitors,y_conversions)
p_values.append(p_value[:])
total_visitors.append(visitors[:])
total_conversions.append(conversions[:])
#total_conversions.append(x_conversions+y_conversions)
# print x_visitors+y_visitors,x_visitors,y_visitors,x_conversions,float(x_conversions)/x_visitors,y_conversions,float(y_conversions)/y_visitors,p_value
return p_values, total_visitors, total_conversions
def findLessThan(p_vals,threshold,numbers=1):
number_found=0
for i in range(len(p_vals)):
if p_vals[i][0] < threshold or p_vals[i][1] < threshold or p_vals[i][2] < threshold:
number_found+=1
if number_found>=numbers:
return i
return -1
if __name__=="__main__":
for i in range(25):
p_vals, visitors,conversions = normal_bandit()
when = findLessThan(p_vals,0.05,10)
print str(when)+','+str(sum(conversions[when])/float(when))