In [40]:
!pip install --user xlrd
In [41]:
# Setup constants if any
In [42]:
import pandas as pd
from io import BytesIO
import requests
import json
import xlrd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from dateutil.parser import parse
from ingest.Connectors import Connectors
In [43]:
# The code was removed by DSX for sharing.
In [44]:
# The code was removed by DSX for sharing.
In [45]:
fundMappingDF = pd.read_excel(getFileFromObjectStorage('MizuhoPOC', 'FundMapping.xlsm'),header=[0])
# Drop rows & columns with all 'NaN' values, axis 0 is for row
fundMappingFilteredDF = fundMappingDF.dropna(axis=[0,1], how='all').fillna('')
fundMappingRenamedDF = fundMappingFilteredDF.rename(index=str, columns={"ALADDIN": "ALADDIN_ID", "Fund Name": "FUND_NAME","FUTURES ACCOUNT": "FUTURES_ACCT_ID","BTIG": "BTIG_ID","FUND": "FUND_ID","NEWEDGE": "NEWEDGE_ID", "BASE": "BASE", "BARC": "BARC_ID"})
fundMappingRenamedDF.head(10)
# print fundMappingRenamedDF.dtypes
Out[45]:
In [49]:
spark = SparkSession.builder.getOrCreate()
def build_schema():
"""Build and return a schema to use for the sample data."""
schema = StructType(
[
StructField("ALADDIN_ID", StringType(), True),
StructField("FUND_NAME", StringType(), True),
StructField("FUTURES_ACCT_ID", StringType(), True),
StructField("BTIG_ID", StringType(), True),
StructField("FUND_ID", StringType(), True),
StructField("NEWEDGE_ID", StringType(), True),
StructField("BASE", StringType(), True),
StructField("BARC_ID", StringType(), True),
]
)
return schema
fundMappingSparkDF = spark.createDataFrame(fundMappingRenamedDF, schema=build_schema())
fundMappingSparkDF.printSchema()
# Connection to Dash DB for writing the data
dashdbsaveoption = {
Connectors.DASHDB.HOST : dashCredentials["host"],
Connectors.DASHDB.DATABASE : dashCredentials["db"],
Connectors.DASHDB.USERNAME : dashCredentials["username"],
Connectors.DASHDB.PASSWORD : dashCredentials["password"],
Connectors.DASHDB.TARGET_TABLE_NAME : dashCredentials["tableName"],
Connectors.DASHDB.TARGET_WRITE_MODE : 'merge'
}
fundMappingDashDBDF = fundMappingSparkDF.write.format("com.ibm.spark.discover").options(**dashdbsaveoption).save()
In [ ]: