Python多进程multiprocessing模块全解析

Python的multiprocessing模块是突破GIL(全局解释器锁)限制、充分利用多核CPU的核心工具,不管是批量处理数据、并行执行耗时任务,还是搭建多进程服务,都离不开它。

一、核心认知:为什么用多进程?

1. 多进程vs多线程(新手必懂)

特性 多进程(multiprocessing) 多线程(threading)
GIL限制 不受限制(每个进程有独立GIL) 受限制(同一进程内线程共享GIL)
CPU密集型任务 效率高(充分利用多核) 效率低(多核无法并行)
内存占用 较高(进程独立内存空间) 较低(线程共享进程内存)
通信难度 稍高(需借助队列、管道等) 较低(直接共享变量,需加锁)

2. 适用场景

  • CPU密集型:数据计算、矩阵运算、图片处理、密码破解等;
  • 批量任务:多文件解析、多接口并发请求、多设备监控等;
  • 独立任务:任务间无依赖,可完全并行执行。

二、基础用法:创建和启动进程

1. 核心类:Process

multiprocessing.Process是创建进程的基础类,核心参数:

  • target:进程要执行的函数;
  • args:传给函数的参数(元组格式);
  • kwargs:传给函数的关键字参数(字典格式);
  • name:进程名称(可选,便于调试)。

2. 场景1:简单多进程示例

import multiprocessing
import time

# 定义进程执行的函数
def task(num):
    print(f"进程{num}开始执行,PID:{multiprocessing.current_process().pid}")
    time.sleep(2)  # 模拟耗时任务
    print(f"进程{num}执行完成")

if __name__ == "__main__":
    # 注意:Windows系统必须放在if __name__ == "__main__"下,避免递归创建进程
    start_time = time.time()
    
    # 创建2个进程
    p1 = multiprocessing.Process(target=task, args=(1,))
    p2 = multiprocessing.Process(target=task, args=(2,))
    
    # 启动进程
    p1.start()
    p2.start()
    
    # 等待进程结束(主进程阻塞)
    p1.join()
    p2.join()
    
    end_time = time.time()
    print(f"总耗时:{end_time - start_time:.2f}秒")

执行结果

进程1开始执行,PID:12345
进程2开始执行,PID:12346
进程1执行完成
进程2执行完成
总耗时:2.01秒

(单进程执行需4秒,多进程仅需2秒,充分体现并行优势)

3. 关键方法说明

  • start():启动进程(真正创建子进程,执行target函数);
  • join([timeout]):主进程等待子进程结束,timeout为可选超时时间;
  • is_alive():判断进程是否还在运行;
  • terminate():强制终止进程(慎用,可能导致资源未释放);
  • pid:获取进程ID,name:获取进程名称。

三、进程通信:实现数据共享

进程间内存相互隔离,无法直接共享变量,需通过QueuePipeManager等工具实现通信。

1. 场景1:Queue(队列)—— 安全的进程间数据传递

Queue是多进程安全的队列,支持“先进先出”,适合批量数据传递。

import multiprocessing

# 生产者函数:向队列写入数据
def producer(queue):
    for i in range(5):
        queue.put(f"数据{i}")
        print(f"生产者写入:数据{i}")
    queue.put(None)  # 写入结束标志

# 消费者函数:从队列读取数据
def consumer(queue):
    while True:
        data = queue.get()
        if data is None:  # 检测到结束标志
            break
        print(f"消费者读取:{data}")

if __name__ == "__main__":
    # 创建队列(可指定最大容量,如Queue(10))
    q = multiprocessing.Queue()
    
    # 创建生产者和消费者进程
    p_producer = multiprocessing.Process(target=producer, args=(q,))
    p_consumer = multiprocessing.Process(target=consumer, args=(q,))
    
    # 启动进程
    p_producer.start()
    p_consumer.start()
    
    # 等待进程结束
    p_producer.join()
    p_consumer.join()
    print("数据传递完成")

执行结果

生产者写入:数据0
生产者写入:数据1
消费者读取:数据0
消费者读取:数据1
生产者写入:数据2
生产者写入:数据3
消费者读取:数据2
消费者读取:数据3
生产者写入:数据4
消费者读取:数据4
数据传递完成

2. 场景2:Pipe(管道)—— 双向/单向数据通信

Pipe适合两个进程间的高效通信,支持双向(默认)或单向传输。

import multiprocessing

# 进程1:向管道写入数据,再读取回应
def process1(conn):
    conn.send("来自进程1的消息")  # 发送数据
    response = conn.recv()  # 接收回应
    print(f"进程1收到回应:{response}")
    conn.close()  # 关闭管道

# 进程2:从管道读取数据,再发送回应
def process2(conn):
    data = conn.recv()  # 接收数据
    print(f"进程2收到消息:{data}")
    conn.send("来自进程2的回应")  # 发送回应
    conn.close()

if __name__ == "__main__":
    # 创建管道(duplex=True为双向,False为单向)
    conn1, conn2 = multiprocessing.Pipe(duplex=True)
    
    # 创建进程
    p1 = multiprocessing.Process(target=process1, args=(conn1,))
    p2 = multiprocessing.Process(target=process2, args=(conn2,))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()

执行结果

进程2收到消息:来自进程1的消息
进程1收到回应:来自进程2的回应

3. 场景3:Manager — 共享变量/列表/字典

Manager支持创建跨进程的共享数据结构(如列表、字典、数值),适合需要修改共享数据的场景。

import multiprocessing

# 修改共享字典
def modify_dict(shared_dict):
    shared_dict["count"] += 1
    shared_dict["name"] = "修改后的值"

# 修改共享列表
def modify_list(shared_list):
    shared_list.append(4)
    shared_list[0] = 100

if __name__ == "__main__":
    # 创建Manager对象
    manager = multiprocessing.Manager()
    
    # 创建共享字典和列表
    shared_dict = manager.dict({"count": 0, "name": "初始值"})
    shared_list = manager.list([1, 2, 3])
    
    # 创建进程
    p1 = multiprocessing.Process(target=modify_dict, args=(shared_dict,))
    p2 = multiprocessing.Process(target=modify_list, args=(shared_list,))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
    # 打印共享数据
    print("共享字典:", shared_dict)
    print("共享列表:", shared_list)

执行结果

共享字典: {'count': 1, 'name': '修改后的值'}
共享列表: [100, 2, 3, 4]

四、进程池:批量管理进程(实用核心)

当需要创建大量进程时,手动创建Process会导致资源耗尽,Pool(进程池)可限制进程数量、复用进程,是批量任务的首选。

1. 核心方法

  • apply():同步执行(主进程等待子进程完成);
  • apply_async():异步执行(非阻塞,推荐);
  • map():批量执行,类似Python内置map,自动分配任务;
  • close():关闭进程池,不再接受新任务;
  • join():等待所有进程完成。

2. 场景1:apply_async — 异步执行批量任务

import multiprocessing
import time

# 耗时任务:计算平方
def square(num):
    time.sleep(1)  # 模拟耗时
    return num * num

if __name__ == "__main__":
    start_time = time.time()
    
    # 创建进程池(指定4个进程,适配4核CPU)
    pool = multiprocessing.Pool(processes=4)
    
    # 异步提交任务(10个任务)
    results = []
    for i in range(10):
        res = pool.apply_async(square, args=(i,))
        results.append(res)
    
    # 关闭进程池(不再接受新任务)
    pool.close()
    # 等待所有任务完成
    pool.join()
    
    # 获取结果
    final_results = [res.get() for res in results]
    print("计算结果:", final_results)
    print(f"总耗时:{time.time() - start_time:.2f}秒")

执行结果

计算结果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
总耗时:3.01秒

(10个任务,4个进程并行,每个任务1秒,总耗时≈3秒,单进程需10秒)

3. 场景2:map — 简化批量任务提交

map适合任务参数为可迭代对象的场景,代码更简洁:

import multiprocessing
import time

def square(num):
    time.sleep(1)
    return num * num

if __name__ == "__main__":
    start_time = time.time()
    
    # 创建进程池
    pool = multiprocessing.Pool(processes=4)
    
    # 批量提交任务(参数为列表)
    final_results = pool.map(square, range(10))
    
    pool.close()
    pool.join()
    
    print("计算结果:", final_results)
    print(f"总耗时:{time.time() - start_time:.2f}秒")

五、进程锁:解决资源竞争问题

当多个进程同时修改共享资源(如文件、打印输出)时,会出现“资源竞争”导致结果混乱,需用Lock加锁。

示例:多进程写入文件(加锁vs不加锁)

不加锁(结果混乱):

import multiprocessing
import time

def write_file(num):
    with open("test.txt", "a") as f:
        f.write(f"进程{num}开始写入\n")
        time.sleep(0.1)  # 模拟写入耗时
        f.write(f"进程{num}写入完成\n")

if __name__ == "__main__":
    # 清空文件
    with open("test.txt", "w") as f:
        pass
    
    # 创建5个进程
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=write_file, args=(i,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    print("写入完成")

test.txt(混乱结果)

进程0开始写入
进程1开始写入
进程0写入完成
进程2开始写入
进程1写入完成
进程3开始写入
进程2写入完成
进程4开始写入
进程3写入完成
进程4写入完成

加锁(结果有序):

import multiprocessing
import time

def write_file(num, lock):
    lock.acquire()  # 加锁
    try:
        with open("test.txt", "a") as f:
            f.write(f"进程{num}开始写入\n")
            time.sleep(0.1)
            f.write(f"进程{num}写入完成\n")
    finally:
        lock.release()  # 释放锁(必须执行,避免死锁)

if __name__ == "__main__":
    with open("test.txt", "w") as f:
        pass
    
    # 创建进程锁
    lock = multiprocessing.Lock()
    
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=write_file, args=(i, lock))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    print("写入完成")

test.txt(有序结果)

进程0开始写入
进程0写入完成
进程1开始写入
进程1写入完成
进程2开始写入
进程2写入完成
进程3开始写入
进程3写入完成
进程4开始写入
进程4写入完成

六、避坑指南:新手常见错误与解决方案

1. 坑1:Windows系统未加if __name__ == "__main__"

  • 现象:报错RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase
  • 原因:Windows系统创建进程时会重新导入脚本,导致递归创建进程;
  • 解决方案:所有创建进程的代码必须放在if __name__ == "__main__"下。

2. 坑2:进程池join()前未执行close()

  • 现象:程序卡死,无法结束;
  • 原因:join()需在close()terminate()后执行;
  • 解决方案:先pool.close(),再pool.join()

3. 坑3:共享变量未用Manager

  • 现象:子进程修改变量后,主进程看不到变化;
  • 原因:进程间内存隔离,普通变量无法共享;
  • 解决方案:用multiprocessing.Manager()创建共享数据结构。

4. 坑4:进程数量设置过大

  • 现象:程序运行变慢,甚至崩溃;
  • 原因:进程数超过CPU核心数,频繁切换进程导致开销增加;
  • 解决方案:进程池数量建议等于CPU核心数(multiprocessing.cpu_count()获取)。

5. 坑5:死锁(锁未释放)

  • 现象:程序卡死,进程无法继续执行;
  • 原因:acquire()加锁后未release()
  • 解决方案:用try...finally确保锁释放,或用with lock:自动释放:
    def write_file(num, lock):
        with lock:  # 自动加锁/释放
            with open("test.txt", "a") as f:
                f.write(f"进程{num}写入\n")
    

七、实战综合案例:多进程解析大量CSV文件

需求

批量解析100个CSV文件(data1.csv ~ data100.csv),提取每个文件的“销售额”列,计算总和,将结果汇总到result.csv。

实现代码

import multiprocessing
import csv
import os

# 解析单个CSV文件,返回销售额总和
def parse_csv(filepath):
    try:
        total = 0
        with open(filepath, "r", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for row in reader:
                total += float(row["销售额"])
        return (filepath, total, "成功")
    except Exception as e:
        return (filepath, 0, f"失败:{str(e)}")

if __name__ == "__main__":
    # 生成文件列表
    file_list = [f"data{i}.csv" for i in range(1, 101)]
    # 过滤不存在的文件
    file_list = [f for f in file_list if os.path.exists(f)]
    
    # 创建进程池(CPU核心数)
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    # 批量解析文件
    results = pool.map(parse_csv, file_list)
    pool.close()
    pool.join()
    
    # 写入汇总结果
    with open("result.csv", "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(["文件名", "销售额总和", "状态"])
        writer.writerows(results)
    
    print(f"解析完成,共处理{len(results)}个文件,结果已写入result.csv")
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容