In [1]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, ByteType
from pyspark.ml.feature import StringIndexer
In [2]:
review_df = (
spark.read.json(
path='../data/yelp_dataset_challenge_round9/yelp_academic_dataset_review.json'
)
.select(
'user_id',
'business_id',
col('stars').cast(ByteType()).alias('stars')
)
)
In [3]:
review_df.printSchema()
In [4]:
restaurant_ids_df = spark.read.json(
path='../data/restaurants'
).select('business_id')
In [5]:
restaurant_review_df = review_df.join(
other=restaurant_ids_df,
on='business_id',
how='inner'
)
In [6]:
restaurant_review_df.printSchema()
In [7]:
user_idx_mdl = (
StringIndexer(inputCol='user_id', outputCol='user_idx')
.fit(restaurant_review_df)
)
business_idx_mdl = (
StringIndexer(inputCol='business_id', outputCol='business_idx')
.fit(restaurant_review_df)
)
In [8]:
# save reviews in business_id(IntegerType), user_id(IntegerType),
# stars(ByteType) format
# max business_id = 48484, max user_id = 721778
restaurant_review_df2 = (
business_idx_mdl.transform(
user_idx_mdl.transform(
restaurant_review_df
)
)
.select(
col('business_idx').cast(IntegerType()).alias('business_id'),
col('user_idx').cast(IntegerType()).alias('user_id'),
'stars'
)
)
In [9]:
restaurant_review_df2.printSchema()
In [14]:
restaurant_review_df2.write.parquet(
path='../data/reviews',
mode='overwrite',
compression='gzip'
)
In [11]:
# Number of all reviews
# review_df.count()
# 4153150
In [12]:
# Number of restaurant reviews
# restaurant_review_df.count()
# 2577298
In [13]:
# Number of unique users that reviewed at least one restaurant
# restaurant_review_df.groupBy('user_id').count().count()
# 721779
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: