贫瘠之地

华北无浪漫,死海扬起帆
多少个夜晚,独自望着天

0%

Python 并发与异步

多进程

fork

Unix/Linux 操作系统提供了一个 fork 系统调用,它非常特殊,因为普通的函数调用,调用一次就返回一次,但是 fork 调用一次,返回两次

因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回,子进程永远返回 0,而父进程返回子进程的 ID

这样做的理由是,一个父进程可以 fork 出很多子进程,所以,父进程要记下每个子进程的 ID,而子进程只需要调用 getppid 就可以拿到父进程的 ID

Python 的 os 模块封装了常见的系统调用,其中就包括 fork,所以可以在 Python 程序中轻松创建子进程

1
2
3
4
5
6
7
8
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

由于 Windows 没有 fork 调用,上面的代码在 Windows 上无法运行;而 Mac 系统是基于 BSD(Unix的一种)内核,所以,在 Mac 下运行是没有问题的

multiprocessing

由于 Python 是跨平台的,自然也应该提供一个跨平台的多进程支持;multiprocessing 模块就是跨平台版本的多进程模块

multiprocessing 模块提供了一个 Process 类来代表一个进程对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process
import os


# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))


if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
1
2
3
4
Parent process 588.
Child process will start.
Run child process test (19096)...
Child process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个 Process 实例,用 start 方法启动,这样创建进程比 fork 还要简单

join 方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步

Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Pool
import os, time, random


def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))


if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Pool()
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
1
2
3
4
5
6
7
8
9
10
11
12
13
Parent process 37332.
Waiting for all subprocesses done...
Run task 0 (21488)...
Run task 1 (35776)...
Run task 2 (19912)...
Run task 3 (18472)...
Task 0 runs 0.37 seconds.
Run task 4 (21488)...
Task 2 runs 0.94 seconds.
Task 1 runs 1.30 seconds.
Task 4 runs 0.95 seconds.
Task 3 runs 2.75 seconds.
All subprocesses done.

Pool 对象调用 join 方法会等待所有子进程执行完毕,调用 join 之前必须先调用 close,调用 close 之后就不能继续添加新的 Process

Pool 默认进程数是 CPU 核数,也可以入参 processes 进行设置

1
2
3
4
5
6
7
if processes is None:
processes = os.cpu_count() or 1
if processes < 1:
raise ValueError("Number of processes must be at least 1")
if maxtasksperchild is not None:
if not isinstance(maxtasksperchild, int) or maxtasksperchild <= 0:
raise ValueError("maxtasksperchild must be a positive int or None")

控制子进程

创建了子进程后,有时还需要控制子进程的输入和输出

subprocess 模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出

1
2
3
4
5
import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

call 方法用运行带有参数的命令

如果子进程还需要输入,则可以通过 communicate 方法输入

1
2
3
4
5
6
7
import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

上面的代码相当于在命令行执行命令 nslookup,然后手动输入如下参数和命令

1
2
3
set q=mx
python.org
exit

进程间通信

Process 之间是需要通信的,操作系统提供了很多机制来实现进程间的通信

Python 的 multiprocessing 模块包装了底层的机制,提供了 QueuePipes 等多种方式来交换数据

Queue 为例,在父进程中创建两个子进程,一个向 Queue 里写数据,一个从 Queue 里读数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from multiprocessing import Process, Queue
import os, time, random


# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())


# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True, 1)
print('Get %s from queue.' % value)


if __name__ == '__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
1
2
3
4
5
6
7
8
9
10
11
Process to write: 26020
Put A to queue...
Process to read: 916
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Process Process-2:
...
# 这里会报错,因为 q.get(True, 1) 设置了超时时间

多线程

多任务可以由多进程完成,也可以由一个进程内的多线程完成

由于线程是操作系统直接支持的执行单元,因此高级语言通常都内置多线程的支持,Python 也不例外,并且 Python 的线程是真正的 Posix Thread,而不是模拟出来的线程

Python 的标准库提供了两个模块:_threadthreading_thread 是低级模块,threading 是高级模块,对 _thread 进行了封装

绝大多数情况下,我们只需要使用 threading 这个高级模块

Thread

启动一个线程就是把一个函数传入并创建 Thread 实例,然后调用 start 开始执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time, threading


# 新线程执行的代码:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)


print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
1
2
3
4
5
6
7
8
9
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python 的 threading 模块有个 current_thread() 函数,它永远返回当前线程的实例

主线程实例的名字叫 MainThread,子线程的名字在创建时指定,我们用 LoopThread 命名子线程,名字仅仅在打印时用来显示,没有其他作用

Lock

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响

而多线程中,所有变量都由所有线程共享(线程中数据共享)

所以在并发情况下,会出现并发安全问题导致数据错乱

下面举一个错乱的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading

total_count = 100000000


def buy():
global total_count
while True:
if total_count > 0:
total_count -= 1
else:
break


threads = []
for i in range(0, 1024):
threads.append(threading.Thread(target=buy()))
for t in threads:
t.start()
t.join()
print(total_count)

事实上因为运算太快,貌似很难复现并发问题,但是不代表不存在

想要保证原子性,就要上锁,创建一个锁就是通过 threading.Lock 来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# threading.Lock 创建锁
lock = threading.Lock()


def buy():
global total_count
while True:
# 上锁
lock.acquire()
try:
if total_count > 0:
total_count -= 1
else:
break
finally:
# 释放
lock.release()

多核 CPU

如果有两个死循环线程,在多核 CPU 中,可以监控到会占用 200% 的CPU,也就是占用两个 CPU 核心

试试用 Python 写个死循环

1
2
3
4
5
6
7
8
9
10
import threading, multiprocessing

def loop():
x = 0
while True:
x = x ^ 1

for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()

但事实上 Python 上的死循环跑不满所有的 CPU

因为 Python 的线程虽然是真正的线程,但解释器执行代码时,有一个 GIL 锁 Global Interpreter Lock,任何 Python 线程执行前,必须先获得 GIL 锁,然后,每执行 100 条字节码,解释器就自动释放 GIL 锁,让别的线程有机会执行

这个 GIL 全局锁实际上把所有线程的执行代码都给上了锁,所以多线程在 Python 中只能交替执行,即使 100 个线程跑在 100 核 CPU 上,也只能用到 1 个核

GIL 是 Python 解释器设计的历史遗留问题,通常我们用的解释器是官方实现的 CPython,要真正利用多核,除非重写一个不带 GIL 的解释器

不过也不用过于担心,Python 虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务,因为多个 Python 进程有各自独立的 GIL 锁,互不影响

ThreadLocal

Java 中也有 ThreadLocal 机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading

# 创建全局 ThreadLocal 对象
local_school = threading.local()

def process_student():
# 获取当前线程关联的 student
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
# 绑定 ThreadLocal 的 student
local_school.student = name
process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
1
2
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

全局变量 local_school 就是一个 ThreadLocal 对象,每个 Thread 对它都可以读写 student 属性,但互不影响

但每个属性如 local_school.student 都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal 内部会自动处理

分布式进程

Python multiprocessing模块中的 managers 子模块还支持把多进程分布到多台机器上

由于 managers 模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序

通过 managers 模块把 Queue 通过网络暴露出去,就可以让其他机器的进程访问 Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的 QueueManager
class QueueManager(BaseManager):
pass

# 由于这个 QueueManager 只从网络上获取 Queue,所以注册时只提供名字
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行 task_master.py 的机器
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与 task_master.py 设置的完全一致
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接
m.connect()
# 获取 Queue 的对象
task = m.get_task_queue()
result = m.get_result_queue()
# 从 task 队列取任务,并把结果写入result队列
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 处理结束:
print('worker exit.')

任务进程要通过网络连接到服务进程,所以要指定服务进程的 IP

  1. 先启动 task_master.py 服务进程

  2. task_master.py 进程发送完任务后,开始等待 result 队列的结果,现在启动 task_worker.py 进程

  3. task_worker.py 进程结束,在 task_master.py 进程中会继续打印出结果

1
2
3
4
5
6
7
8
9
10
Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

Queue 对象存储在 task_master 上,只是通过网络共享了出去(类似 RPC,对上层面屏蔽网络实现)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

┌─────────────────────────────────────────┐ ┌──────────────────────────────────────┐
│task_master.py │ │ │task_worker.py │
│ │ │ │
│ task = manager.get_task_queue() │ │ │ task = manager.get_task_queue() │
│ result = manager.get_result_queue() │ │ result = manager.get_result_queue() │
│ │ │ │ │ │ │
│ │ │ │ │ │
│ ▼ │ │ │ │ │
│ ┌─────────────────────────────────┐ │ │ │ │
│ │QueueManager │ │ │ │ │ │
│ │ ┌────────────┐ ┌──────────────┐ │ │ │ │ │
│ │ │ task_queue │ │ result_queue │ │<───┼──┼──┼──────────────┘ │
│ │ └────────────┘ └──────────────┘ │ │ │ │
│ └─────────────────────────────────┘ │ │ │ │
└─────────────────────────────────────────┘ └──────────────────────────────────────┘


Network

注意 Queue 的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小;比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由 Worker 进程再去共享的磁盘上读取文件

协程

协程,英文名 Coroutine

子程序,或者称为函数,在所有语言中都是层级调用,比如 A 调用 B,B 在执行过程中又调用了 C,C 执行完毕返回,B 执行完毕返回,最后是 A 执行完毕(函数栈)

而协程的调用和子程序不同,协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行

注意在一个子程序中中断后去执行其他子程序,不是函数调用,有点类似 CPU 的中断

1
2
3
4
5
6
7
8
9
def A():
print('1')
print('2')
print('3')

def B():
print('x')
print('y')
print('z')

假设由协程执行,在执行 A 的过程中,可以随时中断,去执行 B,B 也可能在执行过程中中断再去执行 A,结果可能是

1
2
3
4
5
6
1
2
x
y
3
z

但是在 A 中是没有调用 B 的,所以协程的调用比函数调用理解起来要难一些

那么协程和多线程有哪些区别?

  • 优势

    • 执行效率高:子程序切换不是线程切换,协程属于程序级别,调度开支小

    • 不需要多线程的锁机制:无需同步互斥操作

    • 并发高:没有 C10K 问题,IO 并发性好

  • 缺点

    • 多核 CPU:协程的本质是个单线程,不能很好地利用 CPU 的多个核心;协程需要和进程配合才能运行在多 CPU 上
    • 受线程影响:进行阻塞(Blocking)操作时会阻塞掉整个程序

利用多核 CPU,最简单的方法是多进程 + 协程,既充分利用多核,又充分发挥协程的高效率

Python 对协程的支持是通过 generator 实现的,yield 不但可以返回一个值,它还可以接收调用者发出的参数,同过 next 启动生成器,并在后续逻辑中调用 send 函数传递值

下面是一个经典的生产者消费者协程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'


def produce(c):
next(c)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()


c = consumer()
produce(c)

整个流程无锁,由一个线程执行,produceconsumer 协作完成任务,所以称为协程,而非线程的抢占式多任务

同样举一个多协程的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def average():
total = 0
count = 0
value = None

while True:
term = yield value
total += term
count += 1
value = total / count


if __name__ == "__main__":
c = average()
next(c)

for i in range(3):
print(f'录入数字 {i}')
avg = c.send(i)
print(f'已录入数字的平均值为 {avg}')
print("其他操作")
i = 10
print(f'录入数字 {i}')
avg = c.send(i)
print(f'已录入数字的平均值为 {avg}')
c.close()
1
2
3
4
5
6
7
8
9
录入数字 0
已录入数字的平均值为 0.0
录入数字 1
已录入数字的平均值为 0.5
录入数字 2
已录入数字的平均值为 1.0
其他操作
录入数字 10
已录入数字的平均值为 3.25

协程函数中的无限循环表明,只要调用方不断把值发给这个协程,它就会一直接收值,然后计算已录入的所有数字的平均值

可以看出和函数的区别在于,协程像是一个其他调用分支,将当前执行的数据现场保存了起来,如果这个操作使用函数调用来实现,需要对象来对历史数据进行存储(从这个角度来看,协程就保存现场的对象?)

参考

进程和线程 - 廖雪峰的官方网站 (liaoxuefeng.com)

「基础篇」Python 协程(一) - 知乎 (zhihu.com)