In [2]:
array = range(1,1000)

In [3]:
p  =sc.parallelize(array)

In [4]:
q = p.map(lambda x: x*x)

In [5]:
q.take(5)


Out[5]:
[1, 4, 9, 16, 25]

This file was uploaded to the server using Jupyter notebook and then added to HDFS using

hadoop fs -put Demographic_Statistics_By_Zip_Code.csv ../data/

from the command window in jupyter notebooks


In [48]:
demo = sc.textFile('../data/Demographic_Statistics_By_Zip_Code.csv').cache()
demo.take(2)


Out[48]:
[u'JURISDICTION NAME,COUNT PARTICIPANTS,COUNT FEMALE,PERCENT FEMALE,COUNT MALE,PERCENT MALE,COUNT GENDER UNKNOWN,PERCENT GENDER UNKNOWN,COUNT GENDER TOTAL,PERCENT GENDER TOTAL,COUNT PACIFIC ISLANDER,PERCENT PACIFIC ISLANDER,COUNT HISPANIC LATINO,PERCENT HISPANIC LATINO,COUNT AMERICAN INDIAN,PERCENT AMERICAN INDIAN,COUNT ASIAN NON HISPANIC,PERCENT ASIAN NON HISPANIC,COUNT WHITE NON HISPANIC,PERCENT WHITE NON HISPANIC,COUNT BLACK NON HISPANIC,PERCENT BLACK NON HISPANIC,COUNT OTHER ETHNICITY,PERCENT OTHER ETHNICITY,COUNT ETHNICITY UNKNOWN,PERCENT ETHNICITY UNKNOWN,COUNT ETHNICITY TOTAL,PERCENT ETHNICITY TOTAL,COUNT PERMANENT RESIDENT ALIEN,PERCENT PERMANENT RESIDENT ALIEN,COUNT US CITIZEN,PERCENT US CITIZEN,COUNT OTHER CITIZEN STATUS,PERCENT OTHER CITIZEN STATUS,COUNT CITIZEN STATUS UNKNOWN,PERCENT CITIZEN STATUS UNKNOWN,COUNT CITIZEN STATUS TOTAL,PERCENT CITIZEN STATUS TOTAL,COUNT RECEIVES PUBLIC ASSISTANCE,PERCENT RECEIVES PUBLIC ASSISTANCE,COUNT NRECEIVES PUBLIC ASSISTANCE,PERCENT NRECEIVES PUBLIC ASSISTANCE,COUNT PUBLIC ASSISTANCE UNKNOWN,PERCENT PUBLIC ASSISTANCE UNKNOWN,COUNT PUBLIC ASSISTANCE TOTAL,PERCENT PUBLIC ASSISTANCE TOTAL',
 u'10001,44,22,0.5,22,0.5,0,0,44,100,0,0,16,0.36,0,0,3,0.07,1,0.02,21,0.48,3,0.07,0,0,44,100,2,0.05,42,0.95,0,0,0,0,44,100,20,0.45,24,0.55,0,0,44,100']

In [49]:
cols = demo.take(1)
print cols[0].split(',')


[u'JURISDICTION NAME', u'COUNT PARTICIPANTS', u'COUNT FEMALE', u'PERCENT FEMALE', u'COUNT MALE', u'PERCENT MALE', u'COUNT GENDER UNKNOWN', u'PERCENT GENDER UNKNOWN', u'COUNT GENDER TOTAL', u'PERCENT GENDER TOTAL', u'COUNT PACIFIC ISLANDER', u'PERCENT PACIFIC ISLANDER', u'COUNT HISPANIC LATINO', u'PERCENT HISPANIC LATINO', u'COUNT AMERICAN INDIAN', u'PERCENT AMERICAN INDIAN', u'COUNT ASIAN NON HISPANIC', u'PERCENT ASIAN NON HISPANIC', u'COUNT WHITE NON HISPANIC', u'PERCENT WHITE NON HISPANIC', u'COUNT BLACK NON HISPANIC', u'PERCENT BLACK NON HISPANIC', u'COUNT OTHER ETHNICITY', u'PERCENT OTHER ETHNICITY', u'COUNT ETHNICITY UNKNOWN', u'PERCENT ETHNICITY UNKNOWN', u'COUNT ETHNICITY TOTAL', u'PERCENT ETHNICITY TOTAL', u'COUNT PERMANENT RESIDENT ALIEN', u'PERCENT PERMANENT RESIDENT ALIEN', u'COUNT US CITIZEN', u'PERCENT US CITIZEN', u'COUNT OTHER CITIZEN STATUS', u'PERCENT OTHER CITIZEN STATUS', u'COUNT CITIZEN STATUS UNKNOWN', u'PERCENT CITIZEN STATUS UNKNOWN', u'COUNT CITIZEN STATUS TOTAL', u'PERCENT CITIZEN STATUS TOTAL', u'COUNT RECEIVES PUBLIC ASSISTANCE', u'PERCENT RECEIVES PUBLIC ASSISTANCE', u'COUNT NRECEIVES PUBLIC ASSISTANCE', u'PERCENT NRECEIVES PUBLIC ASSISTANCE', u'COUNT PUBLIC ASSISTANCE UNKNOWN', u'PERCENT PUBLIC ASSISTANCE UNKNOWN', u'COUNT PUBLIC ASSISTANCE TOTAL', u'PERCENT PUBLIC ASSISTANCE TOTAL']

In [50]:
headers = [item.replace(' ', '_') for item in cols[0].split(',')]

In [51]:
raw_rows = demo.filter(lambda line: not line.startswith('JURISDICTION NAME,')).map(lambda row: row.split(','))
raw_rows.take(3)


Out[51]:
[[u'10001',
  u'44',
  u'22',
  u'0.5',
  u'22',
  u'0.5',
  u'0',
  u'0',
  u'44',
  u'100',
  u'0',
  u'0',
  u'16',
  u'0.36',
  u'0',
  u'0',
  u'3',
  u'0.07',
  u'1',
  u'0.02',
  u'21',
  u'0.48',
  u'3',
  u'0.07',
  u'0',
  u'0',
  u'44',
  u'100',
  u'2',
  u'0.05',
  u'42',
  u'0.95',
  u'0',
  u'0',
  u'0',
  u'0',
  u'44',
  u'100',
  u'20',
  u'0.45',
  u'24',
  u'0.55',
  u'0',
  u'0',
  u'44',
  u'100'],
 [u'10002',
  u'35',
  u'19',
  u'0.54',
  u'16',
  u'0.46',
  u'0',
  u'0',
  u'35',
  u'100',
  u'0',
  u'0',
  u'1',
  u'0.03',
  u'0',
  u'0',
  u'28',
  u'0.8',
  u'6',
  u'0.17',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'35',
  u'100',
  u'2',
  u'0.06',
  u'33',
  u'0.94',
  u'0',
  u'0',
  u'0',
  u'0',
  u'35',
  u'100',
  u'2',
  u'0.06',
  u'33',
  u'0.94',
  u'0',
  u'0',
  u'35',
  u'100'],
 [u'10003',
  u'1',
  u'1',
  u'1',
  u'0',
  u'0',
  u'0',
  u'0',
  u'1',
  u'100',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'1',
  u'1',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'1',
  u'100',
  u'0',
  u'0',
  u'1',
  u'1',
  u'0',
  u'0',
  u'0',
  u'0',
  u'1',
  u'100',
  u'0',
  u'0',
  u'1',
  u'1',
  u'0',
  u'0',
  u'1',
  u'100']]

In [52]:
head_b = sc.broadcast(headers)
from pyspark.sql import Row

In [53]:
#this is a very naive implementation and should not be used in prod
def detect_data(data):
    try:
        return int(data)
    except ValueError:
        pass
    try:
        return float(data)
    except ValueError:
        return data

def raw_to_row(row):
    clean_row = [detect_data(value) for value in row]
    values_dict = dict(zip(head_b.value, clean_row))
    return Row(**values_dict)

schema_rows = raw_rows.map(raw_to_row).toDF()
schema_rows.take(3)


Out[53]:
[Row(COUNT_AMERICAN_INDIAN=0, COUNT_ASIAN_NON_HISPANIC=3, COUNT_BLACK_NON_HISPANIC=21, COUNT_CITIZEN_STATUS_TOTAL=44, COUNT_CITIZEN_STATUS_UNKNOWN=0, COUNT_ETHNICITY_TOTAL=44, COUNT_ETHNICITY_UNKNOWN=0, COUNT_FEMALE=22, COUNT_GENDER_TOTAL=44, COUNT_GENDER_UNKNOWN=0, COUNT_HISPANIC_LATINO=16, COUNT_MALE=22, COUNT_NRECEIVES_PUBLIC_ASSISTANCE=24, COUNT_OTHER_CITIZEN_STATUS=0, COUNT_OTHER_ETHNICITY=3, COUNT_PACIFIC_ISLANDER=0, COUNT_PARTICIPANTS=44, COUNT_PERMANENT_RESIDENT_ALIEN=2, COUNT_PUBLIC_ASSISTANCE_TOTAL=44, COUNT_PUBLIC_ASSISTANCE_UNKNOWN=0, COUNT_RECEIVES_PUBLIC_ASSISTANCE=20, COUNT_US_CITIZEN=42, COUNT_WHITE_NON_HISPANIC=1, JURISDICTION_NAME=10001, PERCENT_AMERICAN_INDIAN=0, PERCENT_ASIAN_NON_HISPANIC=0.07, PERCENT_BLACK_NON_HISPANIC=0.48, PERCENT_CITIZEN_STATUS_TOTAL=100, PERCENT_CITIZEN_STATUS_UNKNOWN=0, PERCENT_ETHNICITY_TOTAL=100, PERCENT_ETHNICITY_UNKNOWN=0, PERCENT_FEMALE=0.5, PERCENT_GENDER_TOTAL=100, PERCENT_GENDER_UNKNOWN=0, PERCENT_HISPANIC_LATINO=0.36, PERCENT_MALE=0.5, PERCENT_NRECEIVES_PUBLIC_ASSISTANCE=0.55, PERCENT_OTHER_CITIZEN_STATUS=0, PERCENT_OTHER_ETHNICITY=0.07, PERCENT_PACIFIC_ISLANDER=0, PERCENT_PERMANENT_RESIDENT_ALIEN=0.05, PERCENT_PUBLIC_ASSISTANCE_TOTAL=100, PERCENT_PUBLIC_ASSISTANCE_UNKNOWN=0, PERCENT_RECEIVES_PUBLIC_ASSISTANCE=0.45, PERCENT_US_CITIZEN=0.95, PERCENT_WHITE_NON_HISPANIC=0.02),
 Row(COUNT_AMERICAN_INDIAN=0, COUNT_ASIAN_NON_HISPANIC=28, COUNT_BLACK_NON_HISPANIC=0, COUNT_CITIZEN_STATUS_TOTAL=35, COUNT_CITIZEN_STATUS_UNKNOWN=0, COUNT_ETHNICITY_TOTAL=35, COUNT_ETHNICITY_UNKNOWN=0, COUNT_FEMALE=19, COUNT_GENDER_TOTAL=35, COUNT_GENDER_UNKNOWN=0, COUNT_HISPANIC_LATINO=1, COUNT_MALE=16, COUNT_NRECEIVES_PUBLIC_ASSISTANCE=33, COUNT_OTHER_CITIZEN_STATUS=0, COUNT_OTHER_ETHNICITY=0, COUNT_PACIFIC_ISLANDER=0, COUNT_PARTICIPANTS=35, COUNT_PERMANENT_RESIDENT_ALIEN=2, COUNT_PUBLIC_ASSISTANCE_TOTAL=35, COUNT_PUBLIC_ASSISTANCE_UNKNOWN=0, COUNT_RECEIVES_PUBLIC_ASSISTANCE=2, COUNT_US_CITIZEN=33, COUNT_WHITE_NON_HISPANIC=6, JURISDICTION_NAME=10002, PERCENT_AMERICAN_INDIAN=0, PERCENT_ASIAN_NON_HISPANIC=0.8, PERCENT_BLACK_NON_HISPANIC=None, PERCENT_CITIZEN_STATUS_TOTAL=100, PERCENT_CITIZEN_STATUS_UNKNOWN=0, PERCENT_ETHNICITY_TOTAL=100, PERCENT_ETHNICITY_UNKNOWN=0, PERCENT_FEMALE=0.54, PERCENT_GENDER_TOTAL=100, PERCENT_GENDER_UNKNOWN=0, PERCENT_HISPANIC_LATINO=0.03, PERCENT_MALE=0.46, PERCENT_NRECEIVES_PUBLIC_ASSISTANCE=0.94, PERCENT_OTHER_CITIZEN_STATUS=0, PERCENT_OTHER_ETHNICITY=None, PERCENT_PACIFIC_ISLANDER=0, PERCENT_PERMANENT_RESIDENT_ALIEN=0.06, PERCENT_PUBLIC_ASSISTANCE_TOTAL=100, PERCENT_PUBLIC_ASSISTANCE_UNKNOWN=0, PERCENT_RECEIVES_PUBLIC_ASSISTANCE=0.06, PERCENT_US_CITIZEN=0.94, PERCENT_WHITE_NON_HISPANIC=0.17),
 Row(COUNT_AMERICAN_INDIAN=0, COUNT_ASIAN_NON_HISPANIC=1, COUNT_BLACK_NON_HISPANIC=0, COUNT_CITIZEN_STATUS_TOTAL=1, COUNT_CITIZEN_STATUS_UNKNOWN=0, COUNT_ETHNICITY_TOTAL=1, COUNT_ETHNICITY_UNKNOWN=0, COUNT_FEMALE=1, COUNT_GENDER_TOTAL=1, COUNT_GENDER_UNKNOWN=0, COUNT_HISPANIC_LATINO=0, COUNT_MALE=0, COUNT_NRECEIVES_PUBLIC_ASSISTANCE=1, COUNT_OTHER_CITIZEN_STATUS=0, COUNT_OTHER_ETHNICITY=0, COUNT_PACIFIC_ISLANDER=0, COUNT_PARTICIPANTS=1, COUNT_PERMANENT_RESIDENT_ALIEN=0, COUNT_PUBLIC_ASSISTANCE_TOTAL=1, COUNT_PUBLIC_ASSISTANCE_UNKNOWN=0, COUNT_RECEIVES_PUBLIC_ASSISTANCE=0, COUNT_US_CITIZEN=1, COUNT_WHITE_NON_HISPANIC=0, JURISDICTION_NAME=10003, PERCENT_AMERICAN_INDIAN=0, PERCENT_ASIAN_NON_HISPANIC=None, PERCENT_BLACK_NON_HISPANIC=None, PERCENT_CITIZEN_STATUS_TOTAL=100, PERCENT_CITIZEN_STATUS_UNKNOWN=0, PERCENT_ETHNICITY_TOTAL=100, PERCENT_ETHNICITY_UNKNOWN=0, PERCENT_FEMALE=None, PERCENT_GENDER_TOTAL=100, PERCENT_GENDER_UNKNOWN=0, PERCENT_HISPANIC_LATINO=None, PERCENT_MALE=None, PERCENT_NRECEIVES_PUBLIC_ASSISTANCE=None, PERCENT_OTHER_CITIZEN_STATUS=0, PERCENT_OTHER_ETHNICITY=None, PERCENT_PACIFIC_ISLANDER=0, PERCENT_PERMANENT_RESIDENT_ALIEN=None, PERCENT_PUBLIC_ASSISTANCE_TOTAL=100, PERCENT_PUBLIC_ASSISTANCE_UNKNOWN=0, PERCENT_RECEIVES_PUBLIC_ASSISTANCE=None, PERCENT_US_CITIZEN=None, PERCENT_WHITE_NON_HISPANIC=None)]

In [54]:
sqlCtx.registerDataFrameAsTable(schema_rows, 'demographics')
sqlCtx.cacheTable('demographics')

In [55]:
sqlCtx.sql('select MAX(PERCENT_FEMALE) from demographics').collect()


Out[55]:
[Row(_c0=0.97)]

In [56]:
sqlCtx.sql('select JURISDICTION_NAME from demographics where PERCENT_FEMALE > 0.90').collect()


Out[56]:
[Row(JURISDICTION_NAME=11211),
 Row(JURISDICTION_NAME=12737),
 Row(JURISDICTION_NAME=12750),
 Row(JURISDICTION_NAME=12764)]

In [57]:
sqlCtx.tableNames()


Out[57]:
[u'demographics', u'business', u'demographics_hive', u'demographics_parquet']

In [45]:
demo = sqlCtx.table('demographics')

In [47]:
demo.write.format('parquet').saveAsTable('demographics_parquet')

In [ ]: