Concurrency 1 - running concurrent code on a NAO robot

Objective

Concurrent programming is an important part of robotics programming. Multiple inputs, outputs and system resources are often required to be accessed by multiple parts of the code or other parts of the system simultaneously and this can create conflicts. In my current project, TickleMeNAO, poor code design that has not included concurrency has lead to issues with event driven callbacks and resources such as speech recognition. The objective of today is to understand some concepts of conccurent programming.


In [45]:
# Imports
import this
from timeit import timeit


The Zen of Python, by Tim Peters

Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!

Concurrency problems

Problem statement:

  1. Multiple event driven callbacks, calling the same callback method, can run at the same time.
  2. Multiple methods (not callback) are enable/disabling resources e.g. speech recognition, causing exceptions and failure to enable/disable the resource correctly.

Potential solutions, general

  1. Rewrite code in more procedural form, maybe as a state machine.
  2. Threads and lock(), where each thread comes from a class or instance.
  3. Multiprocessing, similar to threading, but uses processes rather than threads.
  4. A simple approach with some standard Python libs:
  5. Greenlets (a form of coroutine).
  6. Coroutines.

  7. Asynchronous Design Techniques - using queues.

  8. Run the code in Choregraph with concurrent tasks each in their own box/timeline.

Potential solutions on a NAO robot

Concurrency vs parallelism, some definitions

  • Thread A sequence of instructions executed within the context of a process.

  • Process The stuff going on!

  • Single-threaded Restricting access to a single thread.

  • Multi-threaded Allowing access to two or more threads.

  • Concurrency is when two or more tasks can start, run, and complete in overlapping time periods. Could also be two threads that are making progress. An example is multi-tasking (time slicing).

  • Parallelism A condition that arises when at least two threads are executing simultaneously. An example would be threads on a multicore processor.

From: http://docs.oracle.com/cd/E19455-01/806-5257/6je9h032b/index.html

  • Concurrency programming as the composition of independently executing processes.

  • Parallelism programming as the simultaneous execution of, possibly related, computations.

  • Channels allow communication between processes.

So, concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once. Concurrency is a way to structure a program by breaking it into pieces that can be executed independently.

From: http://concur.rspace.googlecode.com/hg/talk/concur.html#slide-4

  • Thread is used to refer to a seperate path of execution for code.

  • Process refers to a running executable, can encompass multiple threads.

  • Task refers to the abstract concept of work that needs to be performed.

  • Dispatch queue executes tasks, either serially or concurrently, always as a FIFO (first in first out).

From: https://developer.apple.com/library/ios/documentation/general/conceptual/concurrencyprogrammingguide/Introduction/Introduction.html

Some thoughts on designing for concurrent programming

Design for concurent programming needs to be done upfront particularly with regard to tasks and data structures. Design steps:

  • Define your applications expected behaviour.
    • Enumerate the tasks, user and functional.
      • Break down further into component steps.
    • Enumerate the data structures.
  • Factor out executable units of work. These are your potential serial (must be executed in a specific order) and concurrent tasks (can be executed in parallel or concurrently).
  • Identify the queues you need, serial and concurrent.

Tips:

  • Consider computing values directly within your task if memory usage is a factor.
  • Identify serial tasks early and do what you can to make them more concurrent or remove the shared resource.
  • Avoid using locks, especially if using a queue.
  • Rely on the system frameworks wherever possible.

From: https://developer.apple.com/library/ios/documentation/general/conceptual/concurrencyprogrammingguide/ConcurrencyandApplicationDesign/ConcurrencyandApplicationDesign.html#//apple_ref/doc/uid/TP40008091-CH100-SW1

Some thoughts on functional programming

  • Functions only take inputs and produce outputs, and don’t have any internal state that affects the output produced for a given input. Put another way, input flows through a set of functions. ie it is not an object which has internal states.
  • With a list comprehension you get back a list.
  • A generator expression return an iterator that computes the values as necessary ie you can work with infinite, ongoing or large streams of data.

Advantages and disadvantages for each potential concurrent solution

  1. Procedural form e.g. a serial state machine:

    • Easy to code and debug.
    • Potentially slow.
  2. Threading:

    • Good for real time tasks that need predictable behaviour.
    • More complicated to program and debug with thread race and starvation issues.
  3. Functional programming e.g. coroutines:

    • Code is very modular, so easy to debug and test.
    • Could still have issues with shared resources.
  4. Asynchronous methods e.g. queues:

    • Can be very efficient in spreading the load across the processor.
    • Shared resources can be safely protected.
    • Not necessarily fast for a specific task.
    • Not necessarily real time.
  5. Greenlets:

    • No standard libraries on NAO, and getting libraries on NAO not always straightforward.
  6. Multiprocessing:

A simple state machine example


In [8]:
%%file statemachine.py
""" A simple state machine.
    From: http://www.ibm.com/developerworks/linux/library/l-python-state/index.html

"""
from string import upper

class StateMachine:
    def __init__(self):
        self.handlers = {}
        self.startState = None
        self.endStates = []

    def add_state(self, name, handler, end_state=0):
        name = upper(name)
        self.handlers[name] = handler
        if end_state:
            self.endStates.append(name)

    def set_start(self, name):
        self.startState = upper(name)

    def run(self, cargo):
        try:
            handler = self.handlers[self.startState]
        except:
            raise "InitializationError", "must call .set_start() before .run()"
        if not self.endStates:
            raise "InitializationError", "at least one state must be an end_state"

        while 1:
            (newState, cargo) = handler(cargo)
            if upper(newState) in self.endStates:
                break
            else:
                handler = self.handlers[upper(newState)]


Overwriting statemachine.py

In [9]:
from statemachine import StateMachine

def ones_counter(val):
    print "ONES State:    ",
    while 1:
        if val <= 0 or val >= 30:
           newState = "Out_of_Range"
           break
        elif 20 <= val < 30:
            newState = "TWENTIES"
            break
        elif 10 <= val < 20:
            newState = "TENS"
            break
        else:
            print "  @ %2.1f+" % val,
        val = math_func(val)
    print "  >>"
    return (newState, val)

def tens_counter(val):
    print "TENS State:    ",
    while 1:
        if val <= 0 or val >= 30:
           newState = "Out_of_Range"
           break
        elif 1 <= val < 10:
            newState = "ONES"
            break
        elif 20 <= val < 30:
            newState = "TWENTIES"
            break
        else:
            print "  #%2.1f+" % val,
        val = math_func(val)
    print "  >>"
    return (newState, val)

def twenties_counter(val):
    print "TWENTIES State:",
    while 1:
        if val <= 0 or val >= 30:
           newState = "Out_of_Range"
           break
        elif 1 <= val < 10:
            newState = "ONES"
            break
        elif 10 <= val < 20:
            newState = "TENS"
            break
        else:
            print "  *%2.1f+" % val,
        val = math_func(val)
    print "  >>"
    return (newState, val)

def math_func(n):
    from math import sin
    return abs(sin(n))*31

if __name__== "__main__":
    m = StateMachine()
    m.add_state("ONES", ones_counter)
    m.add_state("TENS", tens_counter)
    m.add_state("TWENTIES", twenties_counter)
    m.add_state("OUT_OF_RANGE", None, end_state=1)
    m.set_start("ONES")
    m.run(1)


ONES State:       @ 1.0+   >>
TWENTIES State:   *26.1+   *25.3+   >>
ONES State:       @ 4.2+   >>
TWENTIES State:   *26.4+   *29.5+   *28.8+   >>
TENS State:       #15.2+   #16.3+   #16.0+   >>
ONES State:       @ 9.5+   @ 3.8+   >>
TENS State:       #18.2+   #17.9+   >>
TWENTIES State:   *24.5+   >>
TENS State:       #17.7+   >>
TWENTIES State:   *28.0+   >>
ONES State:       @ 7.6+   >>
TWENTIES State:   *29.9+   >>

A thread example


In [281]:
%%file threading_example.py
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
    def __init__(self, threadID, name, counter, delay):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
        self.delay = delay
    def run(self):
        print "Starting " + self.name
        print_time(self.name, self.counter, self.delay)
        print "Exiting " + self.name

def print_time(threadName, counter, delay):
    while counter:
        if exitFlag:
            print "%s exited." % threadName
            thread.exit()
        time.sleep(delay)
        print "%s: %s" % (threadName, time.ctime(time.time()))
        counter -= 1

# Create new threads
counter = 10
delay = 0.5
thread1 = myThread(1, "Thread-1", counter, delay)
thread2 = myThread(2, "Thread-2", counter, delay)

# Start new Threads
thread1.start()
thread2.start()

print "Exiting Main Thread"


Overwriting threading_example.py

In [282]:
!python threading_example.py


Starting Thread-1
 Starting Thread-2
Exiting Main Thread
Thread-2: Wed Apr  9 22:18:23 2014
Thread-1: Wed Apr  9 22:18:23 2014
Thread-2: Wed Apr  9 22:18:24 2014
 Thread-1: Wed Apr  9 22:18:24 2014
Thread-1: Wed Apr  9 22:18:24 2014
Thread-2: Wed Apr  9 22:18:24 2014
Thread-2: Wed Apr  9 22:18:25 2014
Thread-1: Wed Apr  9 22:18:25 2014
Thread-2: Wed Apr  9 22:18:25 2014
 Thread-1: Wed Apr  9 22:18:25 2014
Thread-2: Wed Apr  9 22:18:26 2014
 Thread-1: Wed Apr  9 22:18:26 2014
Thread-1: Wed Apr  9 22:18:26 2014
Thread-2: Wed Apr  9 22:18:26 2014
Thread-1: Wed Apr  9 22:18:27 2014
Thread-2: Wed Apr  9 22:18:27 2014
Thread-1: Wed Apr  9 22:18:27 2014
Thread-2: Wed Apr  9 22:18:27 2014
Thread-1: Wed Apr  9 22:18:28 2014
 Thread-2: Wed Apr  9 22:18:28 2014
Exiting Thread-1
Exiting Thread-2

In [53]:
%%file threading_example_with_lock.py
import threading
import time

class myThread (threading.Thread):
    def __init__(self, threadID, name, counter, delay):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
        self.delay = delay
        
    def run(self):
        print "Starting " + self.name
        # Get lock to synchronize threads
        # nb use of blocking = False arg to allow threads to run in parallel.
        threadLock.acquire(BLOCKING)
        print_time(self.name, self.counter, self.delay, self.threadID)
        # Free lock to release next thread
        try:
            threadLock.release()
        except Exception, e:
            print "Error releasing threadlock: ", e
        
def print_time(threadName, counter, delay, increment):
    global TESTNUM, THREADERROR
    
    while counter:
        time.sleep(delay)
        TESTNUM += increment
        #print "TESTNUM BEFORE: ", TESTNUM
        print "%s: %s" % (threadName, time.ctime(time.time()))
        counter -= 1
        TESTNUM -= increment
        if TESTNUM != 0:
            print "Shared resource was accessed by both threads!"
            THREADERROR = 1
            
def runLockTest(counter, delay):   

    # Create new threads
    thread1 = myThread(1, "Thread-1", counter, delay)
    thread2 = myThread(2, "Thread-2", counter, delay)

    # Start new Threads
    thread1.start()
    thread2.start()

    # Add threads to thread list
    threads.append(thread1)
    threads.append(thread2)

    # Wait for all threads to complete
    for t in threads:
        t.join()
    print "Exiting Main Thread"
    if THREADERROR == 0:
        print "Shared resource properly accessed!"
        
# Test with Lock() blocking enabled.
print "Running with Lock() blocking = True"
threadLock = threading.Lock()
threads = []
BLOCKING = True
TESTNUM = 0
THREADERROR = 0
counter = 10
delay = 0.5
runLockTest(counter, delay)

# Test with Lock() blocking disabled.
print "Running with Lock() blocking = False"
threadLock = threading.Lock()
threads = []
BLOCKING = False
TESTNUM = 0
THREADERROR = 0
counter = 10
delay = 0.5
runLockTest(counter, delay)


Overwriting threading_example_with_lock.py

In [54]:
!python threading_example_with_lock.py


Running with Lock() blocking = True
Starting Thread-1
Starting Thread-2
Thread-1: Thu Apr 10 15:09:01 2014
Thread-1: Thu Apr 10 15:09:01 2014
Thread-1: Thu Apr 10 15:09:02 2014
Thread-1: Thu Apr 10 15:09:02 2014
Thread-1: Thu Apr 10 15:09:03 2014
Thread-1: Thu Apr 10 15:09:03 2014
Thread-1: Thu Apr 10 15:09:04 2014
Thread-1: Thu Apr 10 15:09:04 2014
Thread-1: Thu Apr 10 15:09:05 2014
Thread-1: Thu Apr 10 15:09:05 2014
Thread-2: Thu Apr 10 15:09:06 2014
Thread-2: Thu Apr 10 15:09:06 2014
Thread-2: Thu Apr 10 15:09:07 2014
Thread-2: Thu Apr 10 15:09:07 2014
Thread-2: Thu Apr 10 15:09:08 2014
Thread-2: Thu Apr 10 15:09:08 2014
Thread-2: Thu Apr 10 15:09:09 2014
Thread-2: Thu Apr 10 15:09:09 2014
Thread-2: Thu Apr 10 15:09:10 2014
Thread-2: Thu Apr 10 15:09:10 2014
Exiting Main Thread
Shared resource properly accessed!
Running with Lock() blocking = False
Starting Thread-1
Starting Thread-2
Thread-1: Thu Apr 10 15:09:11 2014
 Thread-2: Thu Apr 10 15:09:11 2014
Shared resource was accessed by both threads!
Thread-2: Thu Apr 10 15:09:11 2014
 Thread-1: Thu Apr 10 15:09:11 2014
Shared resource was accessed by both threads!
Thread-2: Thu Apr 10 15:09:12 2014
 Thread-1: Thu Apr 10 15:09:12 2014
Shared resource was accessed by both threads!
Thread-1: Thu Apr 10 15:09:12 2014
 Thread-2: Thu Apr 10 15:09:12 2014
Shared resource was accessed by both threads!
Thread-1: Thu Apr 10 15:09:13 2014
Thread-2: Thu Apr 10 15:09:13 2014
Thread-1: Thu Apr 10 15:09:13 2014
 Shared resource was accessed by both threads!
Thread-2: Thu Apr 10 15:09:13 2014
Thread-1: Thu Apr 10 15:09:14 2014
 Thread-2: Thu Apr 10 15:09:14 2014
Shared resource was accessed by both threads!
Thread-2: Thu Apr 10 15:09:14 2014
 Thread-1: Thu Apr 10 15:09:14 2014
Shared resource was accessed by both threads!
Thread-2: Thu Apr 10 15:09:15 2014
 Thread-1: Thu Apr 10 15:09:15 2014
Shared resource was accessed by both threads!
Thread-2: Thu Apr 10 15:09:15 2014
 Thread-1: Thu Apr 10 15:09:15 2014
Shared resource was accessed by both threads!
Error releasing threadlock:  release unlocked lock
Exiting Main Thread

Conclusion: If Lock() blocking is True then the threads aren't always well scheduled. If Lock() blocking is false then the shared resource can be accessed both by resourses when it shouldn't be. Let's try a queue.


In [277]:
%%file threading_example_with_queue.py
import Queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q
    def run(self):
        print "Starting " + self.name
        process_data(self.name, self.q)
        print "Exiting " + self.name

def process_data(threadName, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print "%s processing %s" % (threadName, data)
        else:
            queueLock.release()
        time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five", 
            "Six", "Seven", "Eight", "Nine", "Ten"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1

# Create new threads
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1

# Fill the queue
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# Wait for queue to empty
while not workQueue.empty():
    pass

# Notify threads it's time to exit
exitFlag = 1

# Wait for all threads to complete
for t in threads:
    t.join()
print "Exiting Main Thread"


Overwriting threading_example_with_queue.py

In [278]:
!python threading_example_with_queue.py


Starting Thread-1Starting Thread-2

Starting Thread-3
Thread-3 processing One
Thread-1 processing Two
Thread-2 processing Three
Thread-3 processing Four
Thread-1 processing Five
Thread-2 processing Six
Thread-3 processing Seven
Thread-1 processing Eight
Thread-2 processing Nine
Thread-3 processing Ten
Exiting Thread-1
Exiting Thread-2
Exiting Thread-3
Exiting Main Thread

Queues provide a way to exchange data between threads.

Conclusion: Threads do not seem a reliable way to handle concurrent tasks as don't run in any particular order. This could be fixed with threading.Condition() perhaps, but conditions also seem problematic.

A multiprocessing example


In [308]:
%%file multiprocessing_example.py
from multiprocessing import Process, Lock
import time
    
def print_time(processName, counter, delay, lock, increment):
    global TESTNUM
    
    while counter:
        lock.acquire()
        TESTNUM += increment
        time.sleep(delay)
        print "%s: %s with TESTNUM: %s" % (processName, time.ctime(time.time()), TESTNUM)
        counter -= 1
        TESTNUM -= increment
        if TESTNUM != 0:
            print "Shared resource was accessed by both processes!"
        lock.release()
    
if __name__ == '__main__':
    counter = 10
    delay = 0.5
    TESTNUM = 0
    
    lock = Lock()
    
    # Spawn process objects.
    p1 = Process(target = print_time, args = ('process1', counter, delay, lock, 1))
    p2 = Process(target = print_time, args = ('process2', counter, delay, lock, 2))
    
    # Start processes.
    p1.start()
    p2.start()
    
    # Wait for processes to finish.
    p1.join()
    p2.join()


Overwriting multiprocessing_example.py

In [309]:
!python multiprocessing_example.py


process1: Wed Apr  9 23:10:09 2014 with TESTNUM: 1
process2: Wed Apr  9 23:10:09 2014 with TESTNUM: 2
process1: Wed Apr  9 23:10:10 2014 with TESTNUM: 1
process2: Wed Apr  9 23:10:10 2014 with TESTNUM: 2
process1: Wed Apr  9 23:10:11 2014 with TESTNUM: 1
process2: Wed Apr  9 23:10:11 2014 with TESTNUM: 2
process1: Wed Apr  9 23:10:12 2014 with TESTNUM: 1
process2: Wed Apr  9 23:10:12 2014 with TESTNUM: 2
process1: Wed Apr  9 23:10:13 2014 with TESTNUM: 1
process2: Wed Apr  9 23:10:13 2014 with TESTNUM: 2
process1: Wed Apr  9 23:10:14 2014 with TESTNUM: 1
process2: Wed Apr  9 23:10:14 2014 with TESTNUM: 2
process1: Wed Apr  9 23:10:15 2014 with TESTNUM: 1
process2: Wed Apr  9 23:10:15 2014 with TESTNUM: 2
process1: Wed Apr  9 23:10:16 2014 with TESTNUM: 1
process2: Wed Apr  9 23:10:16 2014 with TESTNUM: 2
process1: Wed Apr  9 23:10:17 2014 with TESTNUM: 1
process2: Wed Apr  9 23:10:17 2014 with TESTNUM: 2
process1: Wed Apr  9 23:10:18 2014 with TESTNUM: 1
process2: Wed Apr  9 23:10:18 2014 with TESTNUM: 2

Conclusion: Runs great on the computer. Does not work on the NAO if a proxy is called in the process.

Some generator examples, with a couroutine example


In [3]:
# Example 1, using yield.
def countdown(n):
    print "Counting down from", n
    while n > 0:
        yield n
        n -= 1

x = countdown(10)

while 1:
    print x.next()


---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-3-b3015cb8acab> in <module>()
      9 
     10 while 1:
---> 11     print x.next()

StopIteration: 
Counting down from 10
10
9
8
7
6
5
4
3
2
1

Example 2, using yield, with data passed back to the generator. It's a couroutine! From: http://antroy.blogspot.co.uk/2007/04/python-coroutines.html


In [36]:
# First as a generator only with yield.
def rota(people):
    _people = list(people)
    current = 0
    while len(_people):
        yield _people[current]
        current = (current + 1) % len(_people)
        
people = ["Ant", "Bernard", "Carly", "Deb", "Englebert"]
r = rota (people)
for i in range(10):
    print "It's %s's turn." % r.next()


It's Ant's turn.
It's Bernard's turn.
It's Carly's turn.
It's Deb's turn.
It's Englebert's turn.
It's Ant's turn.
It's Bernard's turn.
It's Carly's turn.
It's Deb's turn.
It's Englebert's turn.

In [40]:
# Then as a coroutine with send().
def rota(people):
    _people = list(people)
    current = 0
    while len(_people):
        command = yield _people[current]
        current = (current + 1) % len(_people)
        if command:
            comm, name = command
            if comm == "add":
                _people.append(name)
            elif comm == "remove":
                _people.remove(name)
                
def printname(name):
    print "It's %s's turn." % name
    
people = ["Ant", "Bernard", "Carly", "Deb", "Englebert"]
r = rota (people)

print "\nOriginal list\n"
for i in range(6):
    printname(r.next())
    
printname(r.send(("add", "Fred")))
print "\nAdded Fred\n"

for i in range(7):
    printname(r.next())

printname(r.send(("remove", "Deb")))
print "\nRemoved Deb\n"

for i in range(6):
    printname(r.next())


Original list

It's Ant's turn.
It's Bernard's turn.
It's Carly's turn.
It's Deb's turn.
It's Englebert's turn.
It's Ant's turn.
It's Bernard's turn.

Added Fred

It's Carly's turn.
It's Deb's turn.
It's Englebert's turn.
It's Fred's turn.
It's Ant's turn.
It's Bernard's turn.
It's Carly's turn.
It's Englebert's turn.

Removed Deb

It's Fred's turn.
It's Ant's turn.
It's Bernard's turn.
It's Carly's turn.
It's Englebert's turn.
It's Fred's turn.

In [41]:
# Example 3, with a generator expression.
line_list = [' line 1\n', 'line 2\n', 'line 3\n']

# Generator expression, returns iterator. Note use of () to define as generator.
stripped_iter = (line.strip() for line in line_list)

# List comprehension, returns a list. Note use of [] to define as list comp.
stripped_list = [line.strip() for line in line_list]

In [32]:
stripped_iter


Out[32]:
<generator object <genexpr> at 0x1078d5730>

In [33]:
stripped_list


Out[33]:
['line 1', 'line 2', 'line 3']

In [34]:
while 1:
    print stripped_iter.next()


---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-34-8f0529855f33> in <module>()
      1 while 1:
----> 2     print stripped_iter.next()

StopIteration: 
line 1
line 2
line 3

Some concurrent concepts running on NAO

A few examples from the above concepts, using timeit to time how long the processes take.

The general problem - two head motor moves and data logging


In [249]:
%%file concurrency_1_general_problem.py
""" Looking at concurrency. Moving two head motors (pitch and yaw),
    and logging data simultaneously.

"""

# MEMORY_VALUE_NAMES is the list of ALMemory values names you want to save.
ALMEMORY_KEY_NAMES = [
"Device/SubDeviceList/HeadYaw/Position/Sensor/Value",
"Device/SubDeviceList/HeadYaw/Position/Actuator/Value",
"Device/SubDeviceList/HeadYaw/ElectricCurrent/Sensor/Value",
"Device/SubDeviceList/HeadYaw/Temperature/Sensor/Value",
"Device/SubDeviceList/HeadYaw/Hardness/Actuator/Value",
"Device/SubDeviceList/HeadYaw/Temperature/Sensor/Status",
"Device/SubDeviceList/HeadPitch/Position/Actuator/Value",
"Device/SubDeviceList/HeadPitch/Position/Sensor/Value",
"Device/SubDeviceList/HeadPitch/ElectricCurrent/Sensor/Value",
"Device/SubDeviceList/HeadPitch/Temperature/Sensor/Value",
"Device/SubDeviceList/HeadPitch/Hardness/Actuator/Value",
"Device/SubDeviceList/HeadPitch/Temperature/Sensor/Status"
]

NAO_IP = "mistcalf.local"
STEPS = 5

from naoqi import ALProxy

def main():
    """ Some simple robot processes.

    """
    
    motion = ALProxy("ALMotion", NAO_IP, 9559)
    posture = ALProxy("ALRobotPosture", NAO_IP, 9559)
    memory = ALProxy("ALMemory", NAO_IP, 9559)
    
    data = list()
    
    # Set stiffness on for Head motors and go to start pose.
    print "Starting move...."
    motion.setStiffnesses("Head", 1.0)
    posture.goToPosture("Crouch", 2.0)
        
    # Core processes. Do some moves and record data.
    for i in range(STEPS):
        positiveAngleStep = 1.0 / STEPS
        negativeAngleStep = -1 * positiveAngleStep
        timeStep = 20 / STEPS
        motion.angleInterpolation(
            ["HeadYaw"],
            [positiveAngleStep],
            [timeStep],
            False
        )
        
        motion.angleInterpolation(
            ["HeadPitch"],
            [negativeAngleStep],
            [timeStep],
            False
        )
        
        line = list()
        for key in ALMEMORY_KEY_NAMES:
            value = memory.getData(key)
            line.append(value)
        data.append(line)
            
    # Gently set stiff off for Head motors and relax.
    print "...Going to stop now!"
    motion.setStiffnesses("Head", 0.0)
    motion.rest()
    print data
    
if __name__ == "__main__":
    main()


Overwriting concurrency_1_general_problem.py

In [10]:
!python concurrency_1_general_problem.py


[I] 4359 qi.eventloop: Creating event loop while no qi::Application() is running
[I] 4359 qimessaging.session: Session listener created on tcp://0.0.0.0:0
[I] 4359 qi.eventloop: Creating event loop while no qi::Application() is running
[I] 4359 qimessaging.transportserver: TransportServer will listen on: tcp://10.0.1.152:50647
[I] 4359 qimessaging.transportserver: TransportServer will listen on: tcp://127.0.0.1:50647
[I] 4359 qimessaging.transportserver: TransportServer will listen on: tcp://10.137.0.22:50647
Starting move....
...Going to stop now!
[[0.18557214736938477, 0.19986635446548462, 0.14400000870227814, 39.0, 0.47084471583366394, 0, -0.0998663529753685, -0.0813438892364502, 0.2240000069141388, 39.0, 0.24613042175769806, 0], [0.3819241523742676, 0.39986634254455566, 0.0, 39.0, 1.0, 0, -0.29986631870269775, -0.28996801376342773, 0.0, 39.0, 0.4631490707397461, 0], [0.5874800682067871, 0.5998663306236267, 0.0, 39.0, 1.0, 0, -0.4998663365840912, -0.4740478992462158, 0.01600000075995922, 39.0, 0.45838305354118347, 0], [0.7592880725860596, 0.7998663187026978, 0.08000000566244125, 39.0, 0.11819390952587128, 0, -0.5276462435722351, -0.5031938552856445, 0.01600000075995922, 39.0, 0.24665920436382294, 0], [0.9771161079406738, 0.9998663663864136, 0.06400000303983688, 39.0, 0.10307526588439941, 0, -0.4540998935699463, -0.48325204849243164, 0.03200000151991844, 39.0, 0.22960776090621948, 0]]

Some concurrent solutions to the general problem - two head motor moves and data logging with timeit tests for each solution


In [43]:
%%file concurrency_1_general_problem_with_timeit.py
""" Looking at concurrency. Moving two head motors (pitch and yaw),
    and logging data simultaneously.

"""

# MEMORY_VALUE_NAMES is the list of ALMemory values names you want to save.
ALMEMORY_KEY_NAMES = [
"Device/SubDeviceList/HeadYaw/Position/Sensor/Value",
"Device/SubDeviceList/HeadYaw/Position/Actuator/Value",
"Device/SubDeviceList/HeadYaw/ElectricCurrent/Sensor/Value",
"Device/SubDeviceList/HeadYaw/Temperature/Sensor/Value",
"Device/SubDeviceList/HeadYaw/Hardness/Actuator/Value",
"Device/SubDeviceList/HeadYaw/Temperature/Sensor/Status",
"Device/SubDeviceList/HeadPitch/Position/Actuator/Value",
"Device/SubDeviceList/HeadPitch/Position/Sensor/Value",
"Device/SubDeviceList/HeadPitch/ElectricCurrent/Sensor/Value",
"Device/SubDeviceList/HeadPitch/Temperature/Sensor/Value",
"Device/SubDeviceList/HeadPitch/Hardness/Actuator/Value",
"Device/SubDeviceList/HeadPitch/Temperature/Sensor/Status"
]

# GLOBALS - as easier than passing arguments to timeit
NAO_IP = "mistcalf.local"
STEPS = 5
POSITIVEANGLE = 1.0
NEGATIVEANGLE = -1.0
TESTREPS = 1
TIME = 20.0

POSITIVEANGLESTEP = POSITIVEANGLE / STEPS
NEGATIVEANGLESTEP = NEGATIVEANGLE / STEPS
TIMESTEP = TIME / STEPS
if TIMESTEP <= 0.05:
    # memory.getData() should not be called more than every 50ms as slow.
    print "Warning, TIMESTEP too fast for memory.getData(), set to 50ms."
    TIMESTEP = 0.05

# GLOBALS - proxies and locks
motion = None
posture = None
memory = None
threadLock = None
processLock = None

import time
import threading
import multiprocessing
from timeit import timeit
from naoqi import ALProxy

##########################################################
#                   START test1Procedural CODE
##########################################################

def test1Procedural():
    """ Do some moves and record data in a procedural ie
        linear, method.
        
    """
    global motion, posture, memory
    data = list()    
    
    for i in range(STEPS):
        motion.angleInterpolation(
            ["HeadYaw"],
            [POSITIVEANGLESTEP],
            [TIMESTEP],
            False
        )
        
        motion.angleInterpolation(
            ["HeadPitch"],
            [NEGATIVEANGLESTEP],
            [TIMESTEP],
            False
        )
        
        line = list()
        for key in ALMEMORY_KEY_NAMES:
            value = memory.getData(key)
            line.append(value)
        data.append(line)
        
    #print data
    
##########################################################
#                   END test1Procedural CODE
##########################################################
    
##########################################################
#                   START test2Post CODE
##########################################################
    
def test2Post():
    """ Do some moves and record data using built in post.
        
    """
    global motion, posture, memory  
    
    id1 = motion.post.angleInterpolation(
                                    ["HeadYaw"],
                                    [POSITIVEANGLE],
                                    [TIME],
                                    False
    )
        
    id2 = motion.post.angleInterpolation(
                                    ["HeadPitch"],
                                    [NEGATIVEANGLE],
                                    [TIME],
                                    False
    )
        
    data = recordData()

    while (motion.isRunning(id1) and motion.isRunning(id2)):
        pass
    
    #print data
    
def recordData():
    """ Record the data from ALMemory.
    Returns a matrix of values

    """
    global memory
    
    print "Recording data ..."
    
    data = list()
    for i in range (STEPS):
        line = list()
        for key in ALMEMORY_KEY_NAMES:
            value = memory.getData(key)
            line.append(value)
        data.append(line)
        time.sleep(TIMESTEP)
        
    print "Done recording data"
    
    return data
    
##########################################################
#                   END test2Post CODE
##########################################################

##########################################################
#                   START test3Threading CODE
##########################################################

class myThread(threading.Thread):
    def __init__(self, threadID):
        threading.Thread.__init__(self)
        self.threadID = threadID  
        
    def run(self):
        if self.threadID == 1:
            headYawMotion()
        elif self.threadID == 2:
            headPitchMotion()
        elif self.threadID == 3:
            data = recordDataForThread()
            # print data
        else:
            print "oops, no thread"

def headYawMotion():
    global motion, posture, memory, threadLock
    
    for i in range(STEPS):
        threadLock.acquire(True)
        motion.angleInterpolation(
            ["HeadYaw"],
            [POSITIVEANGLESTEP],
            [TIMESTEP],
            False
        )
        threadLock.release()
    
def headPitchMotion():
    global motion, posture, memory, threadLock
    
    for i in range(STEPS):
        threadLock.acquire(True)
        motion.angleInterpolation(
                                ["HeadPitch"],
                                [NEGATIVEANGLESTEP],
                                [TIMESTEP],
                                False
        )
        threadLock.release()
    
def recordDataForThread():
    """ Record the data from ALMemory.
    Returns a matrix of values

    """
    global motion, posture, memory, threadLock
    
    print "Recording data ..."
    
    data = list()
    for i in range (STEPS):
        threadLock.acquire(True)
        line = list()
        for key in ALMEMORY_KEY_NAMES:
            value = memory.getData(key)
            line.append(value)
        data.append(line)
        time.sleep(TIMESTEP)
        threadLock.release()
        
    print "Done recording data"
    
    return data

def test3Threading():
    """ Do some moves and record data using the threading module.
        Runs without Lock(), will use Lock() as best practise,
        and needed in current app.
        Lock.Acquire(False) with Blocking = False arg seems to run smoother, but
        should be set to True to ensure synchronous running. Requires checking.        
        Lock can also be acquired and released using 'with lock:'.
        But without blocking the shared resource does not seem safe.
    
    """
    global motion, posture, memory, threadLock
    
    threads = []
    threadLock = threading.Lock()
    
    # Create new threads.
    thread1 = myThread(1)
    thread2 = myThread(2)
    thread3 = myThread(3)
    
    # Start new threads.
    thread1.start()
    thread2.start()
    thread3.start()
    
    # Wait for threads to end.
    # Essential, or thread calls methods in run, then returns.
    threads.append(thread1)
    threads.append(thread2)
    threads.append(thread3)
    
    for t in threads:
        t.join()
    
##########################################################
#                   END test3Threading CODE
##########################################################

##########################################################
#                   START test4Multiprocessing CODE
# Doesn't run, issues with proxies causing error:
# 5891 qimessaging.remoteobject: no promise found for req id:39  obj: 21  func: 126 type: 2
##########################################################

def headYawMotionProcess(processLock):
    global motion, posture, memory
    
    for i in range(STEPS):
        processLock.acquire()
        motion.angleInterpolation(
                ["HeadYaw"],
                [POSITIVEANGLESTEP],
                [TIMESTEP],
                False
        )
        processLock.release()
    
def headPitchMotionProcess(processLock):
    global motion, posture, memory
    
    for i in range(STEPS):
        processLock.acquire()
        motion.angleInterpolation(
                            ["HeadPitch"],
                            [NEGATIVEANGLESTEP],
                            [TIMESTEP],
                            False
        )
        processLock.release()
    
def recordDataProcess(processLock):
    """ Record the data from ALMemory.
    Returns a matrix of values

    """
    global motion, posture, memory
    
    print "Recording data ..."
    
    data = list()
    for i in range (STEPS):
        processLock.acquire()
        line = list()
        for key in ALMEMORY_KEY_NAMES:
            value = memory.getData(key)
            line.append(value)
        data.append(line)
        time.sleep(TIMESTEP)
        processLock.release()
        
    print "Done recording data"
    
    print data


def test4Multiprocessing():
    """ Do some moves and record data using the multiprocessing module.
    
    """
    global motion, posture, memory, processLock
    
    processLock = multiprocessing.Lock()
    
    # Spawn process objects.
    p1 = multiprocessing.Process(target = headYawMotionProcess, args = (processLock,))
    p2 = multiprocessing.Process(target = headPitchMotionProcess, args = (processLock,))
    p3 = multiprocessing.Process(target = recordDataProcess, args = (processLock,))
    
    # Start processes.
    p1.start()
    p2.start()
    p3.start()
    
    # Wait for processes to finish.
    p1.join()
    p2.join()
    p3.join()


##########################################################
#                   END test4Multiprocessing CODE
##########################################################

##########################################################
#                   START test5Coroutine CODE
##########################################################

def headYawMotionCoroutine(yawMotionList):
    global motion, posture, memory
    
    _yawMotionList = list(yawMotionList)
    current = 0
    
    while len(_yawMotionList):
        angle = _yawMotionList[current][0]
        time = _yawMotionList[current][1]
        motion.angleInterpolation(
                    ["HeadYaw"],
                    [angle],
                    [time],
                    False
        )
        yield
        current += 1

    
def headPitchMotionCoroutine(pitchMotionList):
    global motion, posture, memory
    
    _pitchMotionList = list(pitchMotionList)
    current = 0
    
    while len(_pitchMotionList):
        angle = _pitchMotionList[current][0]
        time = _pitchMotionList[current][1]
        motion.angleInterpolation(
                                ["HeadPitch"],
                                [angle],
                                [time],
                                False
        )
        yield
        current += 1
    
def recordDataCoroutine():
    """ Record the data from ALMemory.
    Returns a matrix of values

    """
    global motion, posture, memory
    
    print "Recording data ..."
    
    data = list()
    # Infinite list as is run every time there is a motion move.
    while 1:        
        line = list()
        for key in ALMEMORY_KEY_NAMES:
            value = memory.getData(key)
            line.append(value)
        data.append(line)
        yield data
        
def test5Coroutine():
    """ Do some moves and record data using coroutines.
    
    """
    global motion, posture, memory
    
    # Generate motion lists
    yawMotionList = []
    pitchMotionList = []            
    yawMotionList = [(POSITIVEANGLESTEP, TIMESTEP) for i in range(STEPS)]
    pitchMotionList = [(NEGATIVEANGLESTEP, TIMESTEP) for i in range(STEPS)]
    
    # Ininitiate coroutine.
    p1 = headYawMotionCoroutine(yawMotionList)
    p2 = headPitchMotionCoroutine(pitchMotionList)
    p3 = recordDataCoroutine()

    # Loop through all steps. Could also be infinite loop if this was,
    # a full coroutine where data was being sent to the lists.
    for i in range(STEPS):
        p1.next()
        p2.next()
        data = p3.next()
        
    #print data
    

##########################################################
#                   END test5Coroutine CODE
##########################################################

def main():
    """ Some simple robot processes.

    """
    global motion, posture, memory

    motion = ALProxy("ALMotion", NAO_IP, 9559)
    posture = ALProxy("ALRobotPosture", NAO_IP, 9559)
    memory = ALProxy("ALMemory", NAO_IP, 9559)    

    # Set stiffness on for Head motors and go to start pose.
    print "Starting tests...."
    motion.setStiffnesses("Head", 1.0)
    print "\n---------------------------------------\n"
    # Goto start position, and run test1Procedural
    print "test1Procedural starting ..."
    posture.goToPosture("Crouch", 2.0)
    t1 = (timeit("test1Procedural()", setup = "from __main__ import test1Procedural", number = TESTREPS))
    print "...end test1Procedural, time: ", t1
    print "\n---------------------------------------\n"
    # Goto start position, and run test2Post
    print "test2Post starting ..."
    posture.goToPosture("Crouch", 2.0)
    t2 = (timeit("test2Post()", setup = "from __main__ import test2Post", number = TESTREPS))
    print "...end test2Post, time: ", t2
    print "\n---------------------------------------\n"
    # Goto start position, and run test3Threading
    print "test3Threading starting ..."
    posture.goToPosture("Crouch", 2.0)
    t3 = (timeit("test3Threading()", setup = "from __main__ import test3Threading", number = TESTREPS))
    print "...end test3Threading, time: ", t3
    print "\n---------------------------------------\n"
    # Goto start position, and run test4Multiprocessing - NOT WORKING
    print "test4Multiprocessing - not working"
    #print "test4Multiprocessing starting ..."
    #posture.goToPosture("Crouch", 2.0)
    #test4Multiprocessing()
    #t4 = (timeit("test4Multiprocessing()", setup = "from __main__ import test3Threading", number = TESTREPS))
    #print "...end test4Multiprocessing, time: ", t4
    print "\n---------------------------------------\n"
    # Goto start position, and run test5Coroutine
    print "test5Coroutine starting ..."
    posture.goToPosture("Crouch", 2.0)
    t5 = (timeit("test5Coroutine()", setup = "from __main__ import test5Coroutine", number = TESTREPS))
    print "...end test5Coroutine, time: ", t5
    print "\n---------------------------------------\n"
    # Gently set stiff off for Head motors and relax.
    print "...ending tests!"
    motion.setStiffnesses("Head", 0.0)
    motion.rest()
    
if __name__ == "__main__":
    main()


Overwriting concurrency_1_general_problem_with_timeit.py

In [44]:
!python concurrency_1_general_problem_with_timeit.py


[I] 4359 qi.eventloop: Creating event loop while no qi::Application() is running
[I] 4359 qimessaging.session: Session listener created on tcp://0.0.0.0:0
[I] 4359 qi.eventloop: Creating event loop while no qi::Application() is running
[I] 4359 qimessaging.transportserver: TransportServer will listen on: tcp://10.0.1.152:51959
[I] 4359 qimessaging.transportserver: TransportServer will listen on: tcp://127.0.0.1:51959
[I] 4359 qimessaging.transportserver: TransportServer will listen on: tcp://10.137.0.22:51959
Starting tests....

---------------------------------------

test1Procedural starting ...
...end test1Procedural, time:  41.0207629204

---------------------------------------

test2Post starting ...
Recording data ...
Done recording data
...end test2Post, time:  20.3220949173

---------------------------------------

test3Threading starting ...
Recording data ...
Done recording data
...end test3Threading, time:  60.9526190758

---------------------------------------

test4Multiprocessing - not working

---------------------------------------

test5Coroutine starting ...
Recording data ...
...end test5Coroutine, time:  44.906514883

---------------------------------------

...ending tests!

Conclusions

  • Concurrent programming is hard! I have outlined some potential programming approaches to concurrent programming in this notebook.
  • There is a lot more depth and detail to each of these approaches and potentially many ways to use each approach. I'm not sure my outlines are optimum examples of each approach. Particularly in the use of queues and coroutines where it might have been better to have a single global queue or coroutine acting as an actioner for the whole system (or a subsystem) that each task fed.
  • The built in post object is very effective but it can not be applied to user defined methods and it can not lock shared resources.
  • A procedural (state machine) approach could be reliable but slow.
  • In theory threading should have worked well, but in practise it was slow or allowed the shared resource to be accessed inappropriately. Possibly this is a NAO thing or a Python threads thing.
  • Queues (of tasks) or coroutines (acting as a queue for tasks) could be reliable but conccurent tasks were not handled particularly smoothly or quickly.
  • If concurrent tasks are required in a program they need to be considered upfront and the program designed appropriately for the chosen approach.

In [ ]: