In [1]:
import numpy as np
from matplotlib import pyplot as plt
from __future__ import division
from itertools import product
%matplotlib inline
In [111]:
class CA(object):
'''
1-D cellular automaton object
'''
def __init__(self, rule, alphabet_size, radius, initial_condition):
'''
Initializes the CA.
Parameters
----------
rule: int
Defines mapping from current neighborhood to next state. Generalized of Wolfram ECA rule numbering.
Use max_rule(alphabet_size, radius) function to get maximum rule number allowed for given
alphabet size and radius.
alphabet_size: int
Number of local states for each site on CA.
radius: int
Size of neighborhood used for update rule. Neighborhood size is 2*radius +1.
initial_condition: array_like
Initializes initial condition for ECA.
Instance Variables
------------------
self._initial:
Stored initial conditions.
self._spacetime:
Array of spacetime values. Initialized to initial_condition. Space is in horizontal direction,
time is vertical.
self._rule:
CA lookup table rule. Initialized to rule.
self._alphabet:
Local state alphabet size. Initialized to alphabet_size
self._radius:
Neighborhood radius. Initialized to radius.
self._table:
Hypercube array for CA lookup table using CA rule number. Neighborhoods represented as
coordinates on hypercube, and value at the coordinate is the corresponding mapping of that
neighborhood.
self._transduced:
Array of transduced spacetime data (i.e. output of specific transducer run over raw spacetime
data).
Initialized to all zeros.
self._filtered:
Array of filtered spacetime data. All domain sites are 0 and all non-domain sites are 1.
Initialized to all zeros.
'''
size = np.size(initial_condition)
self._initial = initial_condition
self._spacetime = initial_condition
self._rule = rule
self._alphabet = alphabet_size
self._radius = radius
self._table = lookup_table(rule, alphabet_size, radius)
self._transduced = np.zeros(size, dtype = int)
self._masked = np.zeros(size, dtype = int)
self._filtered = np.zeros(size, dtype = int)
def evolve(self, time):
'''
Evolves the CA for specified number of steps.
Parameters
---------
time: int
Number of time steps to evolve the CA.
Updates
-------
self._spacetime
Adds new spacetime data to array of spacetime data.
'''
T = time
#Initial conditions are 1-D arrays but spacetime data is 2D array, so must treat each case
#separately. The first case is for when CA is first initialzed or reset and spacetime is
#equal to initial condition, and thus only 1-D.
if len(np.shape(self._spacetime)) == 1:
size = np.size(self._spacetime)
#get neighborhood values
neighbors = neighborhood(self._spacetime, self._radius)
#initialize array for new configuration
new = np.zeros(size, dtype = int)
#use lookup table to set values for new configuration
new = self._table[neighbors]
#stack new configuration into spacetime data
self._spacetime = np.vstack((self._spacetime, new))
#now that spacetime is 2-D, time loop will run so remove a time step
T = time -1
#time step loop for 2-D spacetime array
rows, columns = np.shape(self._spacetime)
for t in xrange(T):
#get array of current neighborhood values
neighbors = neighborhood(self._spacetime[rows - 1], self._radius)
#initialize array for new configuration string
new = np.zeros(columns, dtype = int)
#run the current neighborhood values through the lookup table to get next configuration
new = self._table[neighbors]
#add new configuration values to spacetime array
self._spacetime = np.vstack((self._spacetime, new))
#increase number of rows since this is initialized outside of time loop
rows += 1
def reset(self, new_state = None):
'''
Resets the spacetime data back to the given state or original initial conditions.
Parameters
----------
new_state: array_like
Configuration of new initial conditions to reset the CA to.
If None, CA is returned to original initial conditions.
'''
if new_state is None:
#reset spacetime data to the initial conditions
self._spacetime = np.copy(self._initial)
else:
self._spacetime = np.copy(new_state)
def rewind(self, steps):
'''
Rewind spacetime data the given number of steps from current configuration
'''
rows, columns = np.shape(self._spacetime)
self._spacetime = self._spacetime[:rows - steps]
def get_spacetime(self):
return self._spacetime
def current_state(self):
'''
Returns current configuration of the CA.
'''
if len(np.shape(self._spacetime)) == 1:
return self._spacetime
else:
rows, columns = np.shape(self._spacetime)
return self._spacetime[rows - 1]
def lookup_table(self, full = False):
'''
Provides the lookup table for the CA.
Parameters
----------
full: bool
If True, will print full lookup table in lexicographic order. Best if used for ECAs and other small
neighborhood size rules.
If False, returns the lookup table as a dict.
'''
#simplify variable names. R = neighborhood size
A = self._alphabet
R = 2*self._radius + 1
scan = tuple(np.arange(0,A)[::-1])
if full:
for a in product(scan, repeat = R):
print a , self._table[a]
return None
else:
lookup = {a : self._table[a] for a in product(scan, repeat = R)}
return lookup
def perturb(self, N_flips):
'''
Randomly flips specified number of bits on current spatial configuration of CA.
Parameters
----------
N_flips: int
Number of sites on spatial configuration to flip
Updates
-------
self._spacetime
Last row of spacetime array (current configuration) has 'N_flips' bits randomly selected
and flipped.
'''
rows, columns = np.shape(self._spacetime)
#indices for current configuration array
indices = np.arange(0, columns, dtype = int)
#array to keep track of location of bit flips so a single site is not flipped multiple times
flip_check = np.zeros(columns)
count = 0
#loop to flip bits while making sure to not flip bits more than once
while count < N_flips:
flip = np.random.choice(indices)
if flip_check[flip] == 0:
flip_check[flip] += 1
count += 1
self._spacetime[rows -1][flip] = -self._spacetime[rows -1][flip] + 1
def transduce(self, t, direction = 'right'):
'''
Uses given transducer to scan spacetime data and record output to instance variable
self._transduced. Result can be displayed with CA.diagram('transduced') method.
Parameters
----------
t : tuple(d, start_state, synch)
d: dict
Mapping current machine state and input symbol to next machine state
and output symbol.
start_state: str
Start state for the transducer.
synch: int
Number of transient steps need for transducer to synchronize to input data.
'''
#reset / initialize self._transduced to correct shpae
rows, columns = np.shape(self._spacetime)
self._transduced = np.zeros((rows, columns), dtype = int)
#initialize transducer object with given transducer input
machine = transducer(*t)
#scan each row of spacetime data with transducer
for r in xrange(rows):
self._transduced[r] = machine.scan(self._spacetime[r], direction)
def mask(self, t, direction ='right'):
'''
Uses given transducer to mask spacetime data and stores output to instance variable
self._masked. Result can be displayed with CA.diagram('masked') method.
Parameters
----------
t: transducer tuple (d, start_state, synch)
d: dict
Mapping of current machine state and input symbol to next machine state
and output symobl.
start_state: str
Start state for the transducer.
synch: int
Number of transient steps needed for transducer to synchronize to input data.
'''
rows, columns = np.shape(self._spacetime)
self._masked = np.zeros((rows, columns), dtype = int)
machine = transducer(*t)
for r in xrange(rows):
self._masked[r] = machine.mask(self._spacetime[r], self._alphabet, direction)
def domain_filter(self, t, fill = False):
'''
Uses given transducer to filter spacetime data and stores output to instance variable
self._filtered. Result can be displayed with CA.diagram('filtered') method.
Parameters
----------
t: transducer tuple (d, start_state, synch)
d: dict
Mapping of current machine state and input symbol to next machine state
and output symobl.
start_state: str
Start state for the transducer.
synch: int
Number of transient steps needed for transducer to synchronize to input data.
fill: bool
Set to True if left and right transducer scans are different; fill == True will combine and fill-in
scans from both directions. If False, will set self._filtered to simplified transducer output
with 0 domain and 1 for not domain.
'''
rows, coluns = np.shape(self._spacetime)
self._filtered = np.zeros((rows, coluns), dtype = int)
machine = transducer(*t)
for r in xrange(rows):
self._filtered[r] = machine.domain_filter(self._spacetime[r], fill)
def diagram(self, option = None, size = None):
'''
Plots raw, transduced, or filtered spacetime diagram.
Parameters
----------
option: 'spacetime' , 'transduced', or 'filtered'
Input to chose which spacetime data to plot. If 'None', plots raw spacetime data.
size: float
Deteremines figure size.
Returns
-------
Matplotlib imshow of self._spacetime, self._transduced, or self._filtered depending on 'option' input
'''
if size == None:
s = 15
else:
s = size
if option == 'spacetime' or option is None:
plt.figure(figsize=(s,s))
plt.imshow(self._spacetime, cmap=plt.cm.bone, interpolation='nearest')
plt.show()
elif option == 'transduced':
plt.figure(figsize=(s,s))
plt.imshow(self._transduced, cmap=plt.cm.bone, interpolation='nearest')
plt.show()
elif option == 'filtered':
plt.figure(figsize=(s,s))
plt.imshow(self._filtered, cmap=plt.cm.Blues, interpolation='nearest')
plt.show()
elif option == 'masked':
plt.figure(figsize=(s,s))
plt.imshow(self._masked, cmap=plt.cm.bone, interpolation='nearest')
plt.show()
In [112]:
class ECA(CA):
'''
Elementary cellular automata subclass. Same as CA subclass but with alphabet size of 2 and
neighborhood radius 1.
'''
alphabet_size = 2
radius = 1
def __init__(self, rule, initial_condition):
'''
Initializes the ECA.
Parameters
----------
rule: int between 0 and 255
Defines the lookup table using Wolfram lexicographic numbering scheme.
initial_condition: array_like
Initial configuration for the ECA.
'''
super(ECA, self).__init__(rule, ECA.alphabet_size, ECA.radius, initial_condition)
In [4]:
def max_rule(alphabet, radius):
'''
Returns the maximum rule number for a given alphabet size and neighborhood radius.
Parameters
----------
alphabet: int
Alphabet size for the CA.
radius: int
Neighborhood radius length for the CA.
Returns
-------
rule number: int
The maximum allowed rule number for the given alphabet size and radius.
'''
rule = 0
#R = neighborhood size
R = 2*radius + 1
#add up max possible output for every neighborhood
for i in xrange(alphabet ** R):
rule += (alphabet - 1) * (alphabet)**i
return rule
In [5]:
import string
digs = string.digits + string.letters
def int2base(x, base, radius):
if x < 0: sign = -1
elif x == 0: return digs[0]
else: sign = 1
x *= sign
digits = []
while x:
digits.append(digs[int(x % base)])
x /= base
if sign < 0:
digits.append('-')
digits.reverse()
result = ''.join(digits)
leng = len(result)
R = base ** (2*radius + 1)
#return result
return result[leng - R:]
In [6]:
test = int2base(3838393, 3,4)
print len(test)
In [7]:
def lookup_table(rule, alphabet_size, radius):
'''
Produces the lookup table for the specified CA rule of the given alphabet size and radius.
Parameters
----------
rule: int
CA rule number that defines the lookup table. For neighborhood in lexicographic order, the
corresponding CA update rule mapping is added up in the base of the alphabet size. This is the
rule number.
alphabet_size: int
Number of local site states available to the CA.
radius: int
Radius of the neighorhood used for the update rule. For radius R, neighborhood size is 2R+1.
Returns
-------
table: ndarray
Multidimensional array representation of lookup table as a hypercube. The neighborhood is
represented as coordinates on the hypercube and the associated update mapping of that neighborhood
is the value of the hypercube at those coordinates.
'''
#R = neighborhood size
R = 2*radius + 1
A = alphabet_size
#convert rule number to appropriate base using int2base function
rule_number = int2base(rule, alphabet_size, radius)
#build shape tuple to get corret shape for lookuptable ndarray from the given radius and alphabet size
shape = ()
for i in xrange(R):
shape += (A,)
#initialize empty ndarray for table with the correct shape
table = np.zeros(shape, dtype = int)
i = 0
#scan is tuple (A, A-1, A-2, ..., 0) used to build neighborhoods in lexicographic order
scan = tuple(np.arange(0,A)[::-1])
#run loop through neighborhoods in lexicographic order and fill in the lookup table outputs
#on the ndarray at coordinates corresponding to the neighborhood value
for a in product(scan, repeat = R):
table[a] = rule_number[i]
i+=1
return table
In [8]:
def neighborhood(data, radius):
'''
Returns list of arrays of neighborhood values for input data. Formatted to index the multidimensional
array lookup table.
Parameters
----------
data: array_like
Input string of data.
radius: int
Radius of the neighborood.
Returns
-------
neighborhood: list of arrays
List of neighbor arrays formatted to index multidimensional array lookup table. Of the form
[array(leftmost neighbors), ..., array(left nearest neighbors), array(data), array(right nearest
neighbors), ..., array(rightmost neighbors)]
'''
#initialize lists for left enighbor values and right neighbor values
left = []
right = []
#run loop through radial values up to the radius. left values are run in reverse so that both sides build
#from inside out. r = 0 is skipped and added on in the last step
for r in xrange(radius):
left += [np.roll(data, radius-r)]
right += [np.roll(data, -(r+1))]
neighbors = left + [data] + right
return neighbors
In [9]:
def random_state(length, alphabet_size):
'''
Returns array of randomly selected states of given length with given alphabet size.
Parameters
----------
length: int
Lenght of the array
alphabet_size: int
Size of the alphabet to be randomly selected from in creating array.
Returns
-------
state: array
Array of given length of states randomly selected from {0,...,A -1} for alphabet size A.
'''
state = np.random.choice(np.arange(0, alphabet_size), length)
return state
In [10]:
def uniform_state(length, symbol):
'''
Returns homogeneous array of given symbol at given length
Parameters
----------
length: int
Length of the array to be created.
symbol: int
Symbol which will uniformly populate the array.
Returns
-------
out: array
Homogeneous array of given length uniformly populated by given symbol.
'''
if symbol == 1:
state = np.ones(length, dtype = int)
elif symbol == 0:
state = np.zeros(length, dtype = int)
else:
state = np.zeros(length, dtype = int)
state[...] = symbol
return state
In [11]:
def variable_density_state(length, p):
state = np.zeros(length, dtype = int)
for i in xrange(length):
if np.random.rand() < p:
state[i] = 1
return state
In [12]:
def n_ones(length, n ):
state = np.zeros(length, dtype = int)
ind = np.arange(0,length,1)
count = 0
while count < n:
test = np.random.choice(ind)
if state[test] == 0:
state[test] =1
count += 1
return state
In [13]:
def domain_18(length, bias, wildcard = 'even'):
state = np.zeros(length, dtype = int)
for s in xrange(length):
if wildcard == 'even':
if s % 2 == 0 and np.random.rand() < bias:
state[s] = 1
elif wildcard == 'odd':
if (s+1) % 2 == 0 and np.random.rand() < bias:
state[s] = 1
return state
In [15]:
def transducer_18():
'''
Zero-wildcard transducer. Domain in ECA rule 18.
Returns
-------
Tuple: (transducer, start_state, synch_time)
transducer: mapping of current machine state and string input to next machine state and output
start_state: starting state of the transducer
synch_time: number of transient steps needed for transducer to synchronize with input data
'''
transducer = { ('a', 0):('a', 0) , ('a', 1):('b', 0) , ('b', 1):('b', 1) , ('b', 0):('c', 0) , \
('c', 0):('b', 0) , ('c',1):('b',0)}
start_state = 'a'
synch_time = 0
return (transducer, start_state, synch_time)
In [16]:
def transducer_54():
'''
Domain transducer for ECA rule 54. 8 state, period 2 domain.
Returns
-------
Tuple: (transducer, start_state, synch_time)
transducer: mapping of current machine state and string input to next machine state and output
start_state: starting state of the transducer
synch_time: number of transient steps needed for transducer to synchronize with input data.
'''
transducer = { ('ta', 0):('tb', 0) , ('ta', 1):('tc', 0) , ('tb', 0):('td', 0) , ('tb', 1):('te', 0) , \
('tc', 0):('tf', 0) , ('tc',1):('tg',0) , ('td', 0):('d',0) , ('td', 1):('a',0) , \
('te', 0):('b',0) , ('te',1):('g',0) , ('tf', 0):('c', 0) , ('tf', 1):('f',0) , \
('tg', 0):('h', 0) , ('tg', 1):('e', 0) , \
('a', 0):('b', 0) , ('a', 1):('g', 1) , ('b', 0):('c', 0) , ('b', 1):('f', 2) , \
('c', 0):('d', 0) , ('c', 1):('a', 3) , ('d', 0):('d', 4) , ('d', 1):('a', 0) , \
('e', 0):('c', 5) , ('e', 1):('f', 0) , ('f', 0):('b', 6) , ('f', 1):('g', 0) , \
('g' , 0):('e', 7) , ('g', 1):('h', 0) , ('h', 0):('e', 0) , ('h', 1):('h', 8)}
start_state = 'ta'
synch_time = 3
return (transducer, start_state, synch_time)
In [17]:
class transducer(object):
def __init__(self, d, start, synch):
'''Instantiates transducer.
Parameters
----------
d : Python dict
Tuple to Tuple mapping. Maps (current machine state, input symbol) to
(next machine state, output symbol).
start: str
Start state for the transducer.
synch: int
Number of synchronization steps needed to reach recurrent component of transducer.
Instance Variables
------------------
self._transducer:
Python dict mapping current machine state and input symbol to next machine state and output symbol
self._start_state:
Start state for the machine.
self._synch:
Number of synchronization steps.
'''
self._transducer = d
self._start = start
self._synch = synch
def scan(self, data, direction = 'right'):
'''
Takes data string as transducer input and returns corresponding transducer output
Parameters
----------
data: array_like
Input string to be scanned by transducer
direction: str
Direction 'right' or 'left' which transducer will scan the input data. Default to 'right'.
Returns
-------
output: array
Array of raw transducer output.
'''
length = np.size(data)
output = np.zeros(length, dtype = int)
#initialize machine state to start state value
machine = self._start
#run loop in appropriate direction to scan the data with the transducer using the trandsucers
#dictionary / mapping, updating machine state and output at each step
for c in xrange(length + self._synch):
if direction == 'right':
machine, output[c % length] = self._transducer[(machine, data[c % length])]
elif direction == 'left':
i = length - (c+1)
machine, output[i % length] = self._transducer[(machine, data[i % length])]
return output
def mask(self, data, alphabet_size, direction = 'right'):
'''
Takes data string as transducer input and returns corresponding transducer output overlayed
ontop of original data, i.e. domains retain their structure.
Parameters
----------
data: array_like
Input string to be scanned by transducer.
direction: str
Direction 'right' or 'left' which transducer will scan the input data. Default to 'right'.
Returns
-------
output: array
Array of transducer output overlayed ontop of original data.
'''
length = np.size(data)
mask = np.zeros(length, dtype = int)
hold = np.copy(data)
#initialize machine state
machine = self._start
#scan data with transducer, updating machine state and output each step
for c in xrange(length + self._synch):
if direction == 'right':
machine, mask[c % length] = self._transducer[(machine, data[c % length])]
elif direction == 'left':
i = length - (c+1)
machine, mask[i % length] = self._transducer[(machine, data[i % length])]
#clear non-domain values so mask can be simply added on top of original data, but not have
#interference with non-zero non-domain values.
hold[mask != 0] = 0
#transducer output is zero for all state values in domain, so must add (alphabet_size - 1)
#onto transducer output to give unique values to defects above the original alphabet, which
#may be present in the domain.
mask[mask != 0] += (alphabet_size - 1)
return hold + mask
def domain_filter(self, data, fill = False):
'''
Takes data string as transducer input and returns simplified transducer output with only two symbols;
domain (0) or not domain (1).
Parameters
----------
data: array_like
Input string to be scanned by transducer.
fill: bool
Set to True if there is a discrepency between left and right transducer scan. This will
run both directions and fill-in between.
False will return simplified output of single-direction scan.
Returns
-------
output: array
Array of simplified transducer output.
'''
scanL = self.scan(data, 'left')
scanR = self.scan(data, 'right')
length = np.size(data)
hold = np.zeros(length, dtype = int)
if fill:
go = False
for c in xrange(length):
if scanL[c] != 0 and go == False:
go = True
hold[c] = 1
elif (scanL[c] == 0) and (scanR[c] == 0) and (go == True):
hold[c] = 1
elif scanR[c] != 0 and scanL[c] == 0 and go == True:
hold[c] = 1
go = False
else:
hold = np.copy(scanR)
hold[hold != 0] = 1
return hold
In [ ]: