In [1]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, ByteType
from pyspark.ml.feature import StringIndexer

In [2]:
review_df = (
    spark.read.json(
        path='../data/yelp_dataset_challenge_round9/yelp_academic_dataset_review.json'
    )
    .select(
        'user_id',
        'business_id',
        col('stars').cast(ByteType()).alias('stars')
    )
)

In [3]:
review_df.printSchema()


root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: byte (nullable = true)


In [4]:
restaurant_ids_df = spark.read.json(
    path='../data/restaurants'
).select('business_id')

In [5]:
restaurant_review_df = review_df.join(
    other=restaurant_ids_df,
    on='business_id',
    how='inner'
)

In [6]:
restaurant_review_df.printSchema()


root
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- stars: byte (nullable = true)


In [7]:
user_idx_mdl = (
    StringIndexer(inputCol='user_id', outputCol='user_idx')
    .fit(restaurant_review_df)
)

business_idx_mdl = (
    StringIndexer(inputCol='business_id', outputCol='business_idx')
    .fit(restaurant_review_df)
)

In [8]:
# save reviews in business_id(IntegerType), user_id(IntegerType),
# stars(ByteType) format
# max business_id = 48484, max user_id = 721778

restaurant_review_df2 = (
    business_idx_mdl.transform(
        user_idx_mdl.transform(
            restaurant_review_df
        )
    )
    .select(
        col('business_idx').cast(IntegerType()).alias('business_id'),
        col('user_idx').cast(IntegerType()).alias('user_id'),
        'stars'
    )
)

In [9]:
restaurant_review_df2.printSchema()


root
 |-- business_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- stars: byte (nullable = true)


In [14]:
restaurant_review_df2.write.parquet(
    path='../data/reviews',
    mode='overwrite',
    compression='gzip'
)

In [11]:
# Number of all reviews
# review_df.count()
# 4153150

In [12]:
# Number of restaurant reviews
# restaurant_review_df.count()
# 2577298

In [13]:
# Number of unique users that reviewed at least one restaurant
# restaurant_review_df.groupBy('user_id').count().count()
# 721779

In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]: