进程迟和线程池-回调函数

池的概念

为了实现并发,提高程序的运行效率,我们使用了多进程和多线程。但是在开启多线程和多进程的时候,由于机器本身的性能瓶颈不能无限开启,所以我引入池的概念,控制主机能够开启线程(进程)的数量,使系统不会因为开销过大而影响性能。

提交任务的两种方式:

  • 同步调用:提交任务完成后就在原地等待,等待任务执行完毕,拿到任务的返回值,才能继续下一行代码,会导致程序串行执行。
  • 异步调用+回调机制:提交任务完成后不在原地等待,并发执行,使用异步调用一般会使用到回调函数,会使用回调机制。任务一旦执行完毕,就会触发回调函数进行执行。

进程池示例

简化版本进程池示例

这里演示如何和开启进程池,进程池和线程池的调用方法一样。

# 导入线程池和进程池模块
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time, random, os


def task(n):
    print(n, '%s is running' % os.getpid())
    time.sleep(random.randint(1, 3))
    return n


if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)  # 开启进程池,进程池大小为4,允许并发执行4个进程

    for i in range(13):
        pool.submit(task, i)  # 提交任务给线程池,i为tast()中的参数
    pool.shutdown(wait=True)  # 不允许再继续提交任务,即使用submit()方法,并且等待所有的任务都执行完毕后再执行后面的代码
    print('主')

同一时间并发执行4个进程,如果进程数量达到4个,就等待池中的任务退出之后,新的进程再继续加入。

对于进程池,一般设置的并发连接数不要超过CPU核心数量的两倍,对于线程池的大小可以通过测试获取一个合理的范围。

如果在提交任务的时候使用同步调用:

pool.submit(task,i).result() 
# 等待每一次任务运行的结果,拿到结果后再进行下一步操作,相当于串行

使用回调函数

# 导入线程池和进程池模块
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time, random, os


def task(n):
    print(n, '%s is running' % os.getpid())
    time.sleep(random.randint(1, 3))
    return n


def handle(res):
    res = res.result()  # 通过对对象使用result()方法,获取结果
    print('handle res %res' % res)


if __name__ == '__main__':
    pool = ProcessPoolExecutor(2)  # 开启进程池,进程池大小为2,允许并发执行4个进程

    for i in range(6):
        obj = pool.submit(task, i)
        obj.add_done_callback(handle)  # 使用回调函数,将执行的结果对象传给handle()函数,执行此步会等待任务执行完后获取对象

    pool.shutdown(wait=True)  # 不允许再继续提交任务,即使用submit()方法,并且等待所有的任务都执行完毕后再执行后面的代码
    print('主')

使用回调函数是程序解耦合,避免了将生产数据和消费数据的功能紧耦合在一起。

阻塞和同步调用的区别

  • 阻塞: 阻塞表示线程在遇到IO操作时,会被剥夺CPU的执行权限,时程序在原地等待。
  • 同步调用: 程序在执行时,需要等待最终的结果才能进行下一步操作,在等待的过程中,可以是非阻塞的状态(计算型任务),也可能会遇到阻塞(IO操作)

线程池实现爬虫示例

这里使用requests 模块实现爬虫,相关用法:

from concurrent.futures import ThreadPoolExecutor
import requests

def get(url):
    response=requests.get(url)    # 模拟浏览器,向站点发送请求
    print(response.status_code)   # 获取状态码
    print(response.text)          # 获取网页内容

get('http://www.baidu.com')

示例代码:

from concurrent.futures import ThreadPoolExecutor
import requests
import time
from threading import current_thread

def get(url):
    print('get %s  %s' %(current_thread().getName(),url))
    response = requests.get(url)  # 模拟浏览器,向站点发送请求
    time.sleep(2)

    if response.status_code == 200:  # 获取状态码
        return {'url': url, 'content': response.text}


def parse(res):      # 对爬取的内容进行格式化
    res=res.result()
    print('parse:[%s] res:[%s]' %(res['url'], len(res['content'])))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(2)  # 如果不设定线程数量,默认是CPU核数的5倍
    urls = [
        'https://baidu.com',
        'https://baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com'
    ]

    for url in urls:
        pool.submit(get, url).add_done_callback(parse)  # 回调parse,将结果传给parse
    pool.shutdown(wait=True)
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 一. 操作系统概念 操作系统位于底层硬件与应用软件之间的一层.工作方式: 向下管理硬件,向上提供接口.操作系统进行...
    月亮是我踢弯得阅读 11,228评论 3 28
  • 必备的理论基础 1.操作系统作用: 隐藏丑陋复杂的硬件接口,提供良好的抽象接口。 管理调度进程,并将多个进程对硬件...
    drfung阅读 8,978评论 0 5
  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    胜浩_ae28阅读 10,566评论 0 23
  •   一个任务通常就是一个程序,每个运行中的程序就是一个进程。当一个程序运行时,内部可能包含了多个顺序执行流,每个顺...
    OmaiMoon阅读 5,669评论 0 12
  • 为什么使用线程池 当我们在使用线程时,如果每次需要一个线程时都去创建一个线程,这样实现起来很简单,但是会有一个问题...
    闽越布衣阅读 9,787评论 10 45

友情链接更多精彩内容