This notebook provides a simple example for using asyncio I/O to interact asynchronously with multiple input devices. A task is created for each input device and coroutines used to process the results. To demonstrate, we recreate the flashing LEDs example in the getting started notebook but using interrupts to avoid polling the GPIO devices. The aim is have holding a button result in the corresponding LED flashing.
In [1]:
from pynq import Overlay, PL
from pynq.board import LED, Switch, Button
Overlay('base.bit').download()
buttons = [Button(i) for i in range(4)]
leds = [LED(i) for i in range(4)]
switches = [Switch(i) for i in range(2)]
In [2]:
import asyncio
@asyncio.coroutine
def flash_led(num):
while True:
yield from buttons[num].wait_for_value_async(1)
while buttons[num].read():
leds[num].toggle()
yield from asyncio.sleep(0.1)
leds[num].off()
In [3]:
tasks = [asyncio.ensure_future(flash_led(i)) for i in range(4)]
In [4]:
import psutil
@asyncio.coroutine
def print_cpu_usage():
# Calculate the CPU utilisation by the amount of idle time
# each CPU has had in three second intervals
last_idle = [c.idle for c in psutil.cpu_times(percpu=True)]
while True:
yield from asyncio.sleep(3)
next_idle = [c.idle for c in psutil.cpu_times(percpu=True)]
usage = [(1-(c2-c1)/3) * 100 for c1,c2 in zip(last_idle, next_idle)]
print("CPU Usage: {0:3.2f}%, {1:3.2f}%".format(*usage))
last_idle = next_idle
tasks.append(asyncio.ensure_future(print_cpu_usage()))
All of the blocking wait_for commands will run the event loop until the condition is met. All that is needed is to call the blocking wait_for_level
method on the switch we are using as the termination condition.
While waiting for switch 0 to get high, users can press any push button on the board to flash the corresponding LED. While this loop is running, try opening a terminal and running top
to see that python is consuming no CPU cycles while waiting for peripherals.
As this code runs until the switch 0 is high, make sure it is low before running the example.
In [5]:
if switches[0].read():
print("Please set switch 0 low before running")
else:
switches[0].wait_for_value(1)
In [6]:
[t.cancel() for t in tasks]
Out[6]:
Now if we re-run the event loop, nothing will happen when we press the buttons. The process will block until the switch is set back down to the low position.
In [7]:
switches[0].wait_for_value(0)