项目上在使用异步处理时,总会想到用RabbitMQ的生产者和消费者模式,下文演示如何使用RabbitMQ Java客户端生产和消费消息。
maven依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.2.1</version>
</dependency>
生产者
package com.nightmare.study2023.study0103;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author: WuChang
* @Description:
* @Date: Created in 2023-01-03 5:17 PM
* @Modified By:
*/
public class RabbitProducer {
private static final String EXCHANGE_NAME ="exchange_nightmare";
private static final String ROUTING_KEY="routing_nightmare";
private static final String QUEUE_NAME="queue_nightmare";
private static final String IP_ADDRESS="**********";
private static final int PORT = 5672;
private static final String USERNAME="guest";
private static final String PASSWORD ="*****";
public static void main(String[] args){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("/");
try {
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 创建一个类型为direct、持久化的、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
// 创建一个持久话、非排他的、非自动删除的队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
// 将交换器与对垒通过路由键绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
// 发送一条持久化的消息
String message ="Hello World!";
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
// 关闭资源
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
生产者代码中 首先和RabbitMQ服务器建立连接,根据连接创建信道,创建交换器和队列,并通过路由键进行绑定;然后发送一条消息;最后关闭信道,关闭连接
消费者
package com.nightmare.study2023.study0103;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @Author: WuChang
* @Description:
* @Date: Created in 2023-01-04 2:28 PM
* @Modified By:
*/
public class RabbitConsumer {
private static final String QUEUE_NAME="queue_nightmare";
private static final String IP_ADDRESS="******";
private static final int PORT = 5672;
private static final String USERNAME="guest";
private static final String PASSWORD ="*******";
public static void main(String[] args){
Address[] addresses = new Address[]{
new Address(IP_ADDRESS,PORT)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("/");
try{
Connection connection = factory.newConnection(addresses); // 创建连接
Channel channel = connection.createChannel();// 创建信道
channel.basicQos(64); // 设置客户端最多接收未被ack的消息的个数
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){
System.out.println("message" + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
channel.basicAck(envelope.getDeliveryTag(),false);
}catch (InterruptedException e){
e.printStackTrace();
}catch (IOException e){
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,consumer);
// 等待毁掉函数执行完毕之后,关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}