Don't forget to delete the hdmi_out and hdmi_in when finished

Pynq Ponq Demonstration

This is a demonstration of a video filter that allows you to play "Pynq Ponq" (based off of a certain classic game). The video logic is taken care of by the FPGA, while the game logic is handled in this Python script. This filter is a proof of concept, and shows how Python can keep up with a simple real-time computing task.

1. Download base overlay and partial bitstream to the board

Ensure that the camera is not connected to the board. Run the following script to provide the PYNQ with its base overlay.


In [1]:
from pynq.drivers.video import HDMI
from pynq import Bitstream_Part
from pynq.board import Register
from pynq import Overlay
from random import choice
from random import randint
from threading import Lock as Mutex

color = {
    'red':0xff0000,
    'green':0x0000ff,
    'blue':0x00ff00,
    'magenta':0xffff00,
    'yellow':0xff00ff,
    'cyan':0x00ffff,
    'black':0x000000,
    'gray':0xbebebe,
    'white':0xffffff
}

In [2]:
#unplug Camera
Overlay("demo.bit").download()
Bitstream_Part("ponq_p.bit").download()

2. Plug in the camera and turn it on

This will allow the input from the camera to be fed through the PYNQ to the output monitor.


In [3]:
#Plug in Camera and turn on

hdmi_in = HDMI('in')
resolution = 2
hdmi_out = HDMI('out', video_mode=resolution, frame_list=hdmi_in.frame_list)

hdmi_out.start()
hdmi_in.start()

In [ ]:

3. Game Logic

The following several cells are game logic for our game of Pynq Ponq. Run each of them.


In [4]:
# Game Constants
ball_size = 8 * resolution
deadzone = ball_size + 1 * resolution
paddle_height = 65 * resolution
paddle_width = 5 * resolution
padding = 10 * resolution
game_width = hdmi_in.frame_width()
game_height = hdmi_in.frame_height()

paddle_edge_left = padding + paddle_width
paddle_edge_right = (game_width - paddle_width) - padding

rectangle_id = 0
# Mutex for shared resource
id_lock = Mutex()

score_height = 10 * resolution
point_width = 20 * resolution

class Rectangle:
    """
    rectangle_id is a unique number that identifies this rectangle.
    It is used to initialize the four registers that will be used to manipulate the rectangle
    args is a dictionary that can be used to initialize rectangles with specific attributes
    """
    def __init__(self, args=None):
        global rectangle_id
        # Write to the registers using a mutex
        self.reg_lock = Mutex()
        with self.reg_lock:
            with id_lock:
                self.x_pos  = Register((5*rectangle_id) + 0)
                self.y_pos  = Register((5*rectangle_id) + 1)
                self.width  = Register((5*rectangle_id) + 2)
                self.height = Register((5*rectangle_id) + 3)
                self.color  = Register((5*rectangle_id) + 4)
                rectangle_id += 1
        
        if args['type'] == 'paddle':
            with self.reg_lock:
                self.width.write(paddle_width)
                self.height.write(paddle_height)
                self.y_pos.write((game_height // 2) - (paddle_height // 2))
            if args['location'] == 'left':
                with self.reg_lock:
                    self.x_pos.write(padding)
                    self.color.write(color['blue'])
            elif args['location'] == 'right':
                with self.reg_lock:
                    self.x_pos.write(paddle_edge_right)
                    self.color.write(color['red'])
        elif args['type'] == 'ball':
            with self.reg_lock:
                self.width.write(ball_size)
                self.height.write(ball_size)
                self.x_pos.write(game_width // 2)
                self.y_pos.write(game_height // 2)
                self.color.write(randint(0,0xffffff))
        elif args['type'] == 'score':
            with self.reg_lock:
                self.x_pos.write(game_width // 2)
                self.y_pos.write(0)
                self.width.write(0)
                self.height.write(score_height)
            if args['location'] == 'left':
                with self.reg_lock:
                    self.color.write(color['blue'])
            elif args['location'] == 'right':
                with self.reg_lock:
                    self.color.write(color['red'])
        else:
            with self.reg_lock:
                self.width.write(0)
                self.height.write(0)
                self.x_pos.write(0)
                self.y_pos.write(0)
                self.color.write(color['white'])
        pass
            
    def erase(self):
        """
        Erase this object from the display by making its width and height 0
        """
        with self.reg_lock:
            self.width.write(0)
            self.height.write(0)
        pass
        
paddle_1 = Rectangle({'type':'paddle', 'location':'left'})
paddle_2 = Rectangle({'type':'paddle', 'location':'right'})
score_1 = Rectangle({'type':'score', 'location':'left'})
score_2 = Rectangle({'type':'score', 'location':'right'})

In [5]:
def player_score(player=0, points=0):
    if player == 1:
        # This player's score is displayed on the left side of the board
        score_width = points * point_width
        if score_width > game_width // 2: score_width = game_width // 2
        with score_1.reg_lock:
            score_1.width.write(score_width)
            score_1.x_pos.write((game_width // 2) - score_width)
    elif player == 2:
        # This player's score is displayed on the right side of the board
        # x_pos doesn't need to change for this one
        with score_2.reg_lock:
            score_2.width.write(points * point_width)
    else:
        print("Player %d doesn't exist" % player)
    pass

In [6]:
from threading import Thread
from time import sleep
from random import choice, randrange

player_1_score = 0
player_2_score = 0
score_1_lock = Mutex()
score_2_lock = Mutex()

class Ball(Thread):
    """
    This will create a ball object that runs on its own thread
    All Ball movement can be paused with the game_running boolean
    This thread can be stopped by setting halt to True
    """
    def __init__(self):
        # Initialize super class
        Thread.__init__(self)
        self.ball = Rectangle({'type':'ball'})
        self.halt = False
        # This float determines how often the game should update
        self.update_delay = 0.02
        #note: this uses choice rather than randrange, even though it's less concise, to avoid a case of zero x velocity
        self.velocity = [choice([-5,-4,-3,-2,-1,1,2,3,4,5]),randrange(-5,5,1)]
        self.speed = 2 * resolution
        self.ospeed = self.speed
        self.maxspeed = 6 * resolution

        self.running = False
        pass
    
    def run(self):
        """
        This is where the physics of ball movement will be handled
        It is also where any game logic (such as updating scores) will happen
        """
        global player_1_score
        global player_2_score
        while not self.halt:
            sleep(self.update_delay)
            if self.running:
                ##################
                # Game Logic
                ##################
                x_pos = 0
                y_pos = 0
                
                with self.ball.reg_lock:
                    x_pos = self.ball.x_pos.read()
                    y_pos = self.ball.y_pos.read()
                
                # Determine which paddle the ball is touching
                side = None
                if x_pos + ball_size >= paddle_edge_right:
                    side = "right"
                elif x_pos <= paddle_edge_left:
                    side = "left"
                    
                # Are we even near a paddle?
                if side:
                    paddle_min = 0                   
                    if side == "left":
                        with paddle_1.reg_lock:
                            paddle_min = paddle_1.y_pos.read()
                    if side == "right":
                        with paddle_2.reg_lock:
                            paddle_min = paddle_2.y_pos.read()
                    paddle_max = paddle_min + paddle_height
                    
                    # Find out if the ball is in the same height range as the paddle
                    if paddle_min - ball_size <= y_pos and y_pos <= paddle_max and ((side == "right" and self.velocity[0] > 0) or (side == "left" and self.velocity[0] < 0)):
                    # add up to +/- velocity-1 (the -1 ensures that it doesn't go to 0) to randomize angle
                    # limiting to velocity-1 ensures that it stays the same sign and non-zero, then the *-1 inverts it.
                        if self.velocity[0] < -1 or self.velocity[0] > 1:
                            delta = randrange(-1*abs(self.velocity[0] - 1), abs(self.velocity[0] - 1),1)
                        else:
                            delta = 0
                        self.velocity[0] = self.velocity[0] + delta
                        self.velocity[0] *= -1
                        #choice(range(-10,-1))
                        # increase speed on each hit, up to maxspeed
                        self.speed = self.speed + resolution / 4
                        if self.speed >= self.maxspeed:
                            self.speed = self.maxspeed

                # Bounce off Ceiling and Floor
                if y_pos >= (game_height - ball_size) or y_pos <= 0:
                    self.velocity[1] *= -1
                
                # Multiply velocity by the speed scalar
                x_pos += int(self.velocity[0] * self.speed)
                y_pos += int(self.velocity[1] * self.speed)

                
                
                # Error correction
                if x_pos < 0: x_pos = 0
                if y_pos < 0: y_pos = 0
                    
                # Player 2 Scores
                if x_pos <= 0:
                    with score_2_lock:
                        player_2_score += 1
                        player_score(2,player_2_score)
                    x_pos = game_width // 2
                    y_pos = game_height // 2
                    # Ball starts off towards winner
                    self.velocity = [choice([1,2,3,4,5]),randrange(-5,5,1)]
                    self.speed = self.ospeed
                    self.ball.color.write(randint(0,0xffffff))
                
                # Player 1 Scores
                if x_pos >= game_width:
                    with score_1_lock:
                        player_1_score += 1
                        player_score(1,player_1_score)
                    x_pos = game_width // 2
                    y_pos = game_height // 2
                    # Ball starts off towards winner
                    self.velocity = [choice([-5,-4,-3,-2,-1]),randrange(-5,5,1)]
                    self.speed = self.ospeed
                    self.ball.color.write(randint(0,0xffffff))
                
                # Write new data to registers
                with self.ball.reg_lock:
                    self.ball.x_pos.write(x_pos)
                    self.ball.y_pos.write(y_pos)
                # End Game Logic
        pass

In [7]:
ball_1 = None
ball_2 = None
balls = []

In [8]:
balls.clear()
rectangle_id= 4 # TODO for testing only
if ball_1:
    ball_1.halt = True
ball_1 = Ball()
ball_1.start()

balls.append(ball_1)

In [9]:
rectangle_id= 5 # TODO for testing only
if ball_2:
    ball_2.halt = True
ball_2 = Ball()
ball_2.start()
balls.append(ball_2)

4. Create the user interface

Here's a rudimentary user interface to the game.


In [10]:
import ipywidgets as widgets

# Define the player paddles
# player one
player_1 = widgets.IntSlider(
    value=255,
    min=0 + deadzone,
    max=game_height - paddle_height - deadzone,
    step=1,
    description='Player 1',
    disabled=False,
    continuous_update=True,
    orientation='vertical',
    readout=True,
    readout_format='i',
    slider_color='blue'
)
# player two
player_2 = widgets.IntSlider(
    value=255,
    min=0 + deadzone,
    max=game_height - paddle_height - deadzone,
    step=1,
    description='Player 2',
    disabled=False,
    continuous_update=True,
    orientation='vertical',
    readout=True,
    readout_format='i',
    slider_color='red'
)

paddle_1.width = '400px'
paddle_2.width = '400px'


def move_paddle_1(*args):
    with paddle_1.reg_lock:
        paddle_1.y_pos.write((game_height - player_1.value) - paddle_height)
player_1.observe(move_paddle_1, 'value')
    
def move_paddle_2(*args):
    with paddle_2.reg_lock:
        paddle_2.y_pos.write((game_height - player_2.value) - paddle_height)
player_2.observe(move_paddle_2, 'value')

In [11]:
from IPython.display import clear_output
from ipywidgets import Button, HBox, VBox, Label

words = ['Start', 'Pause', 'Reset', 'Stop']
items = [Button(description=w) for w in words]

def on_start_clicked(b):
    for ball in balls:
        ball.running = True
        
def on_pause_clicked(b):
    for ball in balls:
        ball.running = False
    
def on_stop_clicked(b):
    # Stop all ball threads
    for ball in balls:
        ball.halt = True
    # write zeroes to all width and height
    for x in range(0,8):
        Register((4*x)+0).write(0) # xpos
        Register((4*x)+1).write(0) # ypos
        Register((4*x)+2).write(0)  # width
        Register((4*x)+3).write(0)  # height
    
def on_reset_clicked(b):
    # Reset player score
    global player_1_score
    global player_2_score
    player_1_score = 0
    player_2_score = 0
    player_score(1,0)
    player_score(2,0)
    # Reset paddle position
    paddle_1.y_pos.write((game_height // 2) - (paddle_height // 2))
    paddle_2.y_pos.write((game_height // 2) - (paddle_height // 2))
    # Put ball in center change its color 
    for ball in balls:
        ball.ball.x_pos.write(game_width // 2)
        ball.ball.y_pos.write(game_height // 2)
        ball.ball.color.write(randint(0,0xffffff))

    
items[0].on_click(on_start_clicked)
items[1].on_click(on_pause_clicked)
items[2].on_click(on_reset_clicked)
items[3].on_click(on_stop_clicked)

In [12]:
GUI = VBox([ HBox([player_1, player_2]), HBox([items[0], items[1], items[2], items[3]])])
GUI.layout.justify_content = 'center'
GUI


Widget Javascript not detected.  It may not be installed properly. Did you enable the widgetsnbextension? If not, then run "jupyter nbextension enable --py --sys-prefix widgetsnbextension"

5. Play

The left and right sliders control the left and right paddles respectively.

Pressing start starts the game. Pressing pause pauses the game. Pressing reset resets the game. And pressing stop kills the game.

6. Play with an AI

The above controls are not the most user friendly for two players, so run the next couple of scripts to play against a simple AI.


In [17]:
# Have AI Play
min_height = 0 + deadzone,
max_height = game_height - paddle_height - deadzone,
class AI(Thread):
    """
    This will create an AI to control a specific paddle
    """
    def __init__(self, paddle):
        # Initialize super class
        Thread.__init__(self)
        self.halt = False
        self.paddle = paddle
        # This float determines how often the game should update
        self.update_delay = 0.02
        self.running = False
        pass
    
    def run(self):
        """
        This is where AI logic happens
        """
        while not self.halt:
            sleep(self.update_delay)
            if self.running:
                # Move the paddle
                current_pos = self.paddle.y_pos.read()
                self.paddle.y_pos.write(current_pos - 1)
                pass

In [18]:
computer_1 = None

In [19]:
if computer_1:
    computer_1.halt = True
computer_1 = AI(paddle_1)
computer_1.start()

In [16]:
from IPython.display import HTML
HTML('''<script>
code_show=true; 
function code_toggle() {
 if (code_show){
 $('div.input').hide();
 } else {
 $('div.input').show();
 }
 code_show = !code_show
} 
$( document ).ready(code_toggle);
</script>
The raw code for this IPython notebook is by default hidden for easier reading.
To toggle on/off the raw code, click <a href="javascript:code_toggle()">here</a>.''')


Out[16]:
The raw code for this IPython notebook is by default hidden for easier reading. To toggle on/off the raw code, click here.

7. Clean up

When you are done playing Pynq Ponq, run the following code to stop the video stream.


In [20]:
hdmi_out.stop()
hdmi_in.stop()
del hdmi_out
del hdmi_in