用Playwright打造可靠的企业级采集方案

爬虫代理

为什么要做这个项目(背景与动机)

在公司做数据产品时,我们常常遇到三个痛点:脚本跑不稳、页面渲染抓不到数据、以及规模化后调度和重试逻辑变得难以维护。最开始通常是一两个 Python 脚本在开发机上跑通,但到了生产环境,问题就会接踵而来:单机负载、被目标站点限速、以及不同页面结构带来的脆弱性。本项目的目标是把一个可验证的单机 Playwright 爬虫,逐步演进成能在多台机器上并行工作的集群化方案,并把关键实现、思路与示例代码记录下来,方便求职/展示或直接在企业内复用。

我们要拿到什么数据(数据目标)

本文以招聘网站为示例,目标字段很常见也很实用,便于后续做分析:

* 职位名称(title)

* 公司名(company)

* 城市 / 工作地点(city)

* 薪资(salary)

* 发布时间(publish_time)

这些字段既能做趋势分析,也能用于构建岗位搜索引擎或行业报告。

技术栈与选型理由(为什么选 Playwright)

* Playwright:相比传统的 requests,它能原生处理 JS 渲染、等待节点、拦截资源等,稳定性和可控性更好。

* 代理 IP(示例:亿牛云):高频访问需要代理来分摊流量并降低被限制的概率。

* Redis(任务队列) + 多进程/多机器:用于实现任务分发、去重与伸缩。

* MongoDB / Elasticsearch:分别用于原始数据落盘和后续的检索/分析/可视化。

这种组合能在保证可维护性的同时兼顾扩展与性能。

核心实现(从单机到分布式的演进)

1)单机版:先把逻辑跑通

这是最直接的版本,用来确认页面结构、JS 渲染及提取逻辑是否稳健。代码示例(异步 Playwright):

# single_playwright.py

import asyncio

from playwright.async_api import async_playwright

# ====== 代理配置(示例:亿牛云www.16yun.cn) ======

proxy_host = "proxy.16yun.cn"

proxy_port = "3100"

proxy_user = "16YUN"

proxy_pass = "16IP"

async def crawl_page(keyword: str, page_num: int = 1):

    """

    单页抓取示例函数:访问并解析职位卡片,返回字典列表

    """

    async with async_playwright() as pw:

        # 启动浏览器(无头/有头可根据调试需要切换)

        browser = await pw.chromium.launch(headless=True,

                                          proxy={

                                              "server": f"http://{proxy_host}:{proxy_port}",

                                              "username": proxy_user,

                                              "password": proxy_pass

                                          })

        page = await browser.new_page()

        url = f"https://example-job-site.com/search?kw={keyword}&p={page_num}"

        await page.goto(url, timeout=30000)

        # 等待职位列表出现(根据实际页面调整选择器)

        await page.wait_for_selector('.job-card', timeout=15000)

        cards = await page.query_selector_all('.job-card')

        results = []

        for c in cards:

            # 下面的选择器只是示例,实际按目标站点调整

            title = await c.query_selector_eval('.job-title', 'el => el.textContent.trim()')

            company = await c.query_selector_eval('.company', 'el => el.textContent.trim()')

            salary = await c.query_selector_eval('.salary', 'el => el.textContent.trim()')

            city = await c.query_selector_eval('.city', 'el => el.textContent.trim()')

            results.append({

                "title": title,

                "company": company,

                "salary": salary,

                "city": city

            })

        await browser.close()

        return results

if __name__ == "__main__":

    rows = asyncio.run(crawl_page("Python 爬虫", page_num=1))

    for r in rows:

        print(r)

说明:先把单页、单关键词跑通,再处理翻页、异常与重试逻辑。单机阶段以稳定为先,别急着并发。

2)改造为可并发的单机版本(充分利用单机资源)

当单页逻辑稳定后,我们会把单页函数包装成任务池,利用 asyncio 并发多个标签页(或多个浏览器实例)来提高吞吐。示例要点:

* 限制并发并设置合理的超时

* 处理网络异常与元素不存在的情况(捕获并重试)

* 记录日志与关键指标(成功率、耗时)

(此处略去并发包装的完整代码,实际项目中可用 asyncio.Semaphore 控制并发数。)

3)走向分布式:Redis 作任务队列(生产者 / 消费者模型)

把任务放到 Redis 列表或流(stream)里,多台机器从队列里取任务并执行。示例消费端伪代码:

# worker.py (伪代码,需加异常处理与日志)

import asyncio

import aioredis

from single_playwright import crawl_page  # 引入前面实现的单页函数

REDIS_URL = "redis://127.0.0.1:6379/0"

TASK_QUEUE = "job_tasks"

async def worker_loop():

    redis = await aioredis.from_url(REDIS_URL)

    while True:

        task = await redis.lpop(TASK_QUEUE)

        if not task:

            await asyncio.sleep(1)

            continue

        # 假设任务是 JSON 字符串,包含 keyword 和 page_num

        task = task.decode("utf-8")

        # 这里简单演示,生产环境请解析 JSON 并做失败重试、幂等处理

        keyword = task

        print("取到任务:", keyword)

        results = await crawl_page(keyword)

        # 结果入库/发送到下游(例如 MongoDB/Elasticsearch)

        # save_results(results)

关键点:

* 任务应包含去重标识(比如 URL 哈希或唯一 ID)以避免重复抓取

* 设计任务超时与重试策略(比如 N 次重试后写入失败队列由人工检查)

* 监控队列深度、消费速率以判断是否需要扩容

运营与工程化要点(实用经验)

* 代理策略:不要把所有流量都打到一个代理 IP,建议轮换或使用多个出口;对静态资源设置拦截,减少不必要带宽消耗。

* 资源拦截:Playwright 支持 route.abort() 拦截图片、视频、广告等资源,显著减轻网络压力。

* 健康检查:为每个爬虫进程加上心跳上报(Prometheus / Pushgateway)便于发现挂死或慢速任务。

* 错误收集:记录页面快照(在非敏感场景),异常堆栈与请求/响应,便于定位问题。

* 容器化:把爬虫打包为 Docker 镜像,用 Kubernetes 编排能显著简化部署与弹性伸缩。

* 数据质量控制:入库前做字段校验、去重与时间窗口过滤,避免脏数据污染分析维度。

数据落盘与可视化(展示成就)

落库示例(MongoDB 文档结构):

{

  "title": "Python 爬虫工程师",

  "company": "某某科技",

  "salary": "20k-35k",

  "city": "上海",

  "publish_time": "2025-09-01",

  "source": "example-job-site",

  "crawl_at": "2025-09-23T10:00:00"

}

落库后可以做的展示(列举思路):

* 城市分布热力图(Kibana / Superset)

* 薪资区间柱状图(按岗位/城市分组)

* 岗位关键词云(用于挖掘热门技能)

* 时间序列分析(岗位量随时间变化)

这些可视化既能作为产品端仪表盘,也可以作为简历/作品集中突出的数据展示部分。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容