Python 线程与协程(1)

要说到线程(Thread)与协程(Coroutine)似乎总是需要从并行(Parallelism)与并发(Concurrency)谈起,关于并行与并发的问题,Rob Pike 用 Golang 小地鼠烧书的例子给出了非常生动形象的说明。简单来说并行就是我们现实世界运行的样子,每个人都是独立的执行单元,各自完成自己的任务,这对应着计算机中的分布式(多台计算机)或多核(多个CPU)运作模式;而对于并发,我看到最生动的解释来自Quora 上 Jan Christian Meyer 回答的这张图

并发对应计算机中充分利用单核(一个CPU)实现(看起来)多个任务同时执行。我们在这里将要讨论的 Python 中的线程与协程仅是基于单核的并发实现,随便去网上搜一搜(Thread vs Coroutine)可以找到一大批关于它们性能的争论、benchmark,这次话题的目的不在于讨论谁好谁坏,套用一句非常套路的话来说,抛开应用场景争好坏都是耍流氓。当然在硬件支持的条件下(多核)也可以利用线程和协程实现并行计算,而且 Python 2.6 之后新增了标准库 multiprocessingPEP 371)突破了 GIL 的限制可以充分利用多核,但由于协程是基于单个线程的,因此多进程的并行对它们来说情况是类似的,因此这里只讨论单核并发的实现。

要了解线程以及协程的原理和由来可以查看参考链接中的前两篇文章。Python 3.5 中关于线程的标准库是 threading,之前在 2.x 版本中的 thread 在 3.x 之后更名为 _thread ,无论是2.7还是3.5都应该尽量避免使用较为底层的 thread/_thread 而应该使用 threading

创建一个线程可以通过实例化一个 threading.Thread 对象:


In [1]:
from threading import Thread
import time

def _sum(x, y):
    print("Compute {} + {}...".format(x, y))
    time.sleep(2.0)
    return x+y
def compute_sum(x, y):
    result = _sum(x, y)
    print("{} + {} = {}".format(x, y, result))

start = time.time()    
threads = [
    Thread(target=compute_sum, args=(0,0)),
    Thread(target=compute_sum, args=(1,1)),
    Thread(target=compute_sum, args=(2,2)),
]
for t in threads:
    t.start()
for t in threads:
    t.join()
print("Total elapsed time {} s".format(time.time() - start))

# Do not use Thread
start = time.time()
compute_sum(0,0)
compute_sum(1,1)
compute_sum(2,2)
print("Total elapsed time {} s".format(time.time() - start))


Compute 0 + 0...
Compute 1 + 1...
Compute 2 + 2...
0 + 0 = 0
1 + 1 = 2
2 + 2 = 4
Total elapsed time 2.002729892730713 s
Compute 0 + 0...
0 + 0 = 0
Compute 1 + 1...
1 + 1 = 2
Compute 2 + 2...
2 + 2 = 4
Total elapsed time 6.004806041717529 s

除了通过将函数传递给 Thread 创建线程实例之外,还可以直接继承 Thread 类:


In [2]:
from threading import Thread
import time
class ComputeSum(Thread):
    def __init__(self, x, y):
        super().__init__()
        self.x = x
        self.y = y
    def run(self):
        result = self._sum(self.x, self.y)
        print("{} + {} = {}".format(self.x, self.y, result))
    def _sum(self, x, y):
        print("Compute {} + {}...".format(x, y))
        time.sleep(2.0)
        return x+y 
threads = [ComputeSum(0,0), ComputeSum(1,1), ComputeSum(2,2)]
start = time.time()
for t in threads:
    t.start()
for t in threads:
    t.join()
print("Total elapsed time {} s".format(time.time() - start))


Compute 0 + 0...
Compute 1 + 1...
Compute 2 + 2...
0 + 0 = 0
1 + 1 = 2
2 + 2 = 4
Total elapsed time 2.001662015914917 s

根据上面代码执行的结果可以发现,compute_sum/t.run 函数的执行是按照 start() 的顺序,但 _sum 结果的输出顺序却是随机的。因为 _sum 中加入了 time.sleep(2.0) ,让程序执行到这里就会进入阻塞状态,但是几个线程的执行看起来却像是同时进行的(并发)。

有时候我们既需要并发地“跳过“阻塞的部分,又需要有序地执行其它部分,例如操作共享数据的时候,这时就需要用到”锁“。在上述”求和线程“的例子中,假设每次求和都需要加上额外的 _base 并把计算结果累积到 _base 中。尽管这个例子不太恰当,但它说明了线程锁的用途:


In [4]:
from threading import Thread, Lock
import time
_base = 1
_lock = Lock()
class ComputeSum(Thread):
    def __init__(self, x, y):
        super().__init__()
        self.x = x
        self.y = y
    def run(self):
        result = self._sum(self.x, self.y)
        print("{} + {} + base = {}".format(self.x, self.y, result))
    def _sum(self, x, y):
        print("Compute {} + {}...".format(x, y))
        time.sleep(2.0)
        global _base
        with _lock:
            result = x + y + _base
            _base = result
        return result
threads = [ComputeSum(0,0), ComputeSum(1,1), ComputeSum(2,2)]

start = time.time()
for t in threads:
    t.start()
for t in threads:
    t.join()
print("Total elapsed time {} s".format(time.time() - start))


Compute 0 + 0...
Compute 1 + 1...
Compute 2 + 2...
0 + 0 + base = 1
1 + 1 + base = 3
2 + 2 + base = 7
Total elapsed time 2.0064051151275635 s

这里用上下文管理器来管理锁的获取和释放,相当于:

_lock.acquire()
try:
    result = x + y + _base
    _base  = result
finally:
    _lock.release()

死锁

线程的一大问题就是通过加锁来”抢夺“共享资源的时候有可能造成死锁,例如下面的程序:

from threading import Lock
_base_lock = Lock()
_pos_lock  = Lock()
_base = 1

def _sum(x, y):
    # Time 1
    with _base_lock:
        # Time 3
        with _pos_lock:
            result = x + y
    return result
def _minus(x, y):
    # Time 0
    with _pos_lock:
        # Time 2
        with _base_lock:
            result = x - y
    return result

由于线程的调度执行顺序是不确定的,在执行上面两个线程 _sum/_minus 的时候就有可能出现注释中所标注的时间顺序,即 # Time 0 的时候运行到 with _pos_lock 获取了 _pos_lock 锁,而接下来由于阻塞马上切换到了 _sum 中的 # Time 1 ,并获取了 _base_lock,接下来由于两个线程互相锁定了彼此需要的下一个锁,将会导致死锁,即程序无法继续运行。根据 我是一个线程 中所描述的,为了避免死锁,需要所有的线程按照指定的算法(或优先级)来进行加锁操作。不管怎么说,死锁问题都是一件非常伤脑筋的事,原因之一在于不管线程实现的是并发还是并行,在编程模型和语法上看起来都是并行的,而我们的大脑虽然是一个(内隐的)绝对并行加工的机器,却非常不善于将并行过程具象化(至少在未经足够训练的时候)。而与线程相比,协程(尤其是结合事件循环)无论在编程模型还是语法上,看起来都是非常友好的单线程同步过程。后面第二部分我们再来讨论 Python 中协程是如何从”小三“一步步扶正上位的:D