This notebook needs to be executed within a Spark/Thunder shell. Run Thunder as normal from the Community_detection folder, but with the option for notebooks turned on (note, we'll use both the Spark context (sc) and Thunder Spark context (tsc)):
IPYTHON_OPTS= "notebook" thunder
In [1]:
# from thunder.timeseries import Stats
from thunder.utils import pack, getdims, matrices
from thunder.factorization import PCA, SVD
# from thunder.clustering.kmeans import KMeans, KMeansModel
from thunder.viz import Colorize
import numpy as np
import itertools
import pylab
import scipy.spatial
import scipy.stats
import scipy.cluster
import time
import matplotlib.pyplot as plt
%matplotlib inline
import mpld3
sc
# mpld3.enable_notebook()
# mpld3.disable_notebook()
Out[1]:
In [2]:
# Load from file
spikes = np.load('test_data.npy')
In [3]:
%whos
In [4]:
spikes[:,0] -= 1 # To establish zero-indexing
spikes[:,1] *= 1e3 # Convert spike times into milliseconds
train_period = 1000. # Period of spike train, for plotting later
n_trains = 105 # Number of spike trains
spikes[:,[0,1]] = spikes[:,[1,0]] # Switch column order
# Create binary vectors for each neuron. This is the Spark friendly format
trains = np.zeros((n_trains, train_period))
spk = spikes.astype(np.int)
trains[spk[:,1], spk[:,0]] = 1
# Keep only those trains that have spikes (all here, though how to keep track of un-clustered trains?)
active = trains.any(axis=1)
act_trains = trains[active,:]
In [5]:
fig, ax = plt.subplots(1,1,figsize=(12, 4))
plt.plot(spikes[:,0],spikes[:,1],'.',ms=4.5)
Out[5]:
In [6]:
keys = np.arange(1, np.shape(trains)[0]+1)
spks = sc.parallelize(zip(keys, trains))
Defining a function for working out ISIs for each spike train in parallel, then returning these to the driver node. This can be trivially computed at the driver node on datasets this size, but it's good getting in the habit early.
Would be cool to work out how to estimate the median and 99th percentile ISI iteratively, without having to collect all the values at the driver (would be impossible if N ISIs is large). Mean is easy, though..
In [7]:
def compISIs(self):
"""
Compute ISIs for each spike train
"""
x = np.where(np.array(self)>=1)
return np.diff(x)
Now apply to all spike trains in parallel
In [8]:
ISIs = spks.mapValues(compISIs)
Compute ISI median of medians to approximate true median, and also approximate 1%-ile min ISI. Median is better than mean for variable ISIs + low firing rates. Initially returns a median ISI for each key, which is then sorted at the driver. This saves loading the whole dataset into memory: especially important when each time series is long.
Compute mean ISI for whole array. Median would be better but is non-trivial
In [9]:
ISIstats = ISIs.map(lambda (k,v): (k, [np.median(v),np.min(v)] )).values().collect()
# bin_max =
x,y = zip(*ISIstats)
bin_max = np.median(x)
bin_min = np.min(y)
print bin_max
print bin_min
Compute ISI stats and generate candidate bin widths
In [10]:
# Set up binwidths
t_reps = 7
bins = np.linspace(bin_min, bin_max, t_reps)
# For binless version, divide binwidth by sqrt(12), and remove timescales of < 1ms
timescales = bins / np.sqrt(12)
timescales = timescales[timescales >= 1.]
timescales
Out[10]:
Define a function for the convolution. This is done separately for each spike train at the nodes. Gaussians are computed at the nodes (trivial).
Note on persistance. At the moment the spike train RDD is re-loaded from disk each time it is needed (once for ISI computation, once for each gaussian binwidth). This might be the best thing to do if the spike train is too large to just sit around in memory. This may not be a problem though (each train, in particular, is pretty small).
In [11]:
def convolve(self, timescale):
"""
Convolve each spike train with a Gaussian of width 'timescale'.
Returns the convolved spike train
"""
import scipy.stats
# Generate the filter and output arrays
sigma = np.int(np.ceil(timescale))
window = np.arange(-5*sigma, +5*sigma)
gaussian = scipy.stats.norm.pdf(window, scale=sigma)
conv_trains = np.empty(0) #(0,(np.array(self).shape[1])+10*sigma-1))
# Make a continuous signal for each train by convolution
conv_train = np.convolve(np.array(self), gaussian)
return conv_train
Convolve all spike trains in parallel, using just one timescale. Needed to use lambda in order to pass an extra variable to the function
In [12]:
i = 1
spks_conv = spks.map(lambda (k,v): (k, convolve(v, timescales[i])))
Current implementation: for each entry in the RDD, compute similarity between itself and all the others. Return an array of similarities which can be formatted into a matrix at the driver (or computed upon directly by Spark, not sure how that works...)
For certain dataset sizes it would be a good idea to ship whole dataset to each node, to keep communication down.
In [13]:
def compute_similarity(self,SC):
"""
Computes pairwise similarity between all spike trains using scipy cosine similarity
"""
import scipy.spatial
x = self #np.transpose(self)
# Find similarity between continuous spike-train vectors
A = np.empty([SC.shape[0]])
for i in range(SC.shape[0]):
A[i] = scipy.spatial.distance.pdist([x,SC[i]], 'cosine')
# Similarity = 1 - distance
A = 1 - A
return A
NOTE: Could also try broadcasting SC without collect(). Is this handled more efficiently?
In [14]:
SC = spks_conv.collect()
SC = zip(*SC)[1]
SC = np.array(SC)
In [15]:
A = spks_conv.map(lambda (k,v): (k, compute_similarity(v, SC)))
In [16]:
def zero_diag(k,v):
# k = self[0][0]
# v = self[0][1]
v[k-1] = 0
return k,v
In [17]:
A = A.map(lambda (k,v): zero_diag(k,v))
# a = A.map(lambda (k1,v1): v1[k1] = 0)
Convert A into a Thunder RowMatrix for numerical manipulation
In [18]:
A_rm = matrices.RowMatrix(A)
In [19]:
r = A_rm.rows().sum()
n = sum(r)
outer_A = A_rm.rdd.mapValues(lambda x: sum(x)*r)
E = outer_A.mapValues(lambda x: x/n)
Finally, subtract E from A to generate B, the modularity matrix
In [20]:
E_rm = matrices.RowMatrix(E)
B = A_rm.elementwise(E_rm,'minus')
In [21]:
svd = SVD(k=20, method="direct")
svd.calc(B)
Out[21]:
In [22]:
from pyspark.mllib.clustering import KMeans
from math import sqrt
# Load and parse the data
# n_units, n_features = features.shape
n_units, n_features = svd.v.T.shape
data = svd.u
# data = sc.parallelize(features).cache()
In [23]:
b_local = B.collect()
In [24]:
# Define functions for cluster quality measurement
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
def label(point):
return clusters.predict(point)
In [25]:
Q = 0
L = np.empty((n_features, n_units))
# Build the model (cluster the data)
for i in range(n_features):
K = i+2
clusters = KMeans.train(data.values(), K, maxIterations=10, runs=10, initializationMode="k-means||")
Labels = data.mapValues(lambda point: label(point)).collect()
# Construct membership matrix
S = np.zeros((n_units, K))
for n in xrange(n_units):
S[n, Labels[n][1]] = 1
# Compute clustering quality (eq. 11 in the paper)
q = np.matrix.trace(np.dot(np.dot(S.T, b_local), S))
# Save best result
if q > Q:
Q, L, C = q, Labels, clusters.centers
print Q
In [26]:
n_clusters = np.array(C).shape[0]
outsp, outn = [[] for x in xrange(n_clusters)], np.zeros(n_clusters)
for i in xrange(np.array(L).shape[0]):
label = L[i][1]
for j in xrange(spikes.shape[0]):
if spikes[j,1] == i:
spt = spikes[j,0]
spidx = outn[label]
outsp[label].append((spt, spidx))
outn[label] += 1
In [27]:
# Plot results
fig, ax = plt.subplots(2,1,figsize=(12, 8))
colors = itertools.cycle(['b', 'g', 'r', 'c', '', 'y', 'k', 'w'])
ax[0].plot(spikes[:,0],spikes[:,1],'.',ms=4.5)
ax[0].set_xlabel('Time (ms)')
ax[0].set_ylabel('Cell ID')
for i in xrange(n_clusters):
outsp[i] = np.array(outsp[i])
ax[1].scatter(outsp[i][:,0], outsp[i][:,1] + np.sum(outn[:i]),
color=next(colors), s=2.5)
ax[1].set_xlabel('Time (ms)')
ax[1].set_ylabel('Cell ID')
plt.xlim([0,1000])
plt.ylim([0,120])
plt.show()
In [27]:
In [27]:
In [27]:
In [27]:
In [27]:
In [28]:
### Sanity check spike train convolution by taking a few examples and plotting
sp2 = spks_conv.take(5)
x = zip(*sp2)[1]
plt.plot(np.transpose(x))
Out[28]:
In [29]:
t1 = time.time()
A_1 = spks_conv.map(lambda (k,v): (k, compute_similarity(v, SC))).collect()
t2 = time.time()
print t2-t1
In [30]:
t1 = time.time()
A_2 = spks_conv.cartesian(spks_conv).map(lambda ((k1,v1),(k2,v2)): ((k1,k2),np.corrcoef(v1,v2)[0,1])).collect()
t2 = time.time()
print t2-t1
In [31]:
A_2.sort()
fig, ax = plt.subplots(1,2,figsize=(10, 5))
ax[0].imshow(np.array(zip(*A_1)[1]))
ax[0].set_title('Cosine similarity')
x = zip(*A_2)[1]
ax[1].imshow(np.array(x).reshape(105,105))
ax[1].set_title('Cross correlation')
Out[31]:
In [32]:
a_local = A.collect()
x,y = zip(*a_local)
z = E.collect()
z = zip(*z)[1]
b_local = B.collect()
fig, ax = plt.subplots(1,3,figsize=(12, 8))
ax[0].imshow(np.array(y))
ax[0].set_title('A: similarity matrix')
ax[1].imshow(np.array(z))
ax[1].set_title('P: expected similarity matrix')
ax[2].imshow(np.array(b_local))
ax[2].set_title('B: modularity matrix')
Out[32]:
In [33]:
A = A_mat.collect()
A = np.array(A_mat)[:,:,0]
np.fill_diagonal(A, 0)
# # Distance = 1-Similarity
A = 1-A
# Compute 'expected' matrix following eq(4) of Humphries 2011. Need to work out how to do this without the earlier .collect()
P = np.outer(np.sum(A, axis=0), np.sum(A, axis=1)) / np.sum(A)
# Compute modularity matrix.
# Plotting an printing summary stats for sanity check
fig, ax = plt.subplots(1,3,figsize=(12, 8))
ax[0].imshow(A)
ax[0].set_title('A: similarity matrix')
ax[1].imshow(P)
ax[1].set_title('P: expected similarity matrix')
# Compute modularity matrix
B = A-P
ax[2].imshow(B)
ax[2].set_title('B: modularity matrix')
In [ ]:
np.array(b_local).shape
b = np.array(b_local)[:,:,0]
b.shape
# B?
plt.imshow(b_local)
# Comparing numpy svd, numpy eig and thunder svd
# import scipy.linalg as LinAlg
# u_true, s_true, v_true = LinAlg.svd(np.array(B))
U, s, V = np.linalg.svd(b_local, full_matrices=True)
eigenvalues, eigenvectors = np.linalg.eig(b_local)
In [ ]:
mpld3.enable_notebook()
fig, ax = plt.subplots(1,1,figsize=(12, 8))
plt.plot(eigenvalues)
plt.plot(s)
plt.plot(svd.s)
plt.plot(np.diff(s))
np.max(np.diff(s))
#mpld3.disable_notebook()
In [ ]:
mpld3.disable_notebook()
fig, ax = plt.subplots(1,3,figsize=(12, 8))
ax[0].imshow(U)
ax[0].set_title('U')
ax[1].imshow(V)
ax[1].set_title('V')
ax[2].imshow(eigenvectors)
ax[2].set_title('eigenvectors')
In [ ]:
revEig = eigenvectors[::-1]
plt.plot(U[:,5])
plt.plot(V.T[:,5])
plt.plot(eigenvectors[:,5])
plt.plot(svd.v.T[:,5])
# plt.plot(revEig[0])
# plt.plot(U[0]*V.T[0])
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: