In [1]:
IN_PATH='/mnt/cms/version2/events/'
OUT_PATH='/mnt/cms/version2/data/'

import numpy as np
import pandas as pd
import os
import os.path as osp

from joblib import Parallel, delayed

In [2]:
def get_stream_files(path=IN_PATH):
    import os
    import os.path as osp
    
    streams = dict()
    
    for d in (d for d in os.listdir(path) if osp.isdir(osp.join(path, d))):
        d_path = osp.join(path, d)
        streams[d] = [ osp.join(d_path, f) for f in os.listdir(d_path) if f.endswith('.pickled') ]
    
    return streams

In [3]:
def group_by_lumi_map(dataframe, f, feature_names_gen=lambda x: None,
                      weight_column = 'instantLumi_',
                      exclude_columns=('run_', 'luminosityBlock_', 'timeHigh_', 'timeLow_')):

    grouped = dataframe.groupby(['run_', 'luminosityBlock_'])
    
    columns = sorted(list(set(dataframe.columns) - set(exclude_columns)))
    feature_names = feature_names_gen(columns)
    
    result = list()
    run_blocks = list()
    weights = list()
    
    for run, luminosity_block in grouped.groups:
        idx =  np.array(grouped.groups[(run, luminosity_block)])

        lumidata = dataframe.iloc[idx][columns].values
        ws = dataframe.iloc[idx][weight_column].values
        
        fs = f(lumidata, ws)
        result.append(fs)
        
        run_blocks.append((run, luminosity_block))
        
        weights.append(np.mean(ws))
    
    df = pd.DataFrame.from_records(result)
    df.columns = feature_names
    index = np.vstack(run_blocks)
    weights = np.array(weights)

    df['run_'] = index[:, 0]
    df['luminosityBlock_'] = index[:, 1]
    df[weight_column] = weights
    
    return df

In [4]:
percentiles = [1, 25, 50, 75, 99]

def extract_features(block_data, weights = None):
    n_features_per_column = len(percentiles) + 2
    
    n = block_data.shape[1]
    
    n_features = n * n_features_per_column + 2
    
    result = np.ndarray(shape = n_features, dtype='float32')
    
    for j in xrange(block_data.shape[1]):
        x = block_data[:, j]
        
        offset = j * n_features_per_column
        result[offset] = np.mean(x)
        result[offset + 1] = np.std(x)

        for i, q in enumerate(percentiles):
            result[offset + i + 2] = np.percentile(x, q = q)
    
    result[-2] = block_data.shape[0]
    result[-1] = np.mean(weights > 0.0)

    return result

def get_feature_names(columns):
    postfixes = ['mean', 'std'] + [ 'p' + str(q) for q in percentiles ]
    return [
        column + '_' + postfix for column in columns for postfix in postfixes
    ] + [ 'nEvents', 'nonZeroWeights' ]

In [5]:
def process(in_file, out_path):
    import cPickle as pickle
    
    _, filename = osp.split(in_file)
    
    with open(in_file, 'r') as f:
        df = pickle.load(f)
    
    features = group_by_lumi_map(df, extract_features, get_feature_names)
    
    out_file = osp.join(out_path, filename)
    features.to_pickle(out_file)
    
    return out_file

In [6]:
data_files = get_stream_files()

In [7]:
data_files.keys()


Out[7]:
['muons', 'minibias', 'photons']

In [8]:
with open(data_files['muons'][0], 'r') as f:
    import cPickle as pickle
    df = pickle.load(f)
    d = group_by_lumi_map(df, extract_features, get_feature_names, weight_column='instantLumi_')

In [9]:
d


Out[9]:
PF_Px_mean PF_Px_std PF_Px_p1 PF_Px_p25 PF_Px_p50 PF_Px_p75 PF_Px_p99 PF_Py_mean PF_Py_std PF_Py_p1 ... photons_pt__q5_p1 photons_pt__q5_p25 photons_pt__q5_p50 photons_pt__q5_p75 photons_pt__q5_p99 nEvents nonZeroWeights run_ luminosityBlock_ instantLumi_
0 0.016587 5.566961 -15.490458 -2.461476 0.080778 2.231963 15.396275 -0.144609 3.501009 -10.431609 ... 0 0 0 10.280008 34.048286 924 1 147222 131 37.095243
1 0.041701 5.952976 -14.936056 -2.248693 0.028245 2.162389 18.130831 -0.246512 3.486799 -10.444407 ... 0 0 0 10.255637 32.160465 892 1 147222 141 36.978725
2 -0.268982 6.343867 -21.439753 -2.537246 -0.056034 2.326667 15.797176 -0.303397 3.277447 -9.488536 ... 0 0 0 10.965864 38.361908 949 1 147222 133 37.072480
3 0.657939 6.687335 -14.726583 -1.872376 0.234515 2.550721 26.616590 -0.004989 3.626891 -9.095286 ... 0 0 0 10.707546 43.316406 827 1 147222 142 36.977972
4 0.246321 5.511532 -13.665376 -2.141167 0.155575 2.240264 17.127756 -0.202404 3.701674 -10.018579 ... 0 0 0 10.467780 41.317921 882 1 147222 132 37.087191
5 -0.020756 7.298374 -17.165285 -2.354466 -0.089530 2.120409 17.338858 -0.398925 3.105963 -9.190974 ... 0 0 0 11.196526 43.400600 875 1 147222 135 37.059391
6 -0.159594 6.595912 -17.551569 -2.183115 -0.085770 1.871124 16.947123 -0.229837 3.864945 -9.223512 ... 0 0 0 10.841379 42.448563 923 1 147222 136 37.050967
7 0.131800 5.585125 -16.232183 -2.032980 0.110132 2.448308 15.451523 -0.459735 2.920705 -9.486435 ... 0 0 0 4.784418 42.123413 843 1 147222 134 37.061549

8 rows × 901 columns


In [14]:
[ f for f in d.columns if 'nEvents' in f ]


Out[14]:
['nEvents']

In [11]:
if not osp.exists(OUT_PATH):
    os.makedirs(OUT_PATH)

for stream in data_files:
    print 'Processing', stream

    files = data_files[stream]
    out_path = osp.join(OUT_PATH, stream)

    if not osp.exists(out_path):
        os.makedirs(out_path)
    
    Parallel(n_jobs=-1)(
        delayed(process)(f, out_path)
        for f in files
    )


Processing muons
Processing minibias
Processing photons

In [ ]: