In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Calculate Distances") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [27]:
# Inspired by Boto3 documentation
import boto3
s3 = boto3.resource('s3')
for bucket in s3.buckets.all():
    print(bucket.name)
    for obj in bucket.objects.all():
        print(obj.key)


big-data-carbon-tax
12878_33598.json
ACS_14_5YR_DP03.zip
Brucie Springsteen .htm
DEC_10_DP_DPDP1.zip
Gaz_ua.zip
Gaz_zcta_national.zip
bing_results/601_30115_meta.json
bing_results/601_3034_meta.json
bing_results/601_35866_meta.json
bing_results/601_43453_meta.json
bing_results/601_55738_meta.json
bing_results/601_631_meta.json
bing_results/601_70642_meta.json
bing_results/601_78985_meta.json
bing_results/601_79093_meta.json
bing_results/601_97561_meta.json
rawdata/bing_results/12878_33598.json

In [3]:
dfDist = spark.read.parquet("../processeddata/distances.parquet")
dfDist.printSchema()


root
 |-- z_id: integer (nullable = true)
 |-- z_pop: integer (nullable = true)
 |-- z_house_unit: integer (nullable = true)
 |-- z_land: double (nullable = true)
 |-- z_water: double (nullable = true)
 |-- z_lat_d: double (nullable = true)
 |-- z_long_d: double (nullable = true)
 |-- z_lat_r: double (nullable = true)
 |-- z_long_r: double (nullable = true)
 |-- m_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- UATYPE: string (nullable = true)
 |-- m_pop: integer (nullable = true)
 |-- m_house_unit: integer (nullable = true)
 |-- m_land: double (nullable = true)
 |-- m_water: double (nullable = true)
 |-- m_lat_d: double (nullable = true)
 |-- m_long_d: double (nullable = true)
 |-- m_lat_r: double (nullable = true)
 |-- m_long_r: double (nullable = true)
 |-- dist: double (nullable = true)


In [30]:
dfNearby = dfDist.where(dfDist.dist<60).orderBy('z_id','m_id')
print(dfNearby.count())


111727

In [35]:
# https://realpython.com/blog/python/api-integration-in-python/
# http://docs.python-requests.org/en/master/user/quickstart/

import os
import boto3

def upload_file(row):
    s3 = boto3.client('s3')
    
    local_path = '../rawdata/bing_results/'
    remote_path = 'bing_results/'
    
    fn_root = str(row['z_id'])+'_'+str(row['m_id'])
    fn_local = local_path+fn_root+'.json'
    fn_local_meta = local_path+fn_root+'_meta.json'
    fn_remote = remote_path+fn_root+'.json'
    fn_remote_meta = remote_path+fn_root+'_meta.json'
    
    if os.path.isfile(fn_local):
        with open(fn_local, "rb") as f:
            s3.upload_fileobj(f, "big-data-carbon-tax", fn_remote)
        with open(fn_local_meta, "rb") as f:
            s3.upload_fileobj(f, "big-data-carbon-tax", fn_remote_meta)
        
        print('Sent',fn_local)
        
    return 

r = dfNearby.foreach(upload_file)

In [34]:
import boto3
s3 = boto3.client('s3')
with open('../processeddata/driv_dist.parquet.tar.gz', "rb") as f:
    s3.upload_fileobj(f, "big-data-carbon-tax", 'bing_results/driv_dist.parquet.tar.gz')

In [1]:
import boto3
s3 = boto3.client('s3')
with open('../rawdata/ACS_15_5YR_DP03b.csv', "rb") as f:
    s3.upload_fileobj(f, "big-data-carbon-tax", 'bing_results/ACS_15_5YR_DP03b.csv')

In [2]:
import boto3
s3 = boto3.client('s3')
with open('../rawdata/ACS_15_5YR_S1903b.csv', "rb") as f:
    s3.upload_fileobj(f, "big-data-carbon-tax", 'bing_results/ACS_15_5YR_S1903b.csv')