In [1]:
!pip install --user xlrd
In [129]:
# Setup constants if any
# FUNDS ID
FUNDS_ID_LIST = ['I-CJF','I-MG1','I-SQGFSH2','I-SQGFSH2O']
In [2]:
import pandas as pd
from io import BytesIO
import requests
import json
import xlrd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from dateutil.parser import parse
from ingest.Connectors import Connectors
In [3]:
# The code was removed by DSX for sharing.
In [4]:
# The code was removed by DSX for sharing.
In [30]:
aladdinSecDF1 = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'ALADDIN-SEC.xlsx'),index_col=[0], header=[0]).iloc[0:8]
# Drop rows & columns with all 'NaN' values, axis 0 is for row
aladdinSecDFFiltered1 = aladdinSecDF1.dropna(axis=[0,1], how='all')
# print aladdinSecDF1
asOfDate = pd.to_datetime(aladdinSecDFFiltered1.loc['As Of Date:', 'Unnamed: 1']).strftime('%Y-%m-%d')
print "\nasOfDate = " + asOfDate
In [207]:
aladdinSecDF2 = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'ALADDIN-SEC.xlsx'), header=[0], skipinitialspace=True, skiprows=9, index_col=[0,1])
# Drop rows & columns with all 'NaN' values, axis 0 is for row
aladdinSecDF2FilterNullRowsCols = aladdinSecDF2.dropna(axis=[0,1], how='all').fillna('')
print aladdinSecDF2FilterNullRowsCols
# aladdinSecDF2FilterNullRowsCols.index.names
# aladdinSecDF2FilterNullRowsCols.index.get_level_values("Portfolio")
# aladdinSecDF2FilterNullRowsCols.columns
# This step clears the first 2 rows of each fund as those are the aggregate columns and we do not need to store those
dfNewArr = []
for id in FUNDS_ID_LIST:
df = aladdinSecDF2FilterNullRowsCols.loc[id].iloc[2:]
df['fund_id']=id
dfNewArr.append(df)
# Concat all the funds together
dfNew=pd.concat(dfNewArr)
# Flatten the list by removing all the index
dfNew = dfNew.reset_index()
# Rename column to match database columns
aladdinSecDF2Renamed = \
dfNew.rename(index=str,
columns={"fund_id": "FUND_ID",
"Currency": "CURRENCY" ,
"CUSIP(Aladdin ID)": "CUSIP",
"Sec Type": "SEC_TYPE",
"Ticker/Coupon/Maturity": "TICKER_COUPON_MATURITY",
"Sec Desc": "SEC_DESC",
"ISIN": "ISIN",
"Orig. Face": "ORIG_FACE",
"Settled": "SETTLED",
"Notional Market Value": "NOTIONAL_MKT_VAL",
"Base Curr Market Val w/Acc Int": "BASE_CURR_MKT_VAL_ACC_INT",
"Base Curr Accr Int": "BASE_CURR_MKT_INT",
"Maturity": "MATURITY_DATE",
"Issue Date": "ISSUE_DATE",
"Base Curr FX Rate": "BASE_CURR_FX_RATE",
"Market Price": "MKT_PRICE",
"Coupon": "COUPON",
"S&P Rating": "SNP_RATING"
})\
# Convert to float. TODO - Should everything be String as CSV files data is inconsistent
aladdinSecDF2Renamed[['ORIG_FACE', 'SETTLED', 'NOTIONAL_MKT_VAL', 'BASE_CURR_MKT_VAL_ACC_INT', 'BASE_CURR_MKT_INT', 'BASE_CURR_FX_RATE', 'MKT_PRICE']] \
= aladdinSecDF2Renamed[['ORIG_FACE', 'SETTLED', 'NOTIONAL_MKT_VAL', 'BASE_CURR_MKT_VAL_ACC_INT', 'BASE_CURR_MKT_INT', 'BASE_CURR_FX_RATE', 'MKT_PRICE']] \
.astype(float)
aladdinSecDF2Renamed[['MATURITY_DATE','ISSUE_DATE']]=aladdinSecDF2Renamed[['MATURITY_DATE','ISSUE_DATE']].astype(str)
# aladdinSecDF2Renamed['MATURITY_DATE'] = pd.DatetimeIndex(aladdinSecDF2Renamed['MATURITY_DATE'], ambiguous='NaT').date
#aladdinSecDF2Renamed['ISSUE_DATE'] = pd.DatetimeIndex(aladdinSecDF2Renamed['ISSUE_DATE']).date
#aladdinSecDF2Renamed = aladdinSecDF2Renamed.fillna('')
#aladdinSecDF2Renamed.dtypes
# aladdinSecDF2Renamed.head(20)
In [203]:
spark = SparkSession.builder.getOrCreate()
def build_schema():
"""Build and return a schema to use for the sample data."""
schema = StructType(
[
StructField("CURRENCY", StringType(), True),
StructField("CUSIP", StringType(), False),
StructField("SEC_TYPE", StringType(), True),
StructField("TICKER_COUPON_MATURITY", StringType(), True),
StructField("SEC_DESC", StringType(), True),
StructField("ISIN", StringType(), True),
StructField("ORIG_FACE", DoubleType(), True),
StructField("SETTLED", DoubleType(), True),
StructField("NOTIONAL_MKT_VAL", DoubleType(), True),
StructField("BASE_CURR_MKT_VAL_ACC_INT", DoubleType(), True),
StructField("BASE_CURR_MKT_INT", DoubleType(), True),
StructField("MATURITY_DATE", StringType(), True),
StructField("ISSUE_DATE", StringType(), True),
StructField("BASE_CURR_FX_RATE", DoubleType(), True),
StructField("MKT_PRICE", DoubleType(), True),
StructField("COUPON", StringType(), True),
StructField("SNP_RATING", StringType(), True),
StructField("FUND_ID", StringType(), False),
]
)
return schema
aladdinSecDF2SparkDF = spark.createDataFrame(aladdinSecDF2Renamed, schema=build_schema()) \
.withColumn("AS_OF_DATE", lit(asOfDate).cast("date"))
aladdinSecDF2SparkDF.printSchema()
aladdinSecDF2SparkDF.head(1)
Out[203]:
In [204]:
dashDBloadOptions = {
Connectors.DASHDB.HOST : dashCredentials["host"],
Connectors.DASHDB.DATABASE : dashCredentials["db"],
Connectors.DASHDB.USERNAME : dashCredentials["username"],
Connectors.DASHDB.PASSWORD : dashCredentials["password"],
Connectors.DASHDB.SOURCE_TABLE_NAME : dashCredentials["REF_FUND_TABLE"],
}
refFundDF = sqlContext.read.format("com.ibm.spark.discover").options(**dashDBloadOptions).load()
refFundDF.printSchema()
refFundDF.show(1)
In [205]:
aladdinSecJoinSparkDF = aladdinSecDF2SparkDF.join(refFundDF,
aladdinSecDF2SparkDF.FUND_ID == refFundDF.ID, "inner")\
.select(
refFundDF.ID.alias("FUND_ID"),
aladdinSecDF2SparkDF.CURRENCY,
aladdinSecDF2SparkDF.CUSIP,
aladdinSecDF2SparkDF.SEC_TYPE,
aladdinSecDF2SparkDF.TICKER_COUPON_MATURITY,
aladdinSecDF2SparkDF.SEC_DESC,
aladdinSecDF2SparkDF.ISIN,
aladdinSecDF2SparkDF.ORIG_FACE,
aladdinSecDF2SparkDF.SETTLED,
aladdinSecDF2SparkDF.NOTIONAL_MKT_VAL,
aladdinSecDF2SparkDF.BASE_CURR_MKT_VAL_ACC_INT,
aladdinSecDF2SparkDF.BASE_CURR_MKT_INT,
aladdinSecDF2SparkDF.MATURITY_DATE,
aladdinSecDF2SparkDF.ISSUE_DATE,
aladdinSecDF2SparkDF.BASE_CURR_FX_RATE,
aladdinSecDF2SparkDF.MKT_PRICE,
aladdinSecDF2SparkDF.COUPON,
aladdinSecDF2SparkDF.SNP_RATING,
aladdinSecDF2SparkDF.AS_OF_DATE,
)
aladdinSecJoinSparkDF.show(1)
In [206]:
# Connection to Dash DB for writing the data
dashdbsaveoption = {
Connectors.DASHDB.HOST : dashCredentials["host"],
Connectors.DASHDB.DATABASE : dashCredentials["db"],
Connectors.DASHDB.USERNAME : dashCredentials["username"],
Connectors.DASHDB.PASSWORD : dashCredentials["password"],
Connectors.DASHDB.TARGET_TABLE_NAME : dashCredentials["tableName"],
Connectors.DASHDB.TARGET_WRITE_MODE : 'merge'
}
aladdinSecDashDBDF = aladdinSecJoinSparkDF.write.format("com.ibm.spark.discover").options(**dashdbsaveoption).save()
In [ ]: