RabbitMQ官网中文版教程:
http://rabbitmq.mr-ping.com/tutorials_with_python/[1]Hello_World.html
上述教程示例为pathon版,Java版及相应解释如下:
生产者代码
package com.xc.rabbitmq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消息生产者
*
* Created by xc.
*/
public class Producer {
private final static String QUEUE_NAME = "Hello";
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 地址
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("xc");
factory.setPassword("xc");
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个新的频道
Channel channel = connection.createChannel();
// 声明一个队列 -- 在 RabbitMQ 中, 队列的声明是幂等性的
// 一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同
// 也就是说, 如果不存在, 就创建, 如果存在, 不会对已经存在的队列产生任何影响
// 如果并不知道是生产者还是消费者程序中的哪个先运行,在程序中重复将队列重复声明一下是种值得推荐的做法
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
// 发送到消息队列中
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("P [x] Sent '" + message + "'");
// 关闭频道和连接
channel.close();
connection.close();
}
}
程序执行结果如下图(rabbitmq Web管理界面):
RabbitMQ中收到了一条消息。
消费者代码
package com.xc.rabbitmq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消费者
*
* Created by xc.
*/
public class Consumer {
private static final String QUEUE_NAME = "Hello";
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 地址
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbit");
factory.setPassword("carrot");
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 声明要关注的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("C [*] Waiting for messages. To exit press CTRL + C");
// DefaultConsumer类 实现了 Consumer 接口, 通过传入一个频道, 告诉服务器我们需要哪个频道的消息
// 如果频道中有消息, 就会执行回调函数 handleDelivery
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("C [x] Received '" + message + "'");
}
};
// 自动回复队列应答 -- RabbitMQ 中的消息确认机制, 后面章节会详细介绍
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
程序执行结果如下图(rabbitmq Web管理界面):
如图,Hello队列中的消息被消费掉了。
程序的maven依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.0</version>
</dependency>
API参数解释:
1) Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
@param queue : the name of the queue 队列名
@param durable :true if we are declaring a durable queue (the queue will survive a server restart) 队列是否持久化
@param exclusive : true if we are declaring an exclusive queue (restricted to this connection) 是否为当前连接的专用队列,在连接断开后,会自动删除该队列
@param autoDelete : true if we are declaring an autodelete queue (server will delete it when no longer in use) 当没有任何消费者使用时,自动删除该队列
@param arguments : other properties (construction arguments) for the queue 其他队列配置
2)void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
@param exchange the exchange to publish the message to 消息发送到哪个交换器(之后章节会讲到)。填空字符串表示指定默认交换器,MQ会根据routingKey生成一个默认的direct类型的交换器。
@param routingKey :the routing key 路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
@param mandatory :true if the 'mandatory' flag is to be set 如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉
@param immediate :true if the 'immediate' flag is to be set. Note that the RabbitMQ server does not support this flag. 如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
@param props other properties for the message - routing headers etc 需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合exchange(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留。
@param body the message body 具体消息
简单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
- String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
@param queue the name of the queue 队列名
@param autoAck true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements 是否自动ACK。如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答
@param callback an interface to the consumer object 消息消费的回调