In [1]:
!pip install --user xlrd
In [9]:
# Setup constants if any
# FUNDS ID
FUNDS_ID_LIST = ['I-CJF','I-RLLAF','I-WUSIGX','I-AQUA','I-SQGFS','I-SQGFSH2','I-SQGFSH2O', 'I-SQGFSO', 'I-WEUBEAR', 'I-WEUBULL', 'I-WGEBEAR', 'I-WGEBULL', 'I-WGEF', 'I-WUEBEAR', 'I-WUEBULL', 'I-WUSBEAR', 'I-WUSBULL']
In [2]:
import pandas as pd
from io import BytesIO
import requests
import json
import xlrd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from dateutil.parser import parse
from ingest.Connectors import Connectors
In [3]:
# The code was removed by DSX for sharing.
In [22]:
# The code was removed by DSX for sharing.
In [6]:
aladdinFuturePosDF1 = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'ALADDIN-FUT-POS.xlsx'),index_col=[0], header=[0]).iloc[0:7]
# Drop rows & columns with all 'NaN' values, axis 0 is for row
aladdinFuturePosDFFiltered1 = aladdinFuturePosDF1.dropna(axis=[0,1], how='all')
print aladdinFuturePosDFFiltered1
asOfDate = pd.to_datetime(aladdinFuturePosDFFiltered1.loc['As Of Date:', 'Unnamed: 1']).strftime('%Y-%m-%d')
print "\nasOfDate = " + asOfDate
In [14]:
aladdinFuturePosDF2 = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'ALADDIN-FUT-POS.xlsx'), header=[0], skipinitialspace=True, skiprows=8, index_col=[0])
#index_col=[0,1]
# Drop rows & columns with all 'NaN' values, axis 0 is for row
aladdinFuturePosDF2FilterNullRowsCols = aladdinFuturePosDF2.dropna(axis=[0,1], how='all')
dfNewArr = []
for id in FUNDS_ID_LIST:
df = aladdinFuturePosDF2FilterNullRowsCols.loc[id].iloc[1:]
dfNewArr.append(df)
# Concat all the funds together
dfNew=pd.concat(dfNewArr)
# Flatten the list by removing all the index
dfNew = dfNew.reset_index()
# Rename column to match database columns
aladdinFuturePosDF2Renamed = \
dfNew.rename(index=str,
columns={"Portfolio": "FUND_ID",
"Reuter": "REUTER" ,
"CUSIP(Aladdin ID)": "CUSIP",
"Sec Desc": "SEC_DESC",
"Orig. Face": "ORIG_FACE",
"Current Face": "CURR_FACE",
"Currency": "CURRENCY",
"Unsettled": "UNSETTLED",
"Market Price": "MKT_PRICE",
"Counterparty Ticker": "COUNTER_PRTY_TIC"
})\
# Convert to float. TODO - Should everything be String as CSV files data is inconsistent
aladdinFuturePosDF2Renamed[['ORIG_FACE', 'CURR_FACE', 'UNSETTLED', 'MKT_PRICE']] \
= aladdinFuturePosDF2Renamed[['ORIG_FACE', 'CURR_FACE', 'UNSETTLED', 'MKT_PRICE']].astype(float)
#aladdinFuturePosDF2Renamed.head(146)
aladdinFuturePosDF2Renamed.dtypes
Out[14]:
In [15]:
spark = SparkSession.builder.getOrCreate()
def build_schema():
"""Build and return a schema to use for the sample data."""
schema = StructType(
[
StructField("FUND_ID", StringType(), False),
StructField("REUTER", StringType(), False),
StructField("CUSIP", StringType(), False),
StructField("SEC_DESC", StringType(), True),
StructField("ORIG_FACE", DoubleType(), True),
StructField("CURR_FACE", DoubleType(), True),
StructField("CURRENCY", StringType(), False),
StructField("UNSETTLED", DoubleType(), True),
StructField("MKT_PRICE", DoubleType(), True),
StructField("COUNTER_PRTY_TIC", StringType(), True)
]
)
return schema
aladdinFuturePosDF2SparkDF = spark.createDataFrame(aladdinFuturePosDF2Renamed, schema=build_schema()) \
.withColumn("AS_OF_DATE", lit(asOfDate).cast("date"))
aladdinFuturePosDF2SparkDF.printSchema()
aladdinFuturePosDF2SparkDF.head(1)
Out[15]:
In [16]:
dashDBloadOptions = {
Connectors.DASHDB.HOST : dashCredentials["host"],
Connectors.DASHDB.DATABASE : dashCredentials["db"],
Connectors.DASHDB.USERNAME : dashCredentials["username"],
Connectors.DASHDB.PASSWORD : dashCredentials["password"],
Connectors.DASHDB.SOURCE_TABLE_NAME : dashCredentials["REF_FUND_TABLE"],
}
refFundDF = sqlContext.read.format("com.ibm.spark.discover").options(**dashDBloadOptions).load()
refFundDF.printSchema()
refFundDF.show(1)
In [21]:
aladdinFuturePosJoinSparkDF = aladdinFuturePosDF2SparkDF.join(refFundDF,
aladdinFuturePosDF2SparkDF.FUND_ID == refFundDF.ID, "inner")\
.select(
refFundDF.ID.alias("FUND_ID"),
aladdinFuturePosDF2SparkDF.REUTER,
aladdinFuturePosDF2SparkDF.CUSIP,
aladdinFuturePosDF2SparkDF.SEC_DESC,
aladdinFuturePosDF2SparkDF.ORIG_FACE,
aladdinFuturePosDF2SparkDF.CURR_FACE,
aladdinFuturePosDF2SparkDF.CURRENCY,
aladdinFuturePosDF2SparkDF.UNSETTLED,
aladdinFuturePosDF2SparkDF.MKT_PRICE,
aladdinFuturePosDF2SparkDF.COUNTER_PRTY_TIC,
aladdinFuturePosDF2SparkDF.AS_OF_DATE
)
aladdinFuturePosJoinSparkDF.show(10)
In [23]:
# Connection to Dash DB for writing the data
dashdbsaveoption = {
Connectors.DASHDB.HOST : dashCredentials["host"],
Connectors.DASHDB.DATABASE : dashCredentials["db"],
Connectors.DASHDB.USERNAME : dashCredentials["username"],
Connectors.DASHDB.PASSWORD : dashCredentials["password"],
Connectors.DASHDB.TARGET_TABLE_NAME : dashCredentials["tableName"],
Connectors.DASHDB.TARGET_WRITE_MODE : 'merge'
}
aladdinFuturePosDashDBDF = aladdinFuturePosJoinSparkDF.write.format("com.ibm.spark.discover").options(**dashdbsaveoption).save()
In [ ]: