1.发送消息
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
exchange:交换器的名称
routingKey:路由键
props:消息的基本属性集:messageProperties.MINIMAL_BASIC
messageProperties.MINIMAL_PERSISTENT_BASIC
messageProperties.BASIC
messageProperties.TEXT_PLAIN
messageProperties.PERSISTENT_TEXT_PLAIN
byte[] body:消息体,需要发送的内容
示例:
String message = "hello!";
channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.TEXT_PLAIN, message.getBytes());
上行代码向EXCHANGE_NAME交换器发送了一条非持久化的String类型的消息。
2.消费消息
RabbitMQ的消费模式分两种:推模式和拉模式。推模式采用basicConsume进行消费,拉模式通过basicGet进行消费
2.1.推模式
在推模式中,可以通过持续订阅的方式获得消息。
Channel类中basicConsume重载形式:
String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
参数说明:
queue:队列名称
autoAck:设置是否自动确认。建议设成false。
consumerTag:消费者标签,用来区分多个消费者
noLocal:设置为true,则表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者。
exlusive:设置是否排他。
arguments:设置消费者的其他参数
callback:设置消费者的回调函数。用来处理RabbitMQ推送过来的消息。比如DefaultConsumer,使用时,需要客户端重写其中的方法。对于消费者客户端来说,重写handleDelivery方法是十分方便的,更复杂的消费者需要重写更多的方法。
使用示例:
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
首先我们声明了一个DefaultConsumer的子类,并且重写了它的handleDelivery方法。
2.2.拉模式
通过channel.baiscGet方法可以单条地获取消息,其返回值是GetResponse。没有方法重载,只有:
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
queue:队列名称
autoAck:是否自动确认,如果是false,则同样需要channel.basicAck来自动确认。
使用实例:
GetResponse response=channel.basicGet(QUEUE_NAME,true);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);