This is a demonstration of a video filter that allows you to play "Pynq Ponq" (based off of a certain classic game). The video logic is taken care of by the FPGA, while the game logic is handled in this Python script. This filter is a proof of concept, and shows how Python can keep up with a simple real-time computing task.
In [1]:
from pynq.drivers.video import HDMI
from pynq import Bitstream_Part
from pynq.board import Register
from pynq import Overlay
from random import choice
from random import randint
from threading import Lock as Mutex
color = {
'red':0xff0000,
'green':0x0000ff,
'blue':0x00ff00,
'magenta':0xffff00,
'yellow':0xff00ff,
'cyan':0x00ffff,
'black':0x000000,
'gray':0xbebebe,
'white':0xffffff
}
In [2]:
#unplug Camera
Overlay("demo.bit").download()
Bitstream_Part("ponq_p.bit").download()
In [3]:
#Plug in Camera and turn on
hdmi_in = HDMI('in')
resolution = 2
hdmi_out = HDMI('out', video_mode=resolution, frame_list=hdmi_in.frame_list)
hdmi_out.start()
hdmi_in.start()
In [ ]:
In [4]:
# Game Constants
ball_size = 8 * resolution
deadzone = ball_size + 1 * resolution
paddle_height = 65 * resolution
paddle_width = 5 * resolution
padding = 10 * resolution
game_width = hdmi_in.frame_width()
game_height = hdmi_in.frame_height()
paddle_edge_left = padding + paddle_width
paddle_edge_right = (game_width - paddle_width) - padding
rectangle_id = 0
# Mutex for shared resource
id_lock = Mutex()
score_height = 10 * resolution
point_width = 20 * resolution
class Rectangle:
"""
rectangle_id is a unique number that identifies this rectangle.
It is used to initialize the four registers that will be used to manipulate the rectangle
args is a dictionary that can be used to initialize rectangles with specific attributes
"""
def __init__(self, args=None):
global rectangle_id
# Write to the registers using a mutex
self.reg_lock = Mutex()
with self.reg_lock:
with id_lock:
self.x_pos = Register((5*rectangle_id) + 0)
self.y_pos = Register((5*rectangle_id) + 1)
self.width = Register((5*rectangle_id) + 2)
self.height = Register((5*rectangle_id) + 3)
self.color = Register((5*rectangle_id) + 4)
rectangle_id += 1
if args['type'] == 'paddle':
with self.reg_lock:
self.width.write(paddle_width)
self.height.write(paddle_height)
self.y_pos.write((game_height // 2) - (paddle_height // 2))
if args['location'] == 'left':
with self.reg_lock:
self.x_pos.write(padding)
self.color.write(color['blue'])
elif args['location'] == 'right':
with self.reg_lock:
self.x_pos.write(paddle_edge_right)
self.color.write(color['red'])
elif args['type'] == 'ball':
with self.reg_lock:
self.width.write(ball_size)
self.height.write(ball_size)
self.x_pos.write(game_width // 2)
self.y_pos.write(game_height // 2)
self.color.write(randint(0,0xffffff))
elif args['type'] == 'score':
with self.reg_lock:
self.x_pos.write(game_width // 2)
self.y_pos.write(0)
self.width.write(0)
self.height.write(score_height)
if args['location'] == 'left':
with self.reg_lock:
self.color.write(color['blue'])
elif args['location'] == 'right':
with self.reg_lock:
self.color.write(color['red'])
else:
with self.reg_lock:
self.width.write(0)
self.height.write(0)
self.x_pos.write(0)
self.y_pos.write(0)
self.color.write(color['white'])
pass
def erase(self):
"""
Erase this object from the display by making its width and height 0
"""
with self.reg_lock:
self.width.write(0)
self.height.write(0)
pass
paddle_1 = Rectangle({'type':'paddle', 'location':'left'})
paddle_2 = Rectangle({'type':'paddle', 'location':'right'})
score_1 = Rectangle({'type':'score', 'location':'left'})
score_2 = Rectangle({'type':'score', 'location':'right'})
In [5]:
def player_score(player=0, points=0):
if player == 1:
# This player's score is displayed on the left side of the board
score_width = points * point_width
if score_width > game_width // 2: score_width = game_width // 2
with score_1.reg_lock:
score_1.width.write(score_width)
score_1.x_pos.write((game_width // 2) - score_width)
elif player == 2:
# This player's score is displayed on the right side of the board
# x_pos doesn't need to change for this one
with score_2.reg_lock:
score_2.width.write(points * point_width)
else:
print("Player %d doesn't exist" % player)
pass
In [6]:
from threading import Thread
from time import sleep
from random import choice, randrange
player_1_score = 0
player_2_score = 0
score_1_lock = Mutex()
score_2_lock = Mutex()
class Ball(Thread):
"""
This will create a ball object that runs on its own thread
All Ball movement can be paused with the game_running boolean
This thread can be stopped by setting halt to True
"""
def __init__(self):
# Initialize super class
Thread.__init__(self)
self.ball = Rectangle({'type':'ball'})
self.halt = False
# This float determines how often the game should update
self.update_delay = 0.02
#note: this uses choice rather than randrange, even though it's less concise, to avoid a case of zero x velocity
self.velocity = [choice([-5,-4,-3,-2,-1,1,2,3,4,5]),randrange(-5,5,1)]
self.speed = 2 * resolution
self.ospeed = self.speed
self.maxspeed = 6 * resolution
self.running = False
pass
def run(self):
"""
This is where the physics of ball movement will be handled
It is also where any game logic (such as updating scores) will happen
"""
global player_1_score
global player_2_score
while not self.halt:
sleep(self.update_delay)
if self.running:
##################
# Game Logic
##################
x_pos = 0
y_pos = 0
with self.ball.reg_lock:
x_pos = self.ball.x_pos.read()
y_pos = self.ball.y_pos.read()
# Determine which paddle the ball is touching
side = None
if x_pos + ball_size >= paddle_edge_right:
side = "right"
elif x_pos <= paddle_edge_left:
side = "left"
# Are we even near a paddle?
if side:
paddle_min = 0
if side == "left":
with paddle_1.reg_lock:
paddle_min = paddle_1.y_pos.read()
if side == "right":
with paddle_2.reg_lock:
paddle_min = paddle_2.y_pos.read()
paddle_max = paddle_min + paddle_height
# Find out if the ball is in the same height range as the paddle
if paddle_min - ball_size <= y_pos and y_pos <= paddle_max and ((side == "right" and self.velocity[0] > 0) or (side == "left" and self.velocity[0] < 0)):
# add up to +/- velocity-1 (the -1 ensures that it doesn't go to 0) to randomize angle
# limiting to velocity-1 ensures that it stays the same sign and non-zero, then the *-1 inverts it.
if self.velocity[0] < -1 or self.velocity[0] > 1:
delta = randrange(-1*abs(self.velocity[0] - 1), abs(self.velocity[0] - 1),1)
else:
delta = 0
self.velocity[0] = self.velocity[0] + delta
self.velocity[0] *= -1
#choice(range(-10,-1))
# increase speed on each hit, up to maxspeed
self.speed = self.speed + resolution / 4
if self.speed >= self.maxspeed:
self.speed = self.maxspeed
# Bounce off Ceiling and Floor
if y_pos >= (game_height - ball_size) or y_pos <= 0:
self.velocity[1] *= -1
# Multiply velocity by the speed scalar
x_pos += int(self.velocity[0] * self.speed)
y_pos += int(self.velocity[1] * self.speed)
# Error correction
if x_pos < 0: x_pos = 0
if y_pos < 0: y_pos = 0
# Player 2 Scores
if x_pos <= 0:
with score_2_lock:
player_2_score += 1
player_score(2,player_2_score)
x_pos = game_width // 2
y_pos = game_height // 2
# Ball starts off towards winner
self.velocity = [choice([1,2,3,4,5]),randrange(-5,5,1)]
self.speed = self.ospeed
self.ball.color.write(randint(0,0xffffff))
# Player 1 Scores
if x_pos >= game_width:
with score_1_lock:
player_1_score += 1
player_score(1,player_1_score)
x_pos = game_width // 2
y_pos = game_height // 2
# Ball starts off towards winner
self.velocity = [choice([-5,-4,-3,-2,-1]),randrange(-5,5,1)]
self.speed = self.ospeed
self.ball.color.write(randint(0,0xffffff))
# Write new data to registers
with self.ball.reg_lock:
self.ball.x_pos.write(x_pos)
self.ball.y_pos.write(y_pos)
# End Game Logic
pass
In [7]:
ball_1 = None
ball_2 = None
balls = []
In [8]:
balls.clear()
rectangle_id= 4 # TODO for testing only
if ball_1:
ball_1.halt = True
ball_1 = Ball()
ball_1.start()
balls.append(ball_1)
In [9]:
rectangle_id= 5 # TODO for testing only
if ball_2:
ball_2.halt = True
ball_2 = Ball()
ball_2.start()
balls.append(ball_2)
In [10]:
import ipywidgets as widgets
# Define the player paddles
# player one
player_1 = widgets.IntSlider(
value=255,
min=0 + deadzone,
max=game_height - paddle_height - deadzone,
step=1,
description='Player 1',
disabled=False,
continuous_update=True,
orientation='vertical',
readout=True,
readout_format='i',
slider_color='blue'
)
# player two
player_2 = widgets.IntSlider(
value=255,
min=0 + deadzone,
max=game_height - paddle_height - deadzone,
step=1,
description='Player 2',
disabled=False,
continuous_update=True,
orientation='vertical',
readout=True,
readout_format='i',
slider_color='red'
)
paddle_1.width = '400px'
paddle_2.width = '400px'
def move_paddle_1(*args):
with paddle_1.reg_lock:
paddle_1.y_pos.write((game_height - player_1.value) - paddle_height)
player_1.observe(move_paddle_1, 'value')
def move_paddle_2(*args):
with paddle_2.reg_lock:
paddle_2.y_pos.write((game_height - player_2.value) - paddle_height)
player_2.observe(move_paddle_2, 'value')
In [11]:
from IPython.display import clear_output
from ipywidgets import Button, HBox, VBox, Label
words = ['Start', 'Pause', 'Reset', 'Stop']
items = [Button(description=w) for w in words]
def on_start_clicked(b):
for ball in balls:
ball.running = True
def on_pause_clicked(b):
for ball in balls:
ball.running = False
def on_stop_clicked(b):
# Stop all ball threads
for ball in balls:
ball.halt = True
# write zeroes to all width and height
for x in range(0,8):
Register((4*x)+0).write(0) # xpos
Register((4*x)+1).write(0) # ypos
Register((4*x)+2).write(0) # width
Register((4*x)+3).write(0) # height
def on_reset_clicked(b):
# Reset player score
global player_1_score
global player_2_score
player_1_score = 0
player_2_score = 0
player_score(1,0)
player_score(2,0)
# Reset paddle position
paddle_1.y_pos.write((game_height // 2) - (paddle_height // 2))
paddle_2.y_pos.write((game_height // 2) - (paddle_height // 2))
# Put ball in center change its color
for ball in balls:
ball.ball.x_pos.write(game_width // 2)
ball.ball.y_pos.write(game_height // 2)
ball.ball.color.write(randint(0,0xffffff))
items[0].on_click(on_start_clicked)
items[1].on_click(on_pause_clicked)
items[2].on_click(on_reset_clicked)
items[3].on_click(on_stop_clicked)
In [12]:
GUI = VBox([ HBox([player_1, player_2]), HBox([items[0], items[1], items[2], items[3]])])
GUI.layout.justify_content = 'center'
GUI
In [17]:
# Have AI Play
min_height = 0 + deadzone,
max_height = game_height - paddle_height - deadzone,
class AI(Thread):
"""
This will create an AI to control a specific paddle
"""
def __init__(self, paddle):
# Initialize super class
Thread.__init__(self)
self.halt = False
self.paddle = paddle
# This float determines how often the game should update
self.update_delay = 0.02
self.running = False
pass
def run(self):
"""
This is where AI logic happens
"""
while not self.halt:
sleep(self.update_delay)
if self.running:
# Move the paddle
current_pos = self.paddle.y_pos.read()
self.paddle.y_pos.write(current_pos - 1)
pass
In [18]:
computer_1 = None
In [19]:
if computer_1:
computer_1.halt = True
computer_1 = AI(paddle_1)
computer_1.start()
In [16]:
from IPython.display import HTML
HTML('''<script>
code_show=true;
function code_toggle() {
if (code_show){
$('div.input').hide();
} else {
$('div.input').show();
}
code_show = !code_show
}
$( document ).ready(code_toggle);
</script>
The raw code for this IPython notebook is by default hidden for easier reading.
To toggle on/off the raw code, click <a href="javascript:code_toggle()">here</a>.''')
Out[16]:
In [20]:
hdmi_out.stop()
hdmi_in.stop()
del hdmi_out
del hdmi_in