In [1]:
!pip install --user xlrd


Requirement already satisfied: xlrd in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s1df-1767d8774d3251-73caa6cfaa60/.local/lib/python2.7/site-packages

In [2]:
# Setup constants if any
OBJECT_STORAGE_FOLDER = 'MizuhoPOC'
INPUT_FILE_NAME = 'BBH - Custody.xlsx'
TABLE_NAME = 'BBH_CUSTODY'
REF_FUND_MAPPING_TABLE = 'REF_FUND_MAPPING'

In [3]:
# ALL IMPORTS SHOULD GO HERE

import pandas as pd
from io import BytesIO
import requests
import json
import xlrd 

from pyspark.sql.functions import *
from pyspark.sql.types import *

from datetime import datetime
from dateutil.parser import parse

from ingest.Connectors import Connectors

In [4]:
# The code was removed by DSX for sharing.

In [5]:
# The code was removed by DSX for sharing.

In [ ]:
# The code was removed by DSX for sharing.

In [20]:
bbhCustodyDF = pd.read_excel(getFileFromObjectStorage(OBJECT_STORAGE_FOLDER, INPUT_FILE_NAME),header=[0])

# Drop rows & columns with all 'NaN' values, axis 0 is for row
bbhCustodyFilteredDF = bbhCustodyDF.fillna('')
#dropna(axis=[0,1], how='all')

bbhCustodyRenamedDF = bbhCustodyFilteredDF.rename(index=str, columns={"Account Number": "ACCT_NUM", "Account Name": "FUND_NAME","Custody Security ID": "CUSTODY_SEC_ID","SEDOL": "SEDOL","ISIN": "ISIN","Security Description": "SEC_DESC", "Capitalization Description": "CAP_DESC", "Maturity Date": "MATURITY_DATE", "Location": "LOC", "Location Account Name": "LOC_ACCT_NAME", "Registration Description": "REG_DESC","Paydown Factor": "PAYDOWN_FACTOR", "Current Face Value": "CURR_FACE_VAL", "Custody Position": "CUSTODY_POS", "Total Available Position": "TOT_AVAIL_POS", "Position Date": "POS_DATE", "Price Date": "PRICE_DATE", "Price": "PRICE", "Pricing Currency": "PRICING_CURR", "Local Price": "LOCAL_PRICE", "Local Currency": "LOCAL_CURR", "FX Rate": "FX_RATE", "Market Value": "MARKET_VAL", "Local Market Value": "LOCAL_MARKET_VAL"})

# Convert the datetimeindex object to date
bbhCustodyRenamedDF['MATURITY_DATE'] = pd.DatetimeIndex(bbhCustodyRenamedDF['MATURITY_DATE']).date
bbhCustodyRenamedDF['POS_DATE'] = pd.DatetimeIndex(bbhCustodyRenamedDF['POS_DATE']).date
bbhCustodyRenamedDF['PRICE_DATE'] = pd.DatetimeIndex(bbhCustodyRenamedDF['PRICE_DATE']).date

bbhCustodyRenamedDF[['CUSTODY_POS', 'TOT_AVAIL_POS', 'LOCAL_MARKET_VAL']] = bbhCustodyRenamedDF[['CUSTODY_POS','TOT_AVAIL_POS', 'LOCAL_MARKET_VAL']].astype(float)


2018-03-20
asOfDate = pd.to_datetime('today').strftime('%Y-%m-%d')

print "\nasOfDate = " + asOfDate


# bbhCustodyRenamedDF.head(20)
# print bbhCustodyRenamedDF.dtypes


asOfDate = 2017-09-27

In [21]:
spark = SparkSession.builder.getOrCreate()  

def build_schema():
    """Build and return a schema to use for the sample data."""
    schema = StructType(
        [
            StructField("ACCT_NUM", IntegerType(), False),
            StructField("FUND_NAME",  StringType(), False),
            StructField("CUSTODY_SEC_ID", IntegerType(), True),
            StructField("SEDOL", StringType(), True),
            StructField("CUSIP", StringType(), True),
            StructField("ISIN", StringType(), True),
            StructField("SEC_DESC", StringType(), True),
            StructField("CAP_DESC", StringType(), True),
            StructField("MATURITY_DATE", DateType(), True),
            StructField("LOC", StringType(), True),
            StructField("LOC_ACCT_NAME", StringType(), True),
            StructField("REG_DESC", StringType(), True),
            StructField("PAYDOWN_FACTOR", StringType(), True),
            StructField("CURR_FACE_VAL", StringType(), True),
            StructField("CUSTODY_POS", DoubleType(), True),
            StructField("TOT_AVAIL_POS", DoubleType(), True),
            StructField("POS_DATE", DateType(), True),
            StructField("PRICE_DATE", DateType(), True),
            StructField("PRICE", DoubleType(), True),            
            StructField("PRICING_CURR", StringType(), True),            
            StructField("LOCAL_PRICE", DoubleType(), True),            
            StructField("LOCAL_CURR", StringType(), True),                        
            StructField("FX_RATE", DoubleType(), True),                                    
            StructField("MARKET_VAL", DoubleType(), True),                                    
            StructField("LOCAL_MARKET_VAL", DoubleType(), True)
        ]
    )
    return schema


bbhCustodySparkDF = spark.createDataFrame(bbhCustodyRenamedDF, schema=build_schema())\
                            .withColumn("AS_OF_DATE", lit(asOfDate).cast("date"))

bbhCustodySparkDF.printSchema()

bbhCustodySparkDF.show()


root
 |-- ACCT_NUM: integer (nullable = false)
 |-- FUND_NAME: string (nullable = false)
 |-- CUSTODY_SEC_ID: integer (nullable = true)
 |-- SEDOL: string (nullable = true)
 |-- CUSIP: string (nullable = true)
 |-- ISIN: string (nullable = true)
 |-- SEC_DESC: string (nullable = true)
 |-- CAP_DESC: string (nullable = true)
 |-- MATURITY_DATE: date (nullable = true)
 |-- LOC: string (nullable = true)
 |-- LOC_ACCT_NAME: string (nullable = true)
 |-- REG_DESC: string (nullable = true)
 |-- PAYDOWN_FACTOR: string (nullable = true)
 |-- CURR_FACE_VAL: string (nullable = true)
 |-- CUSTODY_POS: double (nullable = true)
 |-- TOT_AVAIL_POS: double (nullable = true)
 |-- POS_DATE: date (nullable = true)
 |-- PRICE_DATE: date (nullable = true)
 |-- PRICE: double (nullable = true)
 |-- PRICING_CURR: string (nullable = true)
 |-- LOCAL_PRICE: double (nullable = true)
 |-- LOCAL_CURR: string (nullable = true)
 |-- FX_RATE: double (nullable = true)
 |-- MARKET_VAL: double (nullable = true)
 |-- LOCAL_MARKET_VAL: double (nullable = true)
 |-- AS_OF_DATE: date (nullable = true)

+--------+------------------+--------------+-------+-----+------------+--------------------+--------+-------------+---+-------------+--------------------+--------------+-------------+-----------+-------------+----------+----------+-----------------+------------+-----------+----------+-------+------------------+----------------+----------+
|ACCT_NUM|         FUND_NAME|CUSTODY_SEC_ID|  SEDOL|CUSIP|        ISIN|            SEC_DESC|CAP_DESC|MATURITY_DATE|LOC|LOC_ACCT_NAME|            REG_DESC|PAYDOWN_FACTOR|CURR_FACE_VAL|CUSTODY_POS|TOT_AVAIL_POS|  POS_DATE|PRICE_DATE|            PRICE|PRICING_CURR|LOCAL_PRICE|LOCAL_CURR|FX_RATE|        MARKET_VAL|LOCAL_MARKET_VAL|AS_OF_DATE|
+--------+------------------+--------------+-------+-----+------------+--------------------+--------+-------------+---+-------------+--------------------+--------------+-------------+-----------+-------------+----------+----------+-----------------+------------+-----------+----------+-------+------------------+----------------+----------+
| 2203719|CRYSTAL JAPAN FUND|        564326|B2RHK74|     |JP1102921853|JAPAN (10 Y 1.7% ...|    DEBT|   2018-03-20| JP|         SMBC|Sec reg in other ...|              |             |      4.0E9|        4.0E9|2017-07-20|2017-07-19|0.905998209489704|         USD|      101.2|       JPY|  111.7|3.62399283795882E7|         4.048E9|2017-09-27|
+--------+------------------+--------------+-------+-----+------------+--------------------+--------+-------------+---+-------------+--------------------+--------------+-------------+-----------+-------------+----------+----------+-----------------+------------+-----------+----------+-------+------------------+----------------+----------+


In [25]:
dashDBloadOptions = { 
                    Connectors.DASHDB.HOST              : dashCredentials["host"],
                    Connectors.DASHDB.DATABASE          : dashCredentials["db"],
                    Connectors.DASHDB.USERNAME          : dashCredentials["username"],
                    Connectors.DASHDB.PASSWORD          : dashCredentials["password"],
                    Connectors.DASHDB.SOURCE_TABLE_NAME : dashCredentials["REF_FUND_MAPPING_TABLE"],
}

refFundMappingDF = sqlContext.read.format("com.ibm.spark.discover").options(**dashDBloadOptions).load()
refFundMappingDF.printSchema()
refFundMappingDF.show(1)


root
 |-- ALADDIN_ID: string (nullable = true)
 |-- FUND_NAME: string (nullable = true)
 |-- FUTURES_ACCT_ID: string (nullable = true)
 |-- BTIG_ID: string (nullable = true)
 |-- FUND_ID: string (nullable = true)
 |-- NEWEDGE_ID: string (nullable = true)
 |-- BASE: string (nullable = true)
 |-- BARC_ID: string (nullable = true)

+----------+------------------+---------------+-------+---------+----------+----+-------+
|ALADDIN_ID|         FUND_NAME|FUTURES_ACCT_ID|BTIG_ID|  FUND_ID|NEWEDGE_ID|BASE|BARC_ID|
+----------+------------------+---------------+-------+---------+----------+----+-------+
|     I-CJF|CRYSTAL JAPAN FUND|          C6500|       |I-ASIAPAC|     72590|  JY|  4902C|
+----------+------------------+---------------+-------+---------+----------+----+-------+
only showing top 1 row


In [23]:
bbhCustodyJoinSparkDF = bbhCustodySparkDF.join(refFundMappingDF, 
                                               bbhCustodySparkDF.FUND_NAME == refFundMappingDF.FUND_NAME, "inner")\
                                        .select(bbhCustodySparkDF.ACCT_NUM,bbhCustodySparkDF.FUND_NAME,
                                                refFundMappingDF.ALADDIN_ID.alias("FUND_ID"),
                                                bbhCustodySparkDF.CUSTODY_SEC_ID, bbhCustodySparkDF.SEDOL, 
                                                bbhCustodySparkDF.CUSIP, bbhCustodySparkDF.ISIN, 
                                                bbhCustodySparkDF.SEC_DESC, bbhCustodySparkDF.CAP_DESC, 
                                                bbhCustodySparkDF.MATURITY_DATE,bbhCustodySparkDF.LOC,
                                                bbhCustodySparkDF.LOC_ACCT_NAME, bbhCustodySparkDF.REG_DESC, 
                                                bbhCustodySparkDF.PAYDOWN_FACTOR, bbhCustodySparkDF.CURR_FACE_VAL, 
                                                bbhCustodySparkDF.CUSTODY_POS,bbhCustodySparkDF.TOT_AVAIL_POS, 
                                                bbhCustodySparkDF.POS_DATE, bbhCustodySparkDF.PRICE_DATE, 
                                                bbhCustodySparkDF.PRICE, bbhCustodySparkDF.PRICING_CURR, 
                                                bbhCustodySparkDF.LOCAL_PRICE, bbhCustodySparkDF.FX_RATE, 
                                                bbhCustodySparkDF.LOCAL_CURR,bbhCustodySparkDF.MARKET_VAL, 
                                                bbhCustodySparkDF.LOCAL_MARKET_VAL, bbhCustodySparkDF.AS_OF_DATE
                                               )

bbhCustodyJoinSparkDF.show(1)


+--------+------------------+-------+--------------+-------+-----+------------+--------------------+--------+-------------+---+-------------+--------------------+--------------+-------------+-----------+-------------+----------+----------+-----------------+------------+-----------+-------+----------+------------------+----------------+----------+
|ACCT_NUM|         FUND_NAME|FUND_ID|CUSTODY_SEC_ID|  SEDOL|CUSIP|        ISIN|            SEC_DESC|CAP_DESC|MATURITY_DATE|LOC|LOC_ACCT_NAME|            REG_DESC|PAYDOWN_FACTOR|CURR_FACE_VAL|CUSTODY_POS|TOT_AVAIL_POS|  POS_DATE|PRICE_DATE|            PRICE|PRICING_CURR|LOCAL_PRICE|FX_RATE|LOCAL_CURR|        MARKET_VAL|LOCAL_MARKET_VAL|AS_OF_DATE|
+--------+------------------+-------+--------------+-------+-----+------------+--------------------+--------+-------------+---+-------------+--------------------+--------------+-------------+-----------+-------------+----------+----------+-----------------+------------+-----------+-------+----------+------------------+----------------+----------+
| 2203719|CRYSTAL JAPAN FUND|  I-CJF|        564326|B2RHK74|     |JP1102921853|JAPAN (10 Y 1.7% ...|    DEBT|   2018-03-20| JP|         SMBC|Sec reg in other ...|              |             |      4.0E9|        4.0E9|2017-07-20|2017-07-19|0.905998209489704|         USD|      101.2|  111.7|       JPY|3.62399283795882E7|         4.048E9|2017-09-27|
+--------+------------------+-------+--------------+-------+-----+------------+--------------------+--------+-------------+---+-------------+--------------------+--------------+-------------+-----------+-------------+----------+----------+-----------------+------------+-----------+-------+----------+------------------+----------------+----------+


In [27]:
# Connection to Dash DB for writing the data
dashdbsaveoption = {
                     Connectors.DASHDB.HOST              : dashCredentials["host"],
                     Connectors.DASHDB.DATABASE          : dashCredentials["db"],
                     Connectors.DASHDB.USERNAME          : dashCredentials["username"],
                     Connectors.DASHDB.PASSWORD          : dashCredentials["password"],
                     Connectors.DASHDB.TARGET_TABLE_NAME : dashCredentials["tableName"],
                     Connectors.DASHDB.TARGET_WRITE_MODE : 'merge' 
}

bbhCustodyJoinSparkDF.printSchema()

saveDashDBDF = bbhCustodyJoinSparkDF.write.format("com.ibm.spark.discover").options(**dashdbsaveoption).save()


root
 |-- ACCT_NUM: integer (nullable = false)
 |-- FUND_NAME: string (nullable = false)
 |-- FUND_ID: string (nullable = true)
 |-- CUSTODY_SEC_ID: integer (nullable = true)
 |-- SEDOL: string (nullable = true)
 |-- CUSIP: string (nullable = true)
 |-- ISIN: string (nullable = true)
 |-- SEC_DESC: string (nullable = true)
 |-- CAP_DESC: string (nullable = true)
 |-- MATURITY_DATE: date (nullable = true)
 |-- LOC: string (nullable = true)
 |-- LOC_ACCT_NAME: string (nullable = true)
 |-- REG_DESC: string (nullable = true)
 |-- PAYDOWN_FACTOR: string (nullable = true)
 |-- CURR_FACE_VAL: string (nullable = true)
 |-- CUSTODY_POS: double (nullable = true)
 |-- TOT_AVAIL_POS: double (nullable = true)
 |-- POS_DATE: date (nullable = true)
 |-- PRICE_DATE: date (nullable = true)
 |-- PRICE: double (nullable = true)
 |-- PRICING_CURR: string (nullable = true)
 |-- LOCAL_PRICE: double (nullable = true)
 |-- FX_RATE: double (nullable = true)
 |-- LOCAL_CURR: string (nullable = true)
 |-- MARKET_VAL: double (nullable = true)
 |-- LOCAL_MARKET_VAL: double (nullable = true)
 |-- AS_OF_DATE: date (nullable = true)


In [ ]: