一、pom引入包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
二、创建消息发送者
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消息生成者
*/
public class Producer {
public final static String QUEUE_NAME = "rabbitMQ.test";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ相关信息
factory.setHost("localhost");
// factory.setUsername("guest");
// factory.setPassword("guest");
// factory.setPort(5672);
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 声明一个队列
// 1.String queue表示队列名称
// 2.boolean durable是否持久化
// 3.exclusive是否是独占队列(创建者可以使用的私有队列,断开后自动删除)
// 4.boolean autoDelete当所有消费者客户端连接断开时是否自动删除队列
// 5.Map<String, Object> arguments队列的其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello RabbitMQ";
// 发送消息到队列中
// 1.String exchange交换机名称
// 2.String routingKey队列映射的路由key
// 3.BasicProperties props消息的其他属性
// 4.byte[] body发送信息的主体
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("Producer Send +'" + message + "'");
// 关闭通道和连接
channel.close();
connection.close();
}
}
三、创建消息接收者
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 消息接收者
*/
public class Customer {
private final static String QUEUE_NAME = "rabbitMQ.test";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ地址
factory.setHost("localhost");
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 声明要关注的队列
// channel.queueDeclare(QUEUE_NAME, false, false, true, null);
System.out.println("Customer Waiting Received messages");
// DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
}
};
// 自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
四、测试
1.运行Customer消息接收者开启服务
2.运行Producer消息发送者
3.可以看到Customer打印出Customer Received 'Hello RabbitMQ'