In [3]:
labVersion = 'cs190_week5_v_1_2'
In [4]:
import matplotlib.pyplot as plt
import numpy as np
def preparePlot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',
gridWidth=1.0):
"""Template for generating the plot layout."""
plt.close()
fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
ax.axes.tick_params(labelcolor='#999999', labelsize='10')
for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
axis.set_ticks_position('none')
axis.set_ticks(ticks)
axis.label.set_color('#999999')
if hideLabels: axis.set_ticklabels([])
plt.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
return fig, ax
def create2DGaussian(mn, sigma, cov, n):
"""Randomly sample points from a two-dimensional Gaussian distribution"""
np.random.seed(142)
return np.random.multivariate_normal(np.array([mn, mn]), np.array([[sigma, cov], [cov, sigma]]), n)
In [5]:
dataRandom = create2DGaussian(mn=50, sigma=1, cov=0, n=100)
# generate layout and plot data
fig, ax = preparePlot(np.arange(46, 55, 2), np.arange(46, 55, 2))
ax.set_xlabel(r'Simulated $x_1$ values'), ax.set_ylabel(r'Simulated $x_2$ values')
ax.set_xlim(45, 54.5), ax.set_ylim(45, 54.5)
plt.scatter(dataRandom[:,0], dataRandom[:,1], s=14**2, c='#d6ebf2', edgecolors='#8cbfd0', alpha=0.75)
pass
In [6]:
dataCorrelated = create2DGaussian(mn=50, sigma=1, cov=.9, n=100)
# generate layout and plot data
fig, ax = preparePlot(np.arange(46, 55, 2), np.arange(46, 55, 2))
ax.set_xlabel(r'Simulated $x_1$ values'), ax.set_ylabel(r'Simulated $x_2$ values')
ax.set_xlim(45.5, 54.5), ax.set_ylim(45.5, 54.5)
plt.scatter(dataCorrelated[:,0], dataCorrelated[:,1], s=14**2, c='#d6ebf2',
edgecolors='#8cbfd0', alpha=0.75)
pass
correlatedData
is an RDD of NumPy arrays. This allows us to perform certain operations more succinctly. For example, we can sum the columns of our dataset using correlatedData.sum()
.
In [7]:
# TODO: Replace <FILL IN> with appropriate code
correlatedData = sc.parallelize(dataCorrelated)
meanCorrelated = correlatedData.mean()
correlatedDataZeroMean = correlatedData.map(lambda x: x - meanCorrelated)
print meanCorrelated
print correlatedData.take(1)
print correlatedDataZeroMean.take(1)
In [8]:
# TEST Interpreting PCA (1a)
from test_helper import Test
Test.assertTrue(np.allclose(meanCorrelated, [49.95739037, 49.97180477]),
'incorrect value for meanCorrelated')
Test.assertTrue(np.allclose(correlatedDataZeroMean.take(1)[0], [-0.28561917, 0.10351492]),
'incorrect value for correlatedDataZeroMean')
In [9]:
# TODO: Replace <FILL IN> with appropriate code
# Compute the covariance matrix using outer products and correlatedDataZeroMean
correlatedCov = correlatedDataZeroMean.map(lambda x: np.outer(x, x)).mean()
print correlatedCov
In [10]:
# TEST Sample covariance matrix (1b)
covResult = [[ 0.99558386, 0.90148989], [0.90148989, 1.08607497]]
Test.assertTrue(np.allclose(covResult, correlatedCov), 'incorrect value for correlatedCov')
In [11]:
# TODO: Replace <FILL IN> with appropriate code
def estimateCovariance(data):
"""Compute the covariance matrix for a given rdd.
Note:
The multi-dimensional covariance array should be calculated using outer products. Don't
forget to normalize the data by first subtracting the mean.
Args:
data (RDD of np.ndarray): An `RDD` consisting of NumPy arrays.
Returns:
np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
length of the arrays in the input `RDD`.
"""
mean = data.mean()
norm = data.map(lambda x: x - mean)
return norm.map(lambda x: np.outer(x, x)).mean()
correlatedCovAuto= estimateCovariance(correlatedData)
print correlatedCovAuto
In [12]:
# TEST Covariance function (1c)
correctCov = [[ 0.99558386, 0.90148989], [0.90148989, 1.08607497]]
Test.assertTrue(np.allclose(correctCov, correlatedCovAuto),
'incorrect value for correlatedCovAuto')
numpy.linalg
called eigh to perform the eigendecomposition. Next, sort the eigenvectors based on their corresponding eigenvalues (from high to low), yielding a matrix where the columns are the eigenvectors (and the first column is the top eigenvector). Note that np.argsort can be used to obtain the indices of the eigenvalues that correspond to the ascending order of eigenvalues. Finally, set the topComponent
variable equal to the top eigenvector or prinicipal component, which is a $\scriptsize 2 $-dimensional vector (array with two values).eigh
appear in the columns and not the rows. For example, the first eigenvector of eigVecs
would be found in the first column and could be accessed using eigVecs[:,0]
.
In [13]:
# TODO: Replace <FILL IN> with appropriate code
from numpy.linalg import eigh
# Calculate the eigenvalues and eigenvectors from correlatedCovAuto
eigVals, eigVecs = eigh(correlatedCovAuto)
print 'eigenvalues: {0}'.format(eigVals)
print '\neigenvectors: \n{0}'.format(eigVecs)
# Use np.argsort to find the top eigenvector based on the largest eigenvalue
inds = np.argsort(eigVals)
topComponent = eigVecs[inds[-1]]
print '\ntop principal component: {0}'.format(topComponent)
In [14]:
# TEST Eigendecomposition (1d)
def checkBasis(vectors, correct):
return np.allclose(vectors, correct) or np.allclose(np.negative(vectors), correct)
Test.assertTrue(checkBasis(topComponent, [0.68915649, 0.72461254]),
'incorrect value for topComponent')
In [15]:
# TODO: Replace <FILL IN> with appropriate code
# Use the topComponent and the data from correlatedData to generate PCA scores
correlatedDataScores = correlatedData.map(lambda x: x.dot(topComponent))
print 'one-dimensional data (first three):\n{0}'.format(np.asarray(correlatedDataScores.take(3)))
In [16]:
# TEST PCA Scores (1e)
firstThree = [70.51682806, 69.30622356, 71.13588168]
Test.assertTrue(checkBasis(correlatedDataScores.take(3), firstThree),
'incorrect value for correlatedDataScores')
pca
, and run it with correlatedData
and $\scriptsize k = 2$. Hint: Use results from Part (1c), Part (1d), and Part (1e).
In [17]:
# TODO: Replace <FILL IN> with appropriate code
def pca(data, k=2):
"""Computes the top `k` principal components, corresponding scores, and all eigenvalues.
Note:
All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
each eigenvectors as a column. This function should also return eigenvectors as columns.
Args:
data (RDD of np.ndarray): An `RDD` consisting of NumPy arrays.
k (int): The number of principal components to return.
Returns:
tuple of (np.ndarray, RDD of np.ndarray, np.ndarrayray): A tuple of (eigenvectors, `RDD` of
scores, eigenvalues). Eigenvectors is a multi-dimensional array where the number of
rows equals the length of the arrays in the input `RDD` and the number of columns equals
`k`. The `RDD` of scores has the same number of rows as `data` and consists of arrays
of length `k`. Eigenvalues is an array of length d (the number of features).
"""
# Validate input
if(k < 1):
return None
# Calculate principal components
cov = estimateCovariance(data)
eigVals, eigVecs = eigh(cov)
ind = np.argsort(eigVals)[::-1]
# Extract values
sortedVals = eigVals[ind]
kVecs = eigVecs[:,ind[:k]]
kScores = data.map(lambda x: x.dot(kVecs))
# End
return (kVecs, kScores, sortedVals)
# Run pca on correlatedData with k = 2
topComponentsCorrelated, correlatedDataScoresAuto, eigenvaluesCorrelated = pca(correlatedData, 2)
# Note that the 1st principal component is in the first column
print 'topComponentsCorrelated: \n{0}'.format(topComponentsCorrelated)
print ('\ncorrelatedDataScoresAuto (first three): \n{0}'
.format('\n'.join(map(str, correlatedDataScoresAuto.take(3)))))
print '\neigenvaluesCorrelated: \n{0}'.format(eigenvaluesCorrelated)
# Create a higher dimensional test set
pcaTestData = sc.parallelize([np.arange(x, x + 4) for x in np.arange(0, 20, 4)])
componentsTest, testScores, eigenvaluesTest = pca(pcaTestData, 3)
print '\npcaTestData: \n{0}'.format(np.array(pcaTestData.collect()))
print '\ncomponentsTest: \n{0}'.format(componentsTest)
print ('\ntestScores (first three): \n{0}'
.format('\n'.join(map(str, testScores.take(3)))))
print '\neigenvaluesTest: \n{0}'.format(eigenvaluesTest)
In [18]:
# TEST PCA Function (2a)
Test.assertTrue(checkBasis(topComponentsCorrelated.T,
[[0.68915649, 0.72461254], [-0.72461254, 0.68915649]]),
'incorrect value for topComponentsCorrelated')
firstThreeCorrelated = [[70.51682806, 69.30622356, 71.13588168], [1.48305648, 1.5888655, 1.86710679]]
Test.assertTrue(np.allclose(firstThreeCorrelated,
np.vstack(np.abs(correlatedDataScoresAuto.take(3))).T),
'incorrect value for firstThreeCorrelated')
Test.assertTrue(np.allclose(eigenvaluesCorrelated, [1.94345403, 0.13820481]),
'incorrect values for eigenvaluesCorrelated')
topComponentsCorrelatedK1, correlatedDataScoresK1, eigenvaluesCorrelatedK1 = pca(correlatedData, 1)
Test.assertTrue(checkBasis(topComponentsCorrelatedK1.T, [0.68915649, 0.72461254]),
'incorrect value for components when k=1')
Test.assertTrue(np.allclose([70.51682806, 69.30622356, 71.13588168],
np.vstack(np.abs(correlatedDataScoresK1.take(3))).T),
'incorrect value for scores when k=1')
Test.assertTrue(np.allclose(eigenvaluesCorrelatedK1, [1.94345403, 0.13820481]),
'incorrect values for eigenvalues when k=1')
Test.assertTrue(checkBasis(componentsTest.T[0], [ .5, .5, .5, .5]),
'incorrect value for componentsTest')
Test.assertTrue(np.allclose(np.abs(testScores.first()[0]), 3.),
'incorrect value for testScores')
Test.assertTrue(np.allclose(eigenvaluesTest, [ 128, 0, 0, 0 ]), 'incorrect value for eigenvaluesTest')
In [19]:
# TODO: Replace <FILL IN> with appropriate code
randomData = sc.parallelize(dataRandom)
# Use pca on randomData
topComponentsRandom, randomDataScoresAuto, eigenvaluesRandom = pca(randomData, 2)
print 'topComponentsRandom: \n{0}'.format(topComponentsRandom)
print ('\nrandomDataScoresAuto (first three): \n{0}'
.format('\n'.join(map(str, randomDataScoresAuto.take(3)))))
print '\neigenvaluesRandom: \n{0}'.format(eigenvaluesRandom)
In [20]:
# TEST PCA on `dataRandom` (2b)
Test.assertTrue(checkBasis(topComponentsRandom.T,
[[-0.2522559 , 0.96766056], [-0.96766056, -0.2522559]]),
'incorrect value for topComponentsRandom')
firstThreeRandom = [[36.61068572, 35.97314295, 35.59836628],
[61.3489929 , 62.08813671, 60.61390415]]
Test.assertTrue(np.allclose(firstThreeRandom, np.vstack(np.abs(randomDataScoresAuto.take(3))).T),
'incorrect value for randomDataScoresAuto')
Test.assertTrue(np.allclose(eigenvaluesRandom, [1.4204546, 0.99521397]),
'incorrect value for eigenvaluesRandom')
In [21]:
def projectPointsAndGetLines(data, components, xRange):
"""Project original data onto first component and get line details for top two components."""
topComponent= components[:, 0]
slope1, slope2 = components[1, :2] / components[0, :2]
means = data.mean()[:2]
demeaned = data.map(lambda v: v - means)
projected = demeaned.map(lambda v: (v.dot(topComponent) /
topComponent.dot(topComponent)) * topComponent)
remeaned = projected.map(lambda v: v + means)
x1,x2 = zip(*remeaned.collect())
lineStartP1X1, lineStartP1X2 = means - np.asarray([xRange, xRange * slope1])
lineEndP1X1, lineEndP1X2 = means + np.asarray([xRange, xRange * slope1])
lineStartP2X1, lineStartP2X2 = means - np.asarray([xRange, xRange * slope2])
lineEndP2X1, lineEndP2X2 = means + np.asarray([xRange, xRange * slope2])
return ((x1, x2), ([lineStartP1X1, lineEndP1X1], [lineStartP1X2, lineEndP1X2]),
([lineStartP2X1, lineEndP2X1], [lineStartP2X2, lineEndP2X2]))
In [22]:
((x1, x2), (line1X1, line1X2), (line2X1, line2X2)) = \
projectPointsAndGetLines(correlatedData, topComponentsCorrelated, 5)
# generate layout and plot data
fig, ax = preparePlot(np.arange(46, 55, 2), np.arange(46, 55, 2), figsize=(7, 7))
ax.set_xlabel(r'Simulated $x_1$ values'), ax.set_ylabel(r'Simulated $x_2$ values')
ax.set_xlim(45.5, 54.5), ax.set_ylim(45.5, 54.5)
plt.plot(line1X1, line1X2, linewidth=3.0, c='#8cbfd0', linestyle='--')
plt.plot(line2X1, line2X2, linewidth=3.0, c='#d6ebf2', linestyle='--')
plt.scatter(dataCorrelated[:,0], dataCorrelated[:,1], s=14**2, c='#d6ebf2',
edgecolors='#8cbfd0', alpha=0.75)
plt.scatter(x1, x2, s=14**2, c='#62c162', alpha=.75)
pass
In [23]:
((x1, x2), (line1X1, line1X2), (line2X1, line2X2)) = \
projectPointsAndGetLines(randomData, topComponentsRandom, 5)
# generate layout and plot data
fig, ax = preparePlot(np.arange(46, 55, 2), np.arange(46, 55, 2), figsize=(7, 7))
ax.set_xlabel(r'Simulated $x_1$ values'), ax.set_ylabel(r'Simulated $x_2$ values')
ax.set_xlim(45.5, 54.5), ax.set_ylim(45.5, 54.5)
plt.plot(line1X1, line1X2, linewidth=3.0, c='#8cbfd0', linestyle='--')
plt.plot(line2X1, line2X2, linewidth=3.0, c='#d6ebf2', linestyle='--')
plt.scatter(dataRandom[:,0], dataRandom[:,1], s=14**2, c='#d6ebf2',
edgecolors='#8cbfd0', alpha=0.75)
plt.scatter(x1, x2, s=14**2, c='#62c162', alpha=.75)
pass
In [24]:
from mpl_toolkits.mplot3d import Axes3D
m = 100
mu = np.array([50, 50, 50])
r1_2 = 0.9
r1_3 = 0.7
r2_3 = 0.1
sigma1 = 5
sigma2 = 20
sigma3 = 20
c = np.array([[sigma1 ** 2, r1_2 * sigma1 * sigma2, r1_3 * sigma1 * sigma3],
[r1_2 * sigma1 * sigma2, sigma2 ** 2, r2_3 * sigma2 * sigma3],
[r1_3 * sigma1 * sigma3, r2_3 * sigma2 * sigma3, sigma3 ** 2]])
np.random.seed(142)
dataThreeD = np.random.multivariate_normal(mu, c, m)
from matplotlib.colors import ListedColormap, Normalize
from matplotlib.cm import get_cmap
norm = Normalize()
cmap = get_cmap("Blues")
clrs = cmap(np.array(norm(dataThreeD[:,2])))[:,0:3]
fig = plt.figure(figsize=(11, 6))
ax = fig.add_subplot(121, projection='3d')
ax.azim=-100
ax.scatter(dataThreeD[:,0], dataThreeD[:,1], dataThreeD[:,2], c=clrs, s=14**2)
xx, yy = np.meshgrid(np.arange(-15, 10, 1), np.arange(-50, 30, 1))
normal = np.array([0.96981815, -0.188338, -0.15485978])
z = (-normal[0] * xx - normal[1] * yy) * 1. / normal[2]
xx = xx + 50
yy = yy + 50
z = z + 50
ax.set_zlim((-20, 120)), ax.set_ylim((-20, 100)), ax.set_xlim((30, 75))
ax.plot_surface(xx, yy, z, alpha=.10)
ax = fig.add_subplot(122, projection='3d')
ax.azim=10
ax.elev=20
#ax.dist=8
ax.scatter(dataThreeD[:,0], dataThreeD[:,1], dataThreeD[:,2], c=clrs, s=14**2)
ax.set_zlim((-20, 120)), ax.set_ylim((-20, 100)), ax.set_xlim((30, 75))
ax.plot_surface(xx, yy, z, alpha=.1)
plt.tight_layout()
pass
In [25]:
# TODO: Replace <FILL IN> with appropriate code
threeDData = sc.parallelize(dataThreeD)
componentsThreeD, threeDScores, eigenvaluesThreeD = pca(threeDData, 2)
print 'componentsThreeD: \n{0}'.format(componentsThreeD)
print ('\nthreeDScores (first three): \n{0}'
.format('\n'.join(map(str, threeDScores.take(3)))))
print '\neigenvaluesThreeD: \n{0}'.format(eigenvaluesThreeD)
In [26]:
# TEST 3D to 2D (2c)
Test.assertEquals(componentsThreeD.shape, (3, 2), 'incorrect shape for componentsThreeD')
Test.assertTrue(np.allclose(np.sum(eigenvaluesThreeD), 969.796443367),
'incorrect value for eigenvaluesThreeD')
Test.assertTrue(np.allclose(np.abs(np.sum(componentsThreeD)), 1.77238943258),
'incorrect value for componentsThreeD')
Test.assertTrue(np.allclose(np.abs(np.sum(threeDScores.take(3))), 237.782834092),
'incorrect value for threeDScores')
In [27]:
scoresThreeD = np.asarray(threeDScores.collect())
# generate layout and plot data
fig, ax = preparePlot(np.arange(20, 150, 20), np.arange(-40, 110, 20))
ax.set_xlabel(r'New $x_1$ values'), ax.set_ylabel(r'New $x_2$ values')
ax.set_xlim(5, 150), ax.set_ylim(-45, 50)
plt.scatter(scoresThreeD[:,0], scoresThreeD[:,1], s=14**2, c=clrs, edgecolors='#8cbfd0', alpha=0.75)
pass
In [28]:
# TODO: Replace <FILL IN> with appropriate code
def varianceExplained(data, k=1):
"""Calculate the fraction of variance explained by the top `k` eigenvectors.
Args:
data (RDD of np.ndarray): An RDD that contains NumPy arrays which store the
features for an observation.
k: The number of principal components to consider.
Returns:
float: A number between 0 and 1 representing the percentage of variance explained
by the top `k` eigenvectors.
"""
components, scores, eigenvalues = pca(data, k)
return float(eigenvalues[:k].sum())/eigenvalues.sum()
varianceRandom1 = varianceExplained(randomData, 1)
varianceCorrelated1 = varianceExplained(correlatedData, 1)
varianceRandom2 = varianceExplained(randomData, 2)
varianceCorrelated2 = varianceExplained(correlatedData, 2)
varianceThreeD2 = varianceExplained(threeDData, 2)
print ('Percentage of variance explained by the first component of randomData: {0:.1f}%'
.format(varianceRandom1 * 100))
print ('Percentage of variance explained by both components of randomData: {0:.1f}%'
.format(varianceRandom2 * 100))
print ('\nPercentage of variance explained by the first component of correlatedData: {0:.1f}%'.
format(varianceCorrelated1 * 100))
print ('Percentage of variance explained by both components of correlatedData: {0:.1f}%'
.format(varianceCorrelated2 * 100))
print ('\nPercentage of variance explained by the first two components of threeDData: {0:.1f}%'
.format(varianceThreeD2 * 100))
In [29]:
# TEST Variance explained (2d)
Test.assertTrue(np.allclose(varianceRandom1, 0.588017172066), 'incorrect value for varianceRandom1')
Test.assertTrue(np.allclose(varianceCorrelated1, 0.933608329586),
'incorrect value for varianceCorrelated1')
Test.assertTrue(np.allclose(varianceRandom2, 1.0), 'incorrect value for varianceRandom2')
Test.assertTrue(np.allclose(varianceCorrelated2, 1.0), 'incorrect value for varianceCorrelated2')
Test.assertTrue(np.allclose(varianceThreeD2, 0.993967356912), 'incorrect value for varianceThreeD2')
In [30]:
import os
baseDir = os.path.join('data')
inputPath = os.path.join('cs190', 'neuro.txt')
inputFile = os.path.join(baseDir, inputPath)
lines = sc.textFile(inputFile)
print lines.first()[0:100]
# Check that everything loaded properly
assert len(lines.first()) == 1397
assert lines.count() == 46460
tuple
, np.ndarray
) pair. Then apply this function to each record in the RDD, and inspect the first entry of the new parsed data set. Now would be a good time to cache the data, and force a computation by calling count, to ensure the data are cached.
In [31]:
# TODO: Replace <FILL IN> with appropriate code
def parse(line):
"""Parse the raw data into a (`tuple`, `np.ndarray`) pair.
Note:
You should store the pixel coordinates as a tuple of two ints and the elements of the pixel intensity
time series as an np.ndarray of floats.
Args:
line (str): A string representing an observation. Elements are separated by spaces. The
first two elements represent the coordinates of the pixel, and the rest of the elements
represent the pixel intensity over time.
Returns:
tuple of tuple, np.ndarray: A (coordinate, pixel intensity array) `tuple` where coordinate is
a `tuple` containing two values and the pixel intensity is stored in an NumPy array
which contains 240 values.
"""
splitline = line.split(' ')
return (tuple(map(int,splitline[:2])), np.array(splitline[2:], dtype=float))
rawData = lines.map(parse)
rawData.cache()
entry = rawData.first()
print 'Length of movie is {0} seconds'.format(len(entry[1]))
print 'Number of pixels in movie is {0:,}'.format(rawData.count())
print ('\nFirst entry of rawData (with only the first five values of the NumPy array):\n({0}, {1})'
.format(entry[0], entry[1][:5]))
In [32]:
# TEST Parse the data (3b)
Test.assertTrue(isinstance(entry[0], tuple), "entry's key should be a tuple")
Test.assertEquals(len(entry), 2, 'entry should have a key and a value')
Test.assertTrue(isinstance(entry[0][1], int), 'coordinate tuple should contain ints')
Test.assertEquals(len(entry[0]), 2, "entry's key should have two values")
Test.assertTrue(isinstance(entry[1], np.ndarray), "entry's value should be an np.ndarray")
Test.assertTrue(isinstance(entry[1][0], np.float), 'the np.ndarray should consist of np.float values')
Test.assertEquals(entry[0], (0, 0), 'incorrect key for entry')
Test.assertEquals(entry[1].size, 240, 'incorrect length of entry array')
Test.assertTrue(np.allclose(np.sum(entry[1]), 24683.5), 'incorrect values in entry array')
In [33]:
# TODO: Replace <FILL IN> with appropriate code
mn = rawData.flatMap(lambda x: x[1]).min()
mx = rawData.flatMap(lambda x: x[1]).max()
print mn, mx
In [34]:
# TEST Min and max flouresence (3c)
Test.assertTrue(np.allclose(mn, 100.6), 'incorrect value for mn')
Test.assertTrue(np.allclose(mx, 940.8), 'incorrect value for mx')
In [35]:
example = rawData.filter(lambda (k, v): np.std(v) > 100).values().first()
# generate layout and plot data
fig, ax = preparePlot(np.arange(0, 300, 50), np.arange(300, 800, 100))
ax.set_xlabel(r'time'), ax.set_ylabel(r'flouresence')
ax.set_xlim(-20, 270), ax.set_ylim(270, 730)
plt.plot(range(len(example)), example, c='#8cbfd0', linewidth='3.0')
pass
In [42]:
# TODO: Replace <FILL IN> with appropriate code
def rescale(ts):
"""Take a np.ndarray and return the standardized array by subtracting and dividing by the mean.
Note:
You should first subtract the mean and then divide by the mean.
Args:
ts (np.ndarray): Time series data (`np.float`) representing pixel intensity.
Returns:
np.ndarray: The times series adjusted by subtracting the mean and dividing by the mean.
"""
mean = np.mean(ts)
out = []
for val in ts:
out.append(float(val - mean)/mean)
return np.array(out)
scaledData = rawData.mapValues(lambda v: rescale(v))
mnScaled = scaledData.map(lambda (k, v): v).map(lambda v: min(v)).min()
mxScaled = scaledData.map(lambda (k, v): v).map(lambda v: max(v)).max()
print mnScaled, mxScaled
In [43]:
# TEST Fractional signal change (3d)
Test.assertTrue(isinstance(scaledData.first()[1], np.ndarray), 'incorrect type returned by rescale')
Test.assertTrue(np.allclose(mnScaled, -0.27151288), 'incorrect value for mnScaled')
Test.assertTrue(np.allclose(mxScaled, 0.90544876), 'incorrect value for mxScaled')
In [44]:
example = scaledData.filter(lambda (k, v): np.std(v) > 0.1).values().first()
# generate layout and plot data
fig, ax = preparePlot(np.arange(0, 300, 50), np.arange(-.1, .6, .1))
ax.set_xlabel(r'time'), ax.set_ylabel(r'flouresence')
ax.set_xlim(-20, 260), ax.set_ylim(-.12, .52)
plt.plot(range(len(example)), example, c='#8cbfd0', linewidth='3.0')
pass