使用redis stream实现队列服务

1. stream类型

Redis5.0引入了Stream类型。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:

  • 消息ID的序列化生成
  • 消息遍历
  • 消息的阻塞和非阻塞读取
  • 消息的分组消费
  • 未完成消息的处理

关于stream的具体介绍可以参见:

2. 队列接口简介

我们基于redis stream实现了一个基础的,类似beanstalk的队列服务。用于多个无差别的消费者从一个队列消费任务的情况。如果您对stream有所了解,那么我们其实是使用了stream+group当作了beanstalk的tube。

提供最基础的功能:

  • addTask:添加任务。
/*                                                         
 * 向流中添加任务                                          
 * $data: 数组形式的任务数据                                            
 * return: 任务id                                          
 */    
addTask(array $data){}
  • getTask:获取任务。
 /*                                                         
  * 获取任务                                       
  * $block:阻塞时间,毫秒. null不阻塞                       
  * $count:读取条数, 只要有数据,条数不够也会立刻返回,即使设置了block。
  * $start:'>'表示接受最新数据. 若设置id,则读取大于该id,且未被确认(ack)的历史任务
  * 普通使用时,只要设置$block即可。
  * 
  * return [
  *      'id1' => taskdata1,                                
  *      'id2' => taskdata2,                                
  *      ... ...                                            
  * ]
  *
  * 无数据返回[]                                            
  */    
getTask($block=null, $count = 1, $start = '>'){}
  • delTask:删除任务
/*
 * 根据id确认任务完成并从stream中删除该任务
 * $ids: 可以是单条taskid,也可以是数组形式的多条id
 *
 * 
 * 该方法其实完成了两个动作
 * ack:确认任务完成
 * del:stream中删除任务
 * 所以返回值中包括两个值,第一个为ack是否成功,第二个为del是否成功
 */
delTask($ids){}

3. 代码实现

<?php
/*
 * 需要redis-server5.0以上 
 * php-redis扩展版本要适配redis-5.0
 * 
 * 使用redis stream仿照beanstalk封装的队列服务
 */
class RedisQueue{
    protected $_mRedis = null;
    protected $_mStream = '';
    protected $_mGroup = '';
    protected $_mConsumer = '';

    //默认0 不限制长度
    protected $_mMaxLength = 0;

    /* 
     * 创建队列, stream+group确认唯一队列
     * $config必须包括:
     * stream: stream名
     * server: 格式ip:port[:auth]
     * 
     * 可选参数:
     * maxLength:队列最大长度
     * group:分组名, 默认与stream相同. stream+group相当于beanstalk的tube
     * consumer:消费者名, 默认与stream相同. 
     * */
    public function __construct(array $config){
        if(!isset($config['stream'])){
            throw new Exception("you must config the stream");
        }

        $this->_mStream = $config['stream'];

        if(!isset($config['server'])){
            throw new Exception("you must config the server");
        }

        $tmp = explode(':', $config['server']);
        $host = $tmp[0];
        $port = $tmp[1];
        $auth = $tmp[2] ?? null;

        if ($host && $port){
            $this->_mRedis = new Redis();
            $this->_mRedis->connect($host,$port,1);
            if($auth){
                $this->_mRedis->auth($auth);
            }
        }
        else{
            throw new Exception("can not get redis server conf");
        }

        if(isset($config['maxLength'])){
            $this->_mMaxLength = $config['maxLength'];
        }

        $this->_mGroup = $config['group'] ?? $config['stream'];       
        $this->_mConsumer = $config['consumer'] ?? $config['stream'];

        $this->creatGroup();
    }

    /*
     * 删除当前流(队列)
     * */
    public function destoryStream(){
        $this->_mRedis->del($this->_mStream);
    }

    /*
     * 向流中添加任务
     * $data: array
     * return: taskid
     * */
    public function addTask(array $data){
        return $this->_mRedis->xAdd($this->_mStream, "*", $data , $this->_mMaxLength);
    }

    /*
     * 从group中获取任务
     * $block:阻塞时间,毫秒. null不阻塞
     * $count:读取条数, 只要有数据,条数不够也会立刻返回,即使设置了block
     * $start:'>'接受最新数据. 若设置id,则读取大于该id,且未被ack的历史任务
     *
     * return [
     *      'id1' => taskdata1,
     *      'id2' => taskdata2,
     *      ... ...
     * ]
     *
     * 无数据返回[]
     * */
    public function getTask($block=null, $count = 1, $start = '>'){
        $d = $this->_mRedis->xReadGroup($this->_mGroup, $this->_mConsumer, [$this->_mStream => $start], $count, $block);

        if (is_array($d) && count($d) > 0){
            return $d[$this->_mStream];
        }

        return $d;
    }
    
    /*
     * ack任务--从pending中删除
     * 同时从stream中删除
     */
    public function delTask($ids){
        if(!is_array($ids)){
            $ids = array($ids);
        }
        $multi = $this->_mRedis->multi(Redis::PIPELINE);

        $multi->xAck($this->_mStream, $this->_mGroup, $ids);
        $multi->xDel($this->_mStream, $ids);        
        $res = $this->_mRedis->exec();        
        return $res;
    }

    protected function creatGroup($startID = 0){
        return $this->_mRedis->xGroup('CREATE', $this->_mStream, $this->_mGroup, $startID, true);
    }
}

git代码库
https://github.com/qmhball/redisQueue

  • RedisQueue.php 队列实现
  • RedisQueueTest.php 对应测试

4. 使用示例

$config = [
        'server' => '10.10.10.1:6379:auth',
        'stream' => 'balltube', 
        'consumer' => 'normalprocessor'//可以不设置
    ];

//创建队列
$q = new RedisQueue($config);

//添加任务
$task = ['task'=>1];
$q->addTask($task);

//获取
$timeout = 1000;
$task = $q->getTask($timeout);

//确认并删除
$taskid = key($task);
$q->delTask($taskid);

5. 对于pending任务的处理

当任务被取出且未被确认时,该任务处理pending状态。beanstalk中,对于这种任务可以设置一个超时时间timeout,当任务超过timeout未被确认,该任务会被还回队列中。对于stream,应该如何处理这种任务呢?请参见:

《关于redis stream中pending数据的处理》

6. beanstalk与redis的stream队列性能对比

6.1 测试环境

  • 队列所在机器配置:4CPU, 6G内存。redis开启aof,每一秒钟持久化一次。
  • 压测机:8CPU,24G。

6.2 测试结果

在任务大小为1k和10k的时候,开启不同个数的进程进行10000次读/写操作,测试结果如下:

任务大小为1k

进程数 10 20 50
redis万次读 1.64928s 0.864051s 0.542352s
beanstalk万次读 1.702436s 0.915132s 0.503198s
redis万次写 3.328083s 1.714555s 0.837429s
beanstalk次写 3.402431s 1.702654s 0.9317s

任务大小为10k

进程数 10 20 50
redis万次读 1.962591s 1.569581s 1.001159s
beanstalk万次读 3.30333s 1.72248s 0.940097s
redis万次写 3.360724s 1.77125s 0.921126s
beanstalk次写 3.418932s 1.766198s 0.823796s

7. redis stream队列与beanstalk队列整体比较

stream beanstalk
主从 支持 不支持
性能 相当 相当
任务持久化 支持 支持
任务优先级 不支持 支持
任务延迟 不支持 支持
超时任务 额外处理 自动
批量任务读写 支持 不支持
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,451评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,172评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,782评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,709评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,733评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,578评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,320评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,241评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,686评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,878评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,992评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,715评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,336评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,912评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,040评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,173评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,947评论 2 355

推荐阅读更多精彩内容