In [58]:
!pip install --user xlrd


Requirement already satisfied: xlrd in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s1df-1767d8774d3251-73caa6cfaa60/.local/lib/python2.7/site-packages

In [59]:
# Setup constants if any

In [60]:
import pandas as pd
from io import BytesIO
import requests
import json
import xlrd 

from pyspark.sql.functions import *
from pyspark.sql.types import *

from datetime import datetime
from dateutil.parser import parse

from ingest.Connectors import Connectors

In [61]:
# The code was removed by DSX for sharing.

In [62]:
# The code was removed by DSX for sharing.

In [63]:
aladdinVarmarDF1 = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'ALADDIN-VARMAR.xlsx'),index_col=[0], header=[0]).iloc[0:7]
# Drop rows & columns with all 'NaN' values, axis 0 is for row
aladdinVarmarDFFiltered1 = aladdinVarmarDF1.dropna(axis=[0,1], how='all')
print aladdinVarmarDF1

#asOfDate = pd.to_datetime(aladdinVarmarDFFiltered1.loc['As Of Date:', 'Unnamed: 1']).strftime('%m/%d/%Y')

asOfDate = pd.to_datetime(aladdinVarmarDFFiltered1.loc['As Of Date:', 'Unnamed: 1']).strftime('%Y-%m-%d')

print "\nasOfDate = " + asOfDate


                       Unnamed: 1 Unnamed: 2 Unnamed: 3 Unnamed: 4 Unnamed: 5  \
View Positions                                                                  
Fund/Group:             I-MAI-ALL        NaN        NaN        NaN        NaN   
Security Group/Type:    CASH/CASH        NaN        NaN        NaN        NaN   
As Of Date:           31-JUL-2017        NaN        NaN        NaN        NaN   
Price Group:                  NAV        NaN        NaN        NaN        NaN   
Risk Group:                  RISK        NaN        NaN        NaN        NaN   
Risk Group:                   NaN        NaN        NaN        NaN        NaN   
99 matches found              NaN        NaN        NaN        NaN        NaN   

                     Unnamed: 6 Unnamed: 7 Unnamed: 8  
View Positions                                         
Fund/Group:                 NaN        NaN        NaN  
Security Group/Type:        NaN        NaN        NaN  
As Of Date:                 NaN        NaN        NaN  
Price Group:                NaN        NaN        NaN  
Risk Group:                 NaN        NaN        NaN  
Risk Group:                 NaN        NaN        NaN  
99 matches found            NaN        NaN        NaN  

asOfDate = 2017-07-31

In [64]:
aladdinVarmarDF2 = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'ALADDIN-VARMAR.xlsx'), header=[0], skipinitialspace=True, skiprows=8)

#index_col=[0,1]
# Drop rows & columns with all 'NaN' values, axis 0 is for row
aladdinVarmarDF2FilterNullRowsCols = aladdinVarmarDF2.dropna(axis=[0,1], how='all')
aladdinVarmarDF2FilterNullRowsCols = aladdinVarmarDF2FilterNullRowsCols.rename(index=str, columns={"Portfolio": "FUND_ID", "CUSIP(Aladdin ID)": "CUSIP", "Sec Desc": "SEC_DESC","Currency": "CURRENCY","Current Face": "CURRENT_FACE","Settled": "SETTLED","Unsettled": "UNSETTLED", "Base Curr FX Rate": "FX_RATE"})

## patern matching if ALADDIN_ID matches '(any value between braces)', this is an aggregate row, don't store it in the database
patternDelete='\(*\)$'
filter = aladdinVarmarDF2FilterNullRowsCols['CUSIP'].str.contains(patternDelete)

aladdinVarmarDF2FilterByPatternDelete = aladdinVarmarDF2FilterNullRowsCols[~filter]

#mhcbDFFilterByICJF = mhcbDFFilterNullRowsCols[mhcbDFFilterNullRowsCols.PORTFOLIO == 'I-CJF']
# aladdinVarmarDF2FilterByPatternDelete.head(20)
aladdinVarmarDF2FilterByPatternDelete.dtypes

# for idx in mhcbDFFilterNullRowsCols.index.map(lambda x: x[:-1]):
#     print idx
#     df_select=mhcbDFFilterNullRowsCols.ix[idx]
#     print df_select


Out[64]:
FUND_ID          object
CUSIP            object
SEC_DESC         object
CURRENCY         object
CURRENT_FACE    float64
SETTLED         float64
UNSETTLED       float64
FX_RATE         float64
dtype: object

