进程和线程是提高CPU利用率的两种方法。
- os.fork()
- 可以用来创建新的子进程
- 在父进程中返回一个正数进程号
- 在子进程中返回0
- windows没有fork()系统调用
- multiprocessing.Process跨平台的多进程支持
- start()
- join():等待子进程结束后再继续往下运行
- Pool进程池
- 当启动的进程的数目大于进程池中创建的进程的数目的时候,多出来的进程需要等待其他进程推出之后才能继续执行
- 子进程(subprocess)
- 启动进程,控制它的输入和输出
- 进程间通信
- _thread和threading两个模块支持。_thread时低级模块,threading是高级模块,对_thread的封装
- Lock: 多线程中的不同线程有时候会有共享的数据。对于共享数据的修改,需要按照一定的顺序。否则,读到的数据可能会不正确。
- 由于GIL的存在,python不能利用多核CPU来进行加速。此时可以使用多进程来实现多核多任务。
- 全局变量如果没有适当的加锁处理,在多线程运行中有可能会干扰
- 局部变量在进行参数传递的时候可能显得有些麻烦
- 使用线程ID作为key的全局字典dict可以避免多线程的干扰,同时参数的传递也可以变得稍微简单。但是它并不能真正的隔离数据,以为每个线程还是可以读取全局的字典,并对字典进行修改
- ThreadLocal对象可以避免全局dict的缺点
-
In [4]:
# os.fork()
import os
print('Process(%s) start...' %os.getpid())
pid = os.fork()
if(pid == 0):
print('This is child process(%s), and the parent is %s' %(os.getpid(), os.getppid()))
else:
print('This is parent process(%s), and the created child pid is %s' %(os.getpid(), pid))
In [6]:
# multiprocessing.Process
from multiprocessing import Process
import os
def run_proc(name):
print('Run child process %s(%s).' % (name, os.getpid()))
print('Parent process is %s.' %os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
In [9]:
import os, time, random
from multiprocessing import Pool
def long_time_task(name):
print('Run task %s(%s)' %(name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s(%s) runs %0.2f seconds' %(name, (end - start)))
p = Pool(5)
for i in range(9):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocessses to end...')
p.close()
p.join()
print('All subprocesses done')
In [12]:
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate(b'set q=mx\npython.org\nnexit\n')
print(out.decode('utf-8'))
print('Exit code: ', p.returncode)
In [13]:
from multiprocessing import Process, Queue
import os, time, random
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' %value)
q.put(value)
time.sleep(random.random())
def read(q):
print('Process to read: %s' %os.getpid)
while True:
value = q.get(True)
print('Get %s from queue.' %value)
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate()
In [17]:
import time, threading
def loop():
print('Thread %s is running...' %threading.current_thread().name)
n = 0
while n < 5:
n += 1
print('Thread %s >>> %s' %(threading.current_thread().name, n))
time.sleep(1)
print('Thread %s ended.' %threading.current_thread().name)
print('Thread %s is running.' %threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('Thread %s end.' %threading.current_thread().name)
In [37]:
import time, threading
import multiprocessing
global_num = 0
def thread_cal():
global global_num
for i in range(1000):
global_num += 1
threads = []
for i in range(10):
threads.append(threading.Thread(target=thread_cal))
threads[i].start()
for i in range(10):
threads[i].join()
print(global_num)
print(multiprocessing.cpu_count())
In [39]:
import threading
local_school = threading.local()
def process_student():
std = local_school.student
print('Hello, %s (in %s)' %(std, threading.current_thread().name))
def process_thread(name):
local_school.student = name
process_student()
t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
In [40]:
import random, time, queue
from multiprocessing.managers import BaseManager
task_queue = queue.Queue()
result_queue = queue.Queue()
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
manager = QueueManager(address=('', 5000), authkey=b'abc')
manager.start()
task = manager.get_task_queue()
result = manager.get_result_queue()
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' %n)
task.put(n)
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
manager.shutdown()
print('master exit.')
In [ ]:
In [ ]: