In [2]:
array = range(1,1000)
In [3]:
p =sc.parallelize(array)
In [4]:
q = p.map(lambda x: x*x)
In [5]:
q.take(5)
Out[5]:
This file was uploaded to the server using Jupyter notebook and then added to HDFS using
hadoop fs -put Demographic_Statistics_By_Zip_Code.csv ../data/
from the command window in jupyter notebooks
In [48]:
demo = sc.textFile('../data/Demographic_Statistics_By_Zip_Code.csv').cache()
demo.take(2)
Out[48]:
In [49]:
cols = demo.take(1)
print cols[0].split(',')
In [50]:
headers = [item.replace(' ', '_') for item in cols[0].split(',')]
In [51]:
raw_rows = demo.filter(lambda line: not line.startswith('JURISDICTION NAME,')).map(lambda row: row.split(','))
raw_rows.take(3)
Out[51]:
In [52]:
head_b = sc.broadcast(headers)
from pyspark.sql import Row
In [53]:
#this is a very naive implementation and should not be used in prod
def detect_data(data):
try:
return int(data)
except ValueError:
pass
try:
return float(data)
except ValueError:
return data
def raw_to_row(row):
clean_row = [detect_data(value) for value in row]
values_dict = dict(zip(head_b.value, clean_row))
return Row(**values_dict)
schema_rows = raw_rows.map(raw_to_row).toDF()
schema_rows.take(3)
Out[53]:
In [54]:
sqlCtx.registerDataFrameAsTable(schema_rows, 'demographics')
sqlCtx.cacheTable('demographics')
In [55]:
sqlCtx.sql('select MAX(PERCENT_FEMALE) from demographics').collect()
Out[55]:
In [56]:
sqlCtx.sql('select JURISDICTION_NAME from demographics where PERCENT_FEMALE > 0.90').collect()
Out[56]:
In [57]:
sqlCtx.tableNames()
Out[57]:
In [45]:
demo = sqlCtx.table('demographics')
In [47]:
demo.write.format('parquet').saveAsTable('demographics_parquet')
In [ ]: