In [1]:
import sys, glob, os
SPARK_HOME = "/wd/software/spark-2.3.1-bin-hadoop2.7"
#SPARK_HOME=os.environ['SPARK_HOME']
sys.path.append(SPARK_HOME + "/python")
sys.path.append(glob.glob(SPARK_HOME + "/python/lib/py4j*.zip")[0])
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.storagelevel import StorageLevel
spark = (SparkSession
.builder
.config("spark.master", "local[*]")
.config("spark.driver.memory", "56G")
.config("spark.sql.shuffle.partitions", 32)
.config("spark.local.dir", "/spark-scratch")
.config("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
.enableHiveSupport()
.getOrCreate())
sc = spark.sparkContext
print(sc.uiWebUrl)
In [2]:
from datetime import datetime
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import numpy as np
import pandas as pd
from sklearn import metrics
pd.options.display.max_columns = 1000
pd.options.display.max_rows = 10
fast_mode = True
%matplotlib inline
from IPython.core.magic import register_line_magic
@register_line_magic
def show(line, n = 5):
return eval(line).limit(n).toPandas()
@register_line_magic
def sql(line, n = 10):
return spark.sql(line)
In [3]:
base_path = "/data/kaggle/outbrain_ctr/parquet/"
def cache_df(df, name, sorage_level = StorageLevel.MEMORY_ONLY):
df.createOrReplaceTempView(name)
spark.catalog.cacheTable(name)
def load(name, rebase_timestamp = False, cache = True):
df = spark.read.load(base_path + name)
if rebase_timestamp and "timestamp" in df.columns:
df = df.withColumn("timestamp"
, F.expr("cast(from_unixtime(cast((timestamp + 1465876799998)/1000 as int)) as timestamp)"))
if cache:
cache_df(df, name)
df.alias(name)
print("Number of partitions for df %s: %d" % (name, df.rdd.getNumPartitions()))
return df
In [4]:
!ls -1 /data/kaggle/outbrain_ctr/parquet/
In [5]:
clicks_train = load("clicks_train")
clicks_train.show()
In [6]:
clicks_test = load("clicks_test")
clicks_test.show()
In [7]:
clicks_train.count(), clicks_test.count()
Out[7]:
Distinct count of ad_id in training and test dataset
In [8]:
%time clicks_train.select("ad_id").distinct().count(), clicks_test.select("ad_id").distinct().count()
Out[8]:
Common ad_id in training and test datasets
In [9]:
%time clicks_train.select("ad_id").intersect(clicks_test.select("ad_id")).count()
Out[9]:
In [10]:
1- 316035/381385
Out[10]:
17% of ad_id in testing dataset are unique.
Calculate CTR on training dataset. Note, we cannot calculate the CTR on test dataset since clicked column is provded the value. In fact, the rask is to predict the probability of click.
In [11]:
ctrs = clicks_train.groupBy("ad_id")\
.agg(F.expr("sum(clicked)/count(*)").alias("ctr"), F.count("*").alias("view_count"))
ctrs.show()
In [12]:
%time ctrs.select("ctr").describe().show()
Median CTR
In [13]:
%time ctrs.selectExpr("percentile(ctr, 0.5)").show()
In [14]:
ctrs.filter("ad_id = 182320").show()
Find 99, 95 and 90 percentile values of the view counts of the ads.
In [15]:
view_counts = ctrs.select("view_count").toPandas()
np.percentile(view_counts["view_count"], [99, 95, 90])
Out[15]:
To build confidence on the CTR, filter out the ads with fewer than 100 views (approx 99 percentile value)
In [16]:
%time ctrs.filter("view_count>100").select("ctr").toPandas()["ctr"].plot.hist(bins = 50, density = True)
plt.xlabel("CTR")
plt.ylabel("Frquency (normalized)")
Out[16]:
Consier, the CTR as baseline for click prediction. Using CTR as based, calculate the MAP (mean avg precision)
In [17]:
#y_pred = clicks_train.join(ctrs.select("ad_id", "ctr"), on = ["ad_id"], how="left").select("ctr").toPandas()["ctr"]
#y_true = clicks_train.select("clicked").toPandas()["clicked"]
#%time metrics.average_precision_score(y_true, y_pred)
In [18]:
clicks_test_baseline = clicks_test.join(ctrs.select("ad_id", "ctr"), on = ["ad_id"], how="left")
clicks_test_baseline.show()
In [19]:
clicks_test_baseline.groupBy("display_id").count()
Out[19]:
In [20]:
clicks_test_baseline.groupBy("display_id").agg(F.sum("ctr").alias("ap")).selectExpr("avg(ap)").show()
How many ads are there for each display_id?
In [21]:
%time clicks_train.groupBy("display_id").count().select("count").distinct().show()
Does each display_id in the training dataset has atleast one click?
In [22]:
%time clicks_train.groupBy("display_id").agg(F.sum("clicked").alias("clicks")).filter("clicks=0").count()
Out[22]:
So, each display_id has atleast one click. Does it display_id in the training dataset has more than click?
In [23]:
%time clicks_train.groupBy("display_id").agg(F.sum("clicked").alias("clicks")).filter("clicks>1").count()
Out[23]:
So, each display_id in the clicks dataset has only one click.
In [24]:
if fast_mode:
print("Loading page_view sample dataset")
page_views = load("page_views_sample", rebase_timestamp=True, cache=True)
#page_views = page_views.sample(False, 0.01, 1)
#cache_df(page_views, "page_views")
else:
print("Loading full page_view dataset")
page_views = load("page_views", rebase_timestamp=True, cache=False)
page_views.printSchema()
In [25]:
page_views.show()
Page views table is nearly 100 GB in decompressed csv file. How many records are there?
In [26]:
page_views.count()
Out[26]:
Does each record in page_views have timestamp?
In [27]:
%time page_views.filter("isnull(timestamp)").count()
Out[27]:
In [28]:
stats = page_views.selectExpr("count(distinct(uuid)) as users"
, "count(distinct(document_id)) as documents"
, "count(distinct(geo_location)) as locations")
%time stats.show()
Some users are more frequent visitor that the others. Find the number of visitors based on the view counts.
In [29]:
page_views_by_user = page_views.groupBy("uuid").count().groupBy("count").count()\
.toDF("view_count", "num_users").toPandas().sort_values("view_count")
page_views_by_user
Out[29]:
In [30]:
page_views_by_user.iloc[:20, :].plot.bar("view_count", "num_users")
Out[30]:
Number of unique users
In [31]:
users_distinct_count = page_views_by_user.num_users.sum()
Average page views per user
In [32]:
(page_views_by_user.num_users * page_views_by_user.view_count).sum()/users_distinct_count
Out[32]:
Cumulative Percentual
In [33]:
page_views_by_user["cum_percentual"] = page_views_by_user.num_users.cumsum()/users_distinct_count
page_views_by_user
Out[33]:
In [34]:
page_views_by_user.iloc[:20, :].plot.line(x = "view_count", y = "cum_percentual")
Out[34]:
In [35]:
page_views_by_user.tail(10)
Out[35]:
Page views by platform
In [36]:
page_views_by_platform = page_views.groupBy("platform")\
.count().toPandas().set_index("platform").sort_index()
page_views_by_platform
Out[36]:
In [37]:
page_views_by_platform.plot.pie(y = "count", labels = ["Desktop", "Mobile", "Tablet"]
, figsize = (8, 8), autopct = "%.2f", fontsize = 15)
plt.title("Page views by platform")
Out[37]:
Page views by traffic source
In [38]:
page_views_by_traffic_source = page_views.groupBy("traffic_source").count().toPandas()
page_views_by_traffic_source = page_views_by_traffic_source.set_index("traffic_source").sort_index()
page_views_by_traffic_source
Out[38]:
In [39]:
page_views_by_traffic_source.plot.pie(y = "count", labels = ["Internal", "Search", "Social"]
, figsize = (8, 8), autopct = "%.2f", fontsize = 15)
plt.title("Page views by traffic source")
Out[39]:
In [40]:
events = load("events")
events.show()
In [41]:
events = events.withColumn("timestamp", F.expr("from_unixtime(cast((timestamp + 1465876799998)/1000 as int))"))
events.show()
In [42]:
events = load("events", rebase_timestamp=True)
In [43]:
events.count()
Out[43]:
Find distinct count of users, document, and location
In [44]:
%time events.selectExpr("count(distinct uuid)", "count(distinct document_id)", "count(distinct geo_location)").first()
Out[44]:
Is the display_id unique in the events dataset?
In [45]:
%time events.groupBy("display_id").count().filter("count>1").count()
Out[45]:
Do all the display_id in the clicks_train present in events?
In [46]:
%time clicks_train.select("display_id").distinct().join(events, on = ["display_id"], how = "left_anti").count()
Out[46]:
So, display_id for each record in clicks_train is present in events dataset.
Check the same for clicks_test dataset.
In [47]:
%time clicks_test.select("display_id").distinct().join(events, on = ["display_id"], how = "left_anti").count()
Out[47]:
Average events by user
In [48]:
events.selectExpr("count(*)/count(distinct uuid) avg_event_by_user").first()
Out[48]:
Does the timestamp exist for each record in events?
In [49]:
events.filter("isnull(timestamp)").count()
Out[49]:
Does the timestamp in clicks_train and clicks_test have overlap?
In [50]:
%time clicks_train.join(events, on = ["display_id"]).select("timestamp").describe().show()
In [51]:
%time clicks_test.join(events, on = ["display_id"]).select("timestamp").describe().show()
Clearly, the date ranges of training and test data overlap.
Number of page views without matching event
How many events have matching page views by uuid and document_id
How many events have no page views
How many views records have not matching events
A given user might visit the same page more than once. Show sample events for which multiple page_views exist.
Show the distribution of number of ditinct users who view the same document multiple times
In [52]:
def join_views_and_events(columns):
df1 = page_views.select(*columns).withColumn("page_views", F.lit(1)).withColumn("events", F.lit(0))
df2 = events.select(*columns).withColumn("page_views", F.lit(0)).withColumn("events", F.lit(1))
df3 = df1.union(df2)
df4 = df3.groupBy(columns).agg(
F.sum("page_views").alias("page_views_count"),
F.sum("events").alias("events_count"))
return df4
In [53]:
# Cache output to disk. The dataframe is too large to hold in the memory of the current machine
views_and_event = join_views_and_events(["uuid", "document_id"]).persist(StorageLevel.DISK_ONLY)
views_and_event.show()
How many user-document combination does not have any click on ads?
In [54]:
%time views_and_event.filter("page_views_count > 0 and events_count = 0").count()
Out[54]:
How many user-doc combination does have machine record in page views?
In [55]:
%time views_and_event.filter("page_views_count = 0 and events_count > 0").count()
Out[55]:
Considering events represents the page views that have got clicks, what fraction of page views got clicks?
In [56]:
events.count()/page_views.count()
Out[56]:
Let's take a sample uuid and document_id to see whether the event records have a matching page_views record.
In [57]:
%time page_views.filter("uuid = 'a34004004c3e50' and document_id = 140264").show()
In [58]:
%time events.filter("uuid = 'a34004004c3e50' and document_id = 140264").show()
Hypothesis: The page views record come web server logs. Events some user tracking devices such as omniture, google analytics. User may open the page, the after sometime, user may choose to view an ad.
User may view a page more than once. Find how many users have viewed the same page more than once.
In [59]:
repeated_page_views = page_views.groupBy(["uuid", "document_id"]).count()\
.filter("count > 1").orderBy(F.desc("count"))
repeated_page_views.show()
In [60]:
repeated_page_views.count()
Out[60]:
Look at a sample uuid who has repeated visited a page to observe the pattern in the source, location and timestamp
In [61]:
if repeated_page_views.count()>0:
sample_record = repeated_page_views.sample(True, 0.1).take(1)[0]
page_views.filter(F.col("uuid") == sample_record.uuid)\
.filter(F.col("document_id") == sample_record.document_id).show()
In [62]:
promoted_contents = load("promoted_content")
promoted_contents.show()
In [63]:
promoted_contents.count()
Out[63]:
Promoted_content stores the meta of the ads. Double check the ad_id is unique in this dataset.
In [64]:
%time promoted_contents.groupBy("ad_id").count().filter("count>1").count()
Out[64]:
How many unique campaigns, documents and advertisers are there?
In [65]:
%time promoted_contents\
.selectExpr("count(distinct document_id)", "count(distinct campaign_id)", "count(distinct advertiser_id)").first()
Out[65]:
Does ad_id in the clicks dataset have meta data info in promoted_contents?
In [66]:
%time (clicks_train.select("ad_id").union(clicks_test.select("ad_id"))\
.join(promoted_contents, on = ["ad_id"], how = "leftanti").count())
Out[66]:
So, all the ad_id in clicks dataset exist in the promoted_contents.
Find average CTR for campaign
In [67]:
avg_ctr_by_campaign = promoted_contents.join(ctrs, on = "ad_id").groupBy("campaign_id")\
.agg(F.avg("ctr").alias("avg_ctr"))
avg_ctr_by_campaign.show()
Find avg ctr by advertiser.
In [68]:
avg_ctr_by_advertiser = promoted_contents.join(ctrs, on = "ad_id").groupBy("advertiser_id")\
.agg(F.avg("ctr").alias("avg_ctr"))
avg_ctr_by_advertiser.show()
Find avg ctr by document.
In [69]:
avg_ctr_by_document = promoted_contents.join(ctrs, on = "ad_id").groupBy("document_id")\
.agg(F.avg("ctr").alias("avg_ctr"))
avg_ctr_by_document.show()
In [70]:
documents_meta = load("documents_meta")
documents_meta.show(5, False)
In [71]:
documents_meta.count()
Out[71]:
Verify whether the document_id is unique in document_meta.
In [72]:
documents_meta.groupby("document_id").count().filter("count>1").count()
Out[72]:
How many source_ids are there?
In [73]:
documents_meta.select("source_id").distinct().count()
Out[73]:
How many publisher_ids are there?
In [74]:
documents_meta.select("publisher_id").distinct().count()
Out[74]:
In [75]:
documents_categories = load("documents_categories").drop_duplicates(["document_id", "category_id"])
documents_categories.printSchema()
documents_categories.show(5, False)
In [76]:
documents_categories.count()
Out[76]:
In [77]:
documents_categories.select("category_id").distinct().count()
Out[77]:
category_id is not indexed. Using string indexer to index it. One Hot Encoding is another option. Here I am not doing it because the I want to use the confidence_level to weigh in.
In [78]:
from pyspark.ml.feature import StringIndexer
#String indexer required string column
documents_categories = documents_categories.withColumn("category_id", F.expr("cast(category_id as string)"))
if "id" in documents_categories.columns:
documents_categories = documents_categories.drop("id")
category_indexer = StringIndexer(inputCol="category_id", outputCol="id")
documents_categories = category_indexer.fit(documents_categories).transform(documents_categories)
documents_categories.show()
Group the data by document_id and pack the other information in a array field.
In [79]:
documents_categories_n = documents_categories\
.withColumn("pair", F.struct("id", "confidence_level"))\
.groupBy("document_id")\
.agg(F.collect_list("pair").alias("categories"))
documents_categories_n.printSchema()
In [80]:
documents_categories_n.count()
Out[80]:
In [81]:
documents_categories_n.show(3, False)
In [82]:
documents_entities = load("documents_entities").drop_duplicates(["document_id", "entity_id"])
documents_entities.show(5, False)
Find stats around the number of entities per document.
In [83]:
documents_entities.groupBy("document_id").count().select("count").describe().show()
Find count of unique entity_ids.
In [84]:
documents_entities.select("entity_id").distinct().count()
Out[84]:
Apply StringIndex to index the entity_id.
In [85]:
if "id" in documents_entities.columns:
documents_entities = documents_entities.drop("id")
documents_entities = StringIndexer(inputCol="entity_id", outputCol="id")\
.fit(documents_entities)\
.transform(documents_entities)
documents_entities.show()
In [86]:
documents_entities_n = documents_entities\
.withColumn("pair", F.struct("id", "confidence_level"))\
.groupBy("document_id")\
.agg(F.collect_list("pair").alias("entities"))
documents_entities_n.printSchema()
In [87]:
documents_topics = load("documents_topics").drop_duplicates(["document_id","topic_id"])
documents_topics.show(5, False)
In [88]:
print(sorted([v.topic_id for v in documents_topics.select("topic_id").distinct().collect()]))
Topic_id seems already indexed. And there 300 topics are there.
In [89]:
documents_topics.select("topic_id").distinct().count()
Out[89]:
In [90]:
documents_topics_n = documents_topics\
.toDF("document_id", "id", "confidence_level")\
.withColumn("pair", F.struct("id", "confidence_level"))\
.groupBy("document_id")\
.agg(F.collect_list("pair").alias("topics"))
documents_topics_n.printSchema()
In [91]:
documents_topics_n.count()
Out[91]:
Create a field new field for each dataset to indicate the original source file and then join all 4 datasets - categories, entities, topics and meta
In [92]:
docs = documents_meta.join(documents_categories_n, on = "document_id", how = "full")
docs = docs.join(documents_topics_n, on = "document_id", how = "full")
docs = docs.join(documents_entities_n, on = "document_id", how = "full")
docs.persist(StorageLevel.DISK_ONLY).count()
docs.printSchema()
In [93]:
%show docs
Out[93]:
Find count of null values of each type of information - meta, category, entity and topic.
In [94]:
docs.selectExpr("sum(if(isnull(topics), 1, 0)) null_topics"
, "sum(if(isnull(categories), 1, 0)) null_categories"
, "sum(if(isnull(entities), 1, 0)) null_entities"
, "sum(if(isnull(publisher_id), 1, 0)) null_meta").first()
Out[94]:
How many document topics are there?
In [95]:
docs.select(F.explode("topics")).select("col.id").distinct().count()
Out[95]:
How many document categories are there?
In [96]:
docs.select(F.explode("categories")).select("col.id").distinct().count()
Out[96]:
How many document entities are there?
In [97]:
docs.select(F.explode("entities")).select("col.id").distinct().count()
Out[97]:
In [98]:
from pyspark.ml.linalg import SparseVector, VectorUDT
def to_vector(values, n):
if values is not None:
values = sorted(values, key=lambda v: v.id)
indices = [v.id for v in values]
values = [v.confidence_level for v in values]
return SparseVector(n, indices, values)
return SparseVector(n, [], [])
spark.udf.register("to_vector", to_vector, VectorUDT())
docs_vectorized = docs\
.withColumn("topics_vector", F.expr("to_vector(topics, 300)"))\
.withColumn("categories_vector", F.expr("to_vector(categories, 97)"))
#.withColumn("entities_vector", F.expr("to_vector(entities, 1326009)"))
docs_vectorized.printSchema()
docs_vectorized.cache().count()
Out[98]:
In [99]:
docs_vectorized.select("topics_vector").first()
Out[99]:
In [100]:
docs_vectorized.select("categories_vector").count()
Out[100]:
In [101]:
from pyspark.ml.feature import IDF, Tokenizer
if "topics_idf" in docs.columns:
docs = docs.drop("topics_idf")
if "entities_idf" in docs.columns:
docs = docs.drop("entities_idf")
if "categories_idf" in docs.columns:
docs = docs.drop("categories_idf")
topics_idf = IDF(inputCol="topics_vector", outputCol="topics_idf")
entities_idf = IDF(inputCol="entities_vector", outputCol="entities_idf")
categories_idf = IDF(inputCol="categories_vector", outputCol="categories_idf")
df1 = docs_vectorized
df2 = topics_idf.fit(df1).transform(df1).cache()
df3 = categories_idf.fit(df2).transform(df2).cache()
#df4 = entities_idf.fit(df3).transform(df3).cache()
docs_idf = df3
docs_idf.printSchema()
In [102]:
docs_idf.select("document_id", "topics_idf", "categories_idf").first()
Out[102]:
Column | Description |
---|---|
user_has_already_viewed_doc | For each content recommended to the user, verify whether the user had previously visited that pages. |
user_views_count | Do eager readers behave differently from other users? Let’s add this feature and let machine learning models guess that. |
user_views_categories, user_views_topics, user_views_entities | User profile vectors based on categories, topics and entities of documents that users have previously viewed (weighted by confidence and TF-IDF), to model users preferences in a Content-Based Filtering approach |
user_avg_views_of_distinct_docs | Ratio between (#user_distinct_docs_views / #user_views), indicating how often users read previously visited pages again. |
In [103]:
user_has_already_viewed_doc = (page_views
.withColumn("user_has_already_viewed_doc"
, F.expr("((ROW_NUMBER() OVER (PARTITION BY uuid, document_id ORDER BY timestamp))) > 1"))
.select("uuid", "document_id", "timestamp", "user_has_already_viewed_doc")
)
%show user_has_already_viewed_doc.filter("uuid = '6c4a7527da27d7' and document_id = 38922")
Out[103]:
In [104]:
user_views_count = (page_views
.withColumn("user_views_count",
F.expr("COUNT(1) OVER (PARTITION BY uuid ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING)"))
.select("uuid", "timestamp", "user_views_count"))
%show user_views_count.filter("uuid = '6c4a7527da27d7' and document_id = 38922")
Out[104]:
In [105]:
#page_views = page_views.withColumn("user_avg_views_of_distinct_docs", F.expr("COUNT(distinct document_id) " +
# "OVER (PARTITION BY uuid ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING)"))
#
#%show page_views.filter("uuid = '6c4a7527da27d7' and document_id = 38922")
Column | Description |
---|---|
doc_ad_days_since_published, doc_event_days_since_published | Days elapsed since the ad document was published in a given user visit. The general assumption is that new content is more relevant to users. But if you are reading an old post, you might be interested in other old posts. |
doc_avg_views_by_distinct_users_cf | Average page views of the ad document by distinct users. Is this a webpage people usually return to? |
ad_views_count, doc_views_count | How popular is a document or ad? |
In [106]:
doc_event_days_since_published = (events
.join(documents_meta, on = "document_id")
.selectExpr("display_id"
, "document_id"
, "timestamp"
, "publish_time"
, "datediff(timestamp, publish_time) age")
)
doc_event_days_since_published.show()
In [107]:
page_view_count_by_document_id = page_views.groupBy("document_id")\
.count().withColumn("page_view_count_by_document_id", F.expr("log(count)"))\
.select("document_id", "page_view_count_by_document_id")
page_view_count_by_document_id.show()
In [108]:
page_view_count_by_uuid = (page_views
.groupBy("uuid")
.count()
.withColumn("page_view_count_by_uuid", F.expr("log(count)"))
.select("uuid", "page_view_count_by_uuid"))
page_view_count_by_uuid.show()
Column | Description |
---|---|
event_local_hour (binned), event_weekend | Event timestamps were in UTC-4, so I processed event geolocation to get timezones and adjust for users' local time. They were binned in periods like morning, afternoon, midday, evening, night. A flag indicating whether it was a weekend was also included. The assumption here is that time influences the kind of content users will read. |
event_country, event_country_state | The field event_geolocation was parsed to extract the user’s country and state in a page visit. |
ad_id, doc_event_id, doc_ad_id, ad_advertiser, … | All of the original categorical fields were One-Hot Encoded to be used by the models, generating about 126,000 features. |
In [109]:
events.selectExpr("uuid", "geo_location"
, "split(geo_location, '>')[0] country"
, "split(geo_location, '>')[1] state"
).show()
In [110]:
events.selectExpr("split(geo_location, '>')[0] country").distinct().toPandas()
Out[110]:
In [111]:
(events
.selectExpr("display_id", "timestamp", "hour(timestamp) hour")
.withColumn("day_session", F.expr("hour % 8"))).show()
Top countries by ad clicks
In [112]:
events.selectExpr("split(geo_location, '>')[0] country")\
.groupBy("country").count().orderBy(F.desc("count")).show()
In [113]:
events.cache()
clicks_train.cache()
documents_meta.cache()
promoted_contents.cache()
Out[113]:
In [114]:
avg_ctrs_by_ad_id = clicks_train.groupBy("ad_id").agg(F.avg("clicked").alias("avg_ctr_by_ad_id"))
%show avg_ctrs_by_ad_id
Out[114]:
In [115]:
avg_ctrs_by_campaign_id = (clicks_train
.join(promoted_contents, on = "ad_id")
.groupBy("campaign_id")
.agg(F.avg("clicked").alias("avg_ctr_by_campaign_id")))
%show avg_ctrs_by_campaign_id
Out[115]:
In [116]:
avg_ctrs_by_advertiser_id = (clicks_train
.join(promoted_contents, on = "ad_id")
.groupBy("advertiser_id")
.agg(F.avg("clicked").alias("avg_ctr_by_advertiser_id"))
.cache()
)
%show avg_ctrs_by_advertiser_id
Out[116]:
In [117]:
avg_ctrs_by_document_id = (clicks_train
.join(promoted_contents, on = "ad_id")
.groupBy("document_id")
.agg(F.avg("clicked").alias("avg_ctr_by_document_id"))
.cache()
)
%show avg_ctrs_by_document_id
Out[117]:
In [118]:
avg_ctrs_by_time = (events
.join(documents_meta, on = "document_id", how = "left")
.join(clicks_train, on = "display_id", how = "left")
.join(promoted_contents, on = "ad_id", how = "left")
.withColumn("total_clicks_by_ad_id"
, F.expr("SUM(clicked) OVER (PARTITION BY ad_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING)"))
.withColumn("total_events_by_ad_id"
, F.expr("COUNT(*) OVER (PARTITION BY ad_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING)"))
.withColumn("total_clicks_by_advertiser_id"
, F.expr("SUM(clicked) OVER (PARTITION BY advertiser_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING)"))
.withColumn("total_events_by_advertiser_id"
, F.expr("COUNT(*) OVER (PARTITION BY advertiser_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING)"))
.withColumn("total_clicks_by_campaign_id"
, F.expr("SUM(clicked) OVER (PARTITION BY campaign_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING)"))
.withColumn("total_events_by_campaign_id"
, F.expr("COUNT(*) OVER (PARTITION BY campaign_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING)"))
.withColumn("total_clicks_by_publisher_id"
, F.expr("SUM(clicked) OVER (PARTITION BY publisher_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING)"))
.withColumn("total_events_by_publisher_id"
, F.expr("COUNT(*) OVER (PARTITION BY publisher_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING)"))
.selectExpr("display_id"
, "timestamp"
, "(total_clicks_by_advertiser_id/total_events_by_advertiser_id) avg_ctr_by_advertiser_id"
, "(total_clicks_by_campaign_id/total_events_by_campaign_id) avg_ctr_by_campaign_id"
, "(total_clicks_by_ad_id/total_events_by_ad_id) avg_ctr_by_ad_id"
, "(total_clicks_by_publisher_id/total_events_by_publisher_id) avg_ctr_by_publisher_id")
)
%show avg_ctrs_by_time
Out[118]:
Column | Description |
---|---|
user_doc_ad_sim_categories, user_doc_ad_sim_topics, user_doc_ad_sim_entities | Cosine similarity between user profile and ad document profile vectors (TF-IDF). |
doc_event_doc_ad_sim_categories, doc_event_doc_ad_sim_topics, doc_event_doc_ad_sim_entities | Cosine similarity between event document (landing page context) and ad document profile vectors (TF-IDF). |
In [119]:
docs.printSchema()
In [120]:
docs_idf.drop("document_id").printSchema()
In [121]:
clicks = (clicks_train.withColumn("is_train", F.lit(1))
.union(clicks_test
.withColumn("clicked", F.lit(0))
.withColumn("is_train", F.lit(0))))
df = (clicks
.join(events.alias("events"), on = ["display_id"], how = "left")
.join(docs_idf.alias("docs_idf"), on = ["document_id"], how = "left")
.join(promoted_contents.drop("document_id"), on = ["ad_id"], how = "left")
.join(page_view_count_by_uuid, on = ["uuid"], how = "left")
.join(page_view_count_by_document_id, on = ["document_id"], how = "left")
.withColumn("clicks_by_ad_id", F.expr("sum(clicked) over (partition by ad_id)"))
.withColumn("events_by_ad_id", F.expr("count(*) over (partition by ad_id)"))
.withColumn("avg_ctr_by_ad_id", F.expr("clicks_by_ad_id/events_by_ad_id"))
.withColumn("clicks_by_campaign_id", F.expr("sum(clicked) over (partition by campaign_id)"))
.withColumn("events_by_campaign_id", F.expr("count(*) over (partition by campaign_id)"))
.withColumn("avg_ctr_by_campaign_id", F.expr("clicks_by_campaign_id/events_by_campaign_id"))
.withColumn("clicks_by_document_id", F.expr("sum(clicked) over (partition by events.document_id)"))
.withColumn("events_by_document_id", F.expr("count(*) over (partition by events.document_id)"))
.withColumn("avg_ctr_by_document_id", F.expr("clicks_by_campaign_id/events_by_document_id"))
.withColumn("clicks_by_advertiser_id", F.expr("sum(clicked) over (partition by advertiser_id)"))
.withColumn("events_by_advertiser_id", F.expr("count(*) over (partition by advertiser_id)"))
.withColumn("avg_ctr_by_advertiser_id", F.expr("clicks_by_campaign_id/events_by_advertiser_id"))
.withColumn("country", F.expr("split(geo_location, '>')[0]"))
.withColumn("state", F.expr("split(geo_location, '>')[1]"))
.withColumn("doc_age", F.expr("datediff(timestamp, publish_time)"))
.withColumn("session", F.expr("cast((hour(timestamp) % 8) as string)"))
.withColumn("source_id", F.expr("cast(source_id as string)"))
.withColumn("publisher_id", F.expr("cast(publisher_id as string)"))
)
df.printSchema()
In [123]:
df.write.mode("overwrite").save(base_path + "merged_enriched")
df.printSchema()
In [124]:
df = load("merged_enriched", cache = False)
df.count()
Out[124]:
In [125]:
features = [
'platform'
, 'source_id'
, 'publisher_id'
, 'topics_idf'
, 'categories_idf'
, 'avg_ctr_by_ad_id'
, 'avg_ctr_by_campaign_id'
, 'avg_ctr_by_document_id'
, 'avg_ctr_by_advertiser_id'
, "country"
, "state"
, "doc_age"
, "session"
, "ad_id"
, "display_id"
, "is_train"
, "clicked"
]
In [126]:
df.selectExpr(*features).printSchema()
In [127]:
%show df
Out[127]:
In [128]:
df1 = df
In [129]:
def show_null_counts(df):
null_testers = ["sum(if(isnull(%s), 1, 0)) %s" % (f, f) for f in df.columns]
null_counts = df.selectExpr(*null_testers).toPandas().T
null_counts.columns = ["Count"]
null_counts["pct"] = null_counts.Count/df.count()
null_counts["dtype"] = [t[1] for t in df.dtypes]
print(null_counts.to_string())
show_null_counts(df.selectExpr(*features))
In [130]:
df_trunc = df.selectExpr(*features)
distinct_counts = df_trunc.selectExpr(*["approx_count_distinct(%s)"
% f for f in df.selectExpr(*features).columns]).toPandas()
print(distinct_counts.T.to_string())
In [131]:
fill_na_values = {"platform": "<null>"
, "source_id": "<null>"
, "publisher_id": "<null>"
, "avg_ctr_by_ad_id": 0.0
, "avg_ctr_by_campaign_id": 0.0
, "avg_ctr_by_document_id": 0.0
, "avg_ctr_by_advertiser_id": 0.0
, "country": "null"
, "state": "null"
, "doc_age": -1}
df_null_removed = df.selectExpr(*features).na.fill(fill_na_values)
show_null_counts(df_null_removed)
In [132]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
In [133]:
categorical_columns = [col for col, dtype in df_null_removed.dtypes if dtype == "string"]
df_string_indexed = df_null_removed
for col in categorical_columns:
indexer = StringIndexer(inputCol=col, outputCol="%s_index" % col)
df_string_indexed = indexer.fit(df_string_indexed).transform(df_string_indexed)
one_hot_estimator = OneHotEncoderEstimator(
inputCols = [col + "_index" for col in categorical_columns],
outputCols = [col + "_vec" for col in categorical_columns]
)
df_ohe = one_hot_estimator.fit(df_string_indexed).transform(df_string_indexed)
In [134]:
df_ohe.dtypes
Out[134]:
In [135]:
to_be_vectorized = [('topics_idf', 'vector'),
('categories_idf', 'vector'),
('avg_ctr_by_ad_id', 'double'),
('avg_ctr_by_campaign_id', 'double'),
('avg_ctr_by_document_id', 'double'),
('avg_ctr_by_advertiser_id', 'double'),
('doc_age', 'int'),
('country_vec', 'vector'),
('session_vec', 'vector'),
('source_id_vec', 'vector'),
('state_vec', 'vector'),
('publisher_id_vec', 'vector'),
('platform_vec', 'vector')]
vector_assembler = VectorAssembler(inputCols = [c for c, _ in to_be_vectorized], outputCol="features")
In [136]:
df_vectorized = vector_assembler.transform(df_ohe)
df_vectorized.dtypes
Out[136]:
In [137]:
df_train, df_test = df_vectorized.filter("is_train = 1").select("display_id", "ad_id", "clicked", "features")\
.randomSplit(weights=[0.7, 0.3], seed = 1)
cache_df(df_train, "df_train")
df_train.printSchema()
In [138]:
df_train.count()
Out[138]:
In [139]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.1, elasticNetParam=0.8, featuresCol="features", labelCol="clicked")
lrModel = lr.fit(df_train)
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
trainingSummary = lrModel.summary
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
print(objective)
# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))
In [140]:
# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
print("label %d: %s" % (i, rate))
print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
print("label %d: %s" % (i, rate))
print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
print("label %d: %s" % (i, prec))
print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
print("label %d: %s" % (i, rec))
print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
print("label %d: %s" % (i, f))
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
% (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))
In [142]:
lrModel.write().overwrite().save(base_path + "lrModel")
In [143]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
In [ ]:
# Make predictions.
predictions = lrModel.transform(df_test)
# Select example rows to display.
predictions.select("prediction", "clicked", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="clicked", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy))
In [ ]: