消息队列选型更多考虑业务的场景
- 性能,比如每秒有多少消息啊,如果你每秒有几万的消息量,那 Beanstalk、RabbitMQ 之列的就不能拿来直接用了。
- 可靠性,消息是否允许丢失?是否需要持久化?
- 高可用,是否可以容忍宕机?
- 是否需要分布式
- 运维成本,你们公司的开发(运维)是否有能力维护好这个消息队列
- 客户端支持,这个才是语言层面的东西。比如 Kafka 是个很好的消息队列,但是他的 PHP 客户端写的不太好,自己重新写也比较难,那么在选型的是否就要慎重。
安装
1、官网
https://kr.github.io/beanstalkd/
2、安装
yum install beanstalkd --enablerepo=epel
3、启动
/usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd/binlog -F
-b
开启binlog,断电后重启会自动恢复任务。
4、配置文件
/etc/sysconfig/beanstalkd
基本概念
1、Beanstalkd设计里面的核心概念:
job:一个需要异步处理的任务,是 Beanstalkd 中的基本单元,需要放在一个 tube 中。
tube:一个有名的任务队列,用来存储统一类型的 job,是 producer 和 consumer 操作的对象。
producer:Job 的生产者,通过 put 命令来将一个 job 放到一个 tube 中。
consumer:Job的消费者,通过 reserve/release/bury/delete 命令来获取 job 或改变 job 的状态。
2、job 的生命周期
当producer直接put一个job时,job就处于READY状态,等待consumer来处理,如果选择延迟put,job就先到DELAYED状态,等待时间过后才迁移到READY状态。consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其他的consumer就不能再操作该job。当consumer完成该job后,可以选择delete, release或者bury操作;delete之后,job从系统消亡,之后不能再获取;release操作可以重新把该job状态迁移回READY(也可以延迟该状态迁移操作),使其他的consumer可以继续获取和执行该job;有意思的是bury操作,可以把该job休眠,等到需要的时候,再将休眠的job kick回READY状态,也可以delete BURIED状态的job。正是有这些有趣的操作和状态,才可以基于此做出很多意思的应用,比如要实现一个循环队列,就可以将RESERVED状态的job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。
- READY - 需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;
- DELAYED - 延迟执行的任务, 当消费者处理任务后, 可以用将消息再次放回 DELAYED 队列延迟执行;
- RESERVED - 已经被消费者获取, 正在执行的任务。Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;
- BURIED - 保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;
- DELETED - 消息被彻底删除。Beanstalkd 不再维持这些消息。
一些特性
优先级
任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级,默认优先级为1024。
持久化
可以通过binlog将job及其状态记录到文件里面,在Beanstalkd下次启动时可以通过读取binlog来恢复之前的job及状态。
分布式容错
分布式设计和Memcached类似,beanstalkd各个server之间并不知道彼此的存在,都是通过client来实现分布式以及根据tube名称去特定server获取job。
超时控制
为了防止某个consumer长时间占用任务但不能处理的情况,Beanstalkd为reserve操作设置了timeout时间,如果该consumer不能在指定时间内完成job,job将被迁移回READY状态,供其他consumer执行。
Client Libraries For PHP
1、Producer 示例:向队列中添加job
$pheanstalk = new Pheanstalk_Pheanstalk('127.0.0.1');
$pheanstalk ->useTube('tubeName') ->put($jobData);
2、Consumer 示例:从队列中取出job
$job = $pheanstalk ->watch('tubeName') ->ignore('default') ->reserve();
echo $job->getData();
$pheanstalk->delete($job);
3、检查服务状态
$isAlive = $pheanstalk->getConnection()->isServiceListening(); //返回 true 或 false
4、获取某一 tube 的数据
try{
$tubeStatus = $pheanstalk->statsTube('tubeName');
} catch (Exception $e){
if($e->getMessage()=='Server reported NOT_FOUND'){ //tube 不存在
$current_jobs_ready = 0;
}
}
五、参考资料
https://github.com/kr/beanstalkd/wiki/faq
http://csrd.aliapp.com/?p=1201
https://my.oschina.net/u/698121/blog/157092