进程和线程是提高CPU利用率的两种方法。

1. 多进程(multiprocessing)

- os.fork()
    - 可以用来创建新的子进程
    - 在父进程中返回一个正数进程号
    - 在子进程中返回0
    - windows没有fork()系统调用
- multiprocessing.Process跨平台的多进程支持
    - start()
    - join():等待子进程结束后再继续往下运行
- Pool进程池
    - 当启动的进程的数目大于进程池中创建的进程的数目的时候,多出来的进程需要等待其他进程推出之后才能继续执行
- 子进程(subprocess)
    - 启动进程,控制它的输入和输出
- 进程间通信

2. 多线程

- _thread和threading两个模块支持。_thread时低级模块,threading是高级模块,对_thread的封装
- Lock: 多线程中的不同线程有时候会有共享的数据。对于共享数据的修改,需要按照一定的顺序。否则,读到的数据可能会不正确。
- 由于GIL的存在,python不能利用多核CPU来进行加速。此时可以使用多进程来实现多核多任务。

3. ThreadLocal

- 全局变量如果没有适当的加锁处理,在多线程运行中有可能会干扰
- 局部变量在进行参数传递的时候可能显得有些麻烦
- 使用线程ID作为key的全局字典dict可以避免多线程的干扰,同时参数的传递也可以变得稍微简单。但是它并不能真正的隔离数据,以为每个线程还是可以读取全局的字典,并对字典进行修改
- ThreadLocal对象可以避免全局dict的缺点

4. 分布式进程TODO

- 

In [4]:
# os.fork()
import os

print('Process(%s) start...' %os.getpid())
pid = os.fork()
if(pid == 0):
    print('This is child process(%s), and the parent is %s' %(os.getpid(), os.getppid()))
else:
    print('This is parent process(%s), and the created child pid is %s' %(os.getpid(), pid))


Process(24235) start...
This is parent process(24235), and the created child pid is 8643
This is child process(8643), and the parent is 24235

In [6]:
# multiprocessing.Process

from multiprocessing import Process
import os

def run_proc(name):
    print('Run child process %s(%s).' % (name, os.getpid()))

print('Parent process is %s.' %os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')


Parent process is 24235.
Child process will start.
Run child process test(29533).
Child process end.

In [9]:
import os, time, random
from multiprocessing import Pool

def long_time_task(name):
    print('Run task %s(%s)' %(name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s(%s) runs %0.2f seconds' %(name, (end - start)))

p = Pool(5)
for i in range(9):
    p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocessses to end...')
p.close()
p.join()
print('All subprocesses done')


Run task 0(29907)
Run task 4(29909)
Run task 2(29910)
Run task 1(29908)
Run task 3(29911)
Waiting for all subprocessses to end...
Run task 5(29911)
Run task 6(29911)
Run task 7(29908)
Run task 8(29909)
All subprocesses done

In [12]:
import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate(b'set q=mx\npython.org\nnexit\n')
print(out.decode('utf-8'))
print('Exit code: ', p.returncode)


$ nslookup www.python.org
Exit code: 0
$ nslookup
Server:		127.0.0.1
Address:	127.0.0.1#53

Non-authoritative answer:
python.org	mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:
python.org	nameserver = ns3.p11.dynect.net.
python.org	nameserver = ns1.p11.dynect.net.
python.org	nameserver = ns2.p11.dynect.net.
python.org	nameserver = ns4.p11.dynect.net.
mail.python.org	internet address = 188.166.95.178
mail.python.org	has AAAA address 2a03:b0c0:2:d0::71:1
ns1.p11.dynect.net	internet address = 208.78.70.11
ns2.p11.dynect.net	internet address = 204.13.250.11
ns3.p11.dynect.net	internet address = 208.78.71.11
ns4.p11.dynect.net	internet address = 204.13.251.11
;; connection timed out; no servers could be reached


Exit code:  0

In [13]:
from multiprocessing import Process, Queue
import os, time, random

def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' %value)
        q.put(value)
        time.sleep(random.random())
        
def read(q):
    print('Process to read: %s' %os.getpid)
    while True:
        value = q.get(True)
        print('Get %s from queue.' %value)
        
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate()


Process to read: <built-in function getpid>
Process to write: 4503
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

In [17]:
import time, threading

def loop():
    print('Thread %s is  running...' %threading.current_thread().name)
    n = 0
    while n < 5:
        n += 1
        print('Thread %s >>> %s' %(threading.current_thread().name, n))
        time.sleep(1)
    print('Thread %s ended.' %threading.current_thread().name)
    
print('Thread %s is running.' %threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('Thread %s end.' %threading.current_thread().name)


Thread MainThread is running.
Thread LoopThread is  running...
Thread LoopThread >>> 1
Thread LoopThread >>> 2
Thread LoopThread >>> 3
Thread LoopThread >>> 4
Thread LoopThread >>> 5
Thread LoopThread ended.
Thread MainThread end.

In [37]:
import time, threading
import multiprocessing

global_num = 0

def thread_cal():
    global global_num
    for i in range(1000):
        global_num += 1

threads = []
for i in range(10):
    threads.append(threading.Thread(target=thread_cal))
    threads[i].start()

for i in range(10):
    threads[i].join()

print(global_num)
print(multiprocessing.cpu_count())


10000
8

In [39]:
import threading

local_school = threading.local()

def process_student():
    std = local_school.student
    print('Hello, %s (in %s)' %(std, threading.current_thread().name))
    
def process_thread(name):
    local_school.student = name
    process_student()

t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()


Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

In [40]:
import random, time, queue
from multiprocessing.managers import BaseManager

task_queue = queue.Queue()
result_queue = queue.Queue()

class QueueManager(BaseManager):
    pass

QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)

manager = QueueManager(address=('', 5000), authkey=b'abc')

manager.start()

task = manager.get_task_queue()
result = manager.get_result_queue()

for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' %n)
    task.put(n)

print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    
manager.shutdown()
print('master exit.')


Put task 4360...
Put task 1267...
Put task 7718...
Put task 1142...
Put task 1907...
Put task 7101...
Put task 5648...
Put task 9327...
Put task 3660...
Put task 6721...
Try get results...
---------------------------------------------------------------------------
Empty                                     Traceback (most recent call last)
<ipython-input-40-71bf602c11c7> in <module>()
     25 print('Try get results...')
     26 for i in range(10):
---> 27     r = result.get(timeout=10)
     28 
     29 manager.shutdown()

<string> in get(self, *args, **kwds)

/usr/lib/python3.5/multiprocessing/managers.py in _callmethod(self, methodname, args, kwds)
    730             dispatch(conn, None, 'decref', (token.id,))
    731             return proxy
--> 732         raise convert_to_error(kind, result)
    733 
    734     def _getvalue(self):

Empty: 

In [ ]:


In [ ]: