Python爬虫实战以及多进程/多线程/多协程的效率问题

需求:抓取网站https://www.edge.org/library所有的书名、作者以及作者的维基百科简介

工具:PyCharm
第三方库:requests、BeautifulSoup

import requests
from bs4 import BeautifulSoup

爬虫有两种方式,一种是模拟请求获取返回数据即可,另一种需要解析HTML。本次爬虫方式就是后者。
BeautifulSoup是解析HTML的库。

初次写爬虫,一开始傻不拉几的用同步方式,依次获取所有的页面的数据,总共耗时2个多小时,令人发指。后来自己摸索,分别采用多进程/多线程/多协程的方式,效率惊人。

说到进程池,Python提供了两种创建方式,直接上代码:

import re
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing.pool import Pool

import requests
import sys
from bs4 import BeautifulSoup

from book.ExcelUtils import ExcelUtils

headers = {
    'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'}


# 获取书名、作者、作者wiki简介
def get_book_info(url):
    book_dict = {'name': [], 'author': [], 'info': []}
    res = requests.get(url, headers=headers)
    if res.status_code == 200:
        find_book_name_list = re.findall('<div class="views-field views-field-title">.*?<a href=.*?>(.*?)</a>',
                                         res.text)
        book_dict['name'] = find_book_name_list

        find_author_list = re.findall(
            '<span class="views-field views-field-field-edge-author">.*?<a href=".*?">(.*?)</a>', res.text)
        book_dict['author'] = find_author_list

        # 获取作者wiki简介
        get_wiki_author_info(find_author_list, book_dict)

    else:
        print('get_book_info is failed, url:[%s]' % url)

    return book_dict


# 根据作者名字获取作者wiki简介
def get_wiki_author_info(find_author_list, book_dict):
    for name in find_author_list:
        url = 'https://en.wikipedia.org/wiki/%s' % name.replace(' ', '_')
        try:
            res = requests.get(url, headers=headers)
            if res.status_code == 200:
                soup = BeautifulSoup(res.text, "html.parser")
                soup_select = soup.select('#mw-content-text p')
                if str.strip(soup_select[0].get_text()) != '':
                    if soup_select[0].get_text().find('may refer to') != -1:
                        # 存在多种选项
                        name_list = soup.select('#mw-content-text li a')
                        for index, item in enumerate(name_list):
                            if item.get_text().find(name) != -1:
                                # 递归,默认只取第一个匹配的
                                get_wiki_author_info([item.get_text()], book_dict)
                                break
                    else:
                        book_dict['info'].append(soup_select[0].get_text())
                else:
                    book_dict['info'].append(soup_select[1].get_text())
            else:
                book_dict['info'].append('get failed, url:[%s]' % url)
                print('get_wiki_author_info is failed, url:[%s]' % url)

        except:
            book_dict['info'].append('get exception, url:[%s]' % url)

# 进程池的一种创建方式
def pool_test(url_list):
    book_list = []
    # 创建进程池
    pool = Pool(20)
    start = time.time()
    for url in url_list:
        time.sleep(0.5)
        result = pool.apply_async(get_book_info, args=(url,))
        book_list.append(result)

    # 关闭进程池,不再接受新的进程,依旧处理未处理完的任务
    pool.close()
    # 主进程等待所有子进程执行完毕,必须在close或terminate之后
    pool.join()
    print('time: ', time.time() - start)

    book_name_list = []
    author_list = []
    author_info_list = []
    print('book_list: ', len(book_list))
    for v in book_list:
        book_name_list.extend(v.get()['name'])
        author_list.extend(v.get()['author'])
        author_info_list.extend(v.get()['info'])

    ExcelUtils.write_data_to_excel('bookInfo', book_name_list, author_list, author_info_list)

# 线程池的创建方式
def thread_pool_test(url_list):
    book_list = []
    # 创建线程池
    pool = ThreadPoolExecutor(max_workers=20)
    start = time.time()
    for url in url_list:
        time.sleep(0.5)
        result = pool.submit(get_book_info, url)
        book_list.append(result)

    pool.shutdown()
    print('time: ', time.time() - start)

    book_name_list = []
    author_list = []
    author_info_list = []
    print('book_list: ', len(book_list))
    for future in book_list:
        book_name_list.extend(future.result()['name'])
        author_list.extend(future.result()['author'])
        author_info_list.extend(future.result()['info'])

    ExcelUtils.write_data_to_excel('bookInfo', book_name_list, author_list, author_info_list)

# 进程池的另外一种创建方式,跟线程池的创建方式一样。其方法等也相同。
def process_pool_test(url_list):
    book_list = []
    # 创建进程池
    pool = ProcessPoolExecutor(max_workers=20)
    start = time.time()
    for url in url_list:
        time.sleep(0.5)
        result = pool.submit(get_book_info, url)
        book_list.append(result)

    pool.shutdown()
    print('time: ', time.time() - start)

    book_name_list = []
    author_list = []
    author_info_list = []
    print('book_list: ', len(book_list))
    for future in book_list:
        book_name_list.extend(future.result()['name'])
        author_list.extend(future.result()['author'])
        author_info_list.extend(future.result()['info'])

    ExcelUtils.write_data_to_excel('bookInfo', book_name_list, author_list, author_info_list)


if __name__ == '__main__':
    sys.setrecursionlimit(10000)
    url_list = ['https://www.edge.org/library']
    for i in range(1, 52):
        url_list.append('https://www.edge.org/library?page=%s' % i)

    thread_pool_test(url_list)


Python协程爬虫,代码相对于进程线程,稍微麻烦了一点,如下:
import asyncio
import aiohttp
import re
import time

import sys
from bs4 import BeautifulSoup

from spider.ExcelUtils import ExcelUtils

headers = {
    'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'}


# 获取书名、作者
async def get_book_info(url, semaphore):
    book_dict = {'name': [], 'author': []}
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url, headers=headers) as res:
                if res.status == 200:
                    text = await res.text()
                    find_book_name_list = re.findall(
                        '<div class="views-field views-field-title">.*?<a href=.*?>(.*?)</a>', text)
                    book_dict['name'] = find_book_name_list

                    find_author_list = re.findall(
                        '<span class="views-field views-field-field-edge-author">.*?<a href=".*?">(.*?)</a>', text)
                    book_dict['author'] = find_author_list

                else:
                    print('get_book_info is failed, url:[%s]' % url)

    return book_dict


# 根据作者名字获取作者wiki简介
async def get_wiki_author_info(name, semaphore):
    url = 'https://en.wikipedia.org/wiki/%s' % name.replace(' ', '_')
    try:
        async with semaphore:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, headers=headers) as res:
                    if res.status == 200:
                        text = await res.text()
                        soup = BeautifulSoup(text, "html.parser")
                        soup_select = soup.select('#mw-content-text p')
                        if str.strip(soup_select[0].get_text()) != '':
                            if soup_select[0].get_text().find('may refer to') != -1:
                                # 存在多种选项
                                name_list = soup.select('#mw-content-text li a')
                                for index, item in enumerate(name_list):
                                    if item.get_text().find(name) != -1:
                                        # 递归,默认只取第一个匹配的
                                        await get_wiki_author_info([item.get_text(), semaphore])
                                        break
                            else:
                                return soup_select[0].get_text()
                        else:
                            return soup_select[1].get_text()
                    else:
                        print('get_wiki_author_info is failed, url:[%s]' % url)
                        return 'get failed, url:[%s]' % url

    except:
        return 'get exception, url:[%s]' % url


if __name__ == '__main__':
    sys.setrecursionlimit(10000)
    url_list = ['https://www.edge.org/library']
    for i in range(1, 52):
        url_list.append('https://www.edge.org/library?page=%s' % i)

    # 信号量,限制并发数,类比20个线程
    semaphore = asyncio.Semaphore(20)
    loop = asyncio.get_event_loop()
    tasks = [asyncio.ensure_future(get_book_info(url, semaphore)) for url in url_list]

    start = time.time()
    loop.run_until_complete(asyncio.wait(tasks))

    book_name_list = []
    author_list = []
    print('book_list: ', len(tasks))
    for future in tasks:
        book_name_list.extend(future.result()['name'])
        author_list.extend(future.result()['author'])
    # 获取作者wiki简介
    wiki_tasks = [asyncio.ensure_future(get_wiki_author_info(name, semaphore)) for name in author_list]
    loop.run_until_complete(asyncio.wait(wiki_tasks))
    author_info_list = [future.result() for future in wiki_tasks]
    print('time: ', time.time() - start)

    ExcelUtils.write_data_to_excel('bookInfo', book_name_list, author_list, author_info_list)

ExcelUtils工具类的代码如下:

import xlwt


class ExcelUtils:

    @staticmethod
    def def_style():
        style = xlwt.XFStyle()
        alignment = xlwt.Alignment()
        alignment.horz = xlwt.Alignment.HORZ_CENTER  # 水平居中
        alignment.vert = xlwt.Alignment.VERT_CENTER  # 垂直居中
        style.alignment = alignment

        return style

    @staticmethod
    def write_data_to_excel(excel_name, book_name_list, author_list, author_info_list):
        # 实例化一个Workbook()对象(即excel文件)
        wbk = xlwt.Workbook()
        # 新建一个名为Sheet1的excel sheet。此处的cell_overwrite_ok =True是为了能对同一个单元格重复操作。
        sheet = wbk.add_sheet('Sheet1', cell_overwrite_ok=True)
        col_1 = sheet.col(0)
        col_2 = sheet.col(1)
        col_3 = sheet.col(2)
        col_1.width = 256 * 45
        col_2.width = 256 * 30
        col_3.width = 256 * 200

        sheet.write(0, 0, '书名', ExcelUtils.def_style())
        sheet.write(0, 1, '作者', ExcelUtils.def_style())
        sheet.write(0, 2, '作者简介', ExcelUtils.def_style())

        for i in range(len(author_info_list)):
            sheet.write(i + 1, 0, book_name_list[i])
            sheet.write(i + 1, 1, author_list[i], ExcelUtils.def_style())
            sheet.write(i + 1, 2, author_info_list[i])

        wbk.save(excel_name + '.xls')

四种方式每次的实验时间都不同,这里只取其中一次的执行时间,如下:

多进程 Pool(20)
time: 215.41232109069824秒
数据总数:1421

多进程 ProcessPoolExecutor(max_workers=20)
time:  202.88900017738342秒
数据总数:1481


多线程:ThreadPoolExecutor(max_workers=20)
time: 198.67899990081787秒
数据总数:1481

协程: async
time:  166.4319999217987秒
数据总数:1481

可以看出这四者的效率差别并不大,由于每次时间都不同,也不能确切的说哪种方式更优。

不过根据前面讲的进程、线程、协程,一般情况下。协程执行效率是高于线程的,Pool的效率高于ProcessPoolExecutor。但是协程是单线程的,无法利用多核CPU,组合应用的时候,推荐Pool + 协程。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。

推荐阅读更多精彩内容