In [1]:
!pip install --user xlrd


Requirement already satisfied: xlrd in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s1df-1767d8774d3251-73caa6cfaa60/.local/lib/python2.7/site-packages

In [9]:
# Setup constants if any
# FUNDS ID
FUNDS_ID_LIST = ['I-CJF','I-RLLAF','I-WUSIGX','I-AQUA','I-SQGFS','I-SQGFSH2','I-SQGFSH2O', 'I-SQGFSO', 'I-WEUBEAR', 'I-WEUBULL', 'I-WGEBEAR', 'I-WGEBULL', 'I-WGEF', 'I-WUEBEAR', 'I-WUEBULL', 'I-WUSBEAR', 'I-WUSBULL']

In [2]:
import pandas as pd
from io import BytesIO
import requests
import json
import xlrd 

from pyspark.sql.functions import *
from pyspark.sql.types import *

from datetime import datetime
from dateutil.parser import parse

from ingest.Connectors import Connectors

In [3]:
# The code was removed by DSX for sharing.

In [22]:
# The code was removed by DSX for sharing.

In [6]:
aladdinFuturePosDF1 = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'ALADDIN-FUT-POS.xlsx'),index_col=[0], header=[0]).iloc[0:7]
# Drop rows & columns with all 'NaN' values, axis 0 is for row
aladdinFuturePosDFFiltered1 = aladdinFuturePosDF1.dropna(axis=[0,1], how='all')
print aladdinFuturePosDFFiltered1

asOfDate = pd.to_datetime(aladdinFuturePosDFFiltered1.loc['As Of Date:', 'Unnamed: 1']).strftime('%Y-%m-%d')

print "\nasOfDate = " + asOfDate


                                                             Unnamed: 1
View Positions                                                         
Fund/Group:                                                   I-MAI-ALL
Security Group/Type:  FUTURE/*,FUTURE/CFD,FUTURE/COM,FUTURE/CUR,FUTU...
As Of Date:                                                 31-JUL-2017
Price Group:                                                        NAV
Risk Group:                                                        RISK

asOfDate = 2017-07-31

In [14]:
aladdinFuturePosDF2 = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'ALADDIN-FUT-POS.xlsx'), header=[0], skipinitialspace=True, skiprows=8, index_col=[0])

#index_col=[0,1]
# Drop rows & columns with all 'NaN' values, axis 0 is for row
aladdinFuturePosDF2FilterNullRowsCols = aladdinFuturePosDF2.dropna(axis=[0,1], how='all')

dfNewArr = []
for id in FUNDS_ID_LIST:    
    df = aladdinFuturePosDF2FilterNullRowsCols.loc[id].iloc[1:]
    dfNewArr.append(df)

# Concat all the funds together    
dfNew=pd.concat(dfNewArr)

# Flatten the list by removing all the index
dfNew = dfNew.reset_index()

# Rename column to match database columns
aladdinFuturePosDF2Renamed = \
    dfNew.rename(index=str, 
                 columns={"Portfolio": "FUND_ID", 
                          "Reuter": "REUTER" ,
                          "CUSIP(Aladdin ID)": "CUSIP", 
                          "Sec Desc": "SEC_DESC",
                          "Orig. Face": "ORIG_FACE", 
                          "Current Face": "CURR_FACE", 
                          "Currency": "CURRENCY",
                          "Unsettled": "UNSETTLED",
                          "Market Price": "MKT_PRICE",
                          "Counterparty Ticker": "COUNTER_PRTY_TIC"                          
                         })\

# Convert to float. TODO - Should everything be String as CSV files data is inconsistent    
aladdinFuturePosDF2Renamed[['ORIG_FACE', 'CURR_FACE', 'UNSETTLED', 'MKT_PRICE']]  \
= aladdinFuturePosDF2Renamed[['ORIG_FACE', 'CURR_FACE', 'UNSETTLED', 'MKT_PRICE']].astype(float)    

#aladdinFuturePosDF2Renamed.head(146)
aladdinFuturePosDF2Renamed.dtypes


Out[14]:
FUND_ID              object
REUTER               object
CUSIP                object
SEC_DESC             object
ORIG_FACE           float64
CURR_FACE           float64
CURRENCY             object
UNSETTLED           float64
MKT_PRICE           float64
COUNTER_PRTY_TIC     object
dtype: object

In [15]:
spark = SparkSession.builder.getOrCreate()  

def build_schema():
    """Build and return a schema to use for the sample data."""
    schema = StructType(
        [            
            StructField("FUND_ID", StringType(), False),
            StructField("REUTER", StringType(), False),            
            StructField("CUSIP", StringType(), False),
            StructField("SEC_DESC", StringType(), True),
            StructField("ORIG_FACE", DoubleType(), True),
            StructField("CURR_FACE", DoubleType(), True),
            StructField("CURRENCY", StringType(), False),
            StructField("UNSETTLED", DoubleType(), True), 
            StructField("MKT_PRICE", DoubleType(), True),         
            StructField("COUNTER_PRTY_TIC", StringType(), True)                   
        ]
    )
    return schema


aladdinFuturePosDF2SparkDF = spark.createDataFrame(aladdinFuturePosDF2Renamed, schema=build_schema()) \
                                .withColumn("AS_OF_DATE", lit(asOfDate).cast("date"))


aladdinFuturePosDF2SparkDF.printSchema()
aladdinFuturePosDF2SparkDF.head(1)


root
 |-- FUND_ID: string (nullable = false)
 |-- REUTER: string (nullable = false)
 |-- CUSIP: string (nullable = false)
 |-- SEC_DESC: string (nullable = true)
 |-- ORIG_FACE: double (nullable = true)
 |-- CURR_FACE: double (nullable = true)
 |-- CURRENCY: string (nullable = false)
 |-- UNSETTLED: double (nullable = true)
 |-- MKT_PRICE: double (nullable = true)
 |-- COUNTER_PRTY_TIC: string (nullable = true)
 |-- AS_OF_DATE: date (nullable = true)

Out[15]:
[Row(FUND_ID=u'I-CJF', REUTER=u'ADU7', CUSIP=u'ADU720175', SEC_DESC=u'AUD/USD FUTURE (CME) SEP 17', ORIG_FACE=37.0, CURR_FACE=37.0, CURRENCY=u'USD', UNSETTLED=-12.0, MKT_PRICE=0.7994, COUNTER_PRTY_TIC=u'MS-I', AS_OF_DATE=datetime.date(2017, 7, 31))]

In [16]:
dashDBloadOptions = { 
                    Connectors.DASHDB.HOST              : dashCredentials["host"],
                    Connectors.DASHDB.DATABASE          : dashCredentials["db"],
                    Connectors.DASHDB.USERNAME          : dashCredentials["username"],
                    Connectors.DASHDB.PASSWORD          : dashCredentials["password"],
                    Connectors.DASHDB.SOURCE_TABLE_NAME : dashCredentials["REF_FUND_TABLE"],
}

refFundDF = sqlContext.read.format("com.ibm.spark.discover").options(**dashDBloadOptions).load()
refFundDF.printSchema()
refFundDF.show(1)


root
 |-- ID: string (nullable = false)

+-----+
|   ID|
+-----+
|I-CJF|
+-----+
only showing top 1 row


In [21]:
aladdinFuturePosJoinSparkDF = aladdinFuturePosDF2SparkDF.join(refFundDF, 
                                               aladdinFuturePosDF2SparkDF.FUND_ID == refFundDF.ID, "inner")\
                                        .select(
                                            refFundDF.ID.alias("FUND_ID"),                                                
                                            aladdinFuturePosDF2SparkDF.REUTER,
                                            aladdinFuturePosDF2SparkDF.CUSIP,
                                            aladdinFuturePosDF2SparkDF.SEC_DESC,
                                            aladdinFuturePosDF2SparkDF.ORIG_FACE,
                                            aladdinFuturePosDF2SparkDF.CURR_FACE,
                                            aladdinFuturePosDF2SparkDF.CURRENCY,
                                            aladdinFuturePosDF2SparkDF.UNSETTLED,
                                            aladdinFuturePosDF2SparkDF.MKT_PRICE,
                                            aladdinFuturePosDF2SparkDF.COUNTER_PRTY_TIC,
                                            aladdinFuturePosDF2SparkDF.AS_OF_DATE
                                            )

aladdinFuturePosJoinSparkDF.show(10)


+-------+------+---------+--------------------+---------+---------+--------+---------+---------+----------------+----------+
|FUND_ID|REUTER|    CUSIP|            SEC_DESC|ORIG_FACE|CURR_FACE|CURRENCY|UNSETTLED|MKT_PRICE|COUNTER_PRTY_TIC|AS_OF_DATE|
+-------+------+---------+--------------------+---------+---------+--------+---------+---------+----------------+----------+
| I-WGEF| AEXQ7|EOQ720172|AMSTERDAM INDEX A...|      0.0|      0.0|     EUR|      5.0|    521.9|          NEWE-I|2017-07-31|
| I-WGEF|  DMU7|FAU720176|S&P MID 400 EMINI...|      3.0|      3.0|     USD|      0.0|   1759.8|          NEWE-I|2017-07-31|
| I-WGEF|  ESU7|MEU720170| S&P500 EMINI SEP 17|    -12.0|    -12.0|     USD|      4.0|   2468.0|          NEWE-I|2017-07-31|
| I-WGEF| FCEQ7|CFQ720173|CAC40 10 EURO AUG 17|      8.0|      8.0|     EUR|     -1.0|   5090.0|          NEWE-I|2017-07-31|
| I-WGEF| FDXU7|GXU720175|    DAX INDEX SEP 17|      1.0|      1.0|     EUR|     -1.0|  12109.5|          NEWE-I|2017-07-31|
| I-WGEF| FFIU7|ZU7201771|FTSE 100 INDEX SE...|     12.0|     12.0|     GBP|      8.0|   7310.0|          NEWE-I|2017-07-31|
| I-WGEF| FVSU7|FVSU72017|VSTOXX MINI FUTUR...|    -33.0|    -33.0|     EUR|     -5.0|    15.75|          NEWE-I|2017-07-31|
| I-WGEF| HSIQ7|HIQ720172|HANG SENG INDEX A...|     -1.0|     -1.0|     HKD|     -2.0|  27228.0|          NEWE-I|2017-07-31|
| I-WGEF| IFSU7|STU720171|FTSE/MIB INDEX SE...|     -1.0|     -1.0|     EUR|      2.0|  21480.0|          NEWE-I|2017-07-31|
| I-WGEF| JNIU7|NKU720175|NIKKEI 225 (OSE) ...|      5.0|      5.0|     JPY|     -1.0|  19940.0|          NEWE-I|2017-07-31|
+-------+------+---------+--------------------+---------+---------+--------+---------+---------+----------------+----------+
only showing top 10 rows


In [23]:
# Connection to Dash DB for writing the data
dashdbsaveoption = {
                     Connectors.DASHDB.HOST              : dashCredentials["host"],
                     Connectors.DASHDB.DATABASE          : dashCredentials["db"],
                     Connectors.DASHDB.USERNAME          : dashCredentials["username"],
                     Connectors.DASHDB.PASSWORD          : dashCredentials["password"],
                     Connectors.DASHDB.TARGET_TABLE_NAME : dashCredentials["tableName"],
                     Connectors.DASHDB.TARGET_WRITE_MODE : 'merge' 
}

aladdinFuturePosDashDBDF = aladdinFuturePosJoinSparkDF.write.format("com.ibm.spark.discover").options(**dashdbsaveoption).save()

In [ ]: