In [ ]:
import cffi
import numpy as np
from pynq import MMIO
from pynq import Overlay
from pynq import PL
from pynq.drivers import DMA
from time import sleep, time
# Classifier Dimensions
BATCH = 8192
FEAT = 256
CLASSES = 10
# Addresses
ACCEL_CTRL = 0x43C00000
AXI_DMA_0 = 0x40400000
AXI_DMA_1 = 0x40410000
AXI_TIMER = 0x42800000
# C FFI
ffi = cffi.FFI()
# DMA Configs
DMAConfig1 = {
'DeviceId' : 0,
'BaseAddr' : ffi.cast("uint32_t *",AXI_DMA_0),
'HasStsCntrlStrm' : 0,
'HasMm2S' : 1,
'HasMm2SDRE' : 1,
'Mm2SDataWidth' : 64,
'HasS2Mm' : 0,
'HasS2MmDRE' : 0,
'S2MmDataWidth' : 32,
'HasSg' : 0,
'Mm2sNumChannels' : 1,
'S2MmNumChannels' : 1,
'Mm2SBurstSize' : 256,
'S2MmBurstSize' : 16,
'MicroDmaMode' : 0,
'AddrWidth' : 32
}
DMAConfig2 = {
'DeviceId' : 1,
'BaseAddr' : ffi.cast("uint32_t *",AXI_DMA_1),
'HasStsCntrlStrm' : 0,
'HasMm2S' : 0,
'HasMm2SDRE' : 0,
'Mm2SDataWidth' : 32,
'HasS2Mm' : 1,
'HasS2MmDRE' : 1,
'S2MmDataWidth' : 64,
'HasSg' : 0,
'Mm2sNumChannels' : 1,
'S2MmNumChannels' : 1,
'Mm2SBurstSize' : 16,
'S2MmBurstSize' : 256,
'MicroDmaMode' : 0,
'AddrWidth' : 32
}
# Download the custom overlay
ol = Overlay("classifier_fixed.bit")
ol.download()
# Initialize HLS IP
mmult_ip = MMIO(ACCEL_CTRL,0x10000)
# Start the accelerator
ctrl=mmult_ip.read(0x00)&0x08
mmult_ip.write(0x00, (ctrl|0x81))
ctrl=mmult_ip.read(0x00)
hex(ctrl)
# Initialize DMA1 (mem to FPGA)
dma1 = DMA(AXI_DMA_0, direction=0, attr_dict=DMAConfig1)
dma1.create_buf((CLASSES*4+CLASSES*FEAT+BATCH*FEAT), cacheable=0)
# Initialize DMA2 (FPGA to mem)
dma2 = DMA(AXI_DMA_1, direction=1, attr_dict=DMAConfig2)
dma2.create_buf(BATCH*CLASSES*4, cacheable=0)
# Start DMA transfer from FPGA to memory
dma2.transfer(BATCH*CLASSES*4, direction=1)
In [ ]:
# Initialize offsets, weights and inputs
o = np.load('model_offsets_fixed.npy').astype(np.int32)
w = np.load('model_weights_fixed.npy').astype(np.int8)
i = np.load('test_data.npy').astype(np.uint8)[0:BATCH]
l = np.load('test_labels.npy').astype(np.int32)[0:BATCH]
In [ ]:
# Move offset, weight and input data to DMA buffer
ffi.memmove(dma1.get_buf(), ffi.cast("uint32_t *", o.ctypes.data), CLASSES*4)
ffi.memmove(dma1.get_buf()+CLASSES, ffi.cast("uint32_t *", w.ctypes.data), CLASSES*FEAT)
ffi.memmove(dma1.get_buf()+CLASSES+(CLASSES*FEAT)//4, ffi.cast("uint32_t *", i.ctypes.data), BATCH*FEAT)
# Perform FPGA offloading
start_t = time()
dma1.transfer(CLASSES*4+CLASSES*FEAT+BATCH*FEAT, direction=0)
dma2.wait()
fpga_time = time()-start_t
# Dump FPGA result to a numpy array
c = np.frombuffer(ffi.buffer(
dma2.get_buf(),BATCH*CLASSES*4),
dtype=np.int32).reshape(BATCH,CLASSES)
In [ ]:
# Prepare input and weight matrices for matrix multiplication on CPU
ones = np.ones(BATCH).reshape((BATCH,1))
i_p = np.append(ones, i, axis=1)
w_p = np.append(o.reshape(CLASSES,1), w, axis=1)
# Compute CPU result
start_t = time()
c_ref = np.dot(i_p,w_p.T)
cpu_time = time()-start_t
In [ ]:
# Evaluate validation accuracy
cpu_errors = 0
fpga_errors = 0
for idx in range(BATCH):
fpga_label = np.argmax(c[idx])
cpu_label = np.argmax(c_ref[idx])
actual_label = np.argmax(l[idx])
if (fpga_label!=actual_label):
fpga_errors += 1.
if (cpu_label!=actual_label):
cpu_errors += 1.
# Report results
print("FPGA accuracy: {0:.2f}% validation error".format(fpga_errors/BATCH*100))
print("CPU accuracy: {0:.2f}% validation error".format(cpu_errors/BATCH*100))
if (cpu_time < fpga_time):
print("FPGA has a {0:.2f}x slowdown".format(fpga_time/cpu_time))
else:
print("FPGA has a {0:.2f}x speedup".format(cpu_time/fpga_time))
In [ ]:
# Render a given numpy 2D array of pixel data.
def show(image):
from matplotlib import pyplot
import matplotlib as mpl
fig = pyplot.figure()
ax = fig.add_subplot(1,1,1)
imgplot = ax.imshow(image, cmap=mpl.cm.Greys)
imgplot.set_interpolation('nearest')
ax.xaxis.set_ticks_position('top')
ax.yaxis.set_ticks_position('left')
pyplot.show()
# Inspect one of the hand digits classified by the FPGA
idx = 1
show(i[idx].reshape(16,16))
print("Classified as {} by the FPGA".format(np.argmax(c[idx])))