In [1]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
parking = sqlContext.read.json("../data/sf_parking/sf_parking_clean.json")

In [3]:
parking.printSchema()


root
 |-- address: string (nullable = true)
 |-- garorlot: string (nullable = true)
 |-- landusetyp: string (nullable = true)
 |-- location_1: struct (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- longitude: string (nullable = true)
 |    |-- needs_recoding: boolean (nullable = true)
 |-- mccap: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- primetype: string (nullable = true)
 |-- regcap: string (nullable = true)
 |-- secondtype: string (nullable = true)
 |-- valetcap: string (nullable = true)


In [4]:
parking.first()


Out[4]:
Row(address=u'2110 Market St', garorlot=u'L', landusetyp=u'restaurant', location_1=Row(latitude=u'37.767378', longitude=u'-122.429344', needs_recoding=False), mccap=u'0', owner=u'Private', primetype=u'PPA', regcap=u'13', secondtype=u' ', valetcap=u'0')

In [5]:
parking.registerTempTable("parking")

In [6]:
parking.show()


+--------------------+--------+----------+--------------------+-----+----------+---------+------+----------+--------+
|             address|garorlot|landusetyp|          location_1|mccap|     owner|primetype|regcap|secondtype|valetcap|
+--------------------+--------+----------+--------------------+-----+----------+---------+------+----------+--------+
|      2110 Market St|       L|restaurant|[37.767378,-122.4...|    0|   Private|      PPA|    13|          |       0|
|         993 Potrero|       L|          |[37.757272,-122.4...|    0|     SFMTA|      PPA|    34|          |       0|
|601 Terry A Franc...|       L|          |[37.770135,-122.3...|    0|Port of SF|      PPA|    72|          |       0|
|   11 SOUTH VAN NESS|       G|          |[37.77415,-122.41...|    0|   Private|      PHO|   130|       CPO|       0|
|   101 CALIFORNIA ST|       G|          |[37.793243,-122.3...|    0|   Private|      PPA|   250|          |       0|
|        2000 POST ST|       G|          |[37.785078,-122.4...|    0|   Private|      PPA|   304|          |       0|
|   600 CALIFORNIA ST|       G|          |[37.792779,-122.4...|    0|   Private|      PPA|   197|          |       0|
|       35 GILBERT ST|        |          |[37.774337,-122.4...|    0|   Private|      PPA|    80|          |       0|
|        2355 POST ST|       L|          |[37.78397,-122.44...|    0|   Private|      PPA|    50|          |       0|
|      801 STANYAN ST|       L|          |[37.767202,-122.4...|    0|       RPD|      PPA|   324|          |       0|
|    2300 STOCKTON ST|       L|          |[37.807107,-122.4...|    0|   Private|      PPA|   200|          |       0|
|         1 MARKET ST|       G|          |[37.794191,-122.3...|    0|   Private|      PPA|   160|          |       0|
| 100 The Embarcadero|       L|          |[37.794444,-122.3...|    0|Port of SF|      PPA|   100|          |       0|
|  1101 California St|       G|          |[37.791355,-122.4...|    0|   Private|      PPA|   500|          |       0|
|      1390 Market St|       G|          |[37.777332,-122.4...|   10|   Private|      PPA|   401|       PHO|       0|
|   150 CALIFORNIA ST|       G|          |[37.793588,-122.3...|    0|   Private|      PPA|    35|          |       0|
|     153 TOWNSEND ST|       G|          |[37.779602,-122.3...|    0|   Private|      PPA|   371|          |       0|
|  1700 California St|       G|          |[37.790504,-122.4...|    0|   Private|      PPA|   155|          |       0|
|        185 BERRY ST|       G|          |[37.775972,-122.3...|    0|   Private|      PPA|   268|          |       0|
|        199 BEALE ST|       L|          |[37.790147,-122.3...|    0|   Private|      PPA|    81|          |       0|
+--------------------+--------+----------+--------------------+-----+----------+---------+------+----------+--------+
only showing top 20 rows


In [7]:
aggr_by_type = sqlContext.sql("SELECT primetype, secondtype, count(1) AS count, round(avg(regcap), 0) AS avg_spaces " +
                              "FROM parking " +
                              "GROUP BY primetype, secondtype " +
                              "HAVING trim(primetype) != '' " +
                              "ORDER BY count DESC")

In [8]:
aggr_by_type.show()


+---------+----------+-----+----------+
|primetype|secondtype|count|avg_spaces|
+---------+----------+-----+----------+
|      PPA|          |  462|     210.0|
|      PHO|          |  300|      69.0|
|      CPO|          |  163|      53.0|
|      CGO|          |   49|     135.0|
|      PPA|       PHO|   19|     178.0|
|      PPA|       CPO|    2|     263.0|
|      PHO|       CPO|    1|     130.0|
|      PPA|       RPO|    1|      87.0|
|      CPO|       PPA|    1|      12.0|
+---------+----------+-----+----------+


In [9]:
from pyspark.sql import functions as F

aggr_by_type = parking.select("primetype", "secondtype", "regcap") \
                     .where("trim(primetype) != ''") \
                     .groupBy("primetype", "secondtype") \
                     .agg(
                        F.count("*").alias("count"),
                        F.round(F.avg("regcap"), 0).alias("avg_spaces")
                      ).sort("count", ascending=False)

In [10]:
aggr_by_type.show()


+---------+----------+-----+----------+
|primetype|secondtype|count|avg_spaces|
+---------+----------+-----+----------+
|      PPA|          |  462|     210.0|
|      PHO|          |  300|      69.0|
|      CPO|          |  163|      53.0|
|      CGO|          |   49|     135.0|
|      PPA|       PHO|   19|     178.0|
|      PPA|       CPO|    2|     263.0|
|      PHO|       CPO|    1|     130.0|
|      PPA|       RPO|    1|      87.0|
|      CPO|       PPA|    1|      12.0|
+---------+----------+-----+----------+


In [11]:
parking.describe("regcap", "valetcap", "mccap").show()


+-------+------------------+------------------+------------------+
|summary|            regcap|          valetcap|             mccap|
+-------+------------------+------------------+------------------+
|  count|              1000|              1000|              1000|
|   mean|           137.294|             3.297|             0.184|
| stddev|361.05120902655824|22.624824279398823|1.9015151221485882|
|    min|                 0|                 0|                 0|
|    max|               998|                96|                 8|
+-------+------------------+------------------+------------------+


In [12]:
parking.stat.crosstab("owner", "primetype").show()


+-------------------+---+---+---+---+---+
|    owner_primetype|PPA|PHO|CPO|CGO|   |
+-------------------+---+---+---+---+---+
|         Port of SF|  7|  7|  0|  4|  0|
|               SFPD|  0|  3|  0|  6|  0|
|              SFMTA| 42| 14|  0|  0|  0|
|GG Bridge Authority|  2|  0|  0|  0|  0|
|               SFSU|  2|  6|  0|  0|  0|
|               SFRA|  2|  0|  0|  0|  0|
|                LHH|  0|  5|  0|  0|  0|
|                DMV|  0|  0|  1|  0|  0|
|           Caltrans|  0|  0|  0|  1|  0|
|           Presidio|  5|  1|  1|  2|  0|
|              SFPUC|  0|  0|  0|  5|  0|
|       City College|  0|  7|  0|  0|  0|
|                 UC|  1|  0|  0|  0|  0|
|                RPD|  3|  1|  0|  4|  0|
|              SFUSD|  0|  5|  0|  0|  0|
|        State of CA|  0|  1|  0|  0|  0|
|               USPS|  0|  0|  0|  4|  0|
|            Customs|  0|  0|  0|  1|  0|
|              GGNRA|  1|  0|  1|  0|  0|
|               UCSF| 13|  8|  0|  0|  0|
+-------------------+---+---+---+---+---+
only showing top 20 rows


In [13]:
parking = parking.withColumnRenamed('regcap', 'regcap_old')
parking = parking.withColumn('regcap', parking['regcap_old'].cast('int'))
parking = parking.drop('regcap_old')
parking.printSchema()


root
 |-- address: string (nullable = true)
 |-- garorlot: string (nullable = true)
 |-- landusetyp: string (nullable = true)
 |-- location_1: struct (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- longitude: string (nullable = true)
 |    |-- needs_recoding: boolean (nullable = true)
 |-- mccap: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- primetype: string (nullable = true)
 |-- secondtype: string (nullable = true)
 |-- valetcap: string (nullable = true)
 |-- regcap: integer (nullable = true)


In [14]:
def convert_column(df, col, new_type):
    old_col = '%s_old' % col
    df = df.withColumnRenamed(col, old_col)
    df = df.withColumn(col, df[old_col].cast(new_type))
    df = df.drop(old_col)
    return df

In [15]:
parking = convert_column(parking, 'valetcap', 'int')
parking = convert_column(parking, 'mccap', 'int')
parking.printSchema()


root
 |-- address: string (nullable = true)
 |-- garorlot: string (nullable = true)
 |-- landusetyp: string (nullable = true)
 |-- location_1: struct (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- longitude: string (nullable = true)
 |    |-- needs_recoding: boolean (nullable = true)
 |-- owner: string (nullable = true)
 |-- primetype: string (nullable = true)
 |-- secondtype: string (nullable = true)
 |-- regcap: integer (nullable = true)
 |-- valetcap: integer (nullable = true)
 |-- mccap: integer (nullable = true)


In [16]:
import requests

def to_neighborhood(location):
    """
    Uses Google's Geocoding API to perform a reverse-lookup on latitude and longitude
    https://developers.google.com/maps/documentation/geocoding/intro#reverse-example
    """
    name = 'N/A'
    lat = location.latitude
    long = location.longitude
    
    r = requests.get('https://maps.googleapis.com/maps/api/geocode/json?latlng=%s,%s' % (lat, long))
    if r.status_code == 200:
        content = r.json()
        places = content['results']  # results is a list of matching places
        neighborhoods = [p['formatted_address'] for p in places if 'neighborhood' in p['types']]
        if neighborhoods:
            # Addresses are formatted as Japantown, San Francisco, CA
            # so split on comma and just return neighborhood name
            name = neighborhoods[0].split(',')[0]
    
    return name

In [17]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
    
location_to_neighborhood=udf(to_neighborhood, StringType())

sfmta_parking = parking.filter(parking.owner == 'SFMTA') \
                       .select("location_1", "primetype", "landusetyp", "garorlot", "regcap", "valetcap", "mccap") \
                       .withColumn("location_1", location_to_neighborhood("location_1")) \
                       .sort("regcap", ascending=False)
        
sfmta_parking.show()


+------------------+---------+----------+--------+------+--------+-----+
|        location_1|primetype|landusetyp|garorlot|regcap|valetcap|mccap|
+------------------+---------+----------+--------+------+--------+-----+
|   South of Market|      PPA|          |       G|  2585|       0|   47|
|               N/A|      PPA|          |       G|  1865|       0|    0|
|Financial District|      PPA|          |       G|  1095|       0|    0|
|      Union Square|      PPA|          |       G|   985|       0|    0|
|        Tenderloin|      PPA|          |       G|   925|       0|    0|
|  Mission District|      PPA|          |       G|   850|       0|    0|
|      Civic Center|      PPA|          |       G|   843|       0|    0|
|  Mission District|      PPA|          |       G|   807|       0|    0|
|       South Beach|      PPA|          |       G|   752|       0|    0|
|         Japantown|      PPA|          |       G|   747|       0|    0|
|         Chinatown|      PPA|          |       G|   700|       0|    0|
| Fillmore District|      PPA|          |       G|   618|       0|    0|
|         Chinatown|      PPA|          |       G|   600|       0|    0|
|  Mission District|      PPA|          |       G|   350|       0|    0|
|  Mission District|      PHO|          |       G|   227|       0|    0|
|               N/A|      PPA|          |       G|   205|       0|    0|
|         Japantown|      PPA|          |       G|   177|       0|    0|
|      Russian Hill|      PPA|          |       G|   163|       0|    0|
|      Russian Hill|      PPA|          |       G|   162|       0|    0|
|Central Waterfront|      PHO|          |       L|   150|       0|    0|
+------------------+---------+----------+--------+------+--------+-----+
only showing top 20 rows


In [24]:
# Create a Pandas dataframe from the Spark DataFrame
sfmta_pandas = sfmta_parking.filter(sfmta_parking.location_1 != 'N/A').toPandas()

In [29]:
# Display the 20 neighborhoods with the highest average # of public parking owned by SFMTA
sfmta_pandas.groupby(['location_1'])['regcap'].mean().nlargest(20)


Out[29]:
location_1
South of Market       1322.000000
Financial District    1095.000000
Union Square           985.000000
Tenderloin             925.000000
Civic Center           843.000000
South Beach            752.000000
Chinatown              650.000000
Fillmore District      618.000000
Japantown              462.000000
Mission District       284.777778
Cow Hollow             205.000000
Russian Hill           162.500000
Polk Gulch             129.000000
Marina District        116.000000
Mission Bay            105.000000
Fisherman's Wharf      100.000000
Western Addition       100.000000
Dogpatch                55.000000
Islais Creek            51.000000
Outer Mission           40.000000
Name: regcap, dtype: float64

In [ ]: