Python多线程(threading模块)教程:从入门到实战

在Python编程中,多线程是提升程序并发执行效率的核心手段之一,尤其适用于I/O密集型任务(如网络请求、文件读写、数据库操作)。threading模块作为Python内置的多线程实现工具,无需额外安装,语法简洁且贴近原生编程习惯,是开发者处理并发任务的首选。本文从基础概念、核心用法到实战案例,系统讲解threading模块的使用方法,同时规避常见坑点,适合Python初学者和需要优化程序执行效率的开发者学习。

一、多线程基础:为什么用threading模块?

1.1 线程与进程的核心区别

  • 进程:操作系统分配资源的最小单位,每个进程有独立的内存空间,进程间通信复杂(如管道、队列),创建和销毁开销大;
  • 线程:进程内的执行单元,共享进程的内存空间,创建/切换开销远小于进程,适合轻量级并发;
  • Python的GIL限制:需注意,CPython解释器的全局解释器锁(GIL)导致同一时刻仅能有一个线程执行Python字节码,因此threading模块更适合I/O密集型任务(线程等待I/O时释放GIL,其他线程可执行),而非CPU密集型任务(CPU密集型建议用multiprocessing模块)。

1.2 threading模块的核心优势

  • 内置模块,无需额外依赖,兼容性强;
  • 提供线程创建、管理、同步的完整API;
  • 支持守护线程、线程锁、事件等高级特性,满足复杂并发场景。

二、threading模块基础用法

2.1 最简单的线程创建:Thread类

threading.Thread是创建线程的核心类,最常用的两种方式:传入函数创建、继承Thread类重写run方法。

方式1:传入函数创建线程(推荐)

import threading
import time

# 定义线程要执行的函数
def task(name, delay):
    print(f"线程{name}开始执行")
    time.sleep(delay)  # 模拟I/O操作(如网络请求、文件读写)
    print(f"线程{name}执行完成")

if __name__ == "__main__":
    # 记录程序开始时间
    start_time = time.time()
    
    # 创建线程对象
    t1 = threading.Thread(target=task, args=("t1", 2))
    t2 = threading.Thread(target=task, args=("t2", 2))
    
    # 启动线程
    t1.start()
    t2.start()
    
    # 等待所有线程执行完成
    t1.join()
    t2.join()
    
    # 计算总耗时
    end_time = time.time()
    print(f"程序总耗时:{end_time - start_time:.2f}秒")

执行结果

线程t1开始执行
线程t2开始执行
(等待2秒)
线程t1执行完成
线程t2执行完成
程序总耗时:2.01秒

关键说明

  • target:指定线程要执行的函数;
  • args:传入函数的参数(元组格式,单个参数需加逗号,如(1,));
  • start():启动线程(此时线程进入就绪状态,等待CPU调度),而非直接执行;
  • join():主线程等待子线程执行完成,若无join(),主线程会直接结束,子线程可能被强制终止。

方式2:继承Thread类创建线程

适合线程逻辑复杂、需要封装的场景:

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay
    
    # 重写run方法,线程启动后自动执行该方法
    def run(self):
        print(f"线程{self.name}开始执行")
        time.sleep(self.delay)
        print(f"线程{self.name}执行完成")

if __name__ == "__main__":
    start_time = time.time()
    
    t1 = MyThread("t1", 2)
    t2 = MyThread("t2", 2)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    print(f"程序总耗时:{time.time() - start_time:.2f}秒")

2.2 线程的常用属性和方法

方法/属性 作用
start() 启动线程,调用run()方法
join(timeout) 主线程等待子线程结束,timeout为可选超时时间(秒)
is_alive() 判断线程是否处于活动状态(已启动且未结束)
name 线程名称,可通过Thread(name="xxx")设置
daemon 是否为守护线程(布尔值),主线程结束时,守护线程会被强制终止
ident 线程ID,线程启动后有效

守护线程示例

守护线程适合做后台任务(如日志收集、监控),主线程结束后无需等待守护线程:

import threading
import time

def daemon_task():
    while True:
        print("守护线程运行中...")
        time.sleep(1)

if __name__ == "__main__":
    # 创建守护线程
    t = threading.Thread(target=daemon_task)
    t.daemon = True  # 设置为守护线程(必须在start()前设置)
    t.start()
    
    # 主线程执行3秒后结束
    time.sleep(3)
    print("主线程结束,守护线程被终止")

执行结果

守护线程运行中...
(等待1秒)
守护线程运行中...
(等待1秒)
守护线程运行中...
(等待1秒)
主线程结束,守护线程被终止

三、线程同步:解决资源竞争问题

多线程共享进程内存空间,当多个线程同时修改同一变量/资源时,会出现“资源竞争”,导致数据错误。threading模块提供了锁、信号量等工具解决该问题。

3.1 互斥锁(Lock):最常用的同步方式

Lock保证同一时刻只有一个线程能执行锁定代码块,核心方法:acquire()(加锁)、release()(解锁)。

未加锁的问题示例

import threading

# 共享变量
count = 0

def increment():
    global count
    for _ in range(100000):
        count += 1  # 非原子操作(读取→加1→写入),多线程下会出错

if __name__ == "__main__":
    t1 = threading.Thread(target=increment)
    t2 = threading.Thread(target=increment)
    
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    
    print(f"最终count值:{count}")  # 预期200000,实际远小于该值

加锁后的正确示例

import threading

count = 0
# 创建锁对象
lock = threading.Lock()

def increment():
    global count
    for _ in range(100000):
        lock.acquire()  # 加锁
        try:
            count += 1  # 临界区代码,仅一个线程能执行
        finally:
            lock.release()  # 解锁(放在finally中,确保即使出错也能解锁)

if __name__ == "__main__":
    t1 = threading.Thread(target=increment)
    t2 = threading.Thread(target=increment)
    
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    
    print(f"最终count值:{count}")  # 正确输出200000

3.2 可重入锁(RLock)

Lock是不可重入的(同一线程多次acquire()会死锁),RLock允许同一线程多次加锁,需保证加锁次数等于解锁次数:

import threading

rlock = threading.RLock()

def nested_task():
    rlock.acquire()
    print("内层加锁")
    rlock.release()
    print("内层解锁")

def main_task():
    rlock.acquire()
    print("外层加锁")
    nested_task()
    rlock.release()
    print("外层解锁")

if __name__ == "__main__":
    t = threading.Thread(target=main_task)
    t.start()
    t.join()

3.3 信号量(Semaphore):控制并发数

Semaphore允许指定数量的线程同时执行临界区代码,适合限制资源访问数(如最多3个线程同时访问数据库):

import threading
import time

# 最多允许2个线程同时执行
sem = threading.Semaphore(2)

def task(name):
    sem.acquire()
    print(f"线程{name}获取资源")
    time.sleep(2)  # 模拟资源占用
    sem.release()
    print(f"线程{name}释放资源")

if __name__ == "__main__":
    for i in range(5):
        t = threading.Thread(target=task, args=(f"t{i}",))
        t.start()

执行结果

线程t0获取资源
线程t1获取资源
(等待2秒)
线程t0释放资源
线程t1释放资源
线程t2获取资源
线程t3获取资源
(等待2秒)
线程t2释放资源
线程t3释放资源
线程t4获取资源
(等待2秒)
线程t4释放资源

四、实战案例:多线程爬取网页数据

以爬取多个网页内容为例,演示threading模块在I/O密集型任务中的应用(相比单线程,效率提升显著):

import threading
import requests
import time

# 待爬取的网页列表
urls = [
    "https://www.baidu.com",
    "https://www.taobao.com",
    "https://www.jd.com",
    "https://www.163.com",
    "https://www.sina.com.cn"
]

# 存储爬取结果
result = []
lock = threading.Lock()

def crawl_url(url):
    try:
        response = requests.get(url, timeout=5)
        # 加锁写入共享列表,避免资源竞争
        lock.acquire()
        result.append({"url": url, "status_code": response.status_code, "length": len(response.text)})
        lock.release()
        print(f"爬取完成:{url}")
    except Exception as e:
        lock.acquire()
        result.append({"url": url, "error": str(e)})
        lock.release()
        print(f"爬取失败:{url},错误:{e}")

if __name__ == "__main__":
    start_time = time.time()
    
    # 创建线程列表
    threads = []
    for url in urls:
        t = threading.Thread(target=crawl_url, args=(url,))
        threads.append(t)
        t.start()
    
    # 等待所有线程完成
    for t in threads:
        t.join()
    
    end_time = time.time()
    print(f"\n爬取完成,总耗时:{end_time - start_time:.2f}秒")
    print("爬取结果:")
    for item in result:
        print(item)

执行结果(示例)

爬取完成:https://www.baidu.com
爬取完成:https://www.taobao.com
爬取完成:https://www.jd.com
爬取完成:https://www.163.com
爬取完成:https://www.sina.com.cn

爬取完成,总耗时:0.85秒
爬取结果:
{'url': 'https://www.baidu.com', 'status_code': 200, 'length': 2443}
{'url': 'https://www.taobao.com', 'status_code': 200, 'length': 102456}
...

对比单线程:单线程爬取上述5个网页耗时约4秒,多线程仅需1秒左右,效率提升明显。

五、常见坑点与避坑指南

5.1 避免直接修改共享变量

即使加锁,频繁修改共享变量也会降低并发效率,建议通过queue模块(线程安全队列)传递数据:

import threading
import queue
import time

q = queue.Queue()

def producer():
    for i in range(5):
        q.put(i)
        print(f"生产数据:{i}")
        time.sleep(0.5)

def consumer():
    while True:
        if q.empty():
            break
        data = q.get()
        print(f"消费数据:{data}")
        q.task_done()  # 标记任务完成
        time.sleep(1)

if __name__ == "__main__":
    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    
    t1.start()
    t2.start()
    
    t1.join()
    q.join()  # 等待队列所有任务完成
    t2.join()

5.2 避免线程死锁

死锁原因:多个线程互相持有对方需要的锁。避免方法:

  1. 按固定顺序加锁;
  2. acquire()设置超时时间;
  3. 尽量使用RLockwith语句(自动解锁)。

with语句简化锁操作

LockRLockSemaphore均支持with语句,无需手动release()

lock = threading.Lock()

def safe_increment():
    global count
    for _ in range(100000):
        with lock:  # 自动加锁,代码块结束后自动解锁
            count += 1

5.3 不要用多线程处理CPU密集型任务

由于GIL限制,多线程执行CPU密集型任务(如大量计算)时,效率甚至低于单线程。此时应使用multiprocessing模块(多进程,绕过GIL)。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容