在学习Celery之前,我们先要了解一下什么是生产者消费者模式。
生产者消费者模式
简而言之,在实际开发过程中,负责生产数据的模块称为生产者,而处理数据的模块,称为消费者。
可能有人就要问了:xxx,为什么生产者消费者之间还得有个缓存区?生产者生产数据之后直接给消费者不是更好,更直接吗?
但是,我们程序员的终极奥义是什么?“高内聚低耦合”!
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过消息队列(缓冲区)来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给消息队列,消费者不找生产者要数据,而是直接从消息队列里取,消息队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个消息队列就是用来给生产者和消费者解耦的。------------->这里又有一个问题,什么叫做解耦。
解耦:假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
因为太抽象,看过网上的说明之后,通过我的理解,我举了个例子:吃包子。
假如你非常喜欢吃包子(吃起来根本停不下来),今天,你妈妈(生产者)在蒸包子,厨房有张桌子(缓冲区),你妈妈将蒸熟的包子盛在盘子(消息)里,然后放到桌子上,你正在看巴西奥运会,看到蒸熟的包子放在厨房桌子上的盘子里,你就把盘子取走,一边吃包子一边看奥运。在这个过程中,你和你妈妈使用同一个桌子放置盘子和取走盘子,这里桌子就是一个共享对象。生产者添加食物,消费者取走食物。桌子的好处是,你妈妈不用直接把盘子给你,只是负责把包子装在盘子里放到桌子上,如果桌子满了,就不再放了,等待。而且生产者还有其他事情要做,消费者吃包子比较慢,生产者不能一直等消费者吃完包子把盘子放回去再去生产,因为吃包子的人有很多,如果这期间你好朋友来了,和你一起吃包子,生产者不用关注是哪个消费者去桌子上拿盘子,而消费者只去关注桌子上有没有放盘子,如果有,就端过来吃盘子中的包子,没有的话就等待。对应关系如下图:
理解之后,我们开始系统学习Celery.
Celery系统学习
Celery的定义
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。
Celery支持使用任务队列的方式在分布的机器、进程、线程上执行任务调度。
学到这里,我们又有疑问了,什么是任务队列呢?
任务队列
任务队列是一种在线程或机器间分发任务的机制。
消息队列
消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。
Celery通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程,职程对消息进行处理。如下图所示:
Celery的架构
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。如下图,
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括,RabbitMQ,Redis,MongoDB等。
这里我先去了解Redis。
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,Django ORM,AMQP等,这里我先不去看它是如何存储的,就先选用Redis来存储任务执行结果。
安装Celery
pip install celery==3.1.25
安装redis
pip install redis
配置Celery
CELERY_BROKER_URL = ‘redis://47.103.12.202:6379/0‘
CELERY_RESULT_BACKEND = "redis://47.103.12.202:6379/0"
为了测试Celery能否工作,我运行了一个最简单的任务,编写tasks.py,如下图所示:
该命令中-A参数表示的是Celery APP的名称,这个实例中指的就是tasks.py,后面的tasks就是APP的名称,worker是一个执行任务角色,后面的loglevel=info记录日志类型默认是info,这个命令启动了一个worker,用来执行程序中add这个加法任务(task)。