In order to fully understand the following analysis, some key terms must be defined:
The Error between two volumes is the sum of squared errors between their voxels
Given images a and b, and function f(), the Error Reduction Rate is calculated as: $\frac{\bigg{|}\text{squaredError}(a,b) - \text{squaredError}(a, f(b))\bigg{|}}{\text{squaredError}(a, b)}$
I will obtain the functionality testing data from the NDReg tutorial notebooks. The functionality testing data will be an image of a brain, and an atlas that must be aligned to said image.
The solution for this registration should be the same as the solution detailed in the jupyter documentation here: https://github.com/neurodata/ndreg/blob/master/doc/3D_STP_RegistrationTest.ipynb
I expect this test to perform very well as it is the test used in the example notebook detailing how to run the NDReg pipeline
In order to test the validity of the code, I will be creating code that generates a random synthetic synapse volume, and then performs a random rigid body transformationon the data. The randomly generated volume will have similar size and density statistics as the real world data. Specifically, it will have:
Additionally, the random rigid body transformation will account for having up to 5% imaging error in any singular dimension.
The code below prepares the functionality testing data by pulling it from NDio
In [2]:
import sys
sys.path.insert(0, '../../../ndreg/')
import ndreg
refImg = ndreg.imgDownload('ara_ccf2')
refAnnoImg = ndreg.imgDownload('ara_ccf2', channel="annotation")
inImg = ndreg.imgDownload('eacker01', 'r', 5)
#reorient and rescale the test image
inImgReor = ndreg.imgReorient(inImg, 'lsp', 'rsa')
inImg_ds = ndreg.imgResample(inImgReor, spacing=[.25, .25, .25])
refImg_ds = ndreg.imgResample(refImg, spacing=[.25, .25, .25])
In [103]:
import math
import numpy as np
import SimpleITK as itk
from random import randrange as rand
from random import uniform as floatRand
from affine import Affine
def generatePointSet():
center = (rand(0, 99), rand(0, 99), rand(0, 99))
toPopulate = []
for z in range(-1, 2):
for y in range(-1, 2):
for x in range(-1, 2):
curPoint = (center[0]+z, center[1]+y, center[2]+x)
#only populate valid points
valid = True
for dim in range(3):
if curPoint[dim] < 0 or curPoint[dim] >= 100:
valid = False
if valid:
toPopulate.append(curPoint)
return set(toPopulate)
def getTransform():
#generate a random rigid body transform
#error is assumed to be 5% rotationally
#and laterally
a = floatRand(-.15, .15)
b = floatRand(-.15, .15)
c = floatRand(-.15, .15)
a, b, c = -0.15, -0.15, -0.15
xt = floatRand(-1.5, 1.5)
yt = floatRand(-1.5, 1.5)
zt = floatRand(-1.5, 1.5)
xt, yt, zt = 0., 0., 0.
transform = np.stack([
[math.cos(a)*math.cos(b), math.cos(a)*math.sin(b)*math.sin(c)-math.sin(a)*math.cos(c), math.cos(a)*math.sin(b)*math.cos(c)+math.sin(a)*math.sin(c), xt],
[math.sin(a)*math.cos(b), math.sin(a)*math.sin(b)*math.sin(c)+math.cos(a)*math.cos(c), math.sin(a)*math.sin(b)*math.cos(c)-math.cos(a)*math.sin(c), yt],
[-math.sin(b), math.cos(b)*math.sin(c), math.cos(b)*math.cos(c), zt],
[0., 0., 0., 1.]
])
return transform
def applyRigid(initialVolume, transform):
rigidMatrix = np.zeros_like(initialVolume)
for z in range(initialVolume.shape[0]):
for y in range(initialVolume.shape[1]):
for x in range(initialVolume.shape[2]):
shift = [initialVolume.shape[2]/2., initialVolume.shape[1]/2., initialVolume.shape[0]/2.]
shiftedPoint = list(np.subtract([x, y, z], shift))
shiftedPoint.append(1.)
new = np.dot(transform, shiftedPoint)[:3]
final = np.add(new, shift)
try:
rigidMatrix[int(final[2]), int(final[1]), int(final[0])] = initialVolume[z, y, x]
#if transformed place is out of bounds, dont deal with it
except IndexError:
continue
return rigidMatrix
def generateTestVolume():
#create a test volume
volume = np.zeros((100, 100, 100))
myPointSet = set()
for _ in range(rand(500, 800)):
potentialPointSet = generatePointSet()
#be sure there is no overlap
while len(myPointSet.intersection(potentialPointSet)) > 0:
potentialPointSet = generatePointSet()
for elem in potentialPointSet:
myPointSet.add(elem)
#populate the true volume
for elem in myPointSet:
volume[elem[0], elem[1], elem[2]] = 60000
#introduce noise
noiseVolume = np.copy(volume)
for z in range(noiseVolume.shape[0]):
for y in range(noiseVolume.shape[1]):
for x in range(noiseVolume.shape[2]):
if not (z, y, x) in myPointSet:
noiseVolume[z][y][x] = rand(0, 10000)
#convert numpy array to itk image
itkVolume = itk.GetImageFromArray(noiseVolume)
#perform a random rigid body transform to generate the second volume
transform = getTransform()
rigidMatrix = applyRigid(noiseVolume, transform)
rigidVolume = itk.GetImageFromArray(rigidMatrix)
return itkVolume, rigidVolume, transform
This generation routing will return two volumes, the itkVolume and the rigidVolume. The itkVolume is the untranslated data, which can be used as the groundtruth for the registration operation.
I expect the algorithm to perfrom very well since the magnitude of the transform is extremely low at 5%
I expect the functionality results to be identical to the results outlined in the NDReg documentation notebook. The notebook is linked in Simulation Data - Functionality Testing - 1 - i.
I expect the validation data to have an Error Reduction Rate that is proportional to the number of iterations of gradient descent performed in the algorithm. I do not, however, have a prediction for the cap error reduction rate that will be achieved.
The following code will apply random rigid body transformations to an initial image with two simple clusters in it, and display the results to confirm that small, valid transformations are being made.
In [22]:
import pickle
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from skimage.measure import label
twoCluster = pickle.load(open('../code/tests/synthDat/twoPerfectClusters.synth'))
#pre transform
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
z, y, x = twoCluster.nonzero()
ax.scatter(x, y, z, zdir='z', c='r')
plt.show()
In [45]:
for i in range(3):
#post transform
transform = getTransform()
post = applyRigid(twoCluster, transform)
print transform
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
z, y, x = post.nonzero()
ax.scatter(x, y, z, zdir='z', c='r')
plt.show()
From the visualizations provided, I can confirm that the transformation function is successfully transforming the data with a small rigid body transform.
The validity of the random cluster generation routine (i.e. generateTestVolume) has already been investigated and reported on throrughly here:
I am not going to test NDReg's failure cases through data; rather, I am going to test them through hyperparameter variation. It is my expectation that modulating the number of gradient descent iterations of the algorithm will cause failure and success cases to appear on their own.
Below is the pseudocode for NDReg's (poorly named) imgAffineComposite funciton.
The main inputs to this algorithm are 2 3D volumes of the same data type inImg_ds and refImage_ds
The output of this algorithm is a matrix superposition of 4 transformations in this order:
The pseudocode for the algorithm is below:
In [ ]:
######################################
###THIS IS PSEUDOCODE; WILL NOT RUN###
######################################
def algorithm(inImg_ds, refImg_ds, iterations, useMI = False, verbose = False):
translation = randomTranslation()
for iteration in iterations:
newImg = applyTranslation(inImg_ds, translation)
loss = squaredError(newImg, refImg_ds)
translation = gradientDescentStep(translation, loss)
baseImg = newImg
scale = randomScale()
for iteration in iterations:
newImg = applyScale(baseImg, scale)
loss = squaredError(newImg, refImg_ds)
scale = gradientDescentStep(scale, loss)
baseImg = newImg
rigid = randomRigid()
for iteration in iterations:
newImg = applyRigid(baseImg, rigid)
loss = squaredError(newImg, refImg_ds)
rigid = gradientDescentStep(rigid, loss)
baseImg = newImg
affine = randomAffine()
for iteration in iterations:
newImg = applyAffine(baseImg, affine)
loss = squaredError(newImg, refImg_ds)
affine = gradientDescentStep(affine, loss)
return translation * scale * rigid * affine
In [3]:
def register(inImg, baseImg, iterations, MI=False):
return ndreg.imgAffineComposite(inImg,
baseImg,
iterations=iterations,
useMI=MI,
verbose=False)
Please refer to Simulation Data - Section 1 - i
Since attempting to visualize the transformation on the simulated volumes is very difficult, the transformations will be applied to this function in order to help with visualizing the exact distortions of the transformation
In [4]:
def visualizeTransform(transform):
twoCluster = pickle.load(open('../code/tests/synthDat/twoPerfectClusters.synth'))
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
z, y, x = twoCluster.nonzero()
ax.scatter(x, y, z, zdir='z', c='r')
plt.show()
post = applyRigid(twoCluster, transform)
print transform
fig2 = plt.figure()
ax = fig2.add_subplot(111, projection='3d')
z, y, x = post.nonzero()
ax.scatter(x, y, z, zdir='z', c='r')
plt.show()
In [4]:
#display the images to be registered
ndreg.imgShow(inImg_ds, vmax=10000)
ndreg.imgShow(refImg_ds, vmax=500)
In [46]:
baseVolumeImg, transformedVolumeImg, transform = generateTestVolume()
#base volume image
ndreg.imgShow(baseVolumeImg)
#transformed volume image
ndreg.imgShow(transformedVolumeImg)
#transformation visualization
print 'Transformation Visualization:'
visualizeTransform(transform)
In [5]:
transform = register(inImg_ds, refImg_ds, 200, True)
resultImage = ndreg.imgApplyAffine(inImg_ds,
transform,
size=refImg_ds.GetSize(),
spacing=refImg_ds.GetSpacing())
ndreg.imgShow(resultImage, vmax=10000)
The output of the functionality test verifies that the algorithm can be called successfully and is functioning as expected on a base case.
The validation data will be tested as a function of iterations of gradient descent, as well as a function of its loss metric (L2 vs MI). The tests will be evaluated on their error reduction ratio, and their error reduction ratio variance.
In [5]:
def executeValidationTest(iterations, MI):
print '\nExecuting Validation Test With : ', iterations, ' iterations'
#generate an display data
itkVolume, rigidVolume, curTransform = generateTestVolume()
print '\tTransformation Visualization:'
visualizeTransform(curTransform)
ndreg.imgShow(itkVolume, vmax=60000)
ndreg.imgShow(rigidVolume, vmax=60000)
#calculate initial error
initialErr = np.sum((itk.GetArrayFromImage(itkVolume)-itk.GetArrayFromImage(rigidVolume))**2)
print '\tInitial Error: ', initialErr
#perform transform
transform = register(rigidVolume, itkVolume, 100, MI)
resultVolume = ndreg.imgApplyAffine(rigidVolume, transform, size=itkVolume.GetSize(), spacing=itkVolume.GetSpacing())
ndreg.imgShow(resultVolume, vmax=60000)
#perform random transform
randMatrix = applyRigid(itk.GetArrayFromImage(rigidVolume), getTransform())
#calculate final error
finalPipeErr = np.sum((itk.GetArrayFromImage(itkVolume)-itk.GetArrayFromImage(resultVolume))**2)
finalRandErr = np.sum((itk.GetArrayFromImage(itkVolume)-randMatrix)**2)
print '\tFinal Pipeline Error: ', finalPipeErr
print '\tFinal Random Error: ', finalRandErr
#get the reduction in error
pipeRatio = (initialErr - finalPipeErr)/initialErr
randRatio = (initialErr - finalRandErr)/initialErr
print '\tPipeline Reduction Ratio: ', pipeRatio
print '\tRandom Reduction Ratio: ', randRatio
print '\n\n\n'
return pipeRatio, randRatio
In [54]:
pipeDat = []
randDat = []
for iterationNum in (150, 200, 250, 300, 350):
pipeList = []
randList = []
for i in range(3):
pipeline, random = executeValidationTest(iterationNum, False)
pipeList.append(pipeline)
randList.append(random)
print '\nAverage Error Reduction Ratio:'
print '\tPipeline: ', np.average(pipeList)
print '\tRandom: ', np.average(randList)
print '\nError Reduction Ratio Variance'
print '\tPipeline: ', np.var(pipeList)
print '\tRandom: ', np.var(randList)
#save the data for later exploration
pipeDat.append(pipeList)
randDat.append(randList)
In [65]:
fig = plt.figure()
plt.title('Error Reduction Rate vs Iterations')
plt.scatter([150, 200, 250, 300, 350], [np.average(elem) for elem in pipeDat])
plt.show()
In [60]:
fig = plt.figure()
plt.title('Variance vs Iterations')
plt.scatter([150, 200, 250, 300, 350], [np.var(elem) for elem in pipeDat])
plt.show()
In [53]:
miPipeDat = []
miRandDat = []
for iterationNum in (150, 200, 250, 300, 350):
pipeList = []
randList = []
for i in range(3):
pipeline, random = executeValidationTest(iterationNum, True)
pipeList.append(pipeline)
randList.append(random)
print '\nAverage Error Reduction Ratio:'
print '\tPipeline: ', np.average(pipeList)
print '\tRandom: ', np.average(randList)
print '\nError Reduction Ratio Variance'
print '\tPipeline: ', np.var(pipeList)
print '\tRandom: ', np.var(randList)
#save the data for later exploration
miPipeDat.append(pipeList)
miRandDat.append(randList)
In [55]:
fig = plt.figure()
plt.title('Error Reduction Rate vs Iterations')
plt.scatter([150, 200, 250, 300, 350], [np.average(elem) for elem in miPipeDat])
plt.show()
In [56]:
fig = plt.figure()
plt.title('Variance vs Iterations')
plt.scatter([150, 200, 250, 300, 350], [np.var(elem) for elem in miPipeDat])
plt.show()
In [66]:
fig = plt.figure()
plt.scatter([150, 200, 250, 300, 350],[np.average(elem) for elem in miPipeDat], c='r')
plt.scatter([150, 200, 250, 300, 350],[np.average(elem) for elem in pipeDat], c='b')
plt.title('Error reduction for MI Loss(r) and L2 loss(b) vs descent iterations')
plt.show()
In [68]:
fig = plt.figure()
plt.scatter([150, 200, 250, 300, 350],[np.var(elem) for elem in miPipeDat], c='r')
plt.scatter([150, 200, 250, 300, 350],[np.var(elem) for elem in pipeDat], c='b')
plt.title('Error Reduction Variance for MI Loss(r) and L2 loss(b) vs descent iterations')
plt.show()
The results of both NDREg tests were underwhelming. The aveage error reduction of the pipeline is hovering around 16%, regardless of iterations of gradient descent or loss metric.
One interesting relationship that I saw was that the data sets with the most variation tended to be the ones with the highest reduction in error. I am going to attempt to introduce variation by running NDReg with a small number of iterations many times, and selecting the best output (as determined by loss reduction rate)
In [69]:
def executeNDReg2Test(iterations, MI):
print '\nExecuting Validation Test With : ', iterations, ' NDReg iterations'
#generate an display data
itkVolume, rigidVolume, curTransform = generateTestVolume()
print '\tTransformation Visualization:'
visualizeTransform(curTransform)
ndreg.imgShow(itkVolume, vmax=60000)
ndreg.imgShow(rigidVolume, vmax=60000)
#calculate initial error
initialErr = np.sum((itk.GetArrayFromImage(itkVolume)-itk.GetArrayFromImage(rigidVolume))**2)
print '\tInitial Error: ', initialErr
finalVolume = None
bestReduction = 0.
for i in range(iterations):
#perform transform
transform = register(rigidVolume, itkVolume, 250, MI)
resultVolume = ndreg.imgApplyAffine(rigidVolume, transform, size=itkVolume.GetSize(), spacing=itkVolume.GetSpacing())
finalPipeErr = np.sum((itk.GetArrayFromImage(itkVolume)-itk.GetArrayFromImage(resultVolume))**2)
if finalPipeErr > bestReduction:
finalVolume = resultVolume
bestReduction = finalPipeErr
#perform random transform
randMatrix = applyRigid(itk.GetArrayFromImage(rigidVolume), getTransform())
#calculate final error
finalPipeErr = np.sum((itk.GetArrayFromImage(itkVolume)-itk.GetArrayFromImage(finalVolume))**2)
finalRandErr = np.sum((itk.GetArrayFromImage(itkVolume)-randMatrix)**2)
print '\tFinal Pipeline Error: ', finalPipeErr
print '\tFinal Random Error: ', finalRandErr
#get the reduction in error
pipeRatio = (initialErr - finalPipeErr)/initialErr
randRatio = (initialErr - finalRandErr)/initialErr
print '\tPipeline Reduction Ratio: ', pipeRatio
print '\tRandom Reduction Ratio: ', randRatio
print '\n\n\n'
return pipeRatio, randRatio
In [71]:
multiPipeDat = []
multiRandDat = []
for iterationNum in range(1, 6):
pipeList = []
randList = []
for i in range(3):
pipeline, random = executeNDReg2Test(iterationNum, False)
pipeList.append(pipeline)
randList.append(random)
print '\nAverage Error Reduction Ratio:'
print '\tPipeline: ', np.average(pipeList)
print '\tRandom: ', np.average(randList)
print '\nError Reduction Ratio Variance'
print '\tPipeline: ', np.var(pipeList)
print '\tRandom: ', np.var(randList)
#save the data for later exploration
multiPipeDat.append(pipeList)
multiRandDat.append(randList)
In [73]:
fig = plt.figure()
plt.title('Error Reduction Rate vs Iterations')
plt.scatter([1, 2, 3, 4, 5], [np.average(elem) for elem in multiPipeDat])
plt.show()
In [76]:
fig = plt.figure()
plt.title('Error Reduction Rate Variance vs Iterations')
plt.scatter([1, 2, 3, 4, 5], [np.var(elem) for elem in multiPipeDat])
plt.show()
In [75]:
print 'Average of Average Error Reduction for iterative NDReg: ', np.average([np.average(elem) for elem in multiPipeDat])
This repetition of NDReg has improved the reduction of error ratio substantially from the .18 reduction ratio of the non repeated test. Now, I will repeat the same test with the MI loss metric as opposed to the L2 loss metric
In [72]:
multiMIPipeDat = []
multiMIRandDat = []
for iterationNum in range(1, 6):
pipeList = []
randList = []
for i in range(3):
pipeline, random = executeNDReg2Test(iterationNum, True)
pipeList.append(pipeline)
randList.append(random)
print '\nAverage Error Reduction Ratio:'
print '\tPipeline: ', np.average(pipeList)
print '\tRandom: ', np.average(randList)
n Ratio:'
print '\tPipeline: ', np.averag
print '\nError Reduction Ratio Variance'
print '\tPipeline: ', np.var(pipeList)
print '\tRandom: ', np.var(randList)
#save the data for later exploration
multiMIPipeDat.append(pipeList)
multiMIRandDat.append(randList)
In [79]:
fig = plt.figure()
plt.title('Error Reduction Rate vs Iterations')
plt.scatter([1, 2, 3, 4, 5], [np.average(elem) for elem in multiMIPipeDat])
plt.show()
In [80]:
fig = plt.figure()
plt.title('Error Reduction Rate Variance vs Iterations')
plt.scatter([1, 2, 3, 4, 5], [np.var(elem) for elem in multiMIPipeDat])
plt.show()
In [81]:
print 'Average of Average Error Reduction for iterative NDReg: ', np.average([np.average(elem) for elem in multiMIPipeDat])
The results of the iterative pipeline are underwhelming. The high variance in the 4th trial verifies that that high error reduction rate of the 4th trial is likely a result of luck, and not of actual algorithmic integrity.
The variance in these registration metods remains quite high. This is a clue to me that we are getting stuck in a local, non global minima during gradient descent. In order to combat this, I am going to redefine the registration method such that it attempts to align the centroids of the images before running the registration pipline. This will likely cause a large time increase,but hopefully prevent local minima convergence.
However, before going too crazy with new methods, I am going to apply the current transforms to some real data and generate some qualitative results as a 'they actually dont work' sanity check
In [83]:
#load the real data
realDat = pickle.load(open('../code/tests/synthDat/realDat_tpi_5-10.io'))
In [84]:
for i in range(5):
fig = plt.figure()
plt.imshow(realDat[i], cmap='gray')
plt.show()
In [104]:
transform = getTransform()
print transform
rigidVol = applyRigid(realDat, transform)
In [97]:
for i in range(5):
fig = plt.figure()
plt.imshow(rigidVol[i], cmap='gray')
plt.show()
In [ ]: