Python的multiprocessing模块是突破GIL(全局解释器锁)限制、充分利用多核CPU的核心工具,不管是批量处理数据、并行执行耗时任务,还是搭建多进程服务,都离不开它。
一、核心认知:为什么用多进程?
1. 多进程vs多线程(新手必懂)
| 特性 | 多进程(multiprocessing) | 多线程(threading) |
|---|---|---|
| GIL限制 | 不受限制(每个进程有独立GIL) | 受限制(同一进程内线程共享GIL) |
| CPU密集型任务 | 效率高(充分利用多核) | 效率低(多核无法并行) |
| 内存占用 | 较高(进程独立内存空间) | 较低(线程共享进程内存) |
| 通信难度 | 稍高(需借助队列、管道等) | 较低(直接共享变量,需加锁) |
2. 适用场景
- CPU密集型:数据计算、矩阵运算、图片处理、密码破解等;
- 批量任务:多文件解析、多接口并发请求、多设备监控等;
- 独立任务:任务间无依赖,可完全并行执行。
二、基础用法:创建和启动进程
1. 核心类:Process
multiprocessing.Process是创建进程的基础类,核心参数:
-
target:进程要执行的函数; -
args:传给函数的参数(元组格式); -
kwargs:传给函数的关键字参数(字典格式); -
name:进程名称(可选,便于调试)。
2. 场景1:简单多进程示例
import multiprocessing
import time
# 定义进程执行的函数
def task(num):
print(f"进程{num}开始执行,PID:{multiprocessing.current_process().pid}")
time.sleep(2) # 模拟耗时任务
print(f"进程{num}执行完成")
if __name__ == "__main__":
# 注意:Windows系统必须放在if __name__ == "__main__"下,避免递归创建进程
start_time = time.time()
# 创建2个进程
p1 = multiprocessing.Process(target=task, args=(1,))
p2 = multiprocessing.Process(target=task, args=(2,))
# 启动进程
p1.start()
p2.start()
# 等待进程结束(主进程阻塞)
p1.join()
p2.join()
end_time = time.time()
print(f"总耗时:{end_time - start_time:.2f}秒")
执行结果:
进程1开始执行,PID:12345
进程2开始执行,PID:12346
进程1执行完成
进程2执行完成
总耗时:2.01秒
(单进程执行需4秒,多进程仅需2秒,充分体现并行优势)
3. 关键方法说明
-
start():启动进程(真正创建子进程,执行target函数); -
join([timeout]):主进程等待子进程结束,timeout为可选超时时间; -
is_alive():判断进程是否还在运行; -
terminate():强制终止进程(慎用,可能导致资源未释放); -
pid:获取进程ID,name:获取进程名称。
三、进程通信:实现数据共享
进程间内存相互隔离,无法直接共享变量,需通过Queue、Pipe、Manager等工具实现通信。
1. 场景1:Queue(队列)—— 安全的进程间数据传递
Queue是多进程安全的队列,支持“先进先出”,适合批量数据传递。
import multiprocessing
# 生产者函数:向队列写入数据
def producer(queue):
for i in range(5):
queue.put(f"数据{i}")
print(f"生产者写入:数据{i}")
queue.put(None) # 写入结束标志
# 消费者函数:从队列读取数据
def consumer(queue):
while True:
data = queue.get()
if data is None: # 检测到结束标志
break
print(f"消费者读取:{data}")
if __name__ == "__main__":
# 创建队列(可指定最大容量,如Queue(10))
q = multiprocessing.Queue()
# 创建生产者和消费者进程
p_producer = multiprocessing.Process(target=producer, args=(q,))
p_consumer = multiprocessing.Process(target=consumer, args=(q,))
# 启动进程
p_producer.start()
p_consumer.start()
# 等待进程结束
p_producer.join()
p_consumer.join()
print("数据传递完成")
执行结果:
生产者写入:数据0
生产者写入:数据1
消费者读取:数据0
消费者读取:数据1
生产者写入:数据2
生产者写入:数据3
消费者读取:数据2
消费者读取:数据3
生产者写入:数据4
消费者读取:数据4
数据传递完成
2. 场景2:Pipe(管道)—— 双向/单向数据通信
Pipe适合两个进程间的高效通信,支持双向(默认)或单向传输。
import multiprocessing
# 进程1:向管道写入数据,再读取回应
def process1(conn):
conn.send("来自进程1的消息") # 发送数据
response = conn.recv() # 接收回应
print(f"进程1收到回应:{response}")
conn.close() # 关闭管道
# 进程2:从管道读取数据,再发送回应
def process2(conn):
data = conn.recv() # 接收数据
print(f"进程2收到消息:{data}")
conn.send("来自进程2的回应") # 发送回应
conn.close()
if __name__ == "__main__":
# 创建管道(duplex=True为双向,False为单向)
conn1, conn2 = multiprocessing.Pipe(duplex=True)
# 创建进程
p1 = multiprocessing.Process(target=process1, args=(conn1,))
p2 = multiprocessing.Process(target=process2, args=(conn2,))
p1.start()
p2.start()
p1.join()
p2.join()
执行结果:
进程2收到消息:来自进程1的消息
进程1收到回应:来自进程2的回应
3. 场景3:Manager — 共享变量/列表/字典
Manager支持创建跨进程的共享数据结构(如列表、字典、数值),适合需要修改共享数据的场景。
import multiprocessing
# 修改共享字典
def modify_dict(shared_dict):
shared_dict["count"] += 1
shared_dict["name"] = "修改后的值"
# 修改共享列表
def modify_list(shared_list):
shared_list.append(4)
shared_list[0] = 100
if __name__ == "__main__":
# 创建Manager对象
manager = multiprocessing.Manager()
# 创建共享字典和列表
shared_dict = manager.dict({"count": 0, "name": "初始值"})
shared_list = manager.list([1, 2, 3])
# 创建进程
p1 = multiprocessing.Process(target=modify_dict, args=(shared_dict,))
p2 = multiprocessing.Process(target=modify_list, args=(shared_list,))
p1.start()
p2.start()
p1.join()
p2.join()
# 打印共享数据
print("共享字典:", shared_dict)
print("共享列表:", shared_list)
执行结果:
共享字典: {'count': 1, 'name': '修改后的值'}
共享列表: [100, 2, 3, 4]
四、进程池:批量管理进程(实用核心)
当需要创建大量进程时,手动创建Process会导致资源耗尽,Pool(进程池)可限制进程数量、复用进程,是批量任务的首选。
1. 核心方法
-
apply():同步执行(主进程等待子进程完成); -
apply_async():异步执行(非阻塞,推荐); -
map():批量执行,类似Python内置map,自动分配任务; -
close():关闭进程池,不再接受新任务; -
join():等待所有进程完成。
2. 场景1:apply_async — 异步执行批量任务
import multiprocessing
import time
# 耗时任务:计算平方
def square(num):
time.sleep(1) # 模拟耗时
return num * num
if __name__ == "__main__":
start_time = time.time()
# 创建进程池(指定4个进程,适配4核CPU)
pool = multiprocessing.Pool(processes=4)
# 异步提交任务(10个任务)
results = []
for i in range(10):
res = pool.apply_async(square, args=(i,))
results.append(res)
# 关闭进程池(不再接受新任务)
pool.close()
# 等待所有任务完成
pool.join()
# 获取结果
final_results = [res.get() for res in results]
print("计算结果:", final_results)
print(f"总耗时:{time.time() - start_time:.2f}秒")
执行结果:
计算结果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
总耗时:3.01秒
(10个任务,4个进程并行,每个任务1秒,总耗时≈3秒,单进程需10秒)
3. 场景2:map — 简化批量任务提交
map适合任务参数为可迭代对象的场景,代码更简洁:
import multiprocessing
import time
def square(num):
time.sleep(1)
return num * num
if __name__ == "__main__":
start_time = time.time()
# 创建进程池
pool = multiprocessing.Pool(processes=4)
# 批量提交任务(参数为列表)
final_results = pool.map(square, range(10))
pool.close()
pool.join()
print("计算结果:", final_results)
print(f"总耗时:{time.time() - start_time:.2f}秒")
五、进程锁:解决资源竞争问题
当多个进程同时修改共享资源(如文件、打印输出)时,会出现“资源竞争”导致结果混乱,需用Lock加锁。
示例:多进程写入文件(加锁vs不加锁)
不加锁(结果混乱):
import multiprocessing
import time
def write_file(num):
with open("test.txt", "a") as f:
f.write(f"进程{num}开始写入\n")
time.sleep(0.1) # 模拟写入耗时
f.write(f"进程{num}写入完成\n")
if __name__ == "__main__":
# 清空文件
with open("test.txt", "w") as f:
pass
# 创建5个进程
processes = []
for i in range(5):
p = multiprocessing.Process(target=write_file, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
print("写入完成")
test.txt(混乱结果):
进程0开始写入
进程1开始写入
进程0写入完成
进程2开始写入
进程1写入完成
进程3开始写入
进程2写入完成
进程4开始写入
进程3写入完成
进程4写入完成
加锁(结果有序):
import multiprocessing
import time
def write_file(num, lock):
lock.acquire() # 加锁
try:
with open("test.txt", "a") as f:
f.write(f"进程{num}开始写入\n")
time.sleep(0.1)
f.write(f"进程{num}写入完成\n")
finally:
lock.release() # 释放锁(必须执行,避免死锁)
if __name__ == "__main__":
with open("test.txt", "w") as f:
pass
# 创建进程锁
lock = multiprocessing.Lock()
processes = []
for i in range(5):
p = multiprocessing.Process(target=write_file, args=(i, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print("写入完成")
test.txt(有序结果):
进程0开始写入
进程0写入完成
进程1开始写入
进程1写入完成
进程2开始写入
进程2写入完成
进程3开始写入
进程3写入完成
进程4开始写入
进程4写入完成
六、避坑指南:新手常见错误与解决方案
1. 坑1:Windows系统未加if __name__ == "__main__"
- 现象:报错
RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase; - 原因:Windows系统创建进程时会重新导入脚本,导致递归创建进程;
- 解决方案:所有创建进程的代码必须放在
if __name__ == "__main__"下。
2. 坑2:进程池join()前未执行close()
- 现象:程序卡死,无法结束;
- 原因:
join()需在close()或terminate()后执行; - 解决方案:先
pool.close(),再pool.join()。
3. 坑3:共享变量未用Manager
- 现象:子进程修改变量后,主进程看不到变化;
- 原因:进程间内存隔离,普通变量无法共享;
- 解决方案:用
multiprocessing.Manager()创建共享数据结构。
4. 坑4:进程数量设置过大
- 现象:程序运行变慢,甚至崩溃;
- 原因:进程数超过CPU核心数,频繁切换进程导致开销增加;
- 解决方案:进程池数量建议等于CPU核心数(
multiprocessing.cpu_count()获取)。
5. 坑5:死锁(锁未释放)
- 现象:程序卡死,进程无法继续执行;
- 原因:
acquire()加锁后未release(); - 解决方案:用
try...finally确保锁释放,或用with lock:自动释放:def write_file(num, lock): with lock: # 自动加锁/释放 with open("test.txt", "a") as f: f.write(f"进程{num}写入\n")
七、实战综合案例:多进程解析大量CSV文件
需求
批量解析100个CSV文件(data1.csv ~ data100.csv),提取每个文件的“销售额”列,计算总和,将结果汇总到result.csv。
实现代码
import multiprocessing
import csv
import os
# 解析单个CSV文件,返回销售额总和
def parse_csv(filepath):
try:
total = 0
with open(filepath, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
total += float(row["销售额"])
return (filepath, total, "成功")
except Exception as e:
return (filepath, 0, f"失败:{str(e)}")
if __name__ == "__main__":
# 生成文件列表
file_list = [f"data{i}.csv" for i in range(1, 101)]
# 过滤不存在的文件
file_list = [f for f in file_list if os.path.exists(f)]
# 创建进程池(CPU核心数)
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
# 批量解析文件
results = pool.map(parse_csv, file_list)
pool.close()
pool.join()
# 写入汇总结果
with open("result.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["文件名", "销售额总和", "状态"])
writer.writerows(results)
print(f"解析完成,共处理{len(results)}个文件,结果已写入result.csv")