In [1]:
!pip install --user xlrd
In [2]:
# Setup constants if any
OBJECT_STORAGE_FOLDER = 'MizuhoPOC'
INPUT_FILE_NAME = 'BBH - Custody.xlsx'
TABLE_NAME = 'BBH_CUSTODY'
REF_FUND_MAPPING_TABLE = 'REF_FUND_MAPPING'
In [3]:
# ALL IMPORTS SHOULD GO HERE
import pandas as pd
from io import BytesIO
import requests
import json
import xlrd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from dateutil.parser import parse
from ingest.Connectors import Connectors
In [4]:
# The code was removed by DSX for sharing.
In [5]:
# The code was removed by DSX for sharing.
In [ ]:
# The code was removed by DSX for sharing.
In [20]:
bbhCustodyDF = pd.read_excel(getFileFromObjectStorage(OBJECT_STORAGE_FOLDER, INPUT_FILE_NAME),header=[0])
# Drop rows & columns with all 'NaN' values, axis 0 is for row
bbhCustodyFilteredDF = bbhCustodyDF.fillna('')
#dropna(axis=[0,1], how='all')
bbhCustodyRenamedDF = bbhCustodyFilteredDF.rename(index=str, columns={"Account Number": "ACCT_NUM", "Account Name": "FUND_NAME","Custody Security ID": "CUSTODY_SEC_ID","SEDOL": "SEDOL","ISIN": "ISIN","Security Description": "SEC_DESC", "Capitalization Description": "CAP_DESC", "Maturity Date": "MATURITY_DATE", "Location": "LOC", "Location Account Name": "LOC_ACCT_NAME", "Registration Description": "REG_DESC","Paydown Factor": "PAYDOWN_FACTOR", "Current Face Value": "CURR_FACE_VAL", "Custody Position": "CUSTODY_POS", "Total Available Position": "TOT_AVAIL_POS", "Position Date": "POS_DATE", "Price Date": "PRICE_DATE", "Price": "PRICE", "Pricing Currency": "PRICING_CURR", "Local Price": "LOCAL_PRICE", "Local Currency": "LOCAL_CURR", "FX Rate": "FX_RATE", "Market Value": "MARKET_VAL", "Local Market Value": "LOCAL_MARKET_VAL"})
# Convert the datetimeindex object to date
bbhCustodyRenamedDF['MATURITY_DATE'] = pd.DatetimeIndex(bbhCustodyRenamedDF['MATURITY_DATE']).date
bbhCustodyRenamedDF['POS_DATE'] = pd.DatetimeIndex(bbhCustodyRenamedDF['POS_DATE']).date
bbhCustodyRenamedDF['PRICE_DATE'] = pd.DatetimeIndex(bbhCustodyRenamedDF['PRICE_DATE']).date
bbhCustodyRenamedDF[['CUSTODY_POS', 'TOT_AVAIL_POS', 'LOCAL_MARKET_VAL']] = bbhCustodyRenamedDF[['CUSTODY_POS','TOT_AVAIL_POS', 'LOCAL_MARKET_VAL']].astype(float)
2018-03-20
asOfDate = pd.to_datetime('today').strftime('%Y-%m-%d')
print "\nasOfDate = " + asOfDate
# bbhCustodyRenamedDF.head(20)
# print bbhCustodyRenamedDF.dtypes
In [21]:
spark = SparkSession.builder.getOrCreate()
def build_schema():
"""Build and return a schema to use for the sample data."""
schema = StructType(
[
StructField("ACCT_NUM", IntegerType(), False),
StructField("FUND_NAME", StringType(), False),
StructField("CUSTODY_SEC_ID", IntegerType(), True),
StructField("SEDOL", StringType(), True),
StructField("CUSIP", StringType(), True),
StructField("ISIN", StringType(), True),
StructField("SEC_DESC", StringType(), True),
StructField("CAP_DESC", StringType(), True),
StructField("MATURITY_DATE", DateType(), True),
StructField("LOC", StringType(), True),
StructField("LOC_ACCT_NAME", StringType(), True),
StructField("REG_DESC", StringType(), True),
StructField("PAYDOWN_FACTOR", StringType(), True),
StructField("CURR_FACE_VAL", StringType(), True),
StructField("CUSTODY_POS", DoubleType(), True),
StructField("TOT_AVAIL_POS", DoubleType(), True),
StructField("POS_DATE", DateType(), True),
StructField("PRICE_DATE", DateType(), True),
StructField("PRICE", DoubleType(), True),
StructField("PRICING_CURR", StringType(), True),
StructField("LOCAL_PRICE", DoubleType(), True),
StructField("LOCAL_CURR", StringType(), True),
StructField("FX_RATE", DoubleType(), True),
StructField("MARKET_VAL", DoubleType(), True),
StructField("LOCAL_MARKET_VAL", DoubleType(), True)
]
)
return schema
bbhCustodySparkDF = spark.createDataFrame(bbhCustodyRenamedDF, schema=build_schema())\
.withColumn("AS_OF_DATE", lit(asOfDate).cast("date"))
bbhCustodySparkDF.printSchema()
bbhCustodySparkDF.show()
In [25]:
dashDBloadOptions = {
Connectors.DASHDB.HOST : dashCredentials["host"],
Connectors.DASHDB.DATABASE : dashCredentials["db"],
Connectors.DASHDB.USERNAME : dashCredentials["username"],
Connectors.DASHDB.PASSWORD : dashCredentials["password"],
Connectors.DASHDB.SOURCE_TABLE_NAME : dashCredentials["REF_FUND_MAPPING_TABLE"],
}
refFundMappingDF = sqlContext.read.format("com.ibm.spark.discover").options(**dashDBloadOptions).load()
refFundMappingDF.printSchema()
refFundMappingDF.show(1)
In [23]:
bbhCustodyJoinSparkDF = bbhCustodySparkDF.join(refFundMappingDF,
bbhCustodySparkDF.FUND_NAME == refFundMappingDF.FUND_NAME, "inner")\
.select(bbhCustodySparkDF.ACCT_NUM,bbhCustodySparkDF.FUND_NAME,
refFundMappingDF.ALADDIN_ID.alias("FUND_ID"),
bbhCustodySparkDF.CUSTODY_SEC_ID, bbhCustodySparkDF.SEDOL,
bbhCustodySparkDF.CUSIP, bbhCustodySparkDF.ISIN,
bbhCustodySparkDF.SEC_DESC, bbhCustodySparkDF.CAP_DESC,
bbhCustodySparkDF.MATURITY_DATE,bbhCustodySparkDF.LOC,
bbhCustodySparkDF.LOC_ACCT_NAME, bbhCustodySparkDF.REG_DESC,
bbhCustodySparkDF.PAYDOWN_FACTOR, bbhCustodySparkDF.CURR_FACE_VAL,
bbhCustodySparkDF.CUSTODY_POS,bbhCustodySparkDF.TOT_AVAIL_POS,
bbhCustodySparkDF.POS_DATE, bbhCustodySparkDF.PRICE_DATE,
bbhCustodySparkDF.PRICE, bbhCustodySparkDF.PRICING_CURR,
bbhCustodySparkDF.LOCAL_PRICE, bbhCustodySparkDF.FX_RATE,
bbhCustodySparkDF.LOCAL_CURR,bbhCustodySparkDF.MARKET_VAL,
bbhCustodySparkDF.LOCAL_MARKET_VAL, bbhCustodySparkDF.AS_OF_DATE
)
bbhCustodyJoinSparkDF.show(1)
In [27]:
# Connection to Dash DB for writing the data
dashdbsaveoption = {
Connectors.DASHDB.HOST : dashCredentials["host"],
Connectors.DASHDB.DATABASE : dashCredentials["db"],
Connectors.DASHDB.USERNAME : dashCredentials["username"],
Connectors.DASHDB.PASSWORD : dashCredentials["password"],
Connectors.DASHDB.TARGET_TABLE_NAME : dashCredentials["tableName"],
Connectors.DASHDB.TARGET_WRITE_MODE : 'merge'
}
bbhCustodyJoinSparkDF.printSchema()
saveDashDBDF = bbhCustodyJoinSparkDF.write.format("com.ibm.spark.discover").options(**dashdbsaveoption).save()
In [ ]: