In [1]:

CTRL_ID = 101 # Control experiment id
TREAT_ID = 102  # Treatment experiment id
LE_MEAN_ASP_CTRL = 1.0 # Low-end products average sale price (asp) (in Control)
LE_MEAN_ASP_TREAT = 0.9 # Low-end products average sale price (asp) (in Treatment)
HE_MEAN_ASP_CTRL = 2 # High-end products average sale price (asp) (in Control)
HE_MEAN_ASP_TREAT = 2.2 # High-end product average sale price (asp) (in Treatment)

NUM_LE = 10 # Number of low-end products
NUM_HE = 12 # Number of high-end products
MEAN_LE_BIDS_MADE_CTRL = 5 # Averge number of bids made for low-end products (in Control)
MEAN_HE_BIDS_MADE_CTRL = 20 # Averge number of bids made for high-end products (in Control)
MEAN_LE_BIDS_MADE_TREAT = 8 # Averge number of bids made for low-end products (in Treatment)
MEAN_HE_BIDS_MADE_TREAT = 24 # Averge number of bids made for high-end products (in Treatment)




In [2]:

import numpy as np
'''
Creating fake data

Let’s make two categories of products: low-end and high-end and have 100 in each category.
Each product will have a different average sale count (sampled from a Beta Distribution:
say Beta(3,15) -- mean of ~0.167 for both control and treatment

The average sale price (asp) for are Normally distributed N(1, 0.2), N(2, 0.2), N(0.9, 0.2), N(2.2, 0.2)
for sa-control, la-control, sa-treat, la-treat, respectively
First, get the total number of simulated impressions per product: sample poission(\lambda)
'''
np.random.seed(141592)

def calc_sim_data(exp_id, product_ids, bids_won, bids_made, mean_asp, size):
exp_ids = exp_id*(np.ones(size))
zeros = 0.0*(np.ones(size))
ones = np.ones(size)
data = np.concatenate(([exp_ids], [product_ids], [zeros], [bids_won]), axis=0).transpose()
full_data = np.repeat(data,bids, axis=0)

np.set_printoptions(suppress=True) # Suppress scientific notation.
for r in full_data:
r[2] = np.random.normal(mean_asp, 0.2)
r[3] = np.random.binomial(1, r[3])
return full_data

CTRL_ID = 101
EXP_ID = 102
LE_MEAN_ASP_CTRL = 1.0
LE_MEAN_ASP_TREAT = 0.9
HE_MEAN_ASP_CTRL = 2
HE_MEAN_ASP_TREAT = 2.2

NUM_LE = 10
NUM_HE = 12

le_product_ids = np.arange(1000,1000+NUM_LE)
he_product_ids = np.arange(10000,10000+NUM_HE)

le_bids_won = np.random.beta(3, 15, NUM_LE)
he_bids_won = np.random.beta(3, 15, NUM_HE)

le_data_ctrl = calc_sim_data(CTRL_ID, le_product_ids, le_bids_won, MEAN_LE_BIDS_MADE_CTRL, LE_MEAN_ASP_CTRL, NUM_LE)
le_data_treat = calc_sim_data(TREAT_ID, le_product_ids, le_bids_won, MEAN_LE_BIDS_MADE_TREAT, LE_MEAN_ASP_TREAT, NUM_LE)
he_data_ctrl = calc_sim_data(CTRL_ID, he_product_ids, he_bids_won, MEAN_HE_BIDS_MADE_CTRL, HE_MEAN_ASP_CTRL, NUM_HE)
he_data_treat = calc_sim_data(TREAT_ID, he_product_ids, he_bids_won, MEAN_HE_BIDS_MADE_TREAT, HE_MEAN_ASP_TREAT, NUM_HE)

all_data = np.concatenate(([le_data_ctrl], [le_data_treat],
[he_data_ctrl], [he_data_treat]), axis=1)[0]
np.set_printoptions(suppress=True)
np.savetxt('sim_data_{0}_{1}.csv'.format(NUM_LE, NUM_HE), all_data, fmt='%i,%i,%5.2f,%i',




In [3]:

import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
print(spark_home)
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip')) # for Spark 1.4
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.1-src.zip')) # for Spark 2.0

execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))




Welcome to
____              __
/ __/__  ___ _____/ /__
_\ \/ _ \/ _ / __/  '_/
/__ / .__/\_,_/_/ /_/\_\   version 2.0.0
/_/

Using Python version 2.7.10 (default, Oct 23 2015 19:19:21)
SparkSession available as 'spark'.




In [4]:

# Calculate the numerator:
# sum_i price_{ctrl_id,i} * sale_count_{treat_id,i} * w_i
# and the denominator:
# sum_i price_{treat_id,i} * sale_count_{ctrl_id,i} * w_i
# where w_i = 1/(sale_count_{ctrl_id,i} + sale_count_{treat_id,i})
def get_ctrl_treat(product_data):
# The shape is (product_id, iterable(id, price, sale_count))
r = list(product_data[1])
exp_idx = 0
assert len(r) == 2
if int(r[0][exp_idx]) == CTRL_ID and int(r[1][exp_idx]) == TREAT_ID:
ctrl = r[0]
treat = r[1]
elif int(r[1][exp_idx]) == CTRL_ID and int(r[0][exp_idx]) == TREAT_ID:
ctrl = r[1]
treat = r[0]
else:
assert False
return ctrl, treat

def calc_numerator(product_data):
(price_idx, sale_count_idx) = (1, 2)
[ctrl, treat] = get_ctrl_treat(product_data)
w_inverse = (ctrl[sale_count_idx] + treat[sale_count_idx])
if w_inverse > 0:
return (treat[price_idx] * ctrl[sale_count_idx]) / w_inverse
else:
return 0

def calc_denominator(product_data):
(price_idx, sale_count_idx) = (1, 2)
[ctrl, treat] = get_ctrl_treat(product_data)
w_inverse = (ctrl[sale_count_idx] + treat[sale_count_idx])
if w_inverse > 0:
return (ctrl[price_idx] * treat[sale_count_idx]) / w_inverse
else:
return 0

(exp, prod, price, sale_count) = (0,1,2,3)
def convert_line(l):
return [int(l[exp]), int(l[prod]), float(l[price]), int(l[sale_count])]




In [5]:

# We want to calculate MH(v_{t,i},n_{t,i},v_{c,i},n_{c,i}), where t and c are treatment and control
# and there v and n in our cases are value of the sale prices and sale_count.
input_rdd = sc.textFile('sim_data_{0}_{1}.csv'.format(NUM_LE, NUM_HE))
header = input_rdd.first() # Remove the first line.
parsed_input_rdd = input_rdd.filter(lambda x: x !=header).map(lambda x: convert_line(x.split(',')))
transformed = parsed_input_rdd.map(lambda x: ((x[exp], x[prod]), (x[sale_count]*x[price], x[sale_count])))
(sp, clks) = (0, 1) # sale price and sale_count
(ep, spc) = (0, 1) # exp_id&product_id, sp&sale_count
(exp2, prod2) = (0, 1) # exp_id, product_id
# For each product cross exp_id, sum the sale prices and sale_count
grouped_result = transformed.reduceByKey(lambda x,y: (x[sp]+y[sp], x[clks]+y[clks]))
grouped_by_product = grouped_result.map(lambda x: ((x[ep][prod2]), (x[ep][exp2], x[spc][sp], x[spc][clks]))).groupByKey()

effect = numerator_sum / denominator_sum
print(numerator_sum, denominator_sum, effect)




(53.59940817352582, 47.42074413739855, 1.1302945398373545)




In [6]:

import apache_beam as beam

def t_sum(values):
result = [0,0]
for v in values:
result[0] += v[0]
result[1] += v[1]
return (result[0], result[1])

# Create a pipeline executing on a direct runner (local, non-cloud).
# DirectPipelineRunner is the default runner, I'm setting it here to show how one
# would change it to run on the Dataflow Service.
pipeline_options = beam.utils.options.PipelineOptions(['--runner=DirectPipelineRunner'])
p = beam.Pipeline(options=pipeline_options)

parsed_input_rdd = (p
| 'filter header' >> beam.Filter(lambda x: x[0] != '#')
| 'split line' >> beam.Map(lambda x: convert_line(x.split(','))))
transformed = (parsed_input_rdd
| 'reshape' >> beam.Map((lambda x: ((x[exp], x[prod]), (x[price]*x[sale_count], x[sale_count])))))

(sp, clks) = (0, 1) # sale price and sale_count
(ep, spc) = (0, 1) # exp_id&product_id, sp&sale_count
(exp2, prod2) = (0, 1) # exp_id, product_id

# For each product cross exp_id, sum the sale prices and sale_count
grouped_result = (transformed
| 'combine per product/id' >> beam.CombinePerKey(t_sum))
grouped_by_product = (grouped_result
| 'keyByExpProduct' >> beam.Map(lambda x: ((x[ep][prod2]), (x[ep][exp2], x[spc][sp], x[spc][clks])))
| 'group' >> beam.GroupByKey())

numerator_sum = (grouped_by_product
| 'MapForNum' >> beam.Map(lambda x: calc_numerator(x))
| 'CombineNum' >> beam.CombineGlobally(sum))
numerator_sum | 'save numerator' >> beam.io.Write(beam.io.TextFileSink('./numerator_sum'))

denominator_sum = (grouped_by_product
| 'MapForDenom' >> beam.Map(lambda x: calc_denominator(x))
| 'CombineDenom' >> beam.CombineGlobally(sum))
denominator_sum | 'save denominator' >> beam.io.Write(beam.io.TextFileSink('./denominator_sum'))
p.run()




WARNING:oauth2client.contrib.multistore_file:The oauth2client.contrib.multistore_file module has been deprecated and will be removed in the next release of oauth2client. Please migrate to multiprocess_file_storage.

Out[6]:

`