In [58]:
!pip install --user xlrd
In [59]:
# Setup constants if any
In [60]:
import pandas as pd
from io import BytesIO
import requests
import json
import xlrd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from dateutil.parser import parse
from ingest.Connectors import Connectors
In [61]:
# The code was removed by DSX for sharing.
In [62]:
# The code was removed by DSX for sharing.
In [63]:
aladdinVarmarDF1 = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'ALADDIN-VARMAR.xlsx'),index_col=[0], header=[0]).iloc[0:7]
# Drop rows & columns with all 'NaN' values, axis 0 is for row
aladdinVarmarDFFiltered1 = aladdinVarmarDF1.dropna(axis=[0,1], how='all')
print aladdinVarmarDF1
#asOfDate = pd.to_datetime(aladdinVarmarDFFiltered1.loc['As Of Date:', 'Unnamed: 1']).strftime('%m/%d/%Y')
asOfDate = pd.to_datetime(aladdinVarmarDFFiltered1.loc['As Of Date:', 'Unnamed: 1']).strftime('%Y-%m-%d')
print "\nasOfDate = " + asOfDate
In [64]:
aladdinVarmarDF2 = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'ALADDIN-VARMAR.xlsx'), header=[0], skipinitialspace=True, skiprows=8)
#index_col=[0,1]
# Drop rows & columns with all 'NaN' values, axis 0 is for row
aladdinVarmarDF2FilterNullRowsCols = aladdinVarmarDF2.dropna(axis=[0,1], how='all')
aladdinVarmarDF2FilterNullRowsCols = aladdinVarmarDF2FilterNullRowsCols.rename(index=str, columns={"Portfolio": "FUND_ID", "CUSIP(Aladdin ID)": "CUSIP", "Sec Desc": "SEC_DESC","Currency": "CURRENCY","Current Face": "CURRENT_FACE","Settled": "SETTLED","Unsettled": "UNSETTLED", "Base Curr FX Rate": "FX_RATE"})
## patern matching if ALADDIN_ID matches '(any value between braces)', this is an aggregate row, don't store it in the database
patternDelete='\(*\)$'
filter = aladdinVarmarDF2FilterNullRowsCols['CUSIP'].str.contains(patternDelete)
aladdinVarmarDF2FilterByPatternDelete = aladdinVarmarDF2FilterNullRowsCols[~filter]
#mhcbDFFilterByICJF = mhcbDFFilterNullRowsCols[mhcbDFFilterNullRowsCols.PORTFOLIO == 'I-CJF']
# aladdinVarmarDF2FilterByPatternDelete.head(20)
aladdinVarmarDF2FilterByPatternDelete.dtypes
# for idx in mhcbDFFilterNullRowsCols.index.map(lambda x: x[:-1]):
# print idx
# df_select=mhcbDFFilterNullRowsCols.ix[idx]
# print df_select
Out[64]:
In [65]:
spark = SparkSession.builder.getOrCreate()
def build_schema():
"""Build and return a schema to use for the sample data."""
schema = StructType(
[
StructField("FUND_ID", StringType(), False),
StructField("CUSIP", StringType(), True),
StructField("SEC_DESC", StringType(), True),
StructField("CURRENCY", StringType(), True),
StructField("CURRENT_FACE", DoubleType(), True),
StructField("SETTLED", DoubleType(), True),
StructField("UNSETTLED", DoubleType(), True),
StructField("FX_RATE", DoubleType(), True),
]
)
return schema
aladdinVarmarDF2SparkDF = spark.createDataFrame(aladdinVarmarDF2FilterByPatternDelete, schema=build_schema()) \
.withColumn("AS_OF_DATE", lit(asOfDate).cast("date"))
aladdinVarmarDF2SparkDF.printSchema()
aladdinVarmarDF2SparkDF.head(10)
Out[65]:
In [66]:
dashDBloadOptions = {
Connectors.DASHDB.HOST : dashCredentials["host"],
Connectors.DASHDB.DATABASE : dashCredentials["db"],
Connectors.DASHDB.USERNAME : dashCredentials["username"],
Connectors.DASHDB.PASSWORD : dashCredentials["password"],
Connectors.DASHDB.SOURCE_TABLE_NAME : dashCredentials["REF_FUND_TABLE"],
}
refFundDF = sqlContext.read.format("com.ibm.spark.discover").options(**dashDBloadOptions).load()
refFundDF.printSchema()
refFundDF.show(1)
In [67]:
aladdinVarmarJoinSparkDF = aladdinVarmarDF2SparkDF.join(refFundDF,
aladdinVarmarDF2SparkDF.FUND_ID == refFundDF.ID, "inner")\
.select(
refFundDF.ID.alias("FUND_ID"),
aladdinVarmarDF2SparkDF.CUSIP,
aladdinVarmarDF2SparkDF.SEC_DESC,
aladdinVarmarDF2SparkDF.CURRENCY,
aladdinVarmarDF2SparkDF.CURRENT_FACE,
aladdinVarmarDF2SparkDF.SETTLED,
aladdinVarmarDF2SparkDF.UNSETTLED,
aladdinVarmarDF2SparkDF.FX_RATE,
aladdinVarmarDF2SparkDF.AS_OF_DATE
)
aladdinVarmarJoinSparkDF.show(1)
In [68]:
# Connection to Dash DB for writing the data
dashdbsaveoption = {
Connectors.DASHDB.HOST : dashCredentials["host"],
Connectors.DASHDB.DATABASE : dashCredentials["db"],
Connectors.DASHDB.USERNAME : dashCredentials["username"],
Connectors.DASHDB.PASSWORD : dashCredentials["password"],
Connectors.DASHDB.TARGET_TABLE_NAME : dashCredentials["tableName"],
Connectors.DASHDB.TARGET_WRITE_MODE : 'merge'
}
mhcbDashDBDF = aladdinVarmarJoinSparkDF.write.format("com.ibm.spark.discover").options(**dashdbsaveoption).save()