1、介绍
2、MQ优势
应用解耦:提高系统容错性和可维护性
异步提速:提升用户体验和系统吞吐量
削峰填谷:提高系统稳定性
代码
工具类-RabbitmqUtils
package nk.gk.wyl.module.rabbitmq.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author :zhangshuailing
* @date :2022/10/31 14:48
* @project_name :pt-rabbitmq
* @description:
*/
@Slf4j
public class RabbitmqUtils {
private String ip;
private Integer port;
private String username;
private String password;
private String virtual;
public RabbitmqUtils(String ip, Integer port, String username, String password,String virtual) {
this.ip = ip;
this.port = port;
this.username = username;
this.password = password;
this.virtual = virtual;
}
/**
* 获取连接
* @return
*/
@SneakyThrows
public Connection getConnection(){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtual);
try {
//获取TCP长连接
return connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
throw new Exception("连接失败。"+e.getMessage());
} catch (TimeoutException e) {
e.printStackTrace();
throw new Exception("连接超时。"+e.getMessage());
}
}
}
工具类-RabbitUtils
package nk.gk.wyl.module.rabbitmq.utils;
import com.rabbitmq.client.Connection;
public class RabbitUtils {
private static final String ip = "192.168.116.122";
private static final int port = 5672;//5672是RabbitMQ的默认端口号
private static final String username = "zhangshuailing";
private static final String password = "123";
private static final String virtual = "zsl";
private volatile Connection connection = null;
public Connection getConnection(){
if(connection != null){
return connection;
}
try {
connection = new RabbitmqUtils(ip,port,username,password,virtual).getConnection();
return connection;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
工具类-RabbitConstant
package nk.gk.wyl.module.rabbitmq.utils;
public class RabbitConstant {
public static final String QUEUE_HELLOWORLD = "helloworld";
public static final String QUEUE_SMS = "sms";
public static final String EXCHANGE_WEATHER = "weather";
public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
public static final String QUEUE_BAIDU = "baidu";
public static final String QUEUE_SINA = "sina";
public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}
2、MQ六个工作模式
1》简单模式 Hello word
一个生产者对应一个消费者!!
代码
生产者-Consumer
package nk.gk.wyl.module.rabbitmq.helloword;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import nk.gk.wyl.module.rabbitmq.utils.RabbitConstant;
import nk.gk.wyl.module.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author :zhangshuailing
* @date :2022/10/31 14:48
* @project_name :pt-rabbitmq
* @description:
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//获取TCP长连接
Connection conn = new RabbitUtils().getConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = conn.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null);
//从MQ服务器中获取数据
//创建一个消息消费者
//第一个参数:队列名
//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
//第三个参数要传入DefaultConsumer的实现类
channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new DefaultConsumer(channel){
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者接收到的消息:"+message);
System.out.println("消息的TagId:"+envelope.getDeliveryTag());
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
消费者-Consumer
package nk.gk.wyl.module.rabbitmq.helloword;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import nk.gk.wyl.module.rabbitmq.utils.RabbitConstant;
import nk.gk.wyl.module.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author :zhangshuailing
* @date :2022/10/31 14:50
* @project_name :pt-rabbitmq
* @description:
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//获取TCP长连接
Connection conn = new RabbitUtils().getConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = conn.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null);
String message = "hello world";
//四个参数
//exchange 交换机,暂时用不到,在后面进发布订阅时才会用到
//队列名称
//额外的设置属性
//最后一个参数是要传递的消息字节数组
channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null,message.getBytes());
channel.close();
conn.close();
System.out.println("===发送成功===");
}
}
2》工作队列模式 work queues
一个生产者对应多个消费者,但是一条消息只能有一个消费者获得消息!!!
轮询分发就是将[消息队列中的消息,依次发送给所有消费者。一个消息只能被一个消费者获取。
代码
生产者-Producer
package nk.gk.wyl.module.rabbitmq.workqueue;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import nk.gk.wyl.module.rabbitmq.utils.RabbitConstant;
import nk.gk.wyl.module.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author zhangshuailing
* 生产者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = new RabbitUtils().getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
for(int i = 1 ; i <= 100 ; i++) {
String jsonSMS = "先生【"+i+"】您的车票已预订成功";
channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());
}
System.out.println("发送数据成功");
channel.close();
connection.close();
}
}
消费者-多个
package nk.gk.wyl.module.rabbitmq.workqueue;
import com.rabbitmq.client.*;
import nk.gk.wyl.module.rabbitmq.utils.RabbitConstant;
import nk.gk.wyl.module.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
/**
* @author zhangshuailing
* 消费者
*/
public class SMSSender1 {
public static void main(String[] args) throws IOException {
Connection connection = new RabbitUtils().getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender1-短信发送成功:" + jsonSMS);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
package nk.gk.wyl.module.rabbitmq.workqueue;
import com.rabbitmq.client.*;
import nk.gk.wyl.module.rabbitmq.utils.RabbitConstant;
import nk.gk.wyl.module.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
/**
* @author zhangshuailing
* 消费者
*/
public class SMSSender2 {
public static void main(String[] args) throws IOException {
Connection connection = new RabbitUtils().getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender2-短信发送成功:" + jsonSMS);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
3》发布订阅模式 Publish/Subscribe
一个消费者将消息首先发送到[交换器],交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。
ps:X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers,这里的交换器是 fanout。下面我们会详细介绍这几种交换器。
两个消费者获得了同一条消息。即就是,一个消息从交换机同时发送给了两个队列中,[监听]这两个队列的消费者消费了这个消息;
如果没有队列绑定交换机,则消息将丢失。因为交换机没有存储能力,消息只能存储在队列中。
4》路由模式 Routing
生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。
也就是让消费者有选择性的接收消息。
路由模式,是以路由规则为导向,引导消息存入符合规则的队列中。再由队列的消费者进行消费的。
5》主题模式 topics
上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。
符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。
与路由模式相似,但是,主题模式是一种模糊的匹配方式。