前言

这篇文章主要分享如何使用 Python 的 concurrent 库进行多进程运行以加速分析。在约一年前我在跑多个物种的 ABBA-BABA test 时遇到过分析缓慢的问题,因为涉及到的物种有九十多种,所以在经过筛选以后四物种组合的数量依然高达数百万,但由于那个分析并不是很要紧所以也就没有多管(最后跑了一个多月)。

现在的状况与那时有些许不同,最近跑的分析大多涉及到大量计算,由于比较急迫地想看到最终结果,我不得不学习多进程方法来进行加速,很多时候进步都需要一些些小小的契机,now it comes。


由于 Python 并不是从底层框架开始让人写代码,所以它在拥有友好入门门槛的同时也牺牲了一定的性能,特别是在处理计算密集型任务时,Python 比起 C 来说就显得有些捉襟见肘。

当然我肯定是不愿意写 C 的代码的,一是因为我只学了一些皮毛,二是因为相比之下 Python 实在太方便,俗话说性能不够进程来凑,既然我跑的不够快,那我就多跑一些,以量胜质。

在使用多进程前,建议先检查一下自己的代码是否可以进行优化,有时候换成合适的数据操作方式就可以让分析的速度有质的飞跃,使用多进程应该视为后备选择。

本文使用的为 Python 的 concurrent 库,官方网址:https://docs.python.org/zh-cn/3/library/concurrent.html

本文使用多进程,适用于计算密集型任务。如果是 I/O 密集型任务建议使用多线程。

俗话说实践出真知,这里以一个具体的任务作为例子来凸显多进程在处理计算密集型任务方面的优势。

实战

寻找一定范围内所有质数

以下代码由 ChatGPT4.0 生成并由博主进行了补充矫正。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import concurrent.futures
import math
import time


def is_prime(n):
"""判断一个数是否是质数"""
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True

def compute_primes(n_start, n_end):
"""在指定范围内找到所有的质数"""
primes = [n for n in range(n_start, n_end) if is_prime(n)]
return primes

# executor.map 不支持直接将多个参数应用于一个函数,因此这里需要一个包装函数
def compute_primes_range(args):
return compute_primes(*args)

def multi_process_compute(n_start, n_end):
"""使用多进程在指定范围内计算质数"""
range_size = (n_end - n_start) // 4
ranges = [(n_start + i*range_size, n_start + (i+1)*range_size) for i in range(4)]

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(compute_primes_range, ranges)

primes = []
for result in results:
primes.extend(result)

return primes

if __name__ == "__main__":
n_start, n_end = 10000000, 10100000

start = time.time()
primes = multi_process_compute(n_start, n_end)
mid = time.time()
print(f"Multi-process duration: {mid - start}")

primes_single = compute_primes(n_start, n_end)
end = time.time()
print(f"Single-threaded duration: {end - mid}")

assert primes == primes_single # 确保多进程和单线程计算的结果相同
1
2
Multi-process duration: 0.3506944179534912
Single-threaded duration: 0.7943074703216553

能看到使用四个核计算时(max_workers=4)所需时间为单线程的一半左右。由于多进程本身就具有一定的性能开销(进程创建的开销、进程间通信的延迟,以及数据分割和再整合的额外成本等),因此在这种本身不需要太久时间的情况下使用多进程带来的收益并不明显,对于一些简单的计算问题而言,多进程所需要的时间甚至会多于单线程,因此多进程的使用需要考虑到具体情况。

以下是关于上述代码中重要的部分解析:

  • with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    • 该部分相当于创造了一个进程池,允许并行执行四个进程(max_workers),进程池中的任务在 with 语句中完成,确保执行任务后正确关闭进程池。相应地,如果想要的是多线程则使用 ThreadPoolExecutor 进行初始化。
  • results = executor.map(compute_primes_range, ranges)
    • 该步骤中 executor.mapcompute_primes_range 并行映射到分割的 ranges 子列表上,每个进程处理一部分并在所有运行结束后汇总结果并关闭进程池。
    • 此处的 executor.map 是 “阻塞” 且 “顺序” 的,它返回的结果列表会按照输入序列的顺序进行排列,即使某个任务早于其他任务完成,其也会等待所有的任务都完成后按照提交顺序统一返回结果。

以上设计适用于需要结果记录保留原有 “序列性” 的情况,但它不能立即获得结果,因此下面再介绍一种不需要按照提交顺序且能在任务完成后直接输出结果的方法。

不要求返回结果有序

这个问题同上,不过将换一种方式处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import concurrent.futures
import math
import time
import random

def is_prime(n):
"""判断一个数是否是质数"""
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True

def compute_primes(num):
"""在指定范围内找到所有的质数"""
time.sleep(random.uniform(0.005, 0.01))
attri = '质数' if is_prime(num) else '非质数'
return attri

def multi_process_compute(n_start, n_end):
"""使用多进程在指定范围内计算质数"""
lst = {}

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

future_to_num = {executor.submit(compute_primes, num): num for num in range(n_start, n_end)}

for future in concurrent.futures.as_completed(future_to_num):

num = future_to_num[future]
res = future.result()
lst[num] = res

return lst

if __name__ == "__main__":
n_start, n_end = 10000000, 10000500
st_prime = {}

start = time.time()
primes = multi_process_compute(n_start, n_end)
mid = time.time()
print(f"Multi-process duration: {mid - start}")

for num in range(n_start, n_end):
time.sleep(random.uniform(0.005, 0.01))
st_prime[num] = '质数' if is_prime(num) else '非质数'

end = time.time()
print(f"Single-threaded duration: {end - mid}")
print(list(primes) == list(st_prime))
1
2
3
Multi-process duration: 1.1387457847595215
Single-threaded duration: 4.003631353378296
False

由于对单个数字进行质数判断并不是计算密集型,直接使用多进程带来的开销反而更大,因此为了凸显多进程的优势我对每一个循环引入了 time.sleep(random.uniform(0.005, 0.01)) 作为平衡,引入后就相当于对每一个数进行计算都需要花费至少 0.005s 的时间。

以下是关于上述代码中重要的部分解析:

  • future_to_num = {executor.submit(compute_primes, num): num for num in range(n_start, n_end)}

    • executor.submitcompute_primes 映射到 num 上提交给进程池,并返回一个 Future 对象,该对象代表异步执行的操作,这里将每一个 Future 都对应到其提交的数字 num 上方便后续调取。
  • for future in concurrent.futures.as_completed(future_to_num):

    • as_completed() 在这里接受了一个 Future 对象的集合,一旦有 Future 对象完成,它就产生该 Future 并等待下一个完成(直到所有都完成)。这里的描述可能会有些模糊,可以自行查阅 Python yield 的相关知识。

    • 当有 Future 完成,则通过后续代码得到对应数字及结果(future.result()),同时更新到字典 lst(虽然 lst 是列表的意思但不用在意那么多细节)

最后结果表明,多进程花费的时间约为单线程的四分之一,可以看出来当涉及到的计算非常密集需要大量时间时,多进程带来的加速会更加明显。

需要注意的是,这种做法并没有考虑原有数字的序列性,当有一个 Future 完成时它就会即刻将结果保存到 lst 中,因此最后得到的字典顺序与直接使用 For 循环得到的并不相同,但相应地,性能方面也会有更高的提升(因为它不需要等待所有结果,而是可以一边得到结果一边对结果进行处理)。

延申 —— 异常处理

当然,有时我们不能确保所有传入给进程池中的任务都不发生错误,因此我们需要将这些错误记录下来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import concurrent.futures
import math
import time
import random

def is_prime(n):
"""判断一个数是否是质数"""
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True

def compute_primes(num):
"""在指定范围内找到所有的质数"""
time.sleep(random.uniform(0.005, 0.01))
attri = '质数' if is_prime(num) else '非质数'
return attri

def multi_process_compute(n_start, n_end):
"""使用多进程在指定范围内计算质数"""
lst = {}

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

future_to_num = {executor.submit(compute_primes, num): num for num in range(n_start, n_end)}

for future in concurrent.futures.as_completed(future_to_num):

num = future_to_num[future]
try:
res = future.result()
lst[num] = res
except Exception as e:
print(f'{num} have error: {e}')

return lst

if __name__ == "__main__":
n_start, n_end = -1, 10
primes = multi_process_compute(n_start, n_end)
print(primes)
1
2
-1 have error: math domain error
{0: '非质数', 1: '质数', 3: '质数', 2: '非质数', 6: '非质数', 5: '质数', 7: '质数', 4: '非质数', 9: '非质数', 8: '非质数'}

这里可以看到对于 -1 来说任务发生了错误,而该值也并未被记录进字典中,因此合适的异常处理也是很重要的,它可以帮助记录下那些出错的任务同时保证其他正常任务能够继续运行。

当然这里的 0 和 1 也算是异常情况。

看完了以上内容后再让我们来思考以下问题吧:

  1. 什么情况下使用多进程会缩短分析时间?什么情况下反而会延长分析时间?是否能用代码复现出后一种情况?
  2. 在使用 time.sleep 时,使用多进程更快还是使用多线程更快?为什么?
  3. 如果小明想要使用爬虫爬取多个页面的内容,他应该使用多进程还是多线程?为什么?