base方法
<?php
namespace core\utils;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
class BaseRabbitmqService
{
//死信队列和交换机
public static $dlxQueue = 'dlx.queue';
public static $dlxExchange = 'dlx.exchange';
//死信之后的队列和交换机
public static $normalQueue = 'normal.queue';
public static $normalExchange = 'normal.exchange';
//消息发布者的routing_key
public static $msgKey = 'msgkey';
private static function getConfig()
{
return [
'host' => '127.0.0.1',
'port' => 5672,
'name' => 'guest',
'password' => 'guest',
];
}
public static function getConnection()
{
$config = self::getConfig();
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['name'], $config['password']);
self::init($connection);
return $connection;
}
//初始化一些队列信息
private static function init(&$connection)
{
$channel = $connection->channel();
//定义交换机
$channel->exchange_declare(self::$dlxExchange, AMQPExchangeType::DIRECT, false, true);
$channel->exchange_declare(self::$normalExchange, AMQPExchangeType::FANOUT, false, true);
//定义队列,在正常队列超时之后就送去死信队列
$args = new AMQPTable();
// 消息过期方式:设置 queue.normal 队列中的消息5s之后过期,毫秒单位
$args->set('x-message-ttl', 5000);
// 设置队列最大长度方式: x-max-length
//$args->set('x-max-length', 1);
$args->set('x-dead-letter-exchange', self::$dlxExchange);
$args->set('x-dead-letter-routing-key', self::$msgKey);
$channel->queue_declare(self::$normalQueue, false, true, false, false, false, $args);
$channel->queue_declare(self::$dlxQueue, false, true, false, false);
$channel->queue_bind(self::$normalQueue, self::$normalExchange);
$channel->queue_bind(self::$dlxQueue, self::$dlxExchange, self::$msgKey);
}
}
生产者
<?php
namespace app\api\controller;
use core\utils\BaseRabbitmqService;
use PhpAmqpLib\Message\AMQPMessage;
class ProducerController extends BaseRabbitmqService
{
public static function doTask()
{
$connection = self::getConnection();
$channel = $connection->channel();
$data = [];
//生成5条数数据
for ($i = 0; $i < 5; $i++) {
$data['user_id'] = mt_rand(1, 100);
$data['order_amount'] = mt_rand(10000, 99999);
$data['order_number'] = mt_rand(100, 999);
// $msg = new AMQPMessage(json_encode($data),
// array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) # 使消息持久化
// );
$msg = new AMQPMessage(json_encode($data));
echo " [x] Send ", date('Y-m-d H:i:s') . '--' . json_encode($data), "\n";
$channel->basic_publish($msg, self::$normalExchange);
}
$channel->close();
$connection->close();
}
}
消费者
<?php
namespace app\api\controller;
use core\utils\BaseRabbitmqService;
class ConsumerController extends BaseRabbitmqService
{
public static function doTask()
{
$connection = self::getConnection();
$channel = $connection->channel();
$callback = function ($msg) {
echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, "\n";
//主动确认信息处理完
// $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//没有确认就手动丢给死信队列
sleep(10);
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
};
//发送一个未处理完就不发送下一个
// $channel->basic_qos(null, 1, null);
$channel->basic_consume(self::$normalQueue, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}
死信消费者
<?php
namespace app\api\controller;
use core\utils\BaseRabbitmqService;
class ConsumerDeadController extends BaseRabbitmqService {
public static function doTask() {
$connection = self::getConnection();
$channel = $connection->channel();
$callback = function($msg) {
echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, "\n";
//主动确认信息处理完
// $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//发送一个未处理完就不发送下一个
// $channel->basic_qos(null, 1, null);
$channel->basic_consume(self::$dlxQueue, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}