Redis+PHP实现的一个优先级去重队列

主要思路是用一个set做前端去重缓冲, 若干个list做后端的多优先级消息队列, 用一个进程来进行分发, 即从set中分发消息到队列.

set缓冲的设计为当天有效, 所以有个零点问题,有可能在零点前set中刚放进去的消息没有分发即失效, 这一点可以用另一个进程弥补处理前一天的遗留消息和删除前一天的缓冲


<?php

class MsgQuery {

 // TODO - Insert your code here

const KEY_CACHE_PREFIX = 'mass.query.cache'; // 消息缓冲key前缀

const KEY_QUERY_PREFIX = 'mass.query.lv'; // 消息key

const KEY_CACHE_DEAL_PREFIX = 'mass.query.deal'; // 已处理缓冲key前缀

const SCORE_NUM = 5; // 优先级划分数目

const MIN_SCORE = 1; // 最小优先级

static $MAX_SCORE;

static $instance = null;

private $redis;

public static function getInstance($redis) {

if (null == self::$instance) {

self::$instance = new MsgQuery ( $redis );

}

return self::$instance;

}

/**

* 添加消息到消息缓冲区

* @param int $score 优先级(1-5)

* @param string $msg 消息

*/

public function add($score, $msg) {

// 添加到消息缓冲

$socre = intval ( $score );

if ($socre < self::MIN_SCORE) {

$score = self::MIN_SCORE;

}

if ($score > self::$MAX_SCORE) {

$score = self::$MAX_SCORE;

}

$cacheKey = self::KEY_CACHE_PREFIX . date ( 'Ymd' );

$cacheData = array (

'score' => $score,

'msg' => $msg

);

$this->redis->sAdd ( $cacheKey, serialize ( $cacheData ) );

}

/**

* 将消息从缓冲区移动到相应的优先级队列中

*/

public function moveToQuery() {

// 获取当前缓冲区没有入队列的消息

$dealKey = self::KEY_CACHE_DEAL_PREFIX.date('Ymd');

$cacheKey = self::KEY_CACHE_PREFIX.date('Ymd');

$msgs = $this->redis->sDiff($cacheKey, $dealKey);

foreach ($msgs as $cachedData){

// 放入已处理集合

$this->redis->sAdd ( $dealKey, $cachedData );

// 压入相应的优先级队列

$cachedData = unserialize($cachedData);

$score = $cachedData['score'];

$msg = $cachedData['msg'];

$queryKey = self::KEY_QUERY_PREFIX.$score;

$this->redis->rPush($queryKey, $msg);

}

unset($cachedData);

}

/**

* 从队列阻塞式出栈一个最高优先级消息

* @return string msg

*/

public function bPop(){

$queryKeys = array();

for($score=self::$MAX_SCORE;$score>=self::MIN_SCORE;$score--){

$queryKeys[] = self::KEY_QUERY_PREFIX.$score;

}

$msg = $this->redis->blPop($queryKeys, 0);

return $msg[1];

}

private function __construct($redis) {

$this->redis = $redis;

$this->redis->connect ();

self::$MAX_SCORE = self::MIN_SCORE + self::SCORE_NUM - 1;

}

private function __destruct() {

$this->redis->close ();

}

}


?>

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,179评论 19 139
  • 转载地址:http://gnucto.blog.51cto.com/3391516/998509 Redis与Me...
    Ddaidai阅读 21,518评论 0 82
  • 1.1 资料 ,最好的入门小册子,可以先于一切文档之前看,免费。 作者Antirez的博客,Antirez维护的R...
    JefferyLcm阅读 17,242评论 1 51
  • 转至元数据结尾创建: 董潇伟,最新修改于: 十二月 23, 2016 转至元数据起始第一章:isa和Class一....
    40c0490e5268阅读 5,890评论 0 9
  • 非原创,记录。 1、连接操作相关的命令 quit:关闭连接(connection) auth:简单密码认证 2、对...
    FocusOnMyself阅读 3,051评论 0 0

友情链接更多精彩内容