在 RabbitMQ 在设计的时候,特意让生产者和消费者“脱钩”,也就是消息的发布和消息的消费之间是解耦的。
在 RabbitMQ 中,有不同的投递机制(生产者),但是每一种机制都对性能有一定的影响。一般来讲速度快的可靠性低,可靠性好的性能差,具体怎
么使用需要根据你的应用程序来定,所以说没有最好的方式,只有最合适的方式。
接下来我们分别介绍下这几种投递机制
1. 无保障模式
通过 basicPublish 发布你的消息并使用正确的交换器和路由信息,你的消息会被接收并发送到合适的
队列中。但是如果有网络问题,或者消息不可路由,或者 RabbitMQ 自身有问题的话,这种方式就有风险。所以无保证的消息发送一般情况下不推荐。
2. 失败确认模式
在发送消息时设置 mandatory 标志,告诉 RabbitMQ,如果消息不可路由,应该将消息返回给发送者,并通知失败。可以这样认为,开启 mandatory
是开启故障检测模式
注意:它只会让 RabbitMQ 向你通知失败,而不会通知成功。如果消息正确路由到队列,则发布者不会受到任何通知。带来的问题是无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失
代码演示:
生产者代码:
1.发布消息时,传入mandatory为true
2.channal增加消息路由失败的回调处理
public class MandatoryProducer {
//队列名称
public static final String DIRECT_NAME = "lb_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个direct队列
channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
//增加消息发布失败的回调处理
channel.addReturnListener(
(replyCode, replyText, exchange, routingKey, properties, body) -> {
String message = new String(body);
System.out.println("返回的message:" + message);
System.out.println("返回的replycode:" + replyCode);
System.out.println("返回的replyText:" + replyText);
System.out.println("返回的exchange:" + exchange);
System.out.println("返回的routeKey:" + routingKey);
});
for (int i = 1; i < 4; i++) {
if (i != 1) {
//发布这条消息时,乱写一个路由键,使其无法匹配到相应队列,看是否会进入上面回调处理的方法中
channel.basicPublish(DIRECT_NAME, "乱打的", true, null, ("hello,world!" + i).getBytes());
} else {
//以下这种为正常发布,是可以匹配到队列的
channel.basicPublish(DIRECT_NAME, "lb", true, null, ("hello,world!" + i).getBytes());
}
}
channel.close();
connection.close();
}
}
消费者:只创建绑定'lb'这个路由键的队列,并消费该队列消息
public class MandatoryConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个队列
String queueName = "lb-queue";
channel.queueDeclare(queueName, false, false, false, null);
//队列绑定交换机"lb_direct",并指定路由键为"lb"
channel.queueBind(queueName, "lb_direct", "lb");
//消费投递至该队列的消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("消费者接收到消息***:" + new String(body));
}
});
}
}
先启动消费者等待消费消息,再启动生产者生产消息
消费者端会接受到hellworld2,hellworld3两条消息
生产者因为hellworld无法路由到具体的队列,从而进入路由失败的回调处理方法中
3. 发送方确认模式
基于事务的性能问题,RabbitMQ 团队为我们拿出了更好的方案,即采用发送方确认模式,该模式比事务更轻量,性能影响几乎可以忽略不计。
原理:生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),由这
个 id 在生产者和 RabbitMQ 之间进行消息的确认。
不可路由的消息,当交换器发现,消息不能路由到任何队列,会进行确认操作,表示收到了消息。如果发送方设置了 mandatory 模式,则会先调用
addReturnListener 监听器。
可路由的消息,要等到消息被投递到所有匹配的队列之后,broker 会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确
到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了
确认消息的序列号。
confirm 模式最大的好处在于他可以是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最
终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产
者应用程序同样可以在回调方法中处理该 nack 消息决定下一步的处理。
Confirm 的三种实现方式:
方式一:channel.waitForConfirms()普通发送方确认模式;消息到达交换器,就会返回 true。
代码演示:
生产者:
/**
* 生产者
*/
public class ConfirmProducer {
//交换机名称
public static final String DIRECT_NAME = "lb_direct";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个direct队列
channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
//开启发送者确认模式
channel.confirmSelect();
channel.basicPublish(DIRECT_NAME, "乱打的", true, null, "hello,world!".getBytes());
//判断信道中刚才的消息是否发布成功
if (channel.waitForConfirms()) {
System.out.println("msg send success");
} else {
System.out.println("msg send fail");
}
channel.close();
connection.close();
}
}
执行结果:我们可以发现消息已经发布成功。
注意点:上图中我们给定的路由键是"乱打的",是匹配不到队列的。但是在这个模式下,仍然返回了发布成功的信息。因此这里需要强调一点,消息发布成功只是代表已经成功发送至交换机,并不代表成功的路由到对应队列,更不代表消息已经被消费成功。
接下来结合上面的失败确认模式,看看结果
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个direct队列
channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
//增加消息路由失败的回调处理
channel.addReturnListener(
(replyCode, replyText, exchange, routingKey, properties, body) -> {
String message = new String(body);
System.out.println("返回的message:" + message);
System.out.println("返回的replycode:" + replyCode);
System.out.println("返回的replyText:" + replyText);
System.out.println("返回的exchange:" + exchange);
System.out.println("返回的routeKey:" + routingKey);
});
//开启发送者确认模式
channel.confirmSelect();
channel.basicPublish(DIRECT_NAME, "乱打的", true, null, "hello,world!".getBytes());
if (channel.waitForConfirms()) {
System.out.println("msg send success");
} else {
System.out.println("msg send fail");
}
channel.close();
connection.close();
}
可以发现,消息是发布成功,但是是路由失败的
方式二:channel.waitForConfirmsOrDie()批量确认模式;使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未到达交换器就会抛出 IOException 异常。
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个direct队列
channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
//开启发送者确认模式
channel.confirmSelect();
//发送4条消息
for (int i = 0; i < 4; i++) {
channel.basicPublish(DIRECT_NAME, "乱打的", true, null, "hello,world!".getBytes());
}
//将上面的确认方式改成批量确认
channel.waitForConfirmsOrDie();
channel.close();
connection.close();
}
方式三:channel.addConfirmListener()异步监听发送方确认模式
public class ConfirmAsyncProducer {
//交换机名称
public static final String DIRECT_NAME = "lb_direct";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个direct队列
channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
//开启发送者确认模式
channel.confirmSelect();
//发送确认监听
channel.addConfirmListener(new ConfirmListener() {
//deliveryTag 每成功发送一条消息自增1
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("发送成功回调,deliveryTag=" + deliveryTag);
//doSomeThingAfterSuccess.....
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//doSomeThingAfterFail.....
}
});
//发送四条消息
for (int i = 0; i < 4; i++) {
channel.basicPublish(DIRECT_NAME, "乱打的", true, null, "hello,world!".getBytes());
}
channel.close();
connection.close();
}
}
启动消费者,4条消息已经发送成功,deliveryTag每次自增
4. 备用交换机
在第一次声明交换器时被指定,用来提供一种预先存在的交换器,如果主交换器无法路由消息,那么消息将被路由到这个新的备用交换器
使用备用交换器,向往常一样,声明 Queue 和备用交换器,把 Queue 绑定到备用交换器上。然后在声明主交换器时,通过交换器的参数,alternate-exchange,,将备用交换器设置给主交换器。
生产者:
public class BackupProducer {
//正常的交换机
public static final String DIRECT_NAME = "lb_direct";
//备用交换机
public static final String DIRECT_BACKUP_NAME = "lb_backup_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明备用交换机
channel.exchangeDeclare(DIRECT_BACKUP_NAME, BuiltinExchangeType.FANOUT);
//声明主交换机
Map<String, Object> hashMap = new HashMap<>();
//通过键值对的方式将备用交换机的名称设置进去
hashMap.put("alternate-exchange", DIRECT_BACKUP_NAME);
//声明主交换机,最后一个参数是上面的hashMap
channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT, false, false, hashMap);
channel.basicPublish(DIRECT_NAME, "不知名路由键", false, null, "this is backup exchange test".getBytes());
channel.close();
connection.close();
}
}
消费者:
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个队列
String queueName = "lb-backup-queue";
channel.queueDeclare(queueName, false, false, false, null);
//绑定备用交换机,路由键的其实无所谓,因为备用交换机是一个fanout类型的交换机
channel.queueBind(queueName, BackupProducer.DIRECT_BACKUP_NAME, DirectProducer.ROUTE_KEY);
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println(consumerTag + ":" + "receiver[" + envelope.getRoutingKey() + "] : " + new String(body));
}
});
}
启动消费者,再启动生产者发送消息
结果表明,当主交换机发送消息,无法发送至指定队列时,会传送到备用交换机,绑定备用交换机的队列会收到消息
5. 事务
开启事务后,会执行下面步骤
1.生产者发送Tx.Select
2.服务端回复Tx.Select-Ok
3.生产者发送消息
4.生产者发送Tx.Commit
5.服务端回复Tx.Commit-Ok
6.如果提交前发生异常,生产者发送Tx.Rollback
7.服务端回复Tx.Rollback-Ok
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("maomaoyu.xyz");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TRANSACTION_EXCHANGE, BuiltinExchangeType.DIRECT);
try {
//开启事务
channel.txSelect();
channel.basicPublish(TRANSACTION_EXCHANGE, "lb1", true, null, "hello world".getBytes());
//事务提交
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
//出现异常回滚
channel.txRollback();
}
}
事务是非常消耗性能的且是同步的。如果需要发送多条消息,建议一次性提交,如果每次单独提交的话,步骤4和步骤5每次都需要执行,会消耗更多的性能。