This streams data directly into dataframes. See also: https://github.com/snowch/biginsight-examples/blob/master/misc/WebHDFS_Example_local_storage.ipynb
Credentials - keep this secret!
In [ ]:
# Cluster number, e.g. 10000
cluster = ''
# Cluster username
username = ''
# Cluster password
password = ''
# file path in HDFS
filepath = 'yourpath/yourfile.csv'
Add your custom code to read_csv_lines for processing your datafile
In [ ]:
import pandas as pd
def read_csv_lines(lines, is_first_chunk = False):
''' returns a pandas dataframe '''
if is_first_chunk:
# you will want to set the header here if your datafile has a header record
return pd.read_csv(lines, sep='|', header=None)
else:
return pd.read_csv(lines, sep='|', header=None)
In [ ]:
host = 'ehaasp-{0}-mastermanager.bi.services.bluemix.net'.format(cluster)
Code to connect to BigInsights on Cloud via WebHDFS - don't change this
In [ ]:
import requests
import numpy as np
import sys
import datetime
if sys.version_info[0] < 3:
from StringIO import StringIO
else:
from io import StringIO
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
print('SCRIPT START: {0}'.format(datetime.datetime.now()))
chunk_size = 10000000 # Read in 100 Mb chunks
url = "https://{0}:8443/gateway/default/webhdfs/v1/{1}?op=OPEN".format(host, filepath)
# note SSL verification is been disabled
r = requests.get(url,
auth=(username, password),
verify=False,
allow_redirects=True,
stream=True)
df = None
chunk_num = 1
remainder = ''
for chunk in r.iter_content(chunk_size):
if chunk: # filter out keep-alive new chunks
# Show progress by printing a dot - useful when chunk size is quite small
# sys.stdout.write('.')
# sys.stdout.flush()
txt = remainder + chunk
if '\n' in txt:
[lines, remainder] = txt.rsplit('\n', 1)
else:
lines = txt
if chunk_num == 1:
pdf = read_csv_lines(StringIO(lines), True)
df = sqlContext.createDataFrame(pdf)
else:
pdf = read_csv_lines(StringIO(lines), False)
df2 = sqlContext.createDataFrame(pdf)
df = df.sql_ctx.createDataFrame(
df._sc.union([df.rdd, df2.rdd]), df.schema
)
print('Imported chunk: {0} record count: {1} df count: {2}'.format(chunk_num, len(pdf), df.count()))
chunk_num = chunk_num + 1
print '\nTotal record import count: {0}'.format(df.count())
print('SCRIPT END: {0}'.format(datetime.datetime.now()))
df.cache()
Add your code here to work with the imported dataframe, df
In [ ]:
df.show()
In [ ]: