In [1]:
IN_PATH='/mnt/cms/version2/events/'
OUT_PATH='/mnt/cms/version2/data/'
import numpy as np
import pandas as pd
import os
import os.path as osp
from joblib import Parallel, delayed
In [2]:
def get_stream_files(path=IN_PATH):
import os
import os.path as osp
streams = dict()
for d in (d for d in os.listdir(path) if osp.isdir(osp.join(path, d))):
d_path = osp.join(path, d)
streams[d] = [ osp.join(d_path, f) for f in os.listdir(d_path) if f.endswith('.pickled') ]
return streams
In [3]:
def group_by_lumi_map(dataframe, f, feature_names_gen=lambda x: None,
weight_column = 'instantLumi_',
exclude_columns=('run_', 'luminosityBlock_', 'timeHigh_', 'timeLow_')):
grouped = dataframe.groupby(['run_', 'luminosityBlock_'])
columns = sorted(list(set(dataframe.columns) - set(exclude_columns)))
feature_names = feature_names_gen(columns)
result = list()
run_blocks = list()
weights = list()
for run, luminosity_block in grouped.groups:
idx = np.array(grouped.groups[(run, luminosity_block)])
lumidata = dataframe.iloc[idx][columns].values
ws = dataframe.iloc[idx][weight_column].values
fs = f(lumidata, ws)
result.append(fs)
run_blocks.append((run, luminosity_block))
weights.append(np.mean(ws))
df = pd.DataFrame.from_records(result)
df.columns = feature_names
index = np.vstack(run_blocks)
weights = np.array(weights)
df['run_'] = index[:, 0]
df['luminosityBlock_'] = index[:, 1]
df[weight_column] = weights
return df
In [4]:
percentiles = [1, 25, 50, 75, 99]
def extract_features(block_data, weights = None):
n_features_per_column = len(percentiles) + 2
n = block_data.shape[1]
n_features = n * n_features_per_column + 2
result = np.ndarray(shape = n_features, dtype='float32')
for j in xrange(block_data.shape[1]):
x = block_data[:, j]
offset = j * n_features_per_column
result[offset] = np.mean(x)
result[offset + 1] = np.std(x)
for i, q in enumerate(percentiles):
result[offset + i + 2] = np.percentile(x, q = q)
result[-2] = block_data.shape[0]
result[-1] = np.mean(weights > 0.0)
return result
def get_feature_names(columns):
postfixes = ['mean', 'std'] + [ 'p' + str(q) for q in percentiles ]
return [
column + '_' + postfix for column in columns for postfix in postfixes
] + [ 'nEvents', 'nonZeroWeights' ]
In [5]:
def process(in_file, out_path):
import cPickle as pickle
_, filename = osp.split(in_file)
with open(in_file, 'r') as f:
df = pickle.load(f)
features = group_by_lumi_map(df, extract_features, get_feature_names)
out_file = osp.join(out_path, filename)
features.to_pickle(out_file)
return out_file
In [6]:
data_files = get_stream_files()
In [7]:
data_files.keys()
Out[7]:
In [8]:
with open(data_files['muons'][0], 'r') as f:
import cPickle as pickle
df = pickle.load(f)
d = group_by_lumi_map(df, extract_features, get_feature_names, weight_column='instantLumi_')
In [9]:
d
Out[9]:
In [14]:
[ f for f in d.columns if 'nEvents' in f ]
Out[14]:
In [11]:
if not osp.exists(OUT_PATH):
os.makedirs(OUT_PATH)
for stream in data_files:
print 'Processing', stream
files = data_files[stream]
out_path = osp.join(OUT_PATH, stream)
if not osp.exists(out_path):
os.makedirs(out_path)
Parallel(n_jobs=-1)(
delayed(process)(f, out_path)
for f in files
)
In [ ]: