mq操作是开发 过程中经常使用到的中间件,例如redis,rabbitmq等等都可以实现mq中间件,本文主要讲述mq整合的一个思路,编写类库
列出文件关系
整合类MqService.
<?php
namespace mq;
/**
* Class MqService
* @package think
* @method sendMessage(array $msgBody,string $queueName="base-queue") static 推送消息到队列
* @method dequeue(string $queueName="base-queue",$autoAck = true) static 消费队列消息
* @method queueLen(string $queue = 'base-queue') 返回队列未校服数据数量
*/
class MqService {
//配置文件
protected static $config = [];
//消息队列组
protected static $mqService;
/**
* 切换数据库连接
* @access public
* @param array $config 连接配置
* @return array
*/
public static function config(array $config = []){
if(!empty($config)){
self::$config = array_merge(self::$config,$config);
}
return self::$config;
}
/**
* 切换数据库连接
* @access public
* @param array $config 连接配置
* @return MqService
*/
public static function connect(array $config = [],$name = false){
//self::$config = config('mq.');
//创建mq操作实例
self::$config = array_merge(self::$config,$config);
$mqServiceName = "mq\\drive\\".self::$config['type']."Drive";
$name ?: md5(serialize(self::$config));
if(isset(self::$mqService[$name])){
return self::$mqService[$name];
}
// 创建MQ对象实例
return self::$mqService[$name] = new $mqServiceName(self::$config);
}
/**
* 调用函数
* @param $method
* @param $args
* @return mixed
*/
public static function __callStatic($method, $args){
return call_user_func_array([static::connect(), $method], $args);
}
}
驱动类本文只做了两个,rabbitmq驱动如下
<?php
namespace mq\drive;
use Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
* rabbitMq操作类
* Class RabbitMqDrive
* @package mq\drive
*/
class RabbitMqDrive{
//rabbitMq连接对象
protected $handler;
//队列操作对象
protected $channel;
//交换机类型
protected $consumers;
//是否自动ack应答
protected $autoAck = true;
//连接配置
protected $options = [
'host' => '127.0.0.1',
'port' => 5672,
'password' => '',
'user' => '',
'vhost' => '/',
'debug' => false,
'exchangeType' => 'direct',
'exchangeName' => 'base-exchange',
'routingKey' => 'base-routing',
'queue' => 'base-queue',
'consumers' => null
];
/**
* 初始化
* RabbitMqDrive constructor.
* @param array $options 配置参数
*/
public function __construct(array $options = []){
!empty($options) && $this->options = array_merge($this->options,$options);
try{
$this->handler = new AMQPStreamConnection(
$this->options['host'],
$this->options['port'],
$this->options['user'],
$this->options['password'],
$this->options['vhost']
);
//注册消费者
if(!empty($this->options['consumers'])){
$this->consumers = new $this->options['consumers'];
}
$this->channel = $this->handler->channel();
$this->createExchange();
}catch (Exception $e){
if($this->options['debug']){
exit('exception: '.$e->getMessage());
}
exit(" Server exception");
}
}
/**
* 将消息推入到队列
* @author jinanav 2021年1月26日11:46:23
* @return mixed
*/
private function createExchange(){
//声明初始化交换机
$this->channel->exchange_declare($this->options['exchangeName'], $this->options['exchangeType'], false, true, false);
//声明初始化一条队列
$this->channel->queue_declare($this->options['queue'], false, true, false, false);
}
/**
* 将消息推入到队列
* @author jinanav 2021年1月26日11:46:23
* @param array $msgBody 消息体
* @param string $queueName 队列名称
* @throws Exception
*/
public function sendMessage(array $msgBody,string $queueName="base-queue"){
//将队列与某个交换机进行绑定,并使用路由关键字
$this->channel->queue_bind($queueName, $this->options['exchangeName'], $this->options['routingKey']);
//生成消息并写入到队中
$msg = new AMQPMessage(json_encode($msgBody,JSON_UNESCAPED_UNICODE), ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息
//推送消息到某个交换机
$this->channel->basic_publish($msg,$this->options['exchangeName'], $this->options['routingKey']);
}
/**
* 读取队列消息
* @author jinanav 2021年1月26日11:46:23
* @param string $queueName 队列名称
* @param string $exchangeName 交换机
* @param string $routingKey 路由key
* @throws Exception
*/
public function dequeue(string $queueName="base-queue",$autoAck = true){
$this->autoAck = $autoAck;
//将队列与某个交换机进行绑定,并使用路由关键字 ,消费者与生产者都启用 以免队列不存在报异常
$this->channel->queue_bind($queueName, $this->options['exchangeName'], $this->options['routingKey']);
$this->channel->basic_consume($queueName, '', false, $this->autoAck, false, false, function($msg){$this->dealMsg($msg);});
//监听消息
while(count($this->channel->callbacks)){
$this->channel->wait();
}
}
/**
* 消费消息
* @author jinanav 2021年1月26日11:46:23
* @param object $msg 消息体
* @return mixed
*/
public function dealMsg($msg){
$info = json_decode($msg->body,true);
if(empty($this->consumers)) exit("There are no consumers available");
$this->consumers->deal($info);
//开始处理
if(!$this->autoAck) {
//手动ack应答
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
}
/**
* 读取队列中的数量
* @author jinanav 2021年1月26日11:46:49
* @return mixed
*/
public function len(){
return count([]);
}
/**
* 关闭连接以及队列
*/
public function close(){
$this->channel->close();
$this->handler->close();
}
/**
* 析构函数
*/
public function __destruct(){
$this->close();
}
}
redis驱动如下
<?php
namespace mq\drive;
use BadFunctionCallException;
use Exception;
use Redis;
class RedisDrive{
//队列操作对象
protected $handler;
//消费者
protected $consumers;
//连接配置
protected $options = [
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'max' => 8000,
'consumers' => null
];
/**
* 架构函数
* @access public
* @param array $options 连接参数
*/
public function __construct($options = []){
if (!empty($options)) {
$this->options = array_merge($this->options, $options);
}
if (extension_loaded('redis')) {
$this->handler = new Redis;
$this->handler->connect($this->options['host'], $this->options['port'], $this->options['timeout']);
if ($this->options['password'] != '') {
$this->handler->auth($this->options['password']);
}
if ( $this->options['select'] != 0) {
$this->handler->select($this->options['select']);
}
//注册消费者
$this->consumers = new $this->options['consumers'];
} else {
exit('not support: redis');
}
}
/**
* 将消息推入到队列
* @param array $msgBody 消息体
* @param string $queueName 队列名称
* @throws Exception
*/
public function push(array $msgBody,string $queueName="base-queue"){
return $this->handler->lPush($queueName, json_encode($msgBody));
}
/**
* 消费队列
* @author jinanav 2020年1月17日15:11:44
* @param string $queue 队列key名称
* @return array
*/
public function dequeue($queue = 'base-queue'){
while (true){
$data = $this->handler->rPop($queue);
$data = json_decode($data,true)?:[];
if(!empty($data)){
//消费消息
$this->consumers->deal($data);
}else{
sleep(1);
}
}
}
/**
* 返回队列长度
* @author jinanav 2021年1月26日17:25:52
* @param string $queue 队列key名称
* @return int
*/
public function queueLen(string $queue = 'base-queue'){
return $this->handler->lLen($queue);
}
}
使用
<?php
include "vendor/autoload.php";
use mq\MqService;
$config = [
'type' => 'RabbitMq',
'host' => '192.168.0.120',
'port' => 5672,
'password' => '123456',
'user' => 'jinanav',
'vhost' => '/',
'debug' => true,
'consumers' => Consumers::class
];
//生命一个消费者
class Consumers{
//消费信息
function deal($data){
var_dump($data);
}
}
//载入配置
MqService::config($config);
//向队列推入消息
MqService::sendMessage(['title'=>'测试']);
//消费队列信息
MqService::dequeue();
效果