Journal

Read raw json files to DataFrame


In [ ]:
journal = sqlContext.read.json("../vpsdata/journal*")

Remove duplicates, format date and save as parquet files


In [ ]:
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType

journal.withColumn('changeDate', col('changeDate').cast(TimestampType())) \
  .distinct().repartition(50).write.parquet("journal")

Items

Read raw jsons to DataFrame


In [ ]:
items = sqlContext.read.json("../vpsdata/items*")
schema = items.schema

Reduce items by getting latest and save as parquet


In [ ]:
reduced_items_rdd = items.map(lambda i: (i.itemInfo.itId, i)) \
    .reduceByKey(lambda i1, i2: i1 if i1.itemInfo.itHitCount > i2.itemInfo.itHitCount else i2) \
    .map(lambda a: a[1])
    
sqlContext.createDataFrame(reduced_items_rdd, schema).repartition(30).write.parquet("items")

Transactions

Join journal buy-now events with items


In [ ]:
journal = sqlContext.read.parquet("journal")
buynows = journal.where(col('changeType') == 'now')

items = sqlContext.read.parquet("items")

transactions = buynows.join(items, buynows.itemId == items.itemInfo.itId)

Remap structure and save


In [ ]:
from pyspark.sql.types import MapType, ArrayType, StringType

def remap_attributes_funct(attributes):
    try:
        return {a.attribName: a.attribValues.item for a in attributes}
    except:
        return None

make_cat_array = udf(lambda raw: [cat.catName for cat in raw], ArrayType(StringType()))
remap_attributes = udf(remap_attributes_funct, MapType(keyType=StringType(), valueType=ArrayType(StringType())))

transactions \
    .where(col('itemCats.item').isNotNull()) \
    .select(
        col('changeDate').alias('date'),
        col('itemId'),
        col('itemInfo.itName').alias('name'),
        col('itemInfo.itBuyNowPrice').cast('decimal(10,2)').alias('price'),
        col('itemInfo.itSellerLogin').alias('seller'),
        col('itemCats.item')[size('itemCats.item') - 1].catId.alias('categoryId'),
        make_cat_array(col('itemCats.item')).alias('categories'),
        remap_attributes(col('itemAttribs.item')).alias('attributes')
    ).write \
    .parquet("transactions")