It is not possible to load the larger data sets in the memory of a local machine therefeore an alternative is to import them to a psql table and query them from there. By adding the right indices this can make the queries fast enough. After this import one can extract some basic statistics using sql and also export smaller portions of the data which can be handled by spark or pandas on a local machine.
In [1]:
import timeit
def stopwatch(function):
start_time = timeit.default_timer()
result = function()
print('Elapsed time: %i sec' % int(timeit.default_timer() - start_time))
return result
Unfortunately psql does not support an import of record json files therefore we need to convert the data sets to csv. We use here the command line tool json2csv.
WARNING: The following two commands will run for a while, especially the second one. You can expect approximately 1 minute per GB of unzipped data.
In [196]:
start_time = timeit.default_timer()
!ls ./data/large-datasets/*.gz | grep -Po '.*(?=.gz)' | xargs -I {} gunzip {}.gz
print('Elapsed time: %i sec' % int(timeit.default_timer() - start_time))
In [2]:
start_time = timeit.default_timer()
!ls ./data/large-datasets/*.json | xargs sed -i 's/|/?/g;s/\u0000/?/g'
print('Elapsed time: %i sec' % int(timeit.default_timer() - start_time))
In [3]:
start_time = timeit.default_timer()
!ls ./data/large-datasets/*.json | grep -Po '.*(?=.json)' | xargs -I {} json2csv -p -d '|' -k asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime -i {}.json -o {}.csv
!rm ./data/large-datasets/*.json
print('Elapsed time: %i sec' % int(timeit.default_timer() - start_time))
In [4]:
import psycopg2 as pg
import pandas as pd
db_conf = {
'user': 'mariosk',
'database': 'amazon_reviews'
}
connection_factory = lambda: pg.connect(user=db_conf['user'], database=db_conf['database'])
def transaction(*statements):
try:
connection = connection_factory()
cursor = connection.cursor()
for statement in statements:
cursor.execute(statement)
connection.commit()
cursor.close()
except pg.DatabaseError as error:
print(error)
finally:
if connection is not None:
connection.close()
def query(statement):
try:
connection = connection_factory()
cursor = connection.cursor()
cursor.execute(statement)
header = [ description[0] for description in cursor.description ]
rows = cursor.fetchall()
cursor.close()
return pd.DataFrame.from_records(rows, columns=header)
except (Exception, pg.DatabaseError) as error:
print(error)
return None
finally:
if connection is not None:
connection.close()
In [5]:
import re
table_names = [ re.search('reviews_(.*)_5.csv', filename).group(1)
for filename
in sorted(os.listdir('./data/large-datasets'))
if not filename.endswith('json') ]
In [6]:
def create_table(table_name):
transaction(
'create table %s (asin text, helpful text, overall double precision, reviewText text, reviewTime text, reviewerID text, reviewerName text, summary text, unixReviewTime int);' % table_name,
'create index {0}_asin ON {0} (asin);'.format(table_name),
'create index {0}_overall ON {0} (overall);'.format(table_name),
'create index {0}_reviewerID ON {0} (reviewerID);'.format(table_name),
'create index {0}_unixReviewTime ON {0} (unixReviewTime);'.format(table_name))
for table_name in table_names:
create_table(table_name)
In [7]:
start_time = timeit.default_timer()
!ls ./data/large-datasets | grep -Po '(?<=reviews_).*(?=_5.csv)' | xargs -I {} psql -U mariosk -d amazon_reviews -c "\copy {} from './data/large-datasets/reviews_{}_5.csv' with (format csv, delimiter '|', header true);"
print('Elapsed time: %i sec' % int(timeit.default_timer() - start_time))
In [67]:
def average_reviews_per_product(table_name):
return (query('''
with distinct_products as (select count(distinct asin) as products from {0}),
reviews_count as (select cast(count(*) as double precision) as reviews from {0})
select reviews / products as reviews_per_product
from distinct_products cross join reviews_count
'''.format(table_name))
.rename(index={0: table_name.replace('_', ' ')}))
In [68]:
def average_reviews_per_reviewer(table_name):
return (query('''
with distinct_reviewers as (select count(distinct reviewerID) as reviewers from {0}),
reviews_count as (select cast(count(*) as double precision) as reviews from {0})
select reviews / reviewers as reviews_per_reviewer
from distinct_reviewers cross join reviews_count
'''.format(table_name))
.rename(index={ 0: table_name.replace('_', ' ')}))
In [69]:
def percentages_per_rating(table_name):
return (query('''
with rating_counts as (select overall, count(overall) as rating_count from {0} group by overall),
reviews_count as (select cast(count(*) as double precision) as reviews from {0})
select cast(overall as int) as dataset_name, rating_count / reviews as row
from rating_counts cross join reviews_count
'''.format(table_name))
.set_index('dataset_name')
.sort_index()
.transpose()
.rename(index={'row': table_name.replace('_', ' ')}))
In [75]:
def number_of_reviews(table_name):
return (query('''
select count(*) as number_of_reviews from {0}
'''.format(table_name))
.rename(index={ 0: table_name.replace('_', ' ') }))
In [76]:
def all_metrics(table_name):
print(table_name)
return pd.concat(
[ f(table_name)
for f
in [ percentages_per_rating, number_of_reviews, average_reviews_per_product, average_reviews_per_reviewer ]],
axis=1)
In [78]:
metrics = stopwatch(lambda: pd.concat([ all_metrics(table) for table in table_names ]))
In [79]:
metrics.index.name = 'dataset_name'
metrics.to_csv('./metadata/large-datasets-evaluation-metrics.csv')
In [80]:
metrics
Out[80]: