中间件实现任务去重与精细化分发:设计模式与常见陷阱

爬虫代理

一、从设计模式谈起:任务去重这件小事,其实不小

在大多数抓取项目里,任务去重看上去是个再普通不过的小功能。可当采集规模一旦上到成千上万条请求,它的影响就不止是“多爬了几次网页”这么简单,而是直接关系到 系统性能、代理池成本、甚至数据准确性。

在软件架构的世界里,这类问题其实早就有对应的思路,那就是 责任链模式(Chain of Responsibility)。

想象一下,你在流水线上检验产品。每一层质检员都负责不同的环节,有人检查尺寸,有人看外观,有人测性能。任务从一头进来,层层检查、层层传递。在我们的抓取系统里,请求就像那条流水线上的产品,而每个“质检员”——也就是中间件——都承担不同的逻辑,比如:

* 有的负责判断这个请求是不是重复的;

* 有的负责选择该用哪个代理去抓;

* 还有的可能决定,这个任务是不是优先级更高。

这样的解耦让整个系统更干净、更有条理,也方便以后扩展或替换任何一个模块。

二、把架构映射到抓取系统:去重与分发的隐形逻辑

如果你做过分布式采集,肯定见过那种“任务爆炸”的场面:同一个关键词被多个节点同时抓,结果浪费了带宽、代理、CPU,还导致数据库里一堆重复数据。

要解决这个问题,单纯在抓取逻辑里加个 if not in 远远不够。更优雅的方式,是借助设计模式,把复杂逻辑拆分成一组能协同工作的“模块”,各司其职。

比如可以这么类比:

模块

对应的设计模式

作用

去重中间件

单例 + 责任链

负责判定请求是否重复,并阻止重复抓取

分发中间件

策略模式

根据关键词、代理负载等条件分配任务

调度控制器

观察者模式

实时监控任务完成率,并动态调整策略

这样的设计并不只是让代码更“学术”,它能让系统在面对复杂情况时更稳、更灵活。比如,一个节点挂了,任务可以自动被分发到其他代理上;某个关键词短时间内重复提交,也会被过滤掉。

三、动手实践:让抓取系统学会“思考”

说理论容易,关键是怎么落地。我们可以用 eBay 的商品搜索页面作为实验场景,试着让一个简单的抓取具备“去重 + 分发”的能力。

它的运行逻辑大致是这样:

1. 先生成一批关键词搜索任务;

2. 每个任务在执行前先经过去重中间件过滤;

3. 通过分发中间件选择代理(这里我们使用爬虫代理服务);

4. 携带代理和伪装信息发起请求;

5. 最后解析商品标题、价格、发货地点和上架时间。

代码如下:

import hashlib

import json

import random

import requests

from bs4 import BeautifulSoup

from urllib.parse import quote

# ========= 代理配置(亿牛云示例 www.16yun.cn) =========

PROXY_HOST = "proxy.16yun.com"

PROXY_PORT = "3100"

PROXY_USER = "16YUN"

PROXY_PASS = "16IP"

def get_proxy():

    """生成带认证的代理配置"""

    return {

        "http": f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}",

        "https": f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"

    }

# ========= 去重中间件 =========

class DeduplicationMiddleware:

    """任务去重逻辑"""

    def __init__(self):

        self.visited = set()

    def is_duplicate(self, url):

        key = hashlib.md5(url.encode('utf-8')).hexdigest()

        if key in self.visited:

            return True

        self.visited.add(key)

        return False

# ========= 分发中间件 =========

class DistributionMiddleware:

    """根据任务特征选择代理"""

    def select_proxy(self, keyword):

        # 简单策略:根据关键词长度动态分配

        return get_proxy()

# ========= 爬虫核心逻辑 =========

class EbaySpider:

    def __init__(self, keywords):

        self.keywords = keywords

        self.dedup = DeduplicationMiddleware()

        self.dist = DistributionMiddleware()

        self.headers = {

            "User-Agent": random.choice([

                "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",

                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"

            ]),

            "Accept-Language": "en-US,en;q=0.9"

        }

    def fetch(self, keyword):

        search_url = f"https://www.ebay.com/sch/i.html?_nkw={quote(keyword)}"

        if self.dedup.is_duplicate(search_url):

            print(f"跳过重复任务:{keyword}")

            return


        proxies = self.dist.select_proxy(keyword)

        try:

            response = requests.get(search_url, headers=self.headers, proxies=proxies, timeout=10)

            response.raise_for_status()

            self.parse(response.text, keyword)

        except Exception as e:

            print(f"抓取失败:{keyword} -> {e}")

    def parse(self, html, keyword):

        soup = BeautifulSoup(html, "html.parser")

        items = soup.select(".s-item")

        for item in items:

            title = item.select_one(".s-item__title")

            price = item.select_one(".s-item__price")

            location = item.select_one(".s-item__location")

            time = item.select_one(".s-item__listingDate")


            data = {

                "keyword": keyword,

                "title": title.text.strip() if title else None,

                "price": price.text.strip() if price else None,

                "location": location.text.strip() if location else None,

                "time": time.text.strip() if time else None

            }

            print(json.dumps(data, ensure_ascii=False))

    def run(self):

        for kw in self.keywords:

            self.fetch(kw)

# ========= 运行入口 =========

if __name__ == "__main__":

    keywords = ["laptop", "headphones", "watch", "camera"]

    spider = EbaySpider(keywords)

    spider.run()

运行之后,你会发现它的行为更像一个有判断力的系统:不会重复抓取同一个搜索词,也会在代理之间智能分配流量。

四、从工具到思维:中间件的价值不止于“省事”

写中间件的过程,某种程度上像是在给系统装上“神经系统”。它能判断、能决策、还能不断学习。

对于采集来说,这种能力非常宝贵,因为网络环境变化太快,网站结构、反爬机制、请求参数都在不停调整。如果系统本身能通过中间件层做策略分离,就能在修改逻辑时保持底层稳定。

更深层的意义在于——我们开始不只是“写代码去抓数据”,而是 在构建一个有调度、有反馈、有演化能力的数据系统。去重,是让系统不浪费;分发,是让资源更高效;中间件,就是让这一切井然有序的关键角色。

五、结语:技术的艺术感,藏在细节里

中间件的设计,不只是写几行逻辑判断,更是一种系统思维的体现。当我们用设计模式去思考抓取问题,代码就从“能跑”变成了“能成长”。

很多人觉得抓取只是技术活,但其实做得久了你会发现,它更像一门关于 秩序与复杂性管理的艺术。而中间件——正是维持这种秩序的那双看不见的手。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容