虽然过去一年中的工作以爬虫为主,但个人认为最大的收获在于参考已有的系统、从零开始把整个爬虫任务调度系统实现了一遍,因而有了一些架构上的理解,它具有非常强的可扩展性,或许这就是RPC(Remote procedure call)的优点吧,因此在这里做一些归纳总结。
受限于本人薄弱的阐述能力,如果对RabbitMQ有所了解再来阅读此文是比较合适的,如果能有所斧正就更好了。(参考RabbitMq python tutorial--语言可选)
1.架构概要
这个系统是基于RabbitMQ来实现的,因而它是跨平台、跨语言的、分布式的、可平行扩展的,较适合处理时效性不高的任务,在系统内部,任务处理当然是异步的,但对外的表现形式,则是可同步可异步的。
也因此,它的架构概要图与RabbitMQ的生产者消费者模型在本质上是一样的,只不过生产者和消费者的结构更丰富具体。


从概要设计图可以看出,整个系统对外只暴露了
Frontend,其余部分对Requester都是不可见的,他也不需要知道这些,他只要能够通过Frontend发送请求(任务)、得到响应、查询结果就ok了。而对于RabbitMQ来说,图中的
mq_Client和mq_Server则相当于生产者和消费者了。
系统分为5个部分:
- RabbitMQ负责传递消息(Message),一个消息对应一个任务,通过不同的Queue分发不同种类的任务,支持集群
- DB负责存储任务详情或者结果,不论是mysql、MongoDB都支持集群
- Redis负责缓存临时信息,也是支持集群(也可以使用memcached)
- Frontend负责对外提供RESTFul API,响应任务请求创建任务,并通过worker_manager_proxy将任务放进MQ。可以将整个任务序列化放进MQ,也可以只把任务标识(如task_id)放进MQ,同时把信息放进Redis,或者创建后存入DB,只要记得处理任务时存在哪里就在哪里取。不过为了节省MQ的空间,不建议将整个任务信息放入MQ。
- worker_manager负责从MQ里取任务,并为每个任务创建一个worker线程进行工作,期间会通过Redis、DB进行必要的读写或者增删改查操作。
部署时就可以相应的分开部署。
2.特性分析
这里以上图为例来解释一下
1) 跨平台、跨语言、分布式的
-
rabbitMQ可以部署在linux、windows等操作系统上,因此跨平台 -
rabbitMQ教程代码就给出了多达十来种语言的版本,当然是跨语言的,对应此系统,就是你可以使用Python和Java版本的Frontend来接收请求,而同时使用PHP、JavaScript写的Worker来完成工作,只要它们的内在逻辑是一致的。 - 我现在对于消息队列(MQ)最大的理解就是:通过网络实现了原来一个系统内部多个进程间的通信,如此一来各个进程(比如
Frontend和Worker)的运行就突破了空间限制,这当然是分布式的。
2)可扩展性
扩展的目的是为了应对高并发,为了同时处理更多的任务。
假设系统出现性能瓶颈,rabbitMQ、redis、db都可以通过集群来扩展,而Frontend、worker_manager则直接加机器起服务就行。
众所周知,分布式系统通过加机器并发4台1核1G的低配机器,性能可视同于1台4核4G的机器。
3.流程简介
这里以最简单的只处理一种任务(可能不只一个Frontend一个worker_manager)的处理过程来介绍一下系统的工作流程:
- 先定义任务的字段
# python or node
task = {
'task_id': uuid(), # 唯一标识
'status': 'new' or `done` ..., # 任务状态
'type': 'crawler', # 任务类型,假设为爬虫类型
'callback_url': '', # 回调地址,根据需要提供
'data': {}, # 更详细的任务信息
'correlation_id':uuid() # 用于保持request和response的一致性
}
启动
RabbitMQ、Redis、DB服务,以及Frontend和work_manager进程,后两者在启动时通过mq_client或者mq_serverdeclare了两个queue:crawler_task_queue、notify_queue,并立刻开始监听。Requester通过Frontend提供的API发送任务请求,请求附带必要的参数-
Frontend根据请求创建task并存入DB(也可以是Redis),然后将task_tid、correlation_id作为MSG的内容发送至crawler_task_queue中,并将correlation_id记录在outstandingRequests列表中,此时:- 一般异步处理时,此处
Frontend向Requester返回response即可 - 想要同步化处理,则等到
task完成后,再返回response,不过这样Requester就出于阻塞状态,一般不会这么做。
- 一般异步处理时,此处
worker_manager监听到crawler_task_queue有新的MSG,取出来,根据task_id从DB中读取完整的task信息,由于type=='crawler',创建相应的Crawler_Worker进行工作。Crawler_Worker完成工作后,将结果数据存入DB,更新task.status='done'。worker_manager将MSG发送至notify_queue-
Frontend监听notify_queue,读取到MSG信息后,检查correlation_id能否在outstandingRequests列表中匹配到:- 如果未匹配,则不予理会,
- 如果匹配,则进行下一步处理,
ack(MSG),并从outstandingRequests列表中删除该correlation_id。
如果
task['callback_url']的值不为空,则Frontend向该callback_url发送任务结果(简单的task或者根据type='crawler'查询到的详细的crawlerResultData)。对于没有提供
callback_url的Requester而言,则需要在一定时间后,调用Frontend的查询接口进行查询。
如上,一个任务的流程就算走完了。
如果理解了上述流程,就能明白通过type字段以及对应的work_manger可以扩展到多种任务的分布式异步处理。
4.进阶
1)prefetch的使用
一个worker_manager只处理一个task当然不划算,但如果来者不拒为每一个task都创建Crawler_Worker,在进行批跑时,很容把机器挂掉。这时就需要用到RabbitMQ的prefetch和ack机制了,怎么实现就不说了。需要提醒的是,worker_manager在通过ack机制限制当前机器上任务并发数在prefetch之下时,需要先缓存MSG待task完成后再进行ack操作。
2)关于correlation_id的用途
加入任务并发性要求较高,启动了frontA和frontB两个Frontend和多台Crawler_Worker来处理任务。那么correlation_id就可以保证frontA受理的任务请求所需要的异步callback操作以及同步化response依然可以通过frontA完成。
3)异步任务的交互
系统处理任务是异步的,上述任务流程中没有涉及到交互问题,但众所周知,爬虫工作过程中有一部分是需要人的交互的,这也是一种简单的反爬措施,比如模拟登录中需要提交短信验证码,那怎么办呢?
- 提示一下,从
status入手,此外,Frontend的一致性在上一点说过了,当然还要考虑到爬虫工作的连续性,就以补充短信验证码来说,必然要延用之前的cookie和session,这里就不细说了,再给个截图作为提示。
不要被queue的名字所迷惑
5.代码示例
这里给出简单的rabbitMQ_rpc_node_sample,只有rabbitmq的封装、rabbitmq_client和rabbitmq_server的实现以及调用示例invoking_instance。
示例效果是进行简单计算,对GET请求中的参数a进行求根操作,对POST请求中的参数a进行求3次方。
提示一下,示例代码中有一个小小的不太影响使用的坑,不算bug,有心者可以发现,我也是最近才发现的。
6.致谢
感谢魏总在这一年间给予的指导,让我收获颇丰。
