In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Calculate Distances") \
    .getOrCreate()

In [2]:
import string
PATH_RAWDATA = '../rawdata/'
PATH_PROCESSEDDATA = '../processeddata/'
DIST_FN='distances.parquet'
DRIV_DIST_FN='driv_dist.parquet'
PATH_BING = '../rawdata/bing_results/'

In [3]:
dfDist = spark.read.parquet(PATH_PROCESSEDDATA+DIST_FN)
dfDist.printSchema()
dfNearby = dfDist.where(dfDist.dist<60)
print(dfNearby.count())


root
 |-- z_id: integer (nullable = true)
 |-- z_pop: integer (nullable = true)
 |-- z_house_unit: integer (nullable = true)
 |-- z_land: double (nullable = true)
 |-- z_water: double (nullable = true)
 |-- z_lat_d: double (nullable = true)
 |-- z_long_d: double (nullable = true)
 |-- z_lat_r: double (nullable = true)
 |-- z_long_r: double (nullable = true)
 |-- m_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- UATYPE: string (nullable = true)
 |-- m_pop: integer (nullable = true)
 |-- m_house_unit: integer (nullable = true)
 |-- m_land: double (nullable = true)
 |-- m_water: double (nullable = true)
 |-- m_lat_d: double (nullable = true)
 |-- m_long_d: double (nullable = true)
 |-- m_lat_r: double (nullable = true)
 |-- m_long_r: double (nullable = true)
 |-- dist: double (nullable = true)

111727

In [4]:
# https://realpython.com/blog/python/api-integration-in-python/
# http://docs.python-requests.org/en/master/user/quickstart/
import requests
import json
import time
from pyspark.sql import Row
import os


def request_distance(wp0,wp1):
    values = dict()
    values['wp.0']=wp0
    values['wp.1']=wp1
    values['distanceUnit']='mi'
    f = open('../credentials/bing_map_key', 'r')
    values['key']= f.readline()
    f.close()
    
    r = requests.get('https://dev.virtualearth.net/REST/V1/Routes/Driving', params=values)
    r.raise_for_status()
        
    return r



def get_distance(row):
    path = PATH_BING
    fn_prefix = path+str(row['z_id'])+'_'+str(row['m_id'])
    fn = fn_prefix+'.json'
    
    with open(fn_prefix+'_meta.json','w') as f:
        json.dump(row,f)
    
    if os.path.isfile(fn):
        with open(fn,'r') as f:
            resp_json = json.load(f)
            ddist = float(json.dumps(resp_json["resourceSets"][0]["resources"][0]["travelDistance"], sort_keys=True, indent=4))

    else:
        z_coords = str(row['z_lat_d'])+','+str(row['z_long_d'])
        m_coords = str(row['m_lat_d'])+','+str(row['m_long_d'])
        
        try:
            resp = request_distance(z_coords,m_coords)
            resp_json = resp.json()
            ddist = float(json.dumps(resp_json["resourceSets"][0]["resources"][0]["travelDistance"], sort_keys=True, indent=4))
            time.sleep(.400)
            with open(fn_prefix+'.json','w') as f:
                json.dump(resp_json,f)
        except:
            print('Error occured:', str(row['z_id'])+'_'+str(row['m_id']))
            ddist = -1
            
            
    record=row.asDict()
    record['id'] = str(row['z_id'])+'_'+str(row['m_id'])
    record['ddist']= ddist
    print(record['id'],record['dist'],record['ddist'])
    
    return record


r = dfNearby.rdd.map(get_distance)
dfR=spark.createDataFrame(r)


/opt/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
  warnings.warn("Using RDD of dict to inferSchema is deprecated. "

In [6]:
dfR.coalesce(1).write.save(PATH_PROCESSEDDATA+DRIV_DIST_FN,mode='overwrite')