In [252]:
sc
Out[252]:
In [4]:
import csv
from dateutil import parser
from datetime import datetime
from pyspark.sql.types import *
sfcrime_raw_data = '/Users/bill.walrond/Documents/dsprj/data/SanFranCrime/train.csv'
sfcrime_schema_file = '/Users/bill.walrond/Documents/dsprj/data/SanFranCrime/train_schema.txt'
sfc_rdd = sc.textFile(sfcrime_raw_data)
print 'Line count of train file: %d' % sfc_rdd.count()
# Read and process the schema file
schemadict = {}
i = 0
schema = sc.textFile(sfcrime_schema_file)
line_cnt = schema.count()
print 'Lines in schema file: %d' % line_cnt
for l in schema.collect():
col = l.split(',')
schemadict.update({i: (col[0],col[1])})
i += 1
print 'Length of the schemadict: %d' % len(schemadict.keys())
# Establish the Struct for the Schema
keys = schemadict.keys()
keys.sort()
schema = StructType()
for k in keys:
if schemadict[k][1] == 'int':
schema.add(StructField(schemadict[k][0], IntegerType(), True))
elif schemadict[k][1] == 'str':
schema.add(StructField(schemadict[k][0], StringType(), True))
elif schemadict[k][1] == 'float':
schema.add(StructField(schemadict[k][0], FloatType(), True))
elif schemadict[k][1] == 'datetime':
schema.add(StructField(schemadict[k][0], TimestampType(), True))
else:
print 'Unsupported or incorrect data type'
# Skip the first header line with this trick ...
sfc_nh = sfc_rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])
def toRowSep(line):
"""Parses one row using csv reader"""
for r in csv.reader([line], delimiter=','):
return r
sfc_split = sfc_nh.map(toRowSep)
lens = sfc_split.map(lambda r: len(r))
print 'Max len: %d' % lens.max()
print 'Min len: %d' % lens.min()
sfc_split.cache()
Out[4]:
In [5]:
def convert_types(row,schema):
d = row
for col, data in enumerate(row):
if col <= len(schema.keys()):
typed = schema[col]
if data is None:
d[col] = None
elif typed[1] == 'string':
d[col] = data
elif typed[1] == 'int':
d[col] = int(round(float(data)))
elif typed[1] == 'float':
d[col] = float(data)
elif typed[1] == 'datetime':
# d[col] = data
# d[col] = dateutil.parser.parse(data)
d[col] = datetime.strptime(data,'%Y-%m-%d %H:%M:%S')
return d
def toTypedRow(row):
return convert_types(row, schemadict)
# Now, convert the types of all the rdd elements
sfc_typed = sfc_split.map(toTypedRow)
sfc_typed.take(1)
sfc_train = sqlContext.createDataFrame(sfc_typed, schema)
print sfc_train.count()
In [ ]:
sfc_train = sfc_train.cache()
sfc_train.printSchema()
In [8]:
sfc_train.show(5)
In [19]:
parqFileName = '/Users/bill.walrond/Documents/dsprj/data/SanFranCrime/train.pqt'
sfc_train.write.parquet(parqFileName)
In [285]:
sc
Out[285]:
In [253]:
parqFileName = '/Users/bill.walrond/Documents/dsprj/data/SanFranCrime/train.pqt'
sfc_train = sqlContext.read.parquet(parqFileName)
print sfc_train.count()
print sfc_train.printSchema()
sfc_train = sfc_train.cache()
In [254]:
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="whitegrid", color_codes=True)
In [255]:
sfc_train = sfc_train.repartition(8)
print sfc_train.count()
sfc_train = sfc_train.cache()
In [256]:
sfc_train.describe().show()
In [295]:
[name for name,type in sfc_train.dtypes if type == 'float' ]
Out[295]:
In [284]:
print 'Distinct date values: %d' % sfc_train.select("Dates").distinct().count()
print 'Earliest date: %s ' % sfc_train.agg({"Dates": "min"}).collect()[0][0]
print 'Latest date: %s' % sfc_train.agg({"Dates": "max"}).collect()[0][0]
sfc_mod = sfc_train.select('*', trunc(sfc_train.Dates,'month').alias('yr-mo'))
sfc_mod = sfc_mod.cache()
sfc_month_grp = sfc_mod.groupBy('yr-mo').count()
sfc_month_grp = sfc_month_grp.cache()
print sfc_month_grp.orderBy('count', ascending=True).show(10)
earliest_mo = sfc_month_grp.agg(min('yr-mo')).collect()[0][0]
latest_mo = sfc_month_grp.agg(max('yr-mo')).collect()[0][0]
print 'Average obs per month: %d' % sfc_month_grp.agg(avg('count')).collect()[0][0]
print 'Earliest month: {0}\tLatest month: {1}'.format(earliest_mo, latest_mo)
print 'Time span: {0}'.format(str(latest_mo-earliest_mo))
print 'Nulls: %d' % sfc_train.filter("Dates is Null").count()
In [283]:
# category.columns
import pandas as pd
cols = [name for name,type in sfc_train.dtypes if type == 'string' ]
for c in cols:
pdf = sfc_train.groupBy(c).count().orderBy('count', ascending=False).toPandas().head(15)
y_pos = np.arange(start=len(pdf[c]),stop=0,step=-1)
plt.barh(y_pos, pdf['count'], align='center', alpha=0.7)
plt.yticks(y_pos, pdf[c])
plt.xlabel('count')
plt.title('Top 15 {0} Counts'.format(c))
plt.show()
In [296]:
from pyspark.sql.functions import *
cols = [name for name,type in sfc_train.dtypes if type == 'string' ] # get a list of all the string columns
summ_cols = ['Column','Null_cnt','Non-null_cnt','Pct_null','Unique_cnt','Max_len','Min_len','Max_words','Min_words']
summ_df = pd.DataFrame(columns=summ_cols)
total_rows = sfc_train.count()
for col in cols:
sum_vals = []
print '---- Summarizing: {0} ----'.format(col)
words = sfc_train.select(split(col," ").alias("l")).map(lambda l: len(l[0]))
words = words.cache()
sum_val = [col,
sfc_train.filter(col+" is Null").count(), # count of nulls
sfc_train.filter(col+" is not Null").count(), # count of non-nulls
nulls/float(total_rows), # pct of nulls
sfc_train.select(col).distinct().count(), # count of unique values
sfc_train.select(max(length(col))).first()[0], # max length
sfc_train.select(min(length(col))).first()[0], # min length
words.max(), # max number of words
words.min()] # min number of words
summ_df.loc[len(summ_df)] = sum_val
print summ_df.head(len(summ_df))
In [45]:
sns.set(style="white")
# Load the example planets dataset
planets = sns.load_dataset("planets")
# Make a range of years to show categories with no observations
years = np.arange(2000, 2015)
# Draw a count plot to show the number of planets discovered each year
g = sns.factorplot(x="year", data=planets, kind="count",
palette="BuPu", size=6, aspect=1.5, order=years)
g.set_xticklabels(step=2)
Out[45]:
In [49]:
category.columns
Out[49]:
In [51]:
print len(planets)
In [ ]:
# example 1
g = sns.factorplot(x="age", y="embark_town",
hue="sex", row="class",
data=titanic[titanic.embark_town.notnull()],
orient="h", size=2, aspect=3.5, palette="Set3",
kind="violin", split=True, cut=0, bw=.2)
# example 2
g = sns.factorplot(x="who",
y="survived", col="class",
data=titanic, saturation=.5,
kind="bar", ci=None, aspect=.6)
(g.set_axis_labels("", "Survival Rate")
.set_xticklabels(["Men", "Women", "Children"])
.set_titles("{col_name} {col_var}")
.set(ylim=(0, 1))
.despine(left=True))
In [56]:
from bokeh.charts import Bar, output_notebook, show
from bokeh.charts.attributes import cat, color
from bokeh.charts.operations import blend
from bokeh.charts.utils import df_from_json
from bokeh.sampledata.olympics2014 import data
# utilize utility to make it easy to get json/dict data converted to a dataframe
df = df_from_json(data)
# filter by countries with at least one medal and sort by total medals
df = df[df['total'] > 0]
df = df.sort("total", ascending=False)
bar = Bar(df,
values=blend('bronze', 'silver', 'gold', name='medals', labels_name='medal'),
label=cat(columns='abbr', sort=False),
stack=cat(columns='medal', sort=False),
color=color(columns='medal', palette=['SaddleBrown', 'Silver', 'Goldenrod'],
sort=False),
legend='top_right',
title="Medals per Country, Sorted by Total Medals",
tooltips=[('medal', '@medal'), ('country', '@abbr')])
# output_file("stacked_bar.html", title="stacked_bar.py example")
output_notebook()
show(bar)
In [141]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
sns.set(style="darkgrid")
tips = sns.load_dataset("tips")
g = sns.FacetGrid(tips, row="sex", col="time", margin_titles=True)
bins = np.linspace(0, 60, 13)
g.map(plt.hist, "total_bill", color="steelblue", bins=bins, lw=0)
Out[141]:
In [297]:
sc.stop()