Go 语言爬虫从并发式到分布式

Go 语言作为一门为编写网络应用程序而生的编程语言,在拥有比 Java 更强的并发性的同时,有拥有比 C 和 C++ 更快的开发速度(得益于简洁的语法和丰富的标准库),非常适合用于开发爬虫程序。笔者基于 Go 语言开发了一个爬虫程序,并从单任务版本改良为并发式版本,最后演进为分布式版本。下面就分享出并发式版本和分布式版的架构和设计思想。


并发式爬虫

并发式版本利用了 Go 语言强大的并发性能,解决了单任务版本爬取速度慢的问题。并发式版本由四大部分构成,分别是EngineSchedulerWorkerSaver

  1. 爬虫程序输入一个由 URL 和 Parser 构成的种子 Request
  2. Engine 会将接收的每个 Request 传递给维护着一个 Request 队列和一个 可用 Worker 队列的 Scheduler
  3. Scheduler 依据 FIFO 的原则将 Request 任务分配给 Worker
  4. 每个包含 Fetcher 和 Parser 的 Goroutine 代表一个 Worker,每个 Worker 并发执行任务,任务完成后重新加入 Scheduler 中的可用 Worker 队列。Fetcher 首先网络请求 Request 中的 URL 地址,并将返回的网络响应传递给 Parser,Parser 再讲网络响应中需要的信息解析并提取出来,再返回给 Engine
  5. Worker 返回给 Engine 的 ParseResult 中既包含我们需要得到的信息 Item,又包括需要进一步爬取的 Request 任务,Engine 再将任务提交给 Scheduler 之前需要先进行去重处理,以防重复爬取
  6. Engine 将需要保存的 Item 通过通道发送给 Saver
并发式爬虫架构

Model

// 请求,包括URL和指定的解析函数
type Request struct {
    Url    string
    Parser Parser
}
// 解析结果
type ParseResult struct {
    Requests []Request
    Items    []Item
}
// 最终要保存的条目
type Item struct {
    Url     string
    Id      string
    Type    string
    Payload interface{}
}

Engine

type ConcurrentEngine struct {
    MaxWorkerCount int // 工作协程的数量
    Scheduler      Scheduler // 任务调度器
    ItemChan       chan Item // 与ItemSaver之间的通道
    RequestWorker  Processor // Worker的处理器
}

type Processor func(request Request) (ParseResult, error)

type Scheduler interface {
    Submit(request Request)
    GetWorkerChan() chan Request
    Run()
    Ready
}

type Ready interface {
    WorkerReady(chan Request)
}

func (e *ConcurrentEngine) Run(seed ...Request) {
    out := make(chan ParseResult, 1024) // Worker返回任务结果的通道,缓冲区大小为1024
    e.Scheduler.Run()

    // 根据配置的Worker数量创建Goroutin
    for i := 0; i < e.MaxWorkerCount; i++ {
        e.createWorker(e.Scheduler.GetWorkerChan(), out, e.Scheduler)
    }

    for _, r := range seed {
        // 先去重,再提交任务给Scheduler
        if IsDuplicate(r.Url) {
            continue
        }
        e.Scheduler.Submit(r)
    }
    for {
        result := <-out // 获取Worker返回的结果
        for _, item := range result.Items {
            go func() { e.ItemChan <- item }() // 给Saver发送Item
        }
        for _, r := range result.Requests {
            // 先去重,再提交任务给Scheduler
            if IsDuplicate(r.Url) {
                continue
            }
            e.Scheduler.Submit(r)
        }
    }

}

/**
 * Worker工厂函数
 * in Scheduler向Worker发送任务的通道
 * out Worker向Engine返回结果的通道
 * s 实现了Ready接口的Scheduler 
*/
func (e *ConcurrentEngine) createWorker(in chan Request, out chan ParseResult, s Ready) {
    go func() {
        for {
            s.WorkerReady(in) // 完成任务后,通知Scheduler当前Worker可用
            request := <-in // 从Scheduler获取任务
            result, err := e.RequestWorker(request) // 执行任务
            if err != nil {
                continue
            }
            out <- result // 返回任务执行结果
        }
    }()
}

Scheduler

type QueuedScheduler struct {
    requestChan chan engine.Request // 接收Engine提交任务的通道
    workerChan  chan chan engine.Request // 派发任务给Worker的通道
}

func (s *QueuedScheduler) Submit(request engine.Request) {
    s.requestChan <- request
}

func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
    s.workerChan <- w
}

func (s *QueuedScheduler) GetWorkerChan() chan engine.Request {
    return make(chan engine.Request)
}

func (s *QueuedScheduler) Run() {
    s.requestChan = make(chan engine.Request)
    s.workerChan = make(chan chan engine.Request)

    go func() {
        var requestQ []engine.Request // 任务队列
        var workerQ []chan engine.Request // Worker队列
        for {
            var activeRequest engine.Request
            var activeWorker chan engine.Request
            if len(requestQ) > 0 && len(workerQ) > 0 {
                activeRequest = requestQ[0]
                activeWorker = workerQ[0]
            }
            select {
            case r := <-s.requestChan:
                // 若任务通道中有新任务,加入任务队列
                requestQ = append(requestQ, r)
            case w := <-s.workerChan:
                // 若可用Worker通道中有新Worker,加入Worker队列
                workerQ = append(workerQ, w)
            case activeWorker <- activeRequest:
                // 将任务队列中的首个元素派发给可用Worker队列中的首个元素
                requestQ = requestQ[1:]
                workerQ = workerQ[1:]
            }
        }
    }()
}

Worker

func Worker(r Request) (ParseResult, error) {
    body, err := fetcher.Fetch(r.Url)
    if err != nil {
        log.Error().Msgf("请求[%s]失败:%s", r.Url, err)
        return ParseResult{}, err
    }
    return r.Parser.Parse(body, r.Url), nil
}

每个 Worker 运行在一个独立的 Goroutine 当中,读者需要根据实际爬取的网站来编写 Fetcher 和 Parser。

ItemSaver

ItemSaver 的任务就是将 Engine 通过 Channel 传送过来的 Item 持久化存储,读者可以根据自己的需求来实现 ItemSaver,将 Item 存储到 MySQLMongoDBElasticSearch 等数据库。


分布式爬虫

并发式版本虽然解决了单任务版本爬取效率低下的问题,但是在同一机器(同一 IP)上并发请求目标网站,很容易因为短时间内网络请求流量过大而被目标网站封禁。
分布式版本将 WorkerSaver 分离部署到不同的机器上,不同机器上的 Worker 使用不同 IP,不但能够解决 IP 封禁的问题,还能进一步提升爬取效率。

分布式爬虫架构

JSON-RPC

分布式系统需要通过网络交互数据,同步系统中的状态。本系统通过 JSON-RPC 同步各个节点中的状态,交互任务与任务执行结果。Go 语言标准库中的 net/rpc 包支持 JSON-RPC,可以通过交换 JSON 格式的数据来进行 RPC。

// 创建RPC服务器
func ServeRpc(host string, service interface{}) error {
    rpc.Register(service)
    listener, err := net.Listen("tcp", host)
    if err != nil {
        return err
    }
    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Printf("accept error: %v", err)
            continue
        }
        go jsonrpc.ServeConn(conn)
    }
    return nil
}

// 创建RPC客户端
func NewClient(host string) (*rpc.Client, error) {
    conn, err := net.Dial("tcp", host)
    if err != nil {
        return nil, err
    }
    return jsonrpc.NewClient(conn), nil
}

序列化与反序列化

系统中的各个节点在进行 JSON-RPC 之前,必须对需要发送的对象和函数序列,对接收的对象和函数反序列化。

type SerializedParser struct {
    Name string
    Args interface{}
}

type Request struct {
    Url    string
    Parser SerializedParser
}

type ParseResult struct {
    Items    []engine.Item
    Requests []Request
}

// 序列化请求对象
func SerializeRequest(r engine.Request) Request {
    name, args := r.Parse.Serialize()
    return Request{
        Url: r.Url,
        Parser: SerializedParser{
            Name: name,
            Args: args,
        },
    }
}

// 序列化结果对象
func SerializeResult(r engine.ParseResult) (p ParseResult) {
    p.Items = r.Items
    for _, req := range r.Requests {
        p.Requests = append(p.Requests, SerializeRequest(req))
    }
    return p
}

// 反序列解析器
func deserializeParser(p SerializedParser) (engine.Parser, error) {
    switch p.Name {
    case "ParseCity":
        return engine.NewFuncParser(parser.ParseCity, p.Name), nil
    case "ParseCityList":
        return engine.NewFuncParser(parser.ParseCityList, p.Name), nil
    case "ProfileParser":
        if userName, ok := p.Args.(string); ok {
            return parser.NewProfileParser(userName), nil
        } else {
            return nil, errors.New("invalid args for profileParser")
        }
    case "NilParser":
        return engine.NilParse{}, nil
    default:
        return nil, errors.New("unknown parser name")
    }
}

// 反序列化请求
func DeserializeRequest(r Request) (engine.Request, error) {
    parse, err := deserializeParser(r.Parser)
    if err != nil {
        return engine.Request{}, err
    }
    req := engine.Request{
        Url:   r.Url,
        Parse: parse,
    }
    return req, nil
}

// 反序列化结果
func DeserializeResult(r ParseResult) engine.ParseResult {
    result := engine.ParseResult{
        Items: r.Items,
    }
    for _, req := range r.Requests {
        ereq, err := DeserializeRequest(req)
        if err != nil {
            log.Warn().Msgf("error deserializing request: %v", err)
            continue
        }
        result.Requests = append(result.Requests, ereq)
    }
    return result
}

连接池

分布式系统中维护着一个 Worker 连接池,Engine 通过连接池将请求任务派发到系统中的不同节点。

func createClientPool(hosts []string) chan *rpc.Client {
    var clients []*rpc.Client
    for _, h := range hosts {
        client, err := rpcsupport.NewClient(h)
        if err != nil {
            log.Warn().Msgf("error connection to %s : %s", h, err)

        } else {
            clients = append(clients, client)
            log.Warn().Msgf("connected  to %s", h)
        }
    }
    out := make(chan *rpc.Client)
    go func() {
        for {
            for _, c := range clients {
                out <- c
            }
        }
    }()
    return out
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,183评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,850评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,766评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,854评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,871评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,457评论 1 311
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,999评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,914评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,465评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,543评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,675评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,354评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,029评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,514评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,616评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,091评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,685评论 2 360