In [25]:
!pip install --user xlrd
In [26]:
# Setup constants if any
In [27]:
import pandas as pd
from io import BytesIO
import requests
import json
import xlrd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from dateutil.parser import parse
from ingest.Connectors import Connectors
In [28]:
# The code was removed by DSX for sharing.
In [29]:
# The code was removed by DSX for sharing.
In [30]:
bbhCashDF = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'BBH - Cash.xlsx'),header=[0])
# Drop rows & columns with all 'NaN' values, axis 0 is for row
# bbhCashFilteredDF = bbhCashDF.fillna('')
#dropna(axis=[0,1], how='all')
# bbhCustodyFilteredDF.head(10)
bbhCashRenamedDF = bbhCashFilteredDF.rename(index=str, columns={"Head Account Number": "ACCT_NUM", "Actual Available Balance": "ACT_AVAIL_BAL",
"Actual Variance Amount": "ACT_VAR_AMT","Bank of Deposit": "DEPOSIT_BANK",
"Currency Account Name": "FUND_NAME","Currency Code": "CURR_CODE",
"Opening Available + CMS Sweep Return": "OPEN_AVAIL_CMS_SWEEP_RETURN",
"Opening Available Balance": "OPEN_AVAIL_BAL", "Prior Day NAV": "PRIOR_DAY_NAV",
"Projected Closing Available Balance": "PROJ_CLOSE_AVAIL_BAL",
"Sub Account Number": "SUB_ACCT_NUM","Value Date": "AS_OF_DATE"})
# Convert the datetimeindex object to date
bbhCashRenamedDF['AS_OF_DATE'] = pd.DatetimeIndex(bbhCashRenamedDF['AS_OF_DATE']).date
bbhCashRenamedDF[['ACT_AVAIL_BAL', 'ACT_VAR_AMT', 'OPEN_AVAIL_CMS_SWEEP_RETURN', 'OPEN_AVAIL_BAL', 'PRIOR_DAY_NAV', 'PROJ_CLOSE_AVAIL_BAL']] = bbhCashRenamedDF[['ACT_AVAIL_BAL', 'ACT_VAR_AMT', 'OPEN_AVAIL_CMS_SWEEP_RETURN', 'OPEN_AVAIL_BAL', 'PRIOR_DAY_NAV', 'PROJ_CLOSE_AVAIL_BAL']].astype(float)
#asOfDate = pd.to_datetime('today').strftime('%Y-%m-%d')
#print "\nasOfDate = " + asOfDate
# bbhCustodyRenamedDF.head(20)
print bbhCashRenamedDF.dtypes
In [31]:
spark = SparkSession.builder.getOrCreate()
def build_schema():
"""Build and return a schema to use for the sample data."""
schema = StructType(
[
StructField("ACT_AVAIL_BAL", DoubleType(), True),
StructField("ACT_VAR_AMT", DoubleType(), True),
StructField("DEPOSIT_BANK", StringType(), True),
StructField("FUND_NAME", StringType(), False),
StructField("CURR_CODE", StringType(), True),
StructField("ACCT_NUM", IntegerType(), False),
StructField("OPEN_AVAIL_CMS_SWEEP_RETURN", DoubleType(), True),
StructField("OPEN_AVAIL_BAL", DoubleType(), True),
StructField("PRIOR_DAY_NAV", DoubleType(), True),
StructField("PROJ_CLOSE_AVAIL_BAL", DoubleType(), True),
StructField("SUB_ACCT_NUM", IntegerType(), True),
StructField("AS_OF_DATE", DateType(), False)
]
)
return schema
bbhCashSparkDF = spark.createDataFrame(bbhCashRenamedDF, schema=build_schema())
bbhCashSparkDF.printSchema()
bbhCashSparkDF.show()
In [32]:
dashDBloadOptions = {
Connectors.DASHDB.HOST : dashCredentials["host"],
Connectors.DASHDB.DATABASE : dashCredentials["db"],
Connectors.DASHDB.USERNAME : dashCredentials["username"],
Connectors.DASHDB.PASSWORD : dashCredentials["password"],
Connectors.DASHDB.SOURCE_TABLE_NAME : dashCredentials["REF_FUND_MAPPING_TABLE"],
}
refFundMappingDF = sqlContext.read.format("com.ibm.spark.discover").options(**dashDBloadOptions).load()
refFundMappingDF.printSchema()
refFundMappingDF.show(1)
In [34]:
bbhCashJoinSparkDF = bbhCashSparkDF.join(refFundMappingDF,
bbhCashSparkDF.FUND_NAME == refFundMappingDF.FUND_NAME, "inner")\
.select(bbhCashSparkDF.ACCT_NUM,bbhCashSparkDF.FUND_NAME,
refFundMappingDF.ALADDIN_ID.alias("FUND_ID"),
bbhCashSparkDF.ACT_AVAIL_BAL,
bbhCashSparkDF.ACT_VAR_AMT,
bbhCashSparkDF.DEPOSIT_BANK,
bbhCashSparkDF.CURR_CODE,
bbhCashSparkDF.OPEN_AVAIL_CMS_SWEEP_RETURN,
bbhCashSparkDF.OPEN_AVAIL_BAL,
bbhCashSparkDF.PRIOR_DAY_NAV,
bbhCashSparkDF.PROJ_CLOSE_AVAIL_BAL,
bbhCashSparkDF.SUB_ACCT_NUM,
bbhCashSparkDF.AS_OF_DATE
)
bbhCashJoinSparkDF.show(1)
In [35]:
# Connection to Dash DB for writing the data
dashdbsaveoption = {
Connectors.DASHDB.HOST : dashCredentials["host"],
Connectors.DASHDB.DATABASE : dashCredentials["db"],
Connectors.DASHDB.USERNAME : dashCredentials["username"],
Connectors.DASHDB.PASSWORD : dashCredentials["password"],
Connectors.DASHDB.TARGET_TABLE_NAME : dashCredentials["tableName"],
Connectors.DASHDB.TARGET_WRITE_MODE : 'merge'
}
bbhCashJoinSparkDF.printSchema()
saveDashDBDF = bbhCashJoinSparkDF.write.format("com.ibm.spark.discover").options(**dashdbsaveoption).save()
In [ ]: