Python 进阶之并发编程中的多线程

Two events are concurrent if neither can causally affect the other.

从编程的角度讲,某个问题是可并发的,即代表它可以被完全或部分地分解成多个组件,且这几个组件之间是顺序独立的。
换句话说,一个事件被分解成多个相互之间无依赖关系的具体步骤,这些步骤可以独立地被完成,且不管各自完成的顺序如何,都不影响最终的结果。

就像华罗庚先生在《统筹方法》中提到的例子,喝茶需要清洗茶具和烧热水,这是两个相互独立的事件。可以先清洗茶具再烧壶热水,则最终的等待时间是两者独立完成所需的时长之和。
更有效率的方法为,先执行比较耗费时间的烧热水动作,并且在等待热水烧开的同时清洗好茶具,效果类似于两个准备步骤同时执行而不是按顺序依次执行。

在程序的世界中,类似的需求同样屡见不鲜。比如用户点击链接下载一个大的文件,通常会把文件下载任务放在后台执行,使得用户可以继续浏览网页,不需要等待下载任务完成。

多线程

线程是操作系统能够进行运算调度的最小单位,程序设计者可以将其工作划分成多个可同步运行的线程以应对某些场景和需求。不过在单核系统中,多线程并不会加速程序的运行。
而多核或多处理器系统可以将不同的线程分配给多个 CPU 核心同时进行处理,因而能够带来性能上的提升(但 Python 由于 GIL 机制使得这种提升仍有一定的限制)。

在单核系统中,可以通过一种名为时间分片(timeslicing)的机制来实现多线程。即 CPU 可以在多个线程间非常迅速地进行切换,达成一种多个线程“同时”在运行的假象。这种虚拟的并行方式虽然不能带来性能上的提升,但是仍然能够非常好地应对某些特定的需求场景。如:

  • 构建响应式接口:比如将长时间运行的任务放在后台执行,使其等待的过程不会阻塞用户其他的交互动作
  • 任务的分发与委派:对于依赖于第三方资源的进程(比如需要对远程 Web 服务进行大量的请求),多线程可以起到很好的加速效果
  • 构建多用户应用:多用户状态下的多线程类似于独立执行的进程,只不过在某些层面上更加易于管理(共享内存)

示例:多线程请求 Web 数据

requests 获取天气信息

以下是一个简单的 Web 请求示例,通过 requests 模块访问中国天气网,获取部分城市的天气信息:

import requests

def get_weather(cityid):
    api_url = 'http://www.weather.com.cn/data/sk/' + cityid + '.html'
    results = requests.get(api_url)
    results.encoding = 'utf-8'
    weather_info = results.json()['weatherinfo']
    return weather_info

print(get_weather('101210101'))

# => {'city': '杭州', 'cityid': '101210101', 'temp': '24.8', 'WD': '东北风', 'WS': '小于3级', 'SD':
# '81%', 'AP': '1000.3hPa', 'njd': '暂无实况', 'WSE': '<3', 'time': '17:50', 'sm': '2.1', 'isRadar': '1', 'Radar': 'JC_RADAR_AZ9571_JB'}
获取多个城市天气
import requests
import time

def get_weather(cityid):
    api_url = 'http://www.weather.com.cn/data/sk/' + cityid + '.html'
    results = requests.get(api_url)
    results.encoding = 'utf-8'
    weather_info = results.json()['weatherinfo']
    
    print("%s (tmp/humi): %s/%s" % (
        weather_info['city'],
        weather_info['temp'], 
        weather_info['SD']
    ))

cityids = (
    '101210101', '101010100', '101090201',
    '101020100', '101280101', '101230201'
)

def main():
    for id in cityids:
        get_weather(id)

if __name__ == '__main__':
    started = time.time()
    main()
    elapsed = time.time() - started
    print("Time elapsed: {:.2f}s".format(elapsed))

# => 杭州 (tmp/humi): 24.8/81%
# => 北京 (tmp/humi): 27.9/28%
# => 保定 (tmp/humi): 27.5/43%
# => 上海 (tmp/humi): 23.5/80%
# => 广州 (tmp/humi): 26.6/83%
# => 厦门 (tmp/humi): 26.8/87%
# => Time elapsed: 0.16s
多线程获取多个城市的天气

完整代码如下,主要修改了 main 函数:

from threading import Thread
import requests
import time

def get_weather(cityid):
    api_url = 'http://www.weather.com.cn/data/sk/' + cityid + '.html'
    results = requests.get(api_url)
    results.encoding = 'utf-8'
    weather_info = results.json()['weatherinfo']
    
    print("%s (tmp/humi): %s/%s" % (
        weather_info['city'],
        weather_info['temp'], 
        weather_info['SD']
    ))

cityids = (
    '101210101', '101010100', '101090201',
    '101020100', '101280101', '101230201'
)

def main():
    threads = []
    for id in cityids:
        thread = Thread(target=get_weather, args=[id])
        thread.start()
        threads.append(thread)

    while threads:
        threads.pop().join()

if __name__ == '__main__':
    started = time.time()
    main()
    elapsed = time.time() - started
    print("Time elapsed: {:.2f}s".format(elapsed))

# => 保定 (tmp/humi): 27.5/43%
# => 广州 (tmp/humi): 26.6/83%
# => 杭州 (tmp/humi): 24.8/81%
# => 上海 (tmp/humi): 23.5/80%
# => 北京 (tmp/humi): 27.9/28%
# => 厦门 (tmp/humi): 26.8/87%
# => Time elapsed: 0.08s

与前一个版本(在单个线程中顺序地依次请求多个城市的天气信息)相比,此版本通过不同的线程同时请求多个城市的天气信息,每个线程负责一个城市。
虽然多线程在系统资源上增加了一定程度的消耗,但是相对于网络资源响应以及 IO 传输产生的延迟和等待,仍然在效率上有了一定程度的提升。

同时也可以从对输出结果的比较中看出,多线程方案获取到的天气信息并不是按顺序返回的。即表明在请求某个数据时,程序并没有等待上一个请求完全解决,而是在结果返回之前又发起了新的请求。从而在一定程度上减弱了网络延迟对整个程序的阻塞效果。

线程池

一个程序所能运行的线程数量并不是毫无限制的,很多时候需要构建一个固定大小的线程池来处理所有带并行需求的工作任务。这些并行任务可以先存储在一种名为队列(queue)的数据结构中,以先进先出(FIFO)的原则分配给线程池中固定数量的线程去处理。

from queue import Queue, Empty
from threading import Thread
import requests
import time

THREAD_POOL_SIZE = 4
cityids = (
    '101210101', '101010100', '101090201',
    '101020100', '101280101', '101230201'
)

def worker(work_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            get_weather(item)
            work_queue.task_done()

def get_weather(cityid):
    api_url = 'http://www.weather.com.cn/data/sk/' + cityid + '.html'
    results = requests.get(api_url)
    results.encoding = 'utf-8'
    weather_info = results.json()['weatherinfo']
    
    print("%s (tmp/humi): %s/%s" % (
        weather_info['city'],
        weather_info['temp'], 
        weather_info['SD']
    ))

def main():
    work_queue = Queue()

    for id in cityids:
        work_queue.put(id)

    threads = [
        Thread(target=worker, args=(work_queue,)) for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

if __name__ == '__main__':
    started = time.time()
    main()
    elapsed = time.time() - started
    print("Time elapsed: {:.2f}s".format(elapsed))

# => 保定 (tmp/humi): 27.5/43%
# => 杭州 (tmp/humi): 24.8/81%
# => 北京 (tmp/humi): 27.9/28%
# => 上海 (tmp/humi): 23.5/80%
# => 广州 (tmp/humi): 26.6/83%
# => 厦门 (tmp/humi): 26.8/87%
# => Time elapsed: 0.08s

其中 get_weather 函数可以通过 id 值获取对应城市的天气信息,所有需要请求的 id 参数保存在 main 中定义的队列 work_queue 里;
worker 函数是与线程相关联的工作代码,它可以逐个获取队列中存储的 id 参数并传递给 get_weather 函数;
main 函数中的 threads 列表则初始化了固定数量(THREAD_POOL_SIZE)的线程对象,所有的请求任务最终都由这些线程去处理。
队列中的任何一个 id 参数交由任何一个线程处理时都会立即从队列中弹出,因而保证了各线程间的相互独立(同一个任务不会被多个线程获取)。

简单来说,固定数量的线程完成了队列中大量任务的并行处理。可以适当修改线程数量以达到系统资源和执行效率的平衡。

双向队列
import time
from queue import Queue, Empty
from threading import Thread
import requests

THREAD_POOL_SIZE = 4
cityids = (
    '101210101', '101010100', '101090201',
    '101020100', '101280101', '101230201'
)

def get_weather(cityid):
    api_url = 'http://www.weather.com.cn/data/sk/' + cityid + '.html'
    results = requests.get(api_url)
    results.encoding = 'utf-8'
    weather_info = results.json()['weatherinfo']
    return weather_info

def present_result(weather_info):
    print("%s (tmp/humi): %s/%s" % (
        weather_info['city'],
        weather_info['temp'], 
        weather_info['SD']
    ))

def worker(work_queue, results_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            results_queue.put(
                get_weather(item)
            )
            work_queue.task_done()

def main():
    work_queue = Queue()
    results_queue = Queue()

    for id in cityids:
        work_queue.put(id)

    threads = [
        Thread(target=worker, args=(work_queue, results_queue)) for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

    while not results_queue.empty():
        present_result(results_queue.get())

if __name__ == '__main__':
    started = time.time()
    main()
    elapsed = time.time() - started
    print("Time elapsed: {:.2f}s".format(elapsed))

# => 杭州 (tmp/humi): 24.8/81%
# => 北京 (tmp/humi): 27.9/28%
# => 保定 (tmp/humi): 27.5/43%
# => 上海 (tmp/humi): 23.5/80%
# => 广州 (tmp/humi): 26.6/83%
# => 厦门 (tmp/humi): 26.8/87%
# => Time elapsed: 0.08s

与上一个版本的方案不同,此处的程序除了创建包含请求信息的 work_queue 队列外,还创建了一个用于保存返回结果的 results_queue 队列。
即工作线程只用于对远程 API 发起请求并获取结果,而最终结果的整理及打印输出等则交由主线程来处理。此举可以减弱某些无关的操作可能对工作线程产生的不利影响。

错误处理

为了应对请求数据的过程中可能出现的错误,可以在之前代码的基础上添加异常捕获功能。即在 worker 函数中添加 try...except 语句,当执行成功时将返回的结果保存至 results_queue 队列;如有异常发生,则将异常对象保存至 results_queue 队列。
然后在 main 函数中对 results_queue 中的内容进行判断,是直接输出结果还是抛出异常对象。
最终代码如下:

import time
from queue import Queue, Empty
from threading import Thread
import requests

THREAD_POOL_SIZE = 4
cityids = (
    '101210101', '101010100', '101090201',
    '101020100', '101280101', '101230201'
)

def get_weather(cityid):
    api_url = 'http://www.weather.com.cn/data/sk/' + cityid + '.html'
    results = requests.get(api_url)
    results.encoding = 'utf-8'
    weather_info = results.json()['weatherinfo']
    return weather_info

def present_result(weather_info):
    print("%s (tmp/humi): %s/%s" % (
        weather_info['city'],
        weather_info['temp'], 
        weather_info['SD']
    ))

def worker(work_queue, results_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            try:
                result = get_weather(item)
            except Exception as err:
                results_queue.put(err)
            else:
                results_queue.put(result)
            finally:
                work_queue.task_done()

def main():
    work_queue = Queue()
    results_queue = Queue()

    for id in cityids:
        work_queue.put(id)

    threads = [
        Thread(target=worker, args=(work_queue, results_queue)) for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

    while not results_queue.empty():
        result = results_queue.get()

        if isinstance(result, Exception):
            raise result
        present_result(result)

if __name__ == '__main__':
    started = time.time()
    main()
    elapsed = time.time() - started
    print("Time elapsed: {:.2f}s".format(elapsed))

# => 杭州 (tmp/humi): 24.8/81%
# => 北京 (tmp/humi): 27.9/28%
# => 保定 (tmp/humi): 27.5/43%
# => 上海 (tmp/humi): 23.5/80%
# => 广州 (tmp/humi): 26.6/83%
# => 厦门 (tmp/humi): 26.8/87%
# => Time elapsed: 0.07s

参考资料

Expert Python Programming - Second Edition

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容