composer require maweibinguo/easyrabbitmq
RabbitMQ 交换机Exchange价有三个模式:
Direct(一对一) 要有key
Fanout(一对多)
Topic(多对对) 要有key
$config = [
"host" => "127.0.0.1",
"port" => "5672",
"user" => "guest",
"password" => "guest",
"vhost" => "/",
"channel_max_num" => 10,
];
$instance = RabbitMq::getInstance($config);
例如Direct
$instance->pushToDirect(
$msg = time(), //消息体内容
$exchange = "easy_direct_exchange", //交换机名称
$routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致。队列上的
$delaySec = 30 //延迟秒数,推送——》接受需要30s
);
例如Fanout
$instance->pushToFanout(
$msg = time(), //消息体内容
$exchange = "easy_fanout_exchange", //交换机名称
$delaySec = 30 //延迟秒数
);
例如Topic
$instance->pushToTopic(
$msg = time(), //消息体内容
$exchange = "easy_topic_exchange", //交换机名称
$routingKey = "easy.topic.queue",
$delaySec = 30 //延迟秒数
);
订阅模式下的可靠消费:后台运行--处于一直等待
$instance->consume(
$queueName = "direct_test_queue",//订阅的队列名称
$consumerTag = "c1",//消费标记
$exchange = "easy_direct_exchange",//交换机名称
$bindingKey = "direct_test_queue",//bindingkey,如果是直链交换机需要同routingKey保持一致
$callback = function($msg){
$body = $msg->body;
file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
//如果返回结果不绝对等于(===)true,那么将触发消息重试机制
return false;
},
//5次消费消费失败后,失败消息将会投递到的队列名称
$failedQueue = "easymq@failed"
);
拉取模式--一次性获取
$boot =$instance->get(
$queue = "get_queue",
$exchange = "easy_fanout_exchange",
$bindingKey = "",
$callback = function($msg){
$body = $msg->body;
file_put_contents("./6KW.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
//如果返回结果不绝对等于(===)true,那么将触发消息重试机制
return false;
},
//5次消费消费失败后,失败消息将会投递到的队列名称
$failedQueue = 'easymq@failed'
);