In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Calculate Distances") \
.getOrCreate()
In [2]:
import string
PATH_RAWDATA = '../rawdata/'
PATH_PROCESSEDDATA = '../processeddata/'
DIST_FN='distances.parquet'
DRIV_DIST_FN='driv_dist.parquet'
PATH_BING = '../rawdata/bing_results/'
In [3]:
dfDist = spark.read.parquet(PATH_PROCESSEDDATA+DIST_FN)
dfDist.printSchema()
dfNearby = dfDist.where(dfDist.dist<60)
print(dfNearby.count())
In [4]:
# https://realpython.com/blog/python/api-integration-in-python/
# http://docs.python-requests.org/en/master/user/quickstart/
import requests
import json
import time
from pyspark.sql import Row
import os
def request_distance(wp0,wp1):
values = dict()
values['wp.0']=wp0
values['wp.1']=wp1
values['distanceUnit']='mi'
f = open('../credentials/bing_map_key', 'r')
values['key']= f.readline()
f.close()
r = requests.get('https://dev.virtualearth.net/REST/V1/Routes/Driving', params=values)
r.raise_for_status()
return r
def get_distance(row):
path = PATH_BING
fn_prefix = path+str(row['z_id'])+'_'+str(row['m_id'])
fn = fn_prefix+'.json'
with open(fn_prefix+'_meta.json','w') as f:
json.dump(row,f)
if os.path.isfile(fn):
with open(fn,'r') as f:
resp_json = json.load(f)
ddist = float(json.dumps(resp_json["resourceSets"][0]["resources"][0]["travelDistance"], sort_keys=True, indent=4))
else:
z_coords = str(row['z_lat_d'])+','+str(row['z_long_d'])
m_coords = str(row['m_lat_d'])+','+str(row['m_long_d'])
try:
resp = request_distance(z_coords,m_coords)
resp_json = resp.json()
ddist = float(json.dumps(resp_json["resourceSets"][0]["resources"][0]["travelDistance"], sort_keys=True, indent=4))
time.sleep(.400)
with open(fn_prefix+'.json','w') as f:
json.dump(resp_json,f)
except:
print('Error occured:', str(row['z_id'])+'_'+str(row['m_id']))
ddist = -1
record=row.asDict()
record['id'] = str(row['z_id'])+'_'+str(row['m_id'])
record['ddist']= ddist
print(record['id'],record['dist'],record['ddist'])
return record
r = dfNearby.rdd.map(get_distance)
dfR=spark.createDataFrame(r)
In [6]:
dfR.coalesce(1).write.save(PATH_PROCESSEDDATA+DRIV_DIST_FN,mode='overwrite')