在Python编程中,多线程是提升程序并发执行效率的核心手段之一,尤其适用于I/O密集型任务(如网络请求、文件读写、数据库操作)。threading模块作为Python内置的多线程实现工具,无需额外安装,语法简洁且贴近原生编程习惯,是开发者处理并发任务的首选。本文从基础概念、核心用法到实战案例,系统讲解threading模块的使用方法,同时规避常见坑点,适合Python初学者和需要优化程序执行效率的开发者学习。
一、多线程基础:为什么用threading模块?
1.1 线程与进程的核心区别
- 进程:操作系统分配资源的最小单位,每个进程有独立的内存空间,进程间通信复杂(如管道、队列),创建和销毁开销大;
- 线程:进程内的执行单元,共享进程的内存空间,创建/切换开销远小于进程,适合轻量级并发;
-
Python的GIL限制:需注意,CPython解释器的全局解释器锁(GIL)导致同一时刻仅能有一个线程执行Python字节码,因此
threading模块更适合I/O密集型任务(线程等待I/O时释放GIL,其他线程可执行),而非CPU密集型任务(CPU密集型建议用multiprocessing模块)。
1.2 threading模块的核心优势
- 内置模块,无需额外依赖,兼容性强;
- 提供线程创建、管理、同步的完整API;
- 支持守护线程、线程锁、事件等高级特性,满足复杂并发场景。
二、threading模块基础用法
2.1 最简单的线程创建:Thread类
threading.Thread是创建线程的核心类,最常用的两种方式:传入函数创建、继承Thread类重写run方法。
方式1:传入函数创建线程(推荐)
import threading
import time
# 定义线程要执行的函数
def task(name, delay):
print(f"线程{name}开始执行")
time.sleep(delay) # 模拟I/O操作(如网络请求、文件读写)
print(f"线程{name}执行完成")
if __name__ == "__main__":
# 记录程序开始时间
start_time = time.time()
# 创建线程对象
t1 = threading.Thread(target=task, args=("t1", 2))
t2 = threading.Thread(target=task, args=("t2", 2))
# 启动线程
t1.start()
t2.start()
# 等待所有线程执行完成
t1.join()
t2.join()
# 计算总耗时
end_time = time.time()
print(f"程序总耗时:{end_time - start_time:.2f}秒")
执行结果:
线程t1开始执行
线程t2开始执行
(等待2秒)
线程t1执行完成
线程t2执行完成
程序总耗时:2.01秒
关键说明:
-
target:指定线程要执行的函数; -
args:传入函数的参数(元组格式,单个参数需加逗号,如(1,)); -
start():启动线程(此时线程进入就绪状态,等待CPU调度),而非直接执行; -
join():主线程等待子线程执行完成,若无join(),主线程会直接结束,子线程可能被强制终止。
方式2:继承Thread类创建线程
适合线程逻辑复杂、需要封装的场景:
import threading
import time
class MyThread(threading.Thread):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay
# 重写run方法,线程启动后自动执行该方法
def run(self):
print(f"线程{self.name}开始执行")
time.sleep(self.delay)
print(f"线程{self.name}执行完成")
if __name__ == "__main__":
start_time = time.time()
t1 = MyThread("t1", 2)
t2 = MyThread("t2", 2)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"程序总耗时:{time.time() - start_time:.2f}秒")
2.2 线程的常用属性和方法
| 方法/属性 | 作用 |
|---|---|
start() |
启动线程,调用run()方法 |
join(timeout) |
主线程等待子线程结束,timeout为可选超时时间(秒) |
is_alive() |
判断线程是否处于活动状态(已启动且未结束) |
name |
线程名称,可通过Thread(name="xxx")设置 |
daemon |
是否为守护线程(布尔值),主线程结束时,守护线程会被强制终止 |
ident |
线程ID,线程启动后有效 |
守护线程示例
守护线程适合做后台任务(如日志收集、监控),主线程结束后无需等待守护线程:
import threading
import time
def daemon_task():
while True:
print("守护线程运行中...")
time.sleep(1)
if __name__ == "__main__":
# 创建守护线程
t = threading.Thread(target=daemon_task)
t.daemon = True # 设置为守护线程(必须在start()前设置)
t.start()
# 主线程执行3秒后结束
time.sleep(3)
print("主线程结束,守护线程被终止")
执行结果:
守护线程运行中...
(等待1秒)
守护线程运行中...
(等待1秒)
守护线程运行中...
(等待1秒)
主线程结束,守护线程被终止
三、线程同步:解决资源竞争问题
多线程共享进程内存空间,当多个线程同时修改同一变量/资源时,会出现“资源竞争”,导致数据错误。threading模块提供了锁、信号量等工具解决该问题。
3.1 互斥锁(Lock):最常用的同步方式
Lock保证同一时刻只有一个线程能执行锁定代码块,核心方法:acquire()(加锁)、release()(解锁)。
未加锁的问题示例
import threading
# 共享变量
count = 0
def increment():
global count
for _ in range(100000):
count += 1 # 非原子操作(读取→加1→写入),多线程下会出错
if __name__ == "__main__":
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"最终count值:{count}") # 预期200000,实际远小于该值
加锁后的正确示例
import threading
count = 0
# 创建锁对象
lock = threading.Lock()
def increment():
global count
for _ in range(100000):
lock.acquire() # 加锁
try:
count += 1 # 临界区代码,仅一个线程能执行
finally:
lock.release() # 解锁(放在finally中,确保即使出错也能解锁)
if __name__ == "__main__":
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"最终count值:{count}") # 正确输出200000
3.2 可重入锁(RLock)
Lock是不可重入的(同一线程多次acquire()会死锁),RLock允许同一线程多次加锁,需保证加锁次数等于解锁次数:
import threading
rlock = threading.RLock()
def nested_task():
rlock.acquire()
print("内层加锁")
rlock.release()
print("内层解锁")
def main_task():
rlock.acquire()
print("外层加锁")
nested_task()
rlock.release()
print("外层解锁")
if __name__ == "__main__":
t = threading.Thread(target=main_task)
t.start()
t.join()
3.3 信号量(Semaphore):控制并发数
Semaphore允许指定数量的线程同时执行临界区代码,适合限制资源访问数(如最多3个线程同时访问数据库):
import threading
import time
# 最多允许2个线程同时执行
sem = threading.Semaphore(2)
def task(name):
sem.acquire()
print(f"线程{name}获取资源")
time.sleep(2) # 模拟资源占用
sem.release()
print(f"线程{name}释放资源")
if __name__ == "__main__":
for i in range(5):
t = threading.Thread(target=task, args=(f"t{i}",))
t.start()
执行结果:
线程t0获取资源
线程t1获取资源
(等待2秒)
线程t0释放资源
线程t1释放资源
线程t2获取资源
线程t3获取资源
(等待2秒)
线程t2释放资源
线程t3释放资源
线程t4获取资源
(等待2秒)
线程t4释放资源
四、实战案例:多线程爬取网页数据
以爬取多个网页内容为例,演示threading模块在I/O密集型任务中的应用(相比单线程,效率提升显著):
import threading
import requests
import time
# 待爬取的网页列表
urls = [
"https://www.baidu.com",
"https://www.taobao.com",
"https://www.jd.com",
"https://www.163.com",
"https://www.sina.com.cn"
]
# 存储爬取结果
result = []
lock = threading.Lock()
def crawl_url(url):
try:
response = requests.get(url, timeout=5)
# 加锁写入共享列表,避免资源竞争
lock.acquire()
result.append({"url": url, "status_code": response.status_code, "length": len(response.text)})
lock.release()
print(f"爬取完成:{url}")
except Exception as e:
lock.acquire()
result.append({"url": url, "error": str(e)})
lock.release()
print(f"爬取失败:{url},错误:{e}")
if __name__ == "__main__":
start_time = time.time()
# 创建线程列表
threads = []
for url in urls:
t = threading.Thread(target=crawl_url, args=(url,))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
end_time = time.time()
print(f"\n爬取完成,总耗时:{end_time - start_time:.2f}秒")
print("爬取结果:")
for item in result:
print(item)
执行结果(示例):
爬取完成:https://www.baidu.com
爬取完成:https://www.taobao.com
爬取完成:https://www.jd.com
爬取完成:https://www.163.com
爬取完成:https://www.sina.com.cn
爬取完成,总耗时:0.85秒
爬取结果:
{'url': 'https://www.baidu.com', 'status_code': 200, 'length': 2443}
{'url': 'https://www.taobao.com', 'status_code': 200, 'length': 102456}
...
对比单线程:单线程爬取上述5个网页耗时约4秒,多线程仅需1秒左右,效率提升明显。
五、常见坑点与避坑指南
5.1 避免直接修改共享变量
即使加锁,频繁修改共享变量也会降低并发效率,建议通过queue模块(线程安全队列)传递数据:
import threading
import queue
import time
q = queue.Queue()
def producer():
for i in range(5):
q.put(i)
print(f"生产数据:{i}")
time.sleep(0.5)
def consumer():
while True:
if q.empty():
break
data = q.get()
print(f"消费数据:{data}")
q.task_done() # 标记任务完成
time.sleep(1)
if __name__ == "__main__":
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
q.join() # 等待队列所有任务完成
t2.join()
5.2 避免线程死锁
死锁原因:多个线程互相持有对方需要的锁。避免方法:
- 按固定顺序加锁;
- 给
acquire()设置超时时间; - 尽量使用
RLock或with语句(自动解锁)。
with语句简化锁操作
Lock、RLock、Semaphore均支持with语句,无需手动release():
lock = threading.Lock()
def safe_increment():
global count
for _ in range(100000):
with lock: # 自动加锁,代码块结束后自动解锁
count += 1
5.3 不要用多线程处理CPU密集型任务
由于GIL限制,多线程执行CPU密集型任务(如大量计算)时,效率甚至低于单线程。此时应使用multiprocessing模块(多进程,绕过GIL)。