Using Interrupts and asyncio for Buttons and Switches

This notebook provides a simple example for using asyncio I/O to interact asynchronously with multiple input devices. A task is created for each input device and coroutines used to process the results. To demonstrate, we recreate the flashing LEDs example in the getting started notebook but using interrupts to avoid polling the GPIO devices. The aim is have holding a button result in the corresponding LED flashing.

Initialising the Enviroment

First we import an instantiate all required classes to interact with the buttons, switches and LED and ensure the base overlay is loaded.


In [1]:
from pynq import Overlay, PL
from pynq.board import LED, Switch, Button

Overlay('base.bit').download()

buttons = [Button(i) for i in range(4)]
leds = [LED(i) for i in range(4)]
switches = [Switch(i) for i in range(2)]

Define the flash LED task

Next step is to create a task that waits for the button to be pressed and flash the LED until the button is released. The while True loop ensures that the coroutine keeps running until cancelled so that multiple presses of the same button can be handled.


In [2]:
import asyncio

@asyncio.coroutine
def flash_led(num):
    while True:
        yield from buttons[num].wait_for_value_async(1)
        while buttons[num].read():
            leds[num].toggle()
            yield from asyncio.sleep(0.1)
        leds[num].off()

Create the task

As there are four buttons we want to check, we create four tasks. The function asyncio.ensure_future is used to convert the coroutine to a task and schedule it in the event loop. The tasks are stored in an array so they can be referred to later when we want to cancel them.


In [3]:
tasks = [asyncio.ensure_future(flash_led(i)) for i in range(4)]

Monitoring the CPU Usage

One of the advantages of interrupt-based I/O is to minimised CPU usage while waiting for events. To see how CPU usages is impacted by the flashing LED tasks we create another task that prints out the current CPU utilisation every 3 seconds.


In [4]:
import psutil

@asyncio.coroutine
def print_cpu_usage():
    # Calculate the CPU utilisation by the amount of idle time
    # each CPU has had in three second intervals
    last_idle = [c.idle for c in psutil.cpu_times(percpu=True)]
    while True:
        yield from asyncio.sleep(3)
        next_idle = [c.idle for c in psutil.cpu_times(percpu=True)]
        usage = [(1-(c2-c1)/3) * 100 for c1,c2 in zip(last_idle, next_idle)]
        print("CPU Usage: {0:3.2f}%, {1:3.2f}%".format(*usage))
        last_idle = next_idle

tasks.append(asyncio.ensure_future(print_cpu_usage()))

Run the event loop

All of the blocking wait_for commands will run the event loop until the condition is met. All that is needed is to call the blocking wait_for_level method on the switch we are using as the termination condition.

While waiting for switch 0 to get high, users can press any push button on the board to flash the corresponding LED. While this loop is running, try opening a terminal and running top to see that python is consuming no CPU cycles while waiting for peripherals.

As this code runs until the switch 0 is high, make sure it is low before running the example.


In [5]:
if switches[0].read():
    print("Please set switch 0 low before running")
else:
    switches[0].wait_for_value(1)


CPU Usage: 2.33%, 2.67%
CPU Usage: 1.67%, 1.00%
CPU Usage: 0.33%, 0.33%
CPU Usage: 0.33%, 1.00%

Clean up

Even though the event loop has stopped running, the tasks are still active and will run again when the event loop is next used. To avoid this, the tasks should be cancelled when they are no longer needed.


In [6]:
[t.cancel() for t in tasks]


Out[6]:
[True, True, True, True, True]

Now if we re-run the event loop, nothing will happen when we press the buttons. The process will block until the switch is set back down to the low position.


In [7]:
switches[0].wait_for_value(0)