In [65]:
spark = SparkSession.builder.getOrCreate()  

def build_schema():
    """Build and return a schema to use for the sample data."""
    schema = StructType(
        [            
            StructField("FUND_ID",  StringType(), False),
            StructField("CUSIP", StringType(), True),
            StructField("SEC_DESC", StringType(), True),
            StructField("CURRENCY", StringType(), True),            
            StructField("CURRENT_FACE", DoubleType(), True),
            StructField("SETTLED", DoubleType(), True),
            StructField("UNSETTLED", DoubleType(), True),
            StructField("FX_RATE", DoubleType(), True),                        
        ]
    )
    return schema


aladdinVarmarDF2SparkDF = spark.createDataFrame(aladdinVarmarDF2FilterByPatternDelete, schema=build_schema()) \
                                .withColumn("AS_OF_DATE", lit(asOfDate).cast("date"))


aladdinVarmarDF2SparkDF.printSchema()
aladdinVarmarDF2SparkDF.head(10)


root
 |-- FUND_ID: string (nullable = false)
 |-- CUSIP: string (nullable = true)
 |-- SEC_DESC: string (nullable = true)
 |-- CURRENCY: string (nullable = true)
 |-- CURRENT_FACE: double (nullable = true)
 |-- SETTLED: double (nullable = true)
 |-- UNSETTLED: double (nullable = true)
 |-- FX_RATE: double (nullable = true)
 |-- AS_OF_DATE: date (nullable = true)

Out[65]:
[Row(FUND_ID=u'I-AQUA', CUSIP=u'MARNEAUD7', SEC_DESC=u'FUTURES AUD MARGIN BALANCE NEWEDGE', CURRENCY=u'AUD', CURRENT_FACE=-15541.65, SETTLED=-15541.65, UNSETTLED=0.0, FX_RATE=0.7984, AS_OF_DATE=datetime.date(2017, 7, 31)),
 Row(FUND_ID=u'I-AQUA', CUSIP=u'MARNECAD5', SEC_DESC=u'FUTURES CAD MARGIN BALANCE NEWEDGE', CURRENCY=u'CAD', CURRENT_FACE=-34575.85, SETTLED=-34575.85, UNSETTLED=0.0, FX_RATE=0.7996, AS_OF_DATE=datetime.date(2017, 7, 31)),
 Row(FUND_ID=u'I-AQUA', CUSIP=u'MARNEEUR8', SEC_DESC=u'FUTURES EUR MARGIN BALANCE NEWEDGE', CURRENCY=u'EUR', CURRENT_FACE=-42216.08, SETTLED=-42216.08, UNSETTLED=0.0, FX_RATE=1.179, AS_OF_DATE=datetime.date(2017, 7, 31)),
 Row(FUND_ID=u'I-AQUA', CUSIP=u'MARNEGBP8', SEC_DESC=u'FUTURES GBP MARGIN BALANCE NEWEDGE', CURRENCY=u'GBP', CURRENT_FACE=4709.37, SETTLED=4709.37, UNSETTLED=0.0, FX_RATE=1.3183, AS_OF_DATE=datetime.date(2017, 7, 31)),
 Row(FUND_ID=u'I-AQUA', CUSIP=u'MARNEHKD3', SEC_DESC=u'FUTURES HKD MARGIN BALANCE NEWEDGE', CURRENCY=u'HKD', CURRENT_FACE=-97645.86, SETTLED=-97645.86, UNSETTLED=0.0, FX_RATE=0.128, AS_OF_DATE=datetime.date(2017, 7, 31)),
 Row(FUND_ID=u'I-AQUA', CUSIP=u'MARNEJPY8', SEC_DESC=u'FUTURES JPY MARGIN BALANCE NEWEDGE', CURRENCY=u'JPY', CURRENT_FACE=-8652049.0, SETTLED=-8652049.0, UNSETTLED=0.0, FX_RATE=0.0091, AS_OF_DATE=datetime.date(2017, 7, 31)),
 Row(FUND_ID=u'I-AQUA', CUSIP=u'MARNESEK0', SEC_DESC=u'FUTURES SEK MARGIN BALANCE NEWEDGE', CURRENCY=u'SEK', CURRENT_FACE=-319333.1, SETTLED=-319333.1, UNSETTLED=0.0, FX_RATE=0.1236, AS_OF_DATE=datetime.date(2017, 7, 31)),
 Row(FUND_ID=u'I-AQUA', CUSIP=u'MARNESGD4', SEC_DESC=u'FUTURES SGD MARGIN BALANCE NEWEDGE', CURRENCY=u'SGD', CURRENT_FACE=-2850.45, SETTLED=-2850.45, UNSETTLED=0.0, FX_RATE=0.7368, AS_OF_DATE=datetime.date(2017, 7, 31)),
 Row(FUND_ID=u'I-AQUA', CUSIP=u'MARNEUSD6', SEC_DESC=u'FUTURES USD MARGIN BALANCE NEWEDGE', CURRENCY=u'USD', CURRENT_FACE=-1122598.72, SETTLED=-1122598.72, UNSETTLED=0.0, FX_RATE=1.0, AS_OF_DATE=datetime.date(2017, 7, 31)),
 Row(FUND_ID=u'I-CGF', CUSIP=u'MARBAUSD6', SEC_DESC=u'FUTURES USD MARGIN BALANCE BARCLAY', CURRENCY=u'USD', CURRENT_FACE=0.0, SETTLED=0.0, UNSETTLED=0.0, FX_RATE=1.0, AS_OF_DATE=datetime.date(2017, 7, 31))]

In [66]:
dashDBloadOptions = { 
                    Connectors.DASHDB.HOST              : dashCredentials["host"],
                    Connectors.DASHDB.DATABASE          : dashCredentials["db"],
                    Connectors.DASHDB.USERNAME          : dashCredentials["username"],
                    Connectors.DASHDB.PASSWORD          : dashCredentials["password"],
                    Connectors.DASHDB.SOURCE_TABLE_NAME : dashCredentials["REF_FUND_TABLE"],
}

refFundDF = sqlContext.read.format("com.ibm.spark.discover").options(**dashDBloadOptions).load()
refFundDF.printSchema()
refFundDF.show(1)


root
 |-- ID: string (nullable = false)

+-----+
|   ID|
+-----+
|I-CJF|
+-----+
only showing top 1 row


In [67]:
aladdinVarmarJoinSparkDF = aladdinVarmarDF2SparkDF.join(refFundDF, 
                                               aladdinVarmarDF2SparkDF.FUND_ID == refFundDF.ID, "inner")\
                                        .select(
                                                refFundDF.ID.alias("FUND_ID"),                                                
                                                aladdinVarmarDF2SparkDF.CUSIP,
                                                aladdinVarmarDF2SparkDF.SEC_DESC,
                                                aladdinVarmarDF2SparkDF.CURRENCY,
                                                aladdinVarmarDF2SparkDF.CURRENT_FACE,
                                                aladdinVarmarDF2SparkDF.SETTLED,
                                                aladdinVarmarDF2SparkDF.UNSETTLED,
                                                aladdinVarmarDF2SparkDF.FX_RATE,
                                                aladdinVarmarDF2SparkDF.AS_OF_DATE
                                               )

aladdinVarmarJoinSparkDF.show(1)


+-------+---------+--------------------+--------+------------+-------+---------+-------+----------+
|FUND_ID|    CUSIP|            SEC_DESC|CURRENCY|CURRENT_FACE|SETTLED|UNSETTLED|FX_RATE|AS_OF_DATE|
+-------+---------+--------------------+--------+------------+-------+---------+-------+----------+
| I-WGEF|MARNEAUD7|FUTURES AUD MARGI...|     AUD|       126.9|  126.9|      0.0| 0.7984|2017-07-31|
+-------+---------+--------------------+--------+------------+-------+---------+-------+----------+
only showing top 1 row


In [68]:
# Connection to Dash DB for writing the data
dashdbsaveoption = {
                     Connectors.DASHDB.HOST              : dashCredentials["host"],
                     Connectors.DASHDB.DATABASE          : dashCredentials["db"],
                     Connectors.DASHDB.USERNAME          : dashCredentials["username"],
                     Connectors.DASHDB.PASSWORD          : dashCredentials["password"],
                     Connectors.DASHDB.TARGET_TABLE_NAME : dashCredentials["tableName"],
                     Connectors.DASHDB.TARGET_WRITE_MODE : 'merge' 
}

mhcbDashDBDF = aladdinVarmarJoinSparkDF.write.format("com.ibm.spark.discover").options(**dashdbsaveoption).save()