In [25]:
!pip install --user xlrd


Requirement already satisfied: xlrd in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s1df-1767d8774d3251-73caa6cfaa60/.local/lib/python2.7/site-packages

In [26]:
# Setup constants if any

In [27]:
import pandas as pd
from io import BytesIO
import requests
import json
import xlrd 

from pyspark.sql.functions import *
from pyspark.sql.types import *

from datetime import datetime
from dateutil.parser import parse

from ingest.Connectors import Connectors

In [28]:
# The code was removed by DSX for sharing.

In [29]:
# The code was removed by DSX for sharing.

In [30]:
bbhCashDF = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'BBH - Cash.xlsx'),header=[0])

# Drop rows & columns with all 'NaN' values, axis 0 is for row
# bbhCashFilteredDF = bbhCashDF.fillna('')
#dropna(axis=[0,1], how='all')
# bbhCustodyFilteredDF.head(10)
bbhCashRenamedDF = bbhCashFilteredDF.rename(index=str, columns={"Head Account Number": "ACCT_NUM", "Actual Available Balance": "ACT_AVAIL_BAL",
                                                                   "Actual Variance Amount": "ACT_VAR_AMT","Bank of Deposit": "DEPOSIT_BANK",
                                                                   "Currency Account Name": "FUND_NAME","Currency Code": "CURR_CODE",
                                                                   "Opening Available + CMS Sweep Return": "OPEN_AVAIL_CMS_SWEEP_RETURN", 
                                                                   "Opening Available Balance": "OPEN_AVAIL_BAL", "Prior Day NAV": "PRIOR_DAY_NAV", 
                                                                   "Projected Closing Available Balance": "PROJ_CLOSE_AVAIL_BAL", 
                                                                   "Sub Account Number": "SUB_ACCT_NUM","Value Date": "AS_OF_DATE"})

# Convert the datetimeindex object to date
bbhCashRenamedDF['AS_OF_DATE'] = pd.DatetimeIndex(bbhCashRenamedDF['AS_OF_DATE']).date

bbhCashRenamedDF[['ACT_AVAIL_BAL', 'ACT_VAR_AMT', 'OPEN_AVAIL_CMS_SWEEP_RETURN', 'OPEN_AVAIL_BAL', 'PRIOR_DAY_NAV', 'PROJ_CLOSE_AVAIL_BAL']] = bbhCashRenamedDF[['ACT_AVAIL_BAL', 'ACT_VAR_AMT', 'OPEN_AVAIL_CMS_SWEEP_RETURN', 'OPEN_AVAIL_BAL', 'PRIOR_DAY_NAV', 'PROJ_CLOSE_AVAIL_BAL']].astype(float)


#asOfDate = pd.to_datetime('today').strftime('%Y-%m-%d')

#print "\nasOfDate = " + asOfDate


# bbhCustodyRenamedDF.head(20)
print bbhCashRenamedDF.dtypes


ACT_AVAIL_BAL                  float64
ACT_VAR_AMT                    float64
DEPOSIT_BANK                    object
FUND_NAME                       object
CURR_CODE                       object
ACCT_NUM                         int64
OPEN_AVAIL_CMS_SWEEP_RETURN    float64
OPEN_AVAIL_BAL                 float64
PRIOR_DAY_NAV                  float64
PROJ_CLOSE_AVAIL_BAL           float64
SUB_ACCT_NUM                     int64
AS_OF_DATE                      object
dtype: object

In [31]:
spark = SparkSession.builder.getOrCreate()  

def build_schema():
    """Build and return a schema to use for the sample data."""
    schema = StructType(
        [
            StructField("ACT_AVAIL_BAL", DoubleType(), True),            
            StructField("ACT_VAR_AMT", DoubleType(), True),  
            StructField("DEPOSIT_BANK", StringType(), True),
            StructField("FUND_NAME",  StringType(), False),
            StructField("CURR_CODE", StringType(), True),
            StructField("ACCT_NUM", IntegerType(), False),
            StructField("OPEN_AVAIL_CMS_SWEEP_RETURN", DoubleType(), True),
            StructField("OPEN_AVAIL_BAL", DoubleType(), True),
            StructField("PRIOR_DAY_NAV", DoubleType(), True),
            StructField("PROJ_CLOSE_AVAIL_BAL", DoubleType(), True),
            StructField("SUB_ACCT_NUM", IntegerType(), True),
            StructField("AS_OF_DATE", DateType(), False)        
        ]
    )
    return schema

bbhCashSparkDF = spark.createDataFrame(bbhCashRenamedDF, schema=build_schema())

bbhCashSparkDF.printSchema()

bbhCashSparkDF.show()


root
 |-- ACT_AVAIL_BAL: double (nullable = true)
 |-- ACT_VAR_AMT: double (nullable = true)
 |-- DEPOSIT_BANK: string (nullable = true)
 |-- FUND_NAME: string (nullable = false)
 |-- CURR_CODE: string (nullable = true)
 |-- ACCT_NUM: integer (nullable = false)
 |-- OPEN_AVAIL_CMS_SWEEP_RETURN: double (nullable = true)
 |-- OPEN_AVAIL_BAL: double (nullable = true)
 |-- PRIOR_DAY_NAV: double (nullable = true)
 |-- PROJ_CLOSE_AVAIL_BAL: double (nullable = true)
 |-- SUB_ACCT_NUM: integer (nullable = true)
 |-- AS_OF_DATE: date (nullable = false)

+-------------+-----------+------------+--------------------+---------+--------+---------------------------+--------------+-------------+--------------------+------------+----------+
|ACT_AVAIL_BAL|ACT_VAR_AMT|DEPOSIT_BANK|           FUND_NAME|CURR_CODE|ACCT_NUM|OPEN_AVAIL_CMS_SWEEP_RETURN|OPEN_AVAIL_BAL|PRIOR_DAY_NAV|PROJ_CLOSE_AVAIL_BAL|SUB_ACCT_NUM|AS_OF_DATE|
+-------------+-----------+------------+--------------------+---------+--------+---------------------------+--------------+-------------+--------------------+------------+----------+
|   2296497.63|        0.0|         BBH|  CRYSTAL JAPAN FUND|      USD| 2203719|                 2300372.63|           0.0|          0.0|          2296497.63|     2203719|2017-07-27|
| 5.60875494E8|        0.0|         BBH|JPY CRYSTAL JAPAN...|      JPY| 2203719|               5.60875494E8|           0.0|          0.0|        5.60875494E8|     1838812|2017-07-27|
+-------------+-----------+------------+--------------------+---------+--------+---------------------------+--------------+-------------+--------------------+------------+----------+


In [32]:
dashDBloadOptions = { 
                    Connectors.DASHDB.HOST              : dashCredentials["host"],
                    Connectors.DASHDB.DATABASE          : dashCredentials["db"],
                    Connectors.DASHDB.USERNAME          : dashCredentials["username"],
                    Connectors.DASHDB.PASSWORD          : dashCredentials["password"],
                    Connectors.DASHDB.SOURCE_TABLE_NAME : dashCredentials["REF_FUND_MAPPING_TABLE"],
}

refFundMappingDF = sqlContext.read.format("com.ibm.spark.discover").options(**dashDBloadOptions).load()
refFundMappingDF.printSchema()
refFundMappingDF.show(1)


root
 |-- ALADDIN_ID: string (nullable = true)
 |-- FUND_NAME: string (nullable = true)
 |-- FUTURES_ACCT_ID: string (nullable = true)
 |-- BTIG_ID: string (nullable = true)
 |-- FUND_ID: string (nullable = true)
 |-- NEWEDGE_ID: string (nullable = true)
 |-- BASE: string (nullable = true)
 |-- BARC_ID: string (nullable = true)

+----------+------------------+---------------+-------+---------+----------+----+-------+
|ALADDIN_ID|         FUND_NAME|FUTURES_ACCT_ID|BTIG_ID|  FUND_ID|NEWEDGE_ID|BASE|BARC_ID|
+----------+------------------+---------------+-------+---------+----------+----+-------+
|     I-CJF|CRYSTAL JAPAN FUND|          C6500|       |I-ASIAPAC|     72590|  JY|  4902C|
+----------+------------------+---------------+-------+---------+----------+----+-------+
only showing top 1 row


In [34]:
bbhCashJoinSparkDF = bbhCashSparkDF.join(refFundMappingDF, 
                                               bbhCashSparkDF.FUND_NAME == refFundMappingDF.FUND_NAME, "inner")\
                                        .select(bbhCashSparkDF.ACCT_NUM,bbhCashSparkDF.FUND_NAME,
                                                refFundMappingDF.ALADDIN_ID.alias("FUND_ID"),
                                                bbhCashSparkDF.ACT_AVAIL_BAL,
                                                bbhCashSparkDF.ACT_VAR_AMT,
                                                bbhCashSparkDF.DEPOSIT_BANK,
                                                bbhCashSparkDF.CURR_CODE,
                                                bbhCashSparkDF.OPEN_AVAIL_CMS_SWEEP_RETURN,
                                                bbhCashSparkDF.OPEN_AVAIL_BAL,
                                                bbhCashSparkDF.PRIOR_DAY_NAV,
                                                bbhCashSparkDF.PROJ_CLOSE_AVAIL_BAL,
                                                bbhCashSparkDF.SUB_ACCT_NUM,
                                                bbhCashSparkDF.AS_OF_DATE
                                               )

bbhCashJoinSparkDF.show(1)


+--------+------------------+-------+-------------+-----------+------------+---------+---------------------------+--------------+-------------+--------------------+------------+----------+
|ACCT_NUM|         FUND_NAME|FUND_ID|ACT_AVAIL_BAL|ACT_VAR_AMT|DEPOSIT_BANK|CURR_CODE|OPEN_AVAIL_CMS_SWEEP_RETURN|OPEN_AVAIL_BAL|PRIOR_DAY_NAV|PROJ_CLOSE_AVAIL_BAL|SUB_ACCT_NUM|AS_OF_DATE|
+--------+------------------+-------+-------------+-----------+------------+---------+---------------------------+--------------+-------------+--------------------+------------+----------+
| 2203719|CRYSTAL JAPAN FUND|  I-CJF|   2296497.63|        0.0|         BBH|      USD|                 2300372.63|           0.0|          0.0|          2296497.63|     2203719|2017-07-27|
+--------+------------------+-------+-------------+-----------+------------+---------+---------------------------+--------------+-------------+--------------------+------------+----------+


In [35]:
# Connection to Dash DB for writing the data
dashdbsaveoption = {
                     Connectors.DASHDB.HOST              : dashCredentials["host"],
                     Connectors.DASHDB.DATABASE          : dashCredentials["db"],
                     Connectors.DASHDB.USERNAME          : dashCredentials["username"],
                     Connectors.DASHDB.PASSWORD          : dashCredentials["password"],
                     Connectors.DASHDB.TARGET_TABLE_NAME : dashCredentials["tableName"],
                     Connectors.DASHDB.TARGET_WRITE_MODE : 'merge' 
}

bbhCashJoinSparkDF.printSchema()

saveDashDBDF = bbhCashJoinSparkDF.write.format("com.ibm.spark.discover").options(**dashdbsaveoption).save()


root
 |-- ACCT_NUM: integer (nullable = false)
 |-- FUND_NAME: string (nullable = false)
 |-- FUND_ID: string (nullable = true)
 |-- ACT_AVAIL_BAL: double (nullable = true)
 |-- ACT_VAR_AMT: double (nullable = true)
 |-- DEPOSIT_BANK: string (nullable = true)
 |-- CURR_CODE: string (nullable = true)
 |-- OPEN_AVAIL_CMS_SWEEP_RETURN: double (nullable = true)
 |-- OPEN_AVAIL_BAL: double (nullable = true)
 |-- PRIOR_DAY_NAV: double (nullable = true)
 |-- PROJ_CLOSE_AVAIL_BAL: double (nullable = true)
 |-- SUB_ACCT_NUM: integer (nullable = true)
 |-- AS_OF_DATE: date (nullable = false)


In [ ]: