文章参考:Rabbit实战指南
发送消息
如果要发送一个消息,可以使用Channel类的basicPublish方法,比如发送一条内容为“Hello World”的消息,参考如下:
byte[] messageBodyBytes = "Hello,world!".getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);
为了更好地可控制发送,可以使用mandatory这个参数,或者可以发送一些特定属性的信息:
channel.basicPublish(exchangeName,routingKey,mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
下面这行代码发送了一条消息,这条消息的投递模式(delivery mode)设置为2,即消息会被持久化(存入磁盘中)在服务器中。同时这条消息的优先级(priority)设置为1,content-type为“text/plain”。可以自己设定消息的属性:
channel.basicPublish(exchangeName,routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("hidden")
.build(),
messageBodyBytes
);
也可以发送一条带有headers的消息:
Map<String,Object> headers = new HashMap<>();
headers.put("localtion","here");
headers.put("time","today");
channel.basicPublish(exchangeName,routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build(),
messageBodyBytes
);
也可以发送一条带有过期时间(expiration)的消息
channel.basicPublish(exchangeName,routingKey,
new AMQP.BasicProperties.Builder()
.expiration("6000")
.build(),
messageBodyBytes
);
basicPublish的重载方法:
void basicPublish(String exchange,String routingKey,
BasicProperties props,
byte[] body) throws IOException;
void basicPublish(String exchange,String routingKey,
boolean mandatory,
BasicProperties props,
byte[] body) throws IOException;
void basicPublish(String exchange,String routingKey,
boolean mandatory,boolean imediate,
BasicProperties props,
byte[] body) throws IOException;
具体参数解释如下:
- exchange:交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会发送到RabbitMQ默认的交换器中。
- routingKey:路由键,交换器根据路由键将消息存储到相应的队列。
- props:消息的基本属性集,其包含14个属性成员,分别有contentType、contentEncoding、header(Map<Stirng,Object>)、deliveryMode、priority。correlationId、replyTo、expiration、messageId、timestamp、type、urserId、appId、clusterId。
- byte[] body:消息体(payload),真正需要发送的消息。
- mandatory和immediate
消费消息
RabbitMQ的消费模式分为两种:推(Push)模式和拉(Pull)模式。推模式采用Basic.Consume进行消费,而拉模式则调用Basic.Get进行消费。
推模式
在推模式中,可以通过持续订阅的方式来消费信息,使用到的相关类有:
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DeafultConsumer;
接收消息一般通过实现Consumer接口或者继承DefaultConsumer类来实现。当调用与Consumer类来实现。当调用与Consumer相关的API方法,不同的订阅采用不同的消费者标签来区分彼此,在同一个Channel中的消费者也需要通过唯一的消费者标签以作区别。代码如下:
boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName,autoAck,"myConsumerTag",
new DefaultConsumer(channel){
@Override
public void handleDevlivery(String consumerTag,
Envelop envelope,
AMQP.BasicProperties properties,
byte[] bode
)throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
//(process the message components here ...)
channel.basicAck(deliveryTag,false);
}
}
)
上面代码显式地设置autoAck为false,然后在接受到消息之后进行显式ack操作(channel.basicAck),对应消费者来说这个设置是非常必要的,可以防止消息不必要的丢失。
Channel类中basicConsume方法有如下几种形式:
String basicConsume(String queue,Consumer callback) throws IOException
String basicConsume(String queue,boolean autoAck,Consumer callback) throws IOException
String basicConsume(String queue,boolean autoAck,Map<String,Object> arguments,Consumer callback) throws IOException
String basicConsume(String queue,boolean autoAck,String consumerTag,Consumer callback) throws IOException
-
String basicConsume(String queue,boolean autoAck,String consumerTag,boolean noLocal,boolean exclusive,Map<String,Object> arguments,Consumer callback) throws IOException
对应参数说明:
- queue:队列的名称
- autoAck:设置是否自动确认。建议设置为false
- consumerTag:消费者标签,用来区分多个消费者;
- noLocal:设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者
- exclusive:设置是否排他;
- arguments:设置消费者的其他参数
- callback:设置消费者的回调函数。用来处理RabbitMQ推送过来的消息,比如DefaultConsumer,使用时需要客户端重写其中的方法。
对于消费者客户端来说,重写handleDelivery方法是十分方便的。更复杂的消费者客户端会重写更多的方法,具体如下:
void handleConsumeOk(String consumerTag); void handleCancelOk(String consumerTag); void handleCancel(String consumerTag) throws IOException; void handleShutdownSignal(String consumerTag,ShutdowSignalException sig); void handleRecoverOk(String consumerTag);
比如handleShutdownSignal方法,当Channel或者Connection关闭的时候回调用。再者,handleConsumeOk方法会在其他方法之前调用,返回消费者标签。
重写handleCancelOk和handleCancel方法,这样消费端可以再显示地或者隐式地取消订阅的时候调用。也可以通过channel.basicCancel方法来显式地取消一个消费者的订阅:
channel.basicCancel(consumerTag)
注意上面这行代码会首先触发handleConsumerOk方法,之后触发handleDelivery方法,最后才触发handleCanceOk方法。
和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些callback会被分配到与Channel不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,如channe.queueDeclare、channel.basicCancel等。
每个Channel都拥有自己独立的线程。最常用的做法是一个Channel对应一个消费者。意味着消费者彼此之间没有任何关联。也可以在一个Channel中维持多个消费者,但是要注意一个问题,如果Channel中的一个消费者一直在运行,那么其他消费者的callback会被“耽搁”。
拉模式
通过channel.basicGet方法可以单条的获取消息,其返回值是GetResponse。Channel类的basicGet方法没有其他重载方法,只有:
GetResponse basicGet(String queue,boolean autoAck) throws IOException;
其中queue代表队列名称,如果设置autoAck为false,那么同样需要调用channel.basicAck来确认消息被成功接收。
拉模式的关键代码如下所示:
GetResponse response = channel.basicGet(QUEUE_NAME,false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
注意:
Basic.Consume将信道(Channel)置为接收模式,知道取消队列的订阅为止。在接收模式期间,RabbitMQ会不断地推送消息给消费者,当然推送消息的个数还是会受到Basic.Qos的限制。
如果只想从队列获取单挑消息而不是持续订阅,建议使用Basic.Get进行消费。但不能将Basic.Get放在一个循环里来代替Basic.Consume,这样做会严重影响RabbitMQ的性能。消费者理应使用Basic.Consume方法。