In [1]:
import os, sys
In [27]:
from pyspark.sql import SQLContext, Row
import datetime
from collections import namedtuple
import numpy as np
import pandas as pd
Create the SQLContext
In [3]:
sql = SQLContext(sc)
Each row contains the (already computed) scenario values for each date and risk factor:
Row = DAY x RiskFactor x NeutralScenario x Scenarios
Ideally the rows would be parsed like this, but because custom row aggregation is not fully supported
In [36]:
RFScenario = namedtuple('RFScenario', ('rf', 'date', 'neutral', 'scenarios'))
and because the number of scenarios is fixed, each scenario is a column
In [6]:
def construct_scenarios_type(number_scenarios=250, name = 'Scenarios'):
names = ['rf', 'date', 'neutral']
scenario_cols = ["s%d"%x for x in range(1,number_scenarios+1)]
names.extend(scenario_cols)
Scenarios = namedtuple('Scenarios', names)
return Scenarios, scenario_cols
Scenarios, scenario_cols = construct_scenarios_type()
and we can parse the rows of the csv file accordingly
In [30]:
DATA_DIR = os.path.join(os.pardir, 'data')
csv_filename = os.path.join(DATA_DIR, "scenarios2.csv")
pd.read_csv(csv_filename, header=None).head()
Out[30]:
In [31]:
from pyspark.mllib.linalg import Vectors, DenseVector, SparseVector, _convert_to_vector
def parse(row):
DATE_FMT = "%Y-%m-%d"
row[0] = row[0]
row[1] = datetime.datetime.strptime(row[1], DATE_FMT)
for i in np.arange(2,len(row)):
row[i] = float(row[i])
return RFScenario(row[0], row[1], row[2], DenseVector(row[3:6]))
def parse_explicit(row):
DATE_FMT = "%Y-%m-%d"
row[0] = row[0]
row[1] = datetime.datetime.strptime(row[1], DATE_FMT)
for i in np.arange(2,len(row)):
row[i] = float(row[i])
return Scenarios(*row)
In [21]:
lines = sc.textFile(csv_filename)
parts = lines.map(lambda l: l.split(","))
rows = parts.map(parse)
In [22]:
rows_exp = parts.map(parse_explicit)
df_exp = sql.createDataFrame(rows_exp)
In [23]:
df_exp.head(1)
Out[23]:
For each day, we want to aggregate scenarios from different risk factors, and then compute the Value at Risk per day.
In [38]:
def var(scenarios, level=99, neutral_scenario=0):
pnls = scenarios - neutral_scenario
return - np.percentile(pnls, 100-level, interpolation='linear')
In [39]:
scenario_dates = df_exp.groupBy('date').sum()
var_rdd = scenario_dates.map(lambda r: (r[0], r[1], float(var(np.array(r[2:]) - r[1]))))
df_var = sql.createDataFrame(var_rdd, schema=['date', 'neutral', 'var'])
In [40]:
%matplotlib notebook
df_var.toPandas().plot()
Out[40]:
Define portfolios and put them into a Spark DataFrame
In [63]:
pf_rdd = sc.parallelize([('P1', 'RF1', 1.), ('P1', 'RF2', 2.), ('P2', 'RF1', 0.2), ('P2', 'RF2', -0.8)])
dfpf = sql.createDataFrame(pf_rdd, ['portfolio', 'rf', 'qty'])
In [64]:
dfpf.collect()
Out[64]:
In [65]:
res = df_exp.join(dfpf, dfpf.rf == df_exp.rf)
In [66]:
res.head(1)
Out[66]:
In [72]:
# scenario_dates = df_exp.groupBy('date').sum()
var_per_portfolio = res.groupBy('date', 'portfolio').sum()
# var_per_portfolio.toPandas().plot()
var_per_portfolio = var_per_portfolio.map(lambda r: (r[0], r[1], r[2], float(var(np.array(r[3:]) - r[2]))))
var_per_portfolio = sql.createDataFrame(var_per_portfolio, schema=['date', 'portfolio', 'neutral', 'var'])
In [82]:
%matplotlib notebook
df1 = var_per_portfolio.toPandas()
df2 = df1.set_index(['date', 'portfolio'])
# ['neutral'].plot(subplots=True)
In [95]:
df3 = df2.unstack(1) #['var'].plot(subplots=True)
df3
Out[95]:
In [ ]:
f = sql.udf.register("fadd", lambda x: (np.array(x[3]) * 3.1).tolist(), ArrayType(FloatType()))
fagg = sql.udf.register("fagg", lambda x,y: (np.array(x[3]) + np.array(y[3])).tolist(), ArrayType(FloatType()))
In [76]:
sql.registerDataFrameAsTable(df, 'scen')
In [128]:
sql.sql('select date, fadd(scenarios) from scen group by date').collect()