先上消费者代码consumer.php
<?php
//声明连接参数
cnn = new AMQPConnection(cnn->connect()) {
echo "Cannot connect to the broker";
exit();
}
//在连接内创建一个通道
cnn);
//创建一个交换机
ch);
//声明路由键
exchangeName = 'exchange_1';
//设置交换机名称
exchangeName);
//设置交换机类型
//AMQP_EX_TYPE_DIRECT:直连交换机
//AMQP_EX_TYPE_FANOUT:扇形交换机
//AMQP_EX_TYPE_HEADERS:头交换机
//AMQP_EX_TYPE_TOPIC:主题交换机
ex->setFlags(AMQP_DURABLE);
//声明交换机
q = new AMQPQueue(q->setName('queue_1');
//设置队列持久
q->declareQueue();
//交换机和队列通过q->bind(routingKey);
//接收消息并进行处理的回调方法
function receive(queue) {
//休眠两秒,
sleep(2);
//echo消息内容
echo queue->ack(q->consume("receive");
生产者代码publisher.php:
<?php
cnn = new AMQPConnection(cnn->connect()) {
echo "Cannot connect to the broker";
exit();
}
cnn);
ch);
//消息的路由键,一定要和消费者端一致
exchangeName = 'exchange_1';
exchangeName);
ex->setFlags(AMQP_DURABLE);
i=1;i++){
//消息内容
i,
'hello' => 'world',
);
//发送消息到交换机,并返回发送结果
//delivery_mode:2声明消息持久,持久的队列+持久的消息在RabbitMQ重启后才不会丢失
echo "Send Message:".msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2))."\n";
//代码执行完毕后进程会自动退出
}