DOFF
field to achieve interleaved writing pattern.DLASTSGA
field set to the address of the next TCD in memory (except for last TCD,
which points back to first TCD).
In [1]:
import numpy as np
# Configurable test settings
channel_count = 3 # Simulate sampling from multiple channels.
sample_count = 8 # Number of samples (each sample -> one value per channel).
N = channel_count * sample_count
src_data = np.arange(1, N + 1, dtype='uint8')
In [2]:
src_chunks = [src_data[i * channel_count:(i + 1) * channel_count]
for i in xrange(sample_count)]
dst_data = np.column_stack(src_chunks).ravel()
for i, chunk in enumerate(src_chunks):
print 'SOURCE%d:' % i, chunk
# Show interleaved result
print 'TARGET:', dst_data
In [3]:
from teensy_minimal_rpc import SerialProxy
import teensy_minimal_rpc.DMA as dma
# Disconnect from existing proxy (if available)
try:
del proxy
except NameError:
pass
proxy = SerialProxy()
In [4]:
proxy.free_all()
# Allocate source array
src_addr = proxy.mem_alloc(N)
# Allocate destination array
dst_addr = proxy.mem_alloc(N)
src_addrs = [src_addr + i * channel_count for i in xrange(sample_count)]
tcds_addr = proxy.mem_aligned_alloc(32, sample_count * 32)
hw_tcds_addr = 0x40009000
tcd_addrs = [tcds_addr + 32 * i for i in xrange(sample_count)]
hw_tcd_addrs = [hw_tcds_addr + 32 * i for i in xrange(sample_count)]
# Fill first 16 bytes of source array with the numbers 1-N
proxy.mem_cpy_host_to_device(src_addr, src_data)
for i in xrange(sample_count):
print 'SOURCE%d: ' % i, proxy.mem_cpy_device_to_host(src_addrs[i], channel_count)
In [5]:
# Create Transfer Control Descriptor configuration for first chunk, encoded
# as a Protocol Buffer message.
tcd0_msg = dma.TCD(CITER_ELINKNO=dma.R_TCD_ITER_ELINKNO(ITER=1),
BITER_ELINKNO=dma.R_TCD_ITER_ELINKNO(ITER=1),
ATTR=dma.R_TCD_ATTR(SSIZE=dma.R_TCD_ATTR._8_BIT,
DSIZE=dma.R_TCD_ATTR._8_BIT),
NBYTES_MLNO=channel_count,
SADDR=int(src_addrs[0]),
SOFF=1,
SLAST=-channel_count,
DADDR=int(dst_addr),
DOFF=sample_count,
DLASTSGA=int(tcd_addrs[1]),
CSR=dma.R_TCD_CSR(START=0, DONE=False, ESG=True))
# Convert Protocol Buffer encoded TCD to bytes structure.
tcd0 = proxy.tcd_msg_to_struct(tcd0_msg)
# Create binary TCD struct for each TCD protobuf message and copy to device
# memory.
for i, src_addr_i in enumerate(src_addrs):
tcd_i = tcd0.copy()
tcd_i['SADDR'] = src_addr_i
tcd_i['DADDR'] = dst_addr + i
tcd_i['DLASTSGA'] = tcd_addrs[(i + 1) % len(tcd_addrs)]
tcd_i['CSR'] |= (1 << 4) # | 0x1 # Automatically start transfers 2-n
# __N.B.,__ Setting `START` bit causes destination bus error.
# if i > 0:
# tcd_i['CSR'] |= 0x1 # Automatically start transfers 2-n
proxy.mem_cpy_host_to_device(tcd_addrs[i], tcd_i.tostring())
In [6]:
# Fill the destination array with all zeros (to show transfer progress below).
proxy.mem_fill_uint32(dst_addr, 0, N / 4)
for i in xrange(sample_count):
print 'SOURCE%d: ' % i, proxy.mem_cpy_device_to_host(src_addrs[i], channel_count)
# Load initial TCD to DMA channel 0.
proxy.mem_cpy_host_to_device(hw_tcd_addrs[0], tcd0.tostring())
print 'DEST:'
# Trigger once per chunk
for i in xrange(sample_count):
proxy.update_dma_registers(dma.Registers(SSRT=0))
device_dst_data = proxy.mem_cpy_device_to_host(dst_addr, N)
print ' Trigger %d:' % i, device_dst_data
# Verify device result matches expected result computed on host.
assert((device_dst_data == dst_data).all())
In [ ]: