In [40]:
!pip install --user xlrd


Requirement already satisfied: xlrd in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s1df-1767d8774d3251-73caa6cfaa60/.local/lib/python2.7/site-packages

In [41]:
# Setup constants if any

In [42]:
import pandas as pd
from io import BytesIO
import requests
import json
import xlrd 

from pyspark.sql.functions import *
from pyspark.sql.types import *

from datetime import datetime
from dateutil.parser import parse

from ingest.Connectors import Connectors

In [43]:
# The code was removed by DSX for sharing.

In [44]:
# The code was removed by DSX for sharing.

In [45]:
fundMappingDF = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'FundMapping.xlsm'),header=[0])

# Drop rows & columns with all 'NaN' values, axis 0 is for row
fundMappingFilteredDF = fundMappingDF.dropna(axis=[0,1], how='all').fillna('')

fundMappingRenamedDF = fundMappingFilteredDF.rename(index=str, columns={"ALADDIN": "ALADDIN_ID", "Fund Name": "FUND_NAME","FUTURES ACCOUNT": "FUTURES_ACCT_ID","BTIG": "BTIG_ID","FUND": "FUND_ID","NEWEDGE": "NEWEDGE_ID", "BASE": "BASE", "BARC": "BARC_ID"})
fundMappingRenamedDF.head(10)
# print fundMappingRenamedDF.dtypes


Out[45]:
ALADDIN_ID FUND_NAME FUTURES_ACCT_ID BTIG_ID FUND_ID NEWEDGE_ID BASE BARC_ID
0 I-CJF CRYSTAL JAPAN FUND C6500 I-ASIAPAC 72590 JY 4902C
1 I-CGF RRG99268 I-ESPER 799353 JY 4903c
2 I-HFR 12070 I-SABF1 799350 US 4904C
3 I-HMC 799271 I-TRIPRISM 775730 JY 4905C
4 I-SABF1 RRG99350 Sapphire Alternative Beta Fund I I-CGF 799268 US 4901C
5 I-SABF2 99285 Sapphire Alternative Beta Fund II I-CJF 789855 US 4900C
6 I-ESPER H2301 Esper Alternative Fund I-HFR 775780 US 4907C
7 I-TRIPRISM 775730 I-HMC 799271 US 4906C
8 I-ASIAPAC 72590 Asia Pacific Alternative Beta Plus Fund I-ST1 C6503 US
9 MAI Bridge Fund - Discretionary CTA C6503 I-ST2 C6503 US

In [49]:
spark = SparkSession.builder.getOrCreate()  

def build_schema():
    """Build and return a schema to use for the sample data."""
    schema = StructType(
        [
            StructField("ALADDIN_ID", StringType(), True),
            StructField("FUND_NAME",  StringType(), True),
            StructField("FUTURES_ACCT_ID",  StringType(), True),
            StructField("BTIG_ID", StringType(), True),
            StructField("FUND_ID", StringType(), True),
            StructField("NEWEDGE_ID", StringType(), True),
            StructField("BASE", StringType(), True),
            StructField("BARC_ID", StringType(), True),
        ]
    )
    return schema


fundMappingSparkDF = spark.createDataFrame(fundMappingRenamedDF, schema=build_schema())


fundMappingSparkDF.printSchema()


# Connection to Dash DB for writing the data
dashdbsaveoption = {
                     Connectors.DASHDB.HOST              : dashCredentials["host"],
                     Connectors.DASHDB.DATABASE          : dashCredentials["db"],
                     Connectors.DASHDB.USERNAME          : dashCredentials["username"],
                     Connectors.DASHDB.PASSWORD          : dashCredentials["password"],
                     Connectors.DASHDB.TARGET_TABLE_NAME : dashCredentials["tableName"],
                     Connectors.DASHDB.TARGET_WRITE_MODE : 'merge' 
}

fundMappingDashDBDF = fundMappingSparkDF.write.format("com.ibm.spark.discover").options(**dashdbsaveoption).save()


root
 |-- ALADDIN_ID: string (nullable = true)
 |-- FUND_NAME: string (nullable = true)
 |-- FUTURES_ACCT_ID: string (nullable = true)
 |-- BTIG_ID: string (nullable = true)
 |-- FUND_ID: string (nullable = true)
 |-- NEWEDGE_ID: string (nullable = true)
 |-- BASE: string (nullable = true)
 |-- BARC_ID: string (nullable = true)


In [ ]: