In [65]:
import pyspark
from pyspark.sql import SQLContext, Row
import pandas.io.data as web
import datetime
In [5]:
import datetime
from collections import namedtuple
In [2]:
sc = pyspark.SparkContext('local', 'pyspark')
In [3]:
sqlContext = SQLContext(sc)
In [150]:
Simulation = namedtuple('Simulation', ('date', 'neutral', 'scenarios'))
RFScenario = namedtuple('RFScenario', ('rf', 'date', 'neutral', 'scenarios'))
In [149]:
from pyspark.mllib.linalg import Vectors, DenseVector, SparseVector, _convert_to_vector
def parse(row):
DATE_FMT = "%Y-%m-%d"
row[0] = datetime.datetime.strptime(row[0], DATE_FMT)
for i in np.arange(1,len(row)):
row[i] = float(row[i])
return Simulation(row[0], row[1], DenseVector(row[2:]))
def parse2(row):
DATE_FMT = "%Y-%m-%d"
row[0] = row[0]
row[1] = datetime.datetime.strptime(row[1], DATE_FMT)
for i in np.arange(2,len(row)):
row[i] = float(row[i])
# return row
return RFScenario(row[0], row[1], row[2], DenseVector(row[3:6]))
# test
# s = "2015-05-08,32.42,32.864847227683306,32.50044000839989,31.962723820560473,31.920709606792094,32.528263796919845,31.86562405274838,32.136619526291824,32"
# datetime.datetime.strptime( s.split(',')[0], '%Y-%m-%d')
# float( s.split(',')[1])
Scenarios are already available. We will load the csv file into Spark (no Big Data here to see, move along).
In [175]:
csv_filename = "../data/scenarios.csv"
In [155]:
lines = sc.textFile(csv_filename)
parts = lines.map(lambda l: l.split(","))
rows = parts.map(parse)
In [156]:
df = sqlContext.createDataFrame(rows)
In [158]:
df.show()
In [159]:
df.describe()
Out[159]:
Now let's compute the VaR for each day.
In [43]:
def var(scenarios, level=99, neutral_scenario=0):
pnls = scenarios - neutral_scenario
return - np.percentile(pnls, 100-level, interpolation='linear')
In [161]:
pnls = df.map( lambda r: {'date': r.date,
'neutral': r.neutral,
'var': float(var(r.scenarios.array, neutral_scenario=r.neutral))})
In [162]:
a = sqlContext.createDataFrame(pnls)
In [164]:
%matplotlib notebook
a.toPandas().plot();
where the scenarios must be summed up.
In [184]:
csv_filename = "/Users/miguel/Jottacloud/devel/osqf2015/data/scenarios2.csv"
In [185]:
scenarios_rdd = sc.textFile(csv_filename).map(lambda l: l.split(",")).map(parse2)
In [186]:
scenarios_rdd.takeSample(False, 1,0)
Out[186]:
In [188]:
dfs = sqlContext.createDataFrame(scenarios_rdd)
Define one portfolio
In [189]:
pf_rdd = sc.parallelize([('RF1', 1.), ('RF2', 2.)])
dfpf = sqlContext.createDataFrame(pf_rdd, ['rf', 'qty'])
In [191]:
res = dfs.join(dfpf, dfpf.rf == dfs.rf).select(dfs.rf, dfpf.qty, dfs.date, dfs.neutral, dfs.scenarios)
In [192]:
pf_values = res.map(lambda r: Row(date=r.date,
neutral=r.neutral*r.qty,
scenarios=DenseVector(r.scenarios.array * r.qty)))
aaa = pf_values.map(lambda x: (x[0], (x[1],x[2]))).aggregateByKey(0, lambda v, d: d, lambda x,y: (x[0]+y[0], x[1]+y[1])).map(lambda r: Row(date=r[0], neutral=r[1][0], scenarios=r[1][1]))
In [193]:
df_res = sqlContext.createDataFrame(aaa)
In [195]:
pnls = df_res.map( lambda r: {'date': r.date,
'neutral': r.neutral,
'var': float(var(r.scenarios.array, neutral_scenario=r.neutral))}).toDF().toPandas()
In [196]:
%matplotlib notebook
pnls.plot()
Out[196]:
In [198]:
df_vals.groupBy('date').agg({'neutral': 'sum'}).collect()
Out[198]: