cerely
1.cerely本身为分布式任务队列,是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。
它是一个专注于实时处理的任务队列,同时也支持任务调度。
2.任务队列
任务队列是一种在线程或机器间分发任务的机制。
3.消息队列
消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。
Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程,职程对消息进行处理。如下图所示:
Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。
Celery的架构
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括,RabbitMQ,Redis,MongoDB等。
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,mysql等。
rabbitMQ
MQ特点:MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息.
需要的软件
rabbitmq安装
安装路径
http://www.rabbitmq.com/install-windows.html
安装后,激活 RabbitMQ's Management Plugin
使用RabbitMQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器实例的状态。
打开命令窗口:
输入命令:
"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.5\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
这样,就安装好插件了,是不是能使用了呢?别急,需要重启服务才行,使用命令:
net stop RabbitMQ && net start RabbitMQ
这时候的,也许会出现这种结果:
“发生错误:发生系统错误 5。 拒绝访问。”
这是什么鬼?查了下,原来,5代表的是:不是系统管理员权限。
问题解决方案:使用管理员打开cmd再执行此命令:
这样就结束了吗?当然没有。
创建用户,密码,绑定角色
使用rabbitmqctl控制台命令(位于C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.5\sbin>)来创建用户,密码,绑定权限等。
注意:安装路径不同的请看仔细啊。
rabbitmq的用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。
查看已有用户及用户的角色:
rabbitmqctl.bat list_users
新增一个用户:
rabbitmqctl.bat add_user username password
此时来看下我们当前用户哈:
-
给用户赋予权限
rabbitmqctl set_user_tags {用户名} {权限} 例:创建一个超级用户 rabbitmqctl add_user admin1 admin1 rabbitmqctl set_user_tags admin1 administrator
通过百度都可以找到,直接按步骤安装即可。
安装完成后,在页面输入url地址
http://127.0.0.1:15672/
如果顺利跳出页面,说明安装完成,否则检查是否有纰漏。
可以自己设置自己的username和password。
# coding=utf-8
# 导入Exchange, Queue,连接rabbitmq的交换机和队列
from kombu import Exchange, Queue
# 需求请求时导入requests包
import requests
requests.packages.urllib3.disable_warnings()
# 启动命令
# 即worker,cmd命令后,转入到此文件所在目录,然后运行此命令。
# celery -A ss worker -P eventlet -c 3 --loglevel=info
# 其中 ss为文件名,3为协程数,这两处可以自己定义
# 初始化celery
app = Celery()
# 声明连接到rabbitmq中的队列为ss
QueueName = "ss"
# 参数设置celery
app.conf.update(
# 中间人设置
BROKER_URL="amqp://spider:spider_123@127.0.0.1:5672//spider",
# 配置序列化任务载荷的默认的序列化方式
CELERY_TASK_SERIALIZER='json',
# 忽略接收其他内容
CELERY_ACCEPT_CONTENT=['json'],
# 结果序列号
CELERY_RESULT_SERIALIZER='json',
# 设置时区
CELERY_TIMEZONE='Asia/Shanghai',
# 使用UTC的方式,UTC的时间、时区、时差
CELERY_ENABLE_UTC=True,
# 配置队列
CELERY_QUEUES=(
Queue(QueueName, Exchange(QueueName), routing_key=QueueName),
),
# 默认队列
CELERY_DEFAULT_QUEUE=QueueName,
# 连接方式
CELERY_DEFAULT_EXCHANGE_TYPE='direct',
# 路由队列
CELERY_DEFAULT_ROUTING_KEY=QueueName,
# 任务执行结果的超时时间
CELERY_TASK_RESULT_EXPIRES=1800,
# worker 每次取任务的数量
CELERYD_PREFETCH_MULTIPLIER=1,
# 每个worker最多执行完10个任务就会被销毁,可防止内存泄露
CELERYD_MAX_TASKS_PER_CHILD=10,
# 非常重要,有些情况下可以防止死锁
CELERYD_FORCE_EXECV=True,
# 可以让Celery更加可靠,只有当worker执行完任务后,才会告诉MQ,消息被消费
CELERY_ACKS_LATE=True,
# 单个任务的运行时间不超过此值,否则会被SIGKILL 信号杀死
CELERYD_TASK_TIME_LIMIT=600,
# 任务发出后,经过一段时间还未收到acknowledge , 就将任务重新交给其他worker执行
CELERY_DISABLE_RATE_LIMITS=True
)
@app.task
def add(x, y):
return x + y
项目中需要的代码已经详细给出,可直接使用,爬虫代码,直接写就行了。