In [1]:
import itertools
import random
from collections import deque
from copy import deepcopy
import numpy
from nupic.bindings.math import SparseBinaryMatrix, GetNTAReal
Functionality that could be implemented in SparseBinaryMatrix
In [2]:
def makeSparseBinaryMatrix(numRows, numCols):
"""
Construct a SparseBinaryMatrix.
There is a C++ constructor that does this, but it's currently not available
to Python callers.
"""
matrix = SparseBinaryMatrix(numCols)
matrix.resize(numRows, numCols)
return matrix
def rightVecSumAtNZ_sparse(sparseMatrix, sparseBinaryArray):
"""
Like rightVecSumAtNZ, but it supports sparse binary arrays.
@param sparseBinaryArray (sequence)
A sorted list of indices.
Note: this Python implementation doesn't require the list to be sorted, but
an eventual C implementation would.
"""
denseArray = numpy.zeros(sparseMatrix.nCols(), dtype=GetNTAReal())
denseArray[sparseBinaryArray] = 1
return sparseMatrix.rightVecSumAtNZ(denseArray)
def setOuterToOne(sparseMatrix, rows, cols):
"""
Equivalent to:
SparseMatrix.setOuter(rows, cols,
numpy.ones((len(rows),len(cols)))
But it works with the SparseBinaryMatrix. If this functionality is added to
the SparseBinaryMatrix, it will have the added benefit of not having to
construct a big array of ones.
"""
for rowNumber in rows:
sparseRow = sorted(set(sparseMatrix.getRowSparse(rowNumber)).union(cols))
sparseMatrix.replaceSparseRow(rowNumber, sparseRow)
In [3]:
class SetMemory(object):
"""
Uses proximal synapses, distal dendrites, and inhibition to implement "set
memory" with neurons. Set Memory can recognize a set via a series of
inputs. It associates an SDR with each set, growing proximal synapses from
each cell in the SDR to each proximal input. When the SetMemory receives an
ambiguous input, it activates a union of these SDRs. As it receives other
inputs, each SDR stays active only if it has both feedforward and lateral
support. Each SDR has lateral connections to itself, so an SDR has lateral
support if it was active in the previous time step. Over time, the union is
narrowed down to a single SDR.
Requiring feedforward and lateral support is functionally similar to computing
the intersection of the feedforward support and the previous active cells.
The advantages of this approach are:
1. Better noise robustness. If cell is randomly inactive, it's not excluded in
the next time step.
2. It doesn't require any new neural phenomena. It accomplishes all this
through distal dendrites and inhibition.
3. It combines well with other parallel layers. A cell can grow one distal
dendrite segment for each layer and connect each to an object SDR, and use
the number of active dendrite segments to drive inhibition.
This doesn't model:
- Synapse permanences. When it grows a synapse, it's immediately connected.
- Subsampling. When growing synapses to active cells, it simply grows
synapses to every one.
These aren't needed for this experiment.
"""
def __init__(self,
layerID,
feedforwardID,
lateralIDs,
layerSizes,
sdrSize,
minThresholdProximal,
minThresholdDistal):
"""
@param layerID
The layer whose activity this SetMemory should update.
@param feedforwardID
The layer that this layer might form feedforward connections to.
@param lateralIDs (iter)
The layers that this layer might form lateral connections to.
If this layer will form internal lateral connections, this list must include
this layer's layerID.
@param layerSizes (dict)
A dictionary from layerID to number of cells. It must contain a size for
layerID, feedforwardID, and each of the lateralIDs.
@param sdrSize (int)
The number of cells in an SDR.
@param minThresholdProximal (int)
The number of active feedforward synapses required for a cell to have
"feedforward support".
@param minThresholdDistal (int)
The number of active distal synapses required for a segment to be active.
"""
self.layerID = layerID
self.feedforwardID = feedforwardID
self.sdrSize = sdrSize
self.minThresholdProximal = minThresholdProximal
self.minThresholdDistal = minThresholdDistal
# Matrix of connected synapses. Permanences aren't modelled.
self.proximalConnections = makeSparseBinaryMatrix(layerSizes[layerID],
layerSizes[feedforwardID])
# Synapses to lateral layers. Each matrix represents one segment per cell.
# A cell won't grow more than one segment to another layer. If the cell
# appears in multiple object SDRs, it will connect its segments to a union
# of object SDRs.
self.lateralConnections = dict(
(lateralID, makeSparseBinaryMatrix(layerSizes[layerID],
layerSizes[lateralID]))
for lateralID in lateralIDs)
self.numCells = layerSizes[layerID]
self.isReset = True
def learningCompute(self, activity):
"""
Chooses active cells using the previous active cells and the reset signal.
Grows proximal synapses to the feedforward layer's current active cells, and
grows lateral synapses to the each lateral layer's previous active cells.
Reads:
- activity[0][feedforwardID]["activeCells"]
- activity[1][lateralID]["activeCells"] for each lateralID
Writes to:
- activity[0][layerID]["activeCells"]
- The feedforward connections matrix
- The lateral connections matrices
"""
# Select active cells
if self.isReset:
activeCells = sorted(random.sample(xrange(self.numCells), self.sdrSize))
self.isReset = False
else:
activeCells = activity[1][self.layerID]["activeCells"]
# Lateral learning
if len(activity) > 1:
for lateralID, connections in self.lateralConnections.iteritems():
setOuterToOne(connections, activeCells,
activity[1][lateralID]["activeCells"])
# Proximal learning
setOuterToOne(self.proximalConnections, activeCells,
activity[0][self.feedforwardID]["activeCells"])
# Write the activity
activity[0][self.layerID]["activeCells"] = activeCells
def inferenceCompute(self, activity):
"""
Chooses active cells using feedforward and lateral input.
Reads:
- activity[0][feedforwardID]["activeCells"]
- activity[1][lateralID]["activeCells"] for each lateralID
Writes to:
- activity[0][layerID]["activeCells"]
"""
# Calculate feedforward support
overlaps = rightVecSumAtNZ_sparse(self.proximalConnections,
activity[0][self.feedforwardID]["activeCells"])
feedforwardSupportedCells = set(
numpy.where(overlaps >= self.minThresholdProximal)[0])
# Calculate lateral support
numActiveSegmentsByCell = numpy.zeros(self.numCells)
if self.isReset:
# Don't activate any segments
self.isReset = False
elif len(activity) >= 2:
for lateralID, connections in self.lateralConnections.iteritems():
overlaps = rightVecSumAtNZ_sparse(connections,
activity[1][lateralID]["activeCells"])
numActiveSegmentsByCell[overlaps >= self.minThresholdDistal] += 1
# Inference
activeCells = []
# First, activate cells that have feedforward support
orderedCandidates = sorted((cell for cell in feedforwardSupportedCells),
key=lambda x: numActiveSegmentsByCell[x],
reverse=True)
for _, cells in itertools.groupby(orderedCandidates,
lambda x: numActiveSegmentsByCell[x]):
activeCells.extend(cells)
if len(activeCells) >= self.sdrSize:
break
# If necessary, activate cells that were previously active and have lateral
# support
if len(activeCells) < self.sdrSize and len(activity) >= 2:
prevActiveCells = activity[1][self.layerID]["activeCells"]
orderedCandidates = sorted((cell for cell in prevActiveCells
if cell not in feedforwardSupportedCells
and numActiveSegmentsByCell[cell] > 0),
key=lambda x: numActiveSegmentsByCell[x],
reverse=True)
for _, cells in itertools.groupby(orderedCandidates,
lambda x: numActiveSegmentsByCell[x]):
activeCells.extend(cells)
if len(activeCells) >= self.sdrSize:
break
# Write the activity
activity[0][self.layerID]["activeCells"] = sorted(activeCells)
def reset(self):
"""
Signal that we're now going to observe a different set.
With learning, this signals that we're going to observe a never-before-seen
set.
With inference, this signals to start inferring a new object, ignoring
recent inputs.
"""
self.isReset = True
Experiment code
Train an array of columns to recognize these objects, then show it Object 1. It will randomly move its sensors to different feature-locations on the object. It will never put two sensors on the same feature-location at the same time.
In [4]:
LAYER_4_SIZE = 2048 * 8
def createFeatureLocationPool(size=10):
duplicateFound = False
for _ in xrange(5):
candidateFeatureLocations = [frozenset(random.sample(xrange(LAYER_4_SIZE), 40))
for featureNumber in xrange(size)]
# Sanity check that they're pretty unique.
duplicateFound = False
for pattern1, pattern2 in itertools.combinations(candidateFeatureLocations, 2):
if len(pattern1 & pattern2) >= 5:
duplicateFound = True
break
if not duplicateFound:
break
if duplicateFound:
raise ValueError("Failed to generate unique feature-locations")
featureLocationPool = {}
for i, featureLocation in enumerate(candidateFeatureLocations):
if i < 26:
name = chr(ord('A') + i)
else:
name = "Feature-location %d" % i
featureLocationPool[name] = featureLocation
return featureLocationPool
def experiment(objects, numColumns, selectRandom=True):
#
# Initialize
#
layer2IDs = ["Column %d Layer 2" % i for i in xrange(numColumns)]
layer4IDs = ["Column %d Layer 4" % i for i in xrange(numColumns)]
layerSizes = dict((layerID, 4096) for layerID in layer2IDs)
layerSizes.update((layerID, LAYER_4_SIZE) for layerID in layer4IDs)
layer2s = dict((l2, SetMemory(layerID=l2,
feedforwardID=l4,
lateralIDs=layer2IDs,
layerSizes=layerSizes,
sdrSize=40,
minThresholdProximal=20,
minThresholdDistal=20))
for l2, l4 in zip(layer2IDs, layer4IDs))
#
# Learn
#
layer2ObjectSDRs = dict((layerID, {}) for layerID in layer2IDs)
activity = deque(maxlen=2)
step = dict((layerID, {})
for layerID in itertools.chain(layer2IDs, layer4IDs))
for objectName, objectFeatureLocations in objects.iteritems():
for featureLocationName in objectFeatureLocations:
l4ActiveCells = sorted(featureLocationPool[featureLocationName])
for _ in xrange(2):
activity.appendleft(deepcopy(step))
# Compute Layer 4
for layerID in layer4IDs:
activity[0][layerID]["activeCells"] = l4ActiveCells
activity[0][layerID]["featureLocationName"] = featureLocationName
# Compute Layer 2
for setMemory in layer2s.itervalues():
setMemory.learningCompute(activity)
for layerID, setMemory in layer2s.iteritems():
layer2ObjectSDRs[layerID][objectName] = activity[0][layerID]["activeCells"]
setMemory.reset()
#
# Infer
#
objectName = "Object 1"
objectFeatureLocations = objects[objectName]
# Start fresh for inference. No max length because we're also using it as a log.
activity = deque()
success = False
for attempt in xrange(60):
if selectRandom:
featureLocationNames = random.sample(objectFeatureLocations, numColumns)
else:
# Naively move the sensors to touch every point as soon as possible.
start = (attempt * numColumns) % len(objectFeatureLocations)
end = start + numColumns
featureLocationNames = list(objectFeatureLocations)[start:end]
overflow = end - len(objectFeatureLocations)
if overflow > 0:
featureLocationNames += list(objectFeatureLocations)[0:overflow]
# Give the feedforward input 3 times so that the lateral inputs have time to spread.
for _ in xrange(3):
activity.appendleft(deepcopy(step))
# Compute Layer 4
for layerID, name in zip(layer4IDs, featureLocationNames):
activity[0][layerID]["activeCells"] = sorted(featureLocationPool[name])
activity[0][layerID]["featureLocationName"] = name
# Compute Layer 2
for setMemory in layer2s.itervalues():
setMemory.inferenceCompute(activity)
if all(activity[0][layer2]["activeCells"] == layer2ObjectSDRs[layer2][objectName]
for layer2 in layer2IDs):
success = True
print "Converged after %d touches" % (attempt + 1)
break
if not success:
print "Failed to converge after %d touches" % (attempt + 1)
return (objectName, activity, layer2ObjectSDRs)
Create 8 objects, each with 7 feature-locations. Each object is 1 different from each other object.
In [5]:
featureLocationPool = createFeatureLocationPool(size=8)
objects = {"Object 1": set(["A", "B", "C", "D", "E", "F", "G"]),
"Object 2": set(["A", "B", "C", "D", "E", "F", "H"]),
"Object 3": set(["A", "B", "C", "D", "E", "G", "H"]),
"Object 4": set(["A", "B", "C", "D", "F", "G", "H"]),
"Object 5": set(["A", "B", "C", "E", "F", "G", "H"]),
"Object 6": set(["A", "B", "D", "E", "F", "G", "H"]),
"Object 7": set(["A", "C", "D", "E", "F", "G", "H"]),
"Object 8": set(["B", "C", "D", "E", "F", "G", "H"])}
We're testing L2 in isolation, so these "A", "B", etc. patterns are L4 representations, i.e. "feature-locations".
In [6]:
results = experiment(objects, numColumns=1)
Move sensors deterministically, trying to touch every point with some sensor as quickly as possible.
In [7]:
results = experiment(objects, numColumns=1, selectRandom=False)
In [8]:
results = experiment(objects, numColumns=7)
Move sensors randomly.
In [9]:
for numColumns in xrange(1, 8):
print "With %d columns:" % numColumns
results = experiment(objects, numColumns)
print
Move sensors deterministically, trying to touch every point with some sensor as quickly as possible.
In [10]:
for numColumns in xrange(1, 8):
print "With %d columns:" % numColumns
results = experiment(objects, numColumns, selectRandom=False)
print
In [11]:
(testObject,
activity,
layer2ObjectSDRs) = results
In [12]:
for t, step in enumerate(reversed(activity)):
print "Step %d" % t
for column in xrange(len(step) / 2):
layer2ID = "Column %d Layer 2" % column
layer4ID = "Column %d Layer 4" % column
featureLocationName = step[layer4ID]["featureLocationName"]
activeCells = set(step[layer2ID]["activeCells"])
layer2Contents = {}
for objectName, objectCells in layer2ObjectSDRs[layer2ID].iteritems():
containsRatio = len(activeCells & set(objectCells)) / float(len(objectCells))
if containsRatio >= 0.20:
layer2Contents[objectName] = containsRatio
print "Column %d: Input: %s, Active cells: %d %s" % (column,
featureLocationName,
len(activeCells),
layer2Contents)
print
Each step is a timestep. We spend 3 timesteps on each touch.