Spark/Thunder implementation of Humphries 2011 spike train community detection

Mathew H. Evans October 2014. Built on top of a python script developed by Thomas Sharp @ Riken

This notebook needs to be executed within a Spark/Thunder shell. Run Thunder as normal from the Community_detection folder, but with the option for notebooks turned on (note, we'll use both the Spark context (sc) and Thunder Spark context (tsc)):

IPYTHON_OPTS= "notebook" thunder


In [1]:
# from thunder.timeseries import Stats
from thunder.utils import pack, getdims, matrices
from thunder.factorization import PCA, SVD
# from thunder.clustering.kmeans import KMeans, KMeansModel
from thunder.viz import Colorize
import numpy as np
import itertools
import pylab
import scipy.spatial
import scipy.stats
import scipy.cluster
import time 

import matplotlib.pyplot as plt
%matplotlib inline

import mpld3

sc
# mpld3.enable_notebook()
# mpld3.disable_notebook()


Out[1]:
<pyspark.context.SparkContext at 0x1074bc6d0>

First things first: Load data as in standalone python, reformat, then convert to RDD.

Be careful as running code twice makes all the variables increase in size


In [2]:
# Load from file
spikes = np.load('test_data.npy')

In [3]:
%whos


Variable    Type        Data/Info
---------------------------------
Colorize    type        <class 'thunder.viz.colorize.Colorize'>
PCA         type        <class 'thunder.factorization.pca.PCA'>
SVD         type        <class 'thunder.factorization.svd.SVD'>
getdims     function    <function getdims at 0x10a133cf8>
itertools   module      <module 'itertools' from <...>ib-dynload/itertools.so'>
matrices    module      <module 'thunder.utils.ma<...>nder/utils/matrices.pyc'>
mpld3       module      <module 'mpld3' from '/us<...>ages/mpld3/__init__.pyc'>
np          module      <module 'numpy' from '/Us<...>ages/numpy/__init__.pyc'>
pack        function    <function pack at 0x10a1380c8>
plt         module      <module 'matplotlib.pyplo<...>s/matplotlib/pyplot.pyc'>
pylab       module      <module 'pylab' from '/Us<...>site-packages/pylab.pyc'>
scipy       module      <module 'scipy' from '/Us<...>ages/scipy/__init__.pyc'>
spikes      ndarray     621x2: 1242 elems, type `float64`, 9936 bytes
time        module      <module 'time' from '/Use<...>2.7/lib-dynload/time.so'>

In [4]:
spikes[:,0] -= 1 # To establish zero-indexing
spikes[:,1] *= 1e3 # Convert spike times into milliseconds
train_period = 1000. # Period of spike train, for plotting later
n_trains = 105     # Number of spike trains
spikes[:,[0,1]] = spikes[:,[1,0]] # Switch column order

# Create binary vectors for each neuron. This is the Spark friendly format 
trains = np.zeros((n_trains, train_period))
spk = spikes.astype(np.int)
trains[spk[:,1], spk[:,0]] = 1

# Keep only those trains that have spikes (all here, though how to keep track of un-clustered trains?)
active = trains.any(axis=1)
act_trains = trains[active,:]


-c:8: DeprecationWarning: using a non-integer number instead of an integer will result in an error in the future

In [5]:
fig, ax = plt.subplots(1,1,figsize=(12, 4))
plt.plot(spikes[:,0],spikes[:,1],'.',ms=4.5)


Out[5]:
[<matplotlib.lines.Line2D at 0x10a94b250>]

Convert to RDD


In [6]:
keys = np.arange(1, np.shape(trains)[0]+1)
spks = sc.parallelize(zip(keys, trains))

Compute ISI distribution.

ISIs and stats are computed separately. May be worth computing the stats in one shot

Defining a function for working out ISIs for each spike train in parallel, then returning these to the driver node. This can be trivially computed at the driver node on datasets this size, but it's good getting in the habit early.

Would be cool to work out how to estimate the median and 99th percentile ISI iteratively, without having to collect all the values at the driver (would be impossible if N ISIs is large). Mean is easy, though..


In [7]:
def compISIs(self):
    """
    Compute ISIs for each spike train
    """
    x = np.where(np.array(self)>=1)
    
    return np.diff(x)

Now apply to all spike trains in parallel


In [8]:
ISIs = spks.mapValues(compISIs)

Compute ISI median of medians to approximate true median, and also approximate 1%-ile min ISI. Median is better than mean for variable ISIs + low firing rates. Initially returns a median ISI for each key, which is then sorted at the driver. This saves loading the whole dataset into memory: especially important when each time series is long.

Compute mean ISI for whole array. Median would be better but is non-trivial


In [9]:
ISIstats = ISIs.map(lambda (k,v): (k, [np.median(v),np.min(v)] )).values().collect()
# bin_max = 
x,y = zip(*ISIstats)
bin_max = np.median(x)
bin_min = np.min(y)
print bin_max
print bin_min


104.5
1

Compute ISI stats and generate candidate bin widths


In [10]:
# Set up binwidths
t_reps = 7
bins = np.linspace(bin_min, bin_max, t_reps)

# For binless version, divide binwidth by sqrt(12), and remove timescales of < 1ms
timescales = bins / np.sqrt(12)
timescales = timescales[timescales >= 1.]
timescales


Out[10]:
array([  5.26832121,  10.24796728,  15.22761335,  20.20725942,
        25.18690549,  30.16655157])

Convolve spiketrains with a gaussian

Define a function for the convolution. This is done separately for each spike train at the nodes. Gaussians are computed at the nodes (trivial).

Note on persistance. At the moment the spike train RDD is re-loaded from disk each time it is needed (once for ISI computation, once for each gaussian binwidth). This might be the best thing to do if the spike train is too large to just sit around in memory. This may not be a problem though (each train, in particular, is pretty small).


In [11]:
def convolve(self, timescale):
    """
    Convolve each spike train with a Gaussian of width 'timescale'. 
    Returns the convolved spike train
    """
    import scipy.stats
    
    # Generate the filter and output arrays
    sigma = np.int(np.ceil(timescale))
    window = np.arange(-5*sigma, +5*sigma)
    gaussian = scipy.stats.norm.pdf(window, scale=sigma)
    conv_trains = np.empty(0) #(0,(np.array(self).shape[1])+10*sigma-1))
    
    # Make a continuous signal for each train by convolution

    conv_train = np.convolve(np.array(self), gaussian)

    return conv_train

Convolve all spike trains in parallel, using just one timescale. Needed to use lambda in order to pass an extra variable to the function


In [12]:
i = 1
spks_conv = spks.map(lambda (k,v): (k, convolve(v, timescales[i])))

Compute similarity matrix

Current implementation: for each entry in the RDD, compute similarity between itself and all the others. Return an array of similarities which can be formatted into a matrix at the driver (or computed upon directly by Spark, not sure how that works...)

For certain dataset sizes it would be a good idea to ship whole dataset to each node, to keep communication down.


In [13]:
def compute_similarity(self,SC):
    """
    Computes pairwise similarity between all spike trains using scipy cosine similarity
    """
    import scipy.spatial
    
    x = self #np.transpose(self)
    
    # Find similarity between continuous spike-train vectors
    A = np.empty([SC.shape[0]])
    
    for i in range(SC.shape[0]):
        A[i] = scipy.spatial.distance.pdist([x,SC[i]], 'cosine')
        
    # Similarity = 1 - distance
    A = 1 - A

    return A

Collect the convoluted spike trains, then ship the matrix to each node with the function. This method may be cheaper (depending on matrix size i.e. communication costs) this way, than for each node to have to query one another all the time.

NOTE: Could also try broadcasting SC without collect(). Is this handled more efficiently?


In [14]:
SC = spks_conv.collect()
SC = zip(*SC)[1]
SC = np.array(SC)

Now compute the similarity matrix


In [15]:
A = spks_conv.map(lambda (k,v): (k, compute_similarity(v, SC)))

Compute null matrix then subtract

First, remove diagonal elements. Need a function as lambdas don't allow assigments


In [16]:
def zero_diag(k,v): 
#     k = self[0][0]
#     v = self[0][1]
    v[k-1] = 0 
    return k,v

In [17]:
A = A.map(lambda (k,v): zero_diag(k,v))
# a = A.map(lambda (k1,v1): v1[k1] = 0)

Convert A into a Thunder RowMatrix for numerical manipulation


In [18]:
A_rm = matrices.RowMatrix(A)

Compute Expected matrix

Compute sum(A_rm, axis=0) and sum(A_rm), to get the outer product of the two axis sums without having to sum across pipelined RDD rows


In [19]:
r = A_rm.rows().sum()

n = sum(r)

outer_A = A_rm.rdd.mapValues(lambda x: sum(x)*r)

E = outer_A.mapValues(lambda x: x/n)

Finally, subtract E from A to generate B, the modularity matrix


In [20]:
E_rm = matrices.RowMatrix(E)
B = A_rm.elementwise(E_rm,'minus')

Eigendecomposition

Thunder's SVD method


In [21]:
svd = SVD(k=20, method="direct")
svd.calc(B)


Out[21]:
<thunder.factorization.svd.SVD at 0x10ac5e410>

Cluster with MLlib in feature space, trying all +ve eigenvalue N clusterings


In [22]:
from pyspark.mllib.clustering import KMeans
from math import sqrt

# Load and parse the data
# n_units, n_features = features.shape
n_units, n_features = svd.v.T.shape
data = svd.u
# data = sc.parallelize(features).cache()

Collect a local copy of the comparison matrix, for computing Q later.

NOTE: Still need to work out a way of doing this with RDDs


In [23]:
b_local = B.collect()

In [24]:
# Define functions for cluster quality measurement
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

def label(point):
    return clusters.predict(point)

In [25]:
Q = 0 
L = np.empty((n_features, n_units))

# Build the model (cluster the data)
for i in range(n_features):
    K = i+2
    clusters = KMeans.train(data.values(), K, maxIterations=10, runs=10, initializationMode="k-means||")

    Labels = data.mapValues(lambda point: label(point)).collect()

    # Construct membership matrix
    S = np.zeros((n_units, K))
    for n in xrange(n_units):
        S[n, Labels[n][1]] = 1

    # Compute clustering quality (eq. 11 in the paper)
    q = np.matrix.trace(np.dot(np.dot(S.T, b_local), S))

    # Save best result
    if q > Q:
        Q, L, C = q, Labels, clusters.centers

print Q


5583.03292402

Assign labels to original spike train and plot


In [26]:
n_clusters = np.array(C).shape[0]
outsp, outn = [[] for x in xrange(n_clusters)], np.zeros(n_clusters)
for i in xrange(np.array(L).shape[0]):
    label = L[i][1]
    for j in xrange(spikes.shape[0]):
        if spikes[j,1] == i:
            spt = spikes[j,0]
            spidx = outn[label]
            outsp[label].append((spt, spidx))
    outn[label] += 1

In [27]:
# Plot results
fig, ax = plt.subplots(2,1,figsize=(12, 8))
colors = itertools.cycle(['b', 'g', 'r', 'c', '', 'y', 'k', 'w'])

ax[0].plot(spikes[:,0],spikes[:,1],'.',ms=4.5)
ax[0].set_xlabel('Time (ms)')
ax[0].set_ylabel('Cell ID')



for i in xrange(n_clusters):
    outsp[i] = np.array(outsp[i])
    ax[1].scatter(outsp[i][:,0], outsp[i][:,1] + np.sum(outn[:i]),
        color=next(colors), s=2.5)
    ax[1].set_xlabel('Time (ms)')
    ax[1].set_ylabel('Cell ID')
    
plt.xlim([0,1000])
plt.ylim([0,120])
plt.show()



In [27]:


In [27]:


In [27]:


In [27]:


In [27]:

Below are number of optional sanity checks, involing plots of collected RDDs etc


In [28]:
### Sanity check spike train convolution by taking a few examples  and plotting

sp2 = spks_conv.take(5)
x = zip(*sp2)[1]
plt.plot(np.transpose(x))


Out[28]:
[<matplotlib.lines.Line2D at 0x10b74e350>,
 <matplotlib.lines.Line2D at 0x10b74e5d0>,
 <matplotlib.lines.Line2D at 0x10b74e810>,
 <matplotlib.lines.Line2D at 0x10b74e9d0>,
 <matplotlib.lines.Line2D at 0x10b74eb90>]

Compute pairwise similarity for all convolved spike trains.


In [29]:
t1 = time.time()
A_1 = spks_conv.map(lambda (k,v): (k, compute_similarity(v, SC))).collect()
t2 = time.time()
print t2-t1


8.15215587616

Alternatively use a one-line lambda with .cartesian CARTESIAN NOT WORKING, CORRCOEF DOES


In [30]:
t1 = time.time()
A_2 = spks_conv.cartesian(spks_conv).map(lambda ((k1,v1),(k2,v2)): ((k1,k2),np.corrcoef(v1,v2)[0,1])).collect()

t2 = time.time()
print t2-t1


114.141867161

Plot both to sanity check.

It seems like the one-line version didn't behave as it should have done

Note: cartesian shuffles the keys. Need to sort before plotting for the result to make any sense


In [31]:
A_2.sort()

fig, ax = plt.subplots(1,2,figsize=(10, 5))
ax[0].imshow(np.array(zip(*A_1)[1])) 
ax[0].set_title('Cosine similarity')

x = zip(*A_2)[1]
ax[1].imshow(np.array(x).reshape(105,105))
ax[1].set_title('Cross correlation')


Out[31]:
<matplotlib.text.Text at 0x10ba671d0>

Sanity check plotting of Thunder-based modularity matrix computation


In [32]:
a_local = A.collect()

x,y = zip(*a_local)

z = E.collect()
z = zip(*z)[1]

b_local = B.collect()

fig, ax = plt.subplots(1,3,figsize=(12, 8))

ax[0].imshow(np.array(y))
ax[0].set_title('A: similarity matrix')

ax[1].imshow(np.array(z))
ax[1].set_title('P: expected similarity matrix')

ax[2].imshow(np.array(b_local))
ax[2].set_title('B: modularity matrix')


Out[32]:
<matplotlib.text.Text at 0x10c721b90>

Old Modularity matrix code starts here. NOTE: For small matrices, computing all this stuff locally is easier and faster.


In [33]:
A = A_mat.collect()

A = np.array(A_mat)[:,:,0]

np.fill_diagonal(A, 0)

# # Distance = 1-Similarity
A = 1-A


# Compute 'expected' matrix following eq(4) of Humphries 2011. Need to work out how to do this without the earlier .collect()

P = np.outer(np.sum(A, axis=0), np.sum(A, axis=1)) / np.sum(A)

# Compute modularity matrix. 

# Plotting an printing summary stats for sanity check

fig, ax = plt.subplots(1,3,figsize=(12, 8))
ax[0].imshow(A)
ax[0].set_title('A: similarity matrix')


ax[1].imshow(P)
ax[1].set_title('P: expected similarity matrix')

# Compute modularity matrix
B = A-P

ax[2].imshow(B)
ax[2].set_title('B: modularity matrix')


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-33-17cf16b67377> in <module>()
----> 1 A = A_mat.collect()
      2 
      3 A = np.array(A_mat)[:,:,0]
      4 
      5 np.fill_diagonal(A, 0)

NameError: name 'A_mat' is not defined

Comparing SVD to eigendecomposition


In [ ]:
np.array(b_local).shape
b = np.array(b_local)[:,:,0]
b.shape
# B?
plt.imshow(b_local)

# Comparing numpy svd, numpy eig and thunder svd

# import scipy.linalg as LinAlg
# u_true, s_true, v_true = LinAlg.svd(np.array(B))
U, s, V = np.linalg.svd(b_local, full_matrices=True)
eigenvalues, eigenvectors = np.linalg.eig(b_local)

In [ ]:
mpld3.enable_notebook()

fig, ax = plt.subplots(1,1,figsize=(12, 8))
plt.plot(eigenvalues)
plt.plot(s)
plt.plot(svd.s)

plt.plot(np.diff(s))
np.max(np.diff(s))
#mpld3.disable_notebook()

In [ ]:
mpld3.disable_notebook()
fig, ax = plt.subplots(1,3,figsize=(12, 8))
ax[0].imshow(U)
ax[0].set_title('U')
ax[1].imshow(V)
ax[1].set_title('V')
ax[2].imshow(eigenvectors)
ax[2].set_title('eigenvectors')

In [ ]:
revEig = eigenvectors[::-1]
plt.plot(U[:,5])
plt.plot(V.T[:,5])
plt.plot(eigenvectors[:,5])
plt.plot(svd.v.T[:,5])
# plt.plot(revEig[0])
# plt.plot(U[0]*V.T[0])

In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]: