In [1]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
In [2]:
parking = sqlContext.read.json("../data/sf_parking/sf_parking_clean.json")
In [3]:
parking.printSchema()
In [4]:
parking.first()
Out[4]:
In [5]:
parking.registerTempTable("parking")
In [6]:
parking.show()
In [7]:
aggr_by_type = sqlContext.sql("SELECT primetype, secondtype, count(1) AS count, round(avg(regcap), 0) AS avg_spaces " +
"FROM parking " +
"GROUP BY primetype, secondtype " +
"HAVING trim(primetype) != '' " +
"ORDER BY count DESC")
In [8]:
aggr_by_type.show()
In [9]:
from pyspark.sql import functions as F
aggr_by_type = parking.select("primetype", "secondtype", "regcap") \
.where("trim(primetype) != ''") \
.groupBy("primetype", "secondtype") \
.agg(
F.count("*").alias("count"),
F.round(F.avg("regcap"), 0).alias("avg_spaces")
).sort("count", ascending=False)
In [10]:
aggr_by_type.show()
In [11]:
parking.describe("regcap", "valetcap", "mccap").show()
In [12]:
parking.stat.crosstab("owner", "primetype").show()
In [13]:
parking = parking.withColumnRenamed('regcap', 'regcap_old')
parking = parking.withColumn('regcap', parking['regcap_old'].cast('int'))
parking = parking.drop('regcap_old')
parking.printSchema()
In [14]:
def convert_column(df, col, new_type):
old_col = '%s_old' % col
df = df.withColumnRenamed(col, old_col)
df = df.withColumn(col, df[old_col].cast(new_type))
df = df.drop(old_col)
return df
In [15]:
parking = convert_column(parking, 'valetcap', 'int')
parking = convert_column(parking, 'mccap', 'int')
parking.printSchema()
In [16]:
import requests
def to_neighborhood(location):
"""
Uses Google's Geocoding API to perform a reverse-lookup on latitude and longitude
https://developers.google.com/maps/documentation/geocoding/intro#reverse-example
"""
name = 'N/A'
lat = location.latitude
long = location.longitude
r = requests.get('https://maps.googleapis.com/maps/api/geocode/json?latlng=%s,%s' % (lat, long))
if r.status_code == 200:
content = r.json()
places = content['results'] # results is a list of matching places
neighborhoods = [p['formatted_address'] for p in places if 'neighborhood' in p['types']]
if neighborhoods:
# Addresses are formatted as Japantown, San Francisco, CA
# so split on comma and just return neighborhood name
name = neighborhoods[0].split(',')[0]
return name
In [17]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
location_to_neighborhood=udf(to_neighborhood, StringType())
sfmta_parking = parking.filter(parking.owner == 'SFMTA') \
.select("location_1", "primetype", "landusetyp", "garorlot", "regcap", "valetcap", "mccap") \
.withColumn("location_1", location_to_neighborhood("location_1")) \
.sort("regcap", ascending=False)
sfmta_parking.show()
In [24]:
# Create a Pandas dataframe from the Spark DataFrame
sfmta_pandas = sfmta_parking.filter(sfmta_parking.location_1 != 'N/A').toPandas()
In [29]:
# Display the 20 neighborhoods with the highest average # of public parking owned by SFMTA
sfmta_pandas.groupby(['location_1'])['regcap'].mean().nlargest(20)
Out[29]:
In [ ]: