Read raw json files to DataFrame
In [ ]:
journal = sqlContext.read.json("../vpsdata/journal*")
Remove duplicates, format date and save as parquet files
In [ ]:
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType
journal.withColumn('changeDate', col('changeDate').cast(TimestampType())) \
.distinct().repartition(50).write.parquet("journal")
Read raw jsons to DataFrame
In [ ]:
items = sqlContext.read.json("../vpsdata/items*")
schema = items.schema
Reduce items by getting latest and save as parquet
In [ ]:
reduced_items_rdd = items.map(lambda i: (i.itemInfo.itId, i)) \
.reduceByKey(lambda i1, i2: i1 if i1.itemInfo.itHitCount > i2.itemInfo.itHitCount else i2) \
.map(lambda a: a[1])
sqlContext.createDataFrame(reduced_items_rdd, schema).repartition(30).write.parquet("items")
Join journal buy-now events with items
In [ ]:
journal = sqlContext.read.parquet("journal")
buynows = journal.where(col('changeType') == 'now')
items = sqlContext.read.parquet("items")
transactions = buynows.join(items, buynows.itemId == items.itemInfo.itId)
Remap structure and save
In [ ]:
from pyspark.sql.types import MapType, ArrayType, StringType
def remap_attributes_funct(attributes):
try:
return {a.attribName: a.attribValues.item for a in attributes}
except:
return None
make_cat_array = udf(lambda raw: [cat.catName for cat in raw], ArrayType(StringType()))
remap_attributes = udf(remap_attributes_funct, MapType(keyType=StringType(), valueType=ArrayType(StringType())))
transactions \
.where(col('itemCats.item').isNotNull()) \
.select(
col('changeDate').alias('date'),
col('itemId'),
col('itemInfo.itName').alias('name'),
col('itemInfo.itBuyNowPrice').cast('decimal(10,2)').alias('price'),
col('itemInfo.itSellerLogin').alias('seller'),
col('itemCats.item')[size('itemCats.item') - 1].catId.alias('categoryId'),
make_cat_array(col('itemCats.item')).alias('categories'),
remap_attributes(col('itemAttribs.item')).alias('attributes')
).write \
.parquet("transactions")