文章参考:Rabbit实战指南
交换机的类型
-
fanout
- 它会把所有发送到该交换机的消息路由到所有与该交换机绑定的队列中。
-
direct
- 它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。
-
topic
- 将消息路由到BindingKey和RoutingKey相匹配的队列中,它约定:
- RoutingKey作为一个点号"."分割的字符串(被点号"."分隔开的每一段独立的字符串称为一个单词)
- BindingKey和RoutingKey一样也是点号"."分割的字符串。
- BindingKey中可以存在两种特殊字符串"*"和"#",用于做模糊匹配,其中"#"用来匹配一个单词,"*"用于匹配多规格单词。
- 路由键为“com.rabbitmq.client”的消息会同时路由到Queue1和Queue2;
- 路由键为“com.hidden.client”的消息只会路由到Queue2中;
- 路由键为java.util.concurrent的消息会被丢弃或者返回给生产者(需要设置)
- 将消息路由到BindingKey和RoutingKey相匹配的队列中,它约定:
-
headers
- headers类型的交换机不依赖路由键的匹配规则来路由信息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送信息到交换器时,RabbitMQ会获取到该信息的headers,对比其中键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列。
RabbitMQ运转流程
- 生产者发送消息:
- 生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
- 生产者声明一个交换器,并设置相关属性,比如交互器类型,是否持久化等
- 生产者声明一个队列并设置相关属性,比如是否排他,是否持久化,是否自动删除等
- 生产者通过路由键将交换器和队列绑定起来
- 生产者发送消息至RabbitMQ Broker,其中包含路由键,交换器等信息
- 相应的交换器根据接受到的路由键查找相匹配的队列
- 如果找到,则将从生产者发送过来的信息存入相应的队列中
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关闭信道
- 关闭连接
- 消费者接受消息:
- 消费者连接到RabbitMQ Broker,建立一个连接,开启一个信道
- 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
- 等待RabbitMQ Broker 回应并投递相应队列中的消息,消费者接受消息
- 消费者确认接收到的消息
- RabbitMQ从队列中删除相应已经被确认的消息
- 关闭信道
- 关闭连接
连接RabbitMQ
下面代码用来在给定的参数(IP地址、端口号、用户名、密码等)下连接RabbitMQ:
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(virtualHost);
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
Connection conn = factory.newConnection();
也可以选择使用URI的方式来实现,
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@ipAddress:portNumber/virtualHost");
Connection conn = factory.newConnection();
创建之后,Channel可以用来发送或者接收消息了。
Channel channel = conn.createChannel();
Channel或者Connection中有个isOpen方法可以用来检测其是否处于开启状态,不推荐在生产环境上使用isOpen方法,这个方法的返回值依赖于shutdownCause的存在,有可能会产生竞争。
isOpen方法的源码
public boolean isOpen(){
synchronized(this.monitor){
return this.shutdownCause == null;
}
}
错误的使用isOpen方法
public void brokenMethod(Channel channel){
if(channel.isOpen()){
//The following code depends on the channel being in open state.
//However there is a possibility of the change in the channel state
//between isOpen() and basicQos(1) call
...
channel.baseicQos(1);
}
}
通常情况下,在调用createXXX或者newXXX方法之后,我们可以简单地认为Connection或者Channel已经成功地处于开启状态,而并不会在代码中使用isOpen这个检测方法。如果在使用Channel的时候其已经处于关闭状态,那么程序会抛出个 com. rabbitmq. client. ShutdownSignalException,我们只需捕获这个异常即可。当然同时也要试着捕获I0Exception 或者SocketException, 以防Connection意外关闭。示例代码如下:
public void validMethod(Channel channel){
try{
...
channel.basicQos(1);
} catch (ShutdownSignalException sse){
//possibly check if channel was closed
//by the time we started action and reasons for
//closing it
...
} catch (IOException ioe) {
// check why connection was closed
...
}
}