rabbitmq入门

1、介绍

2、MQ优势

应用解耦:提高系统容错性和可维护性
异步提速:提升用户体验和系统吞吐量
削峰填谷:提高系统稳定性

代码
工具类-RabbitmqUtils

package nk.gk.wyl.module.rabbitmq.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author :zhangshuailing
 * @date :2022/10/31 14:48
 * @project_name :pt-rabbitmq
 * @description:
 */
@Slf4j
public class RabbitmqUtils {
    private String ip;
    private Integer port;
    private String username;
    private String password;
    private String virtual;

    public RabbitmqUtils(String ip, Integer port, String username, String password,String virtual) {
        this.ip = ip;
        this.port = port;
        this.username = username;
        this.password = password;
        this.virtual = virtual;
    }

    /**
     * 获取连接
     * @return
     */
    @SneakyThrows
    public Connection getConnection(){
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtual);
        try {
            //获取TCP长连接
            return connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
            throw new Exception("连接失败。"+e.getMessage());
        } catch (TimeoutException e) {
            e.printStackTrace();
            throw new Exception("连接超时。"+e.getMessage());
        }
    }
}

工具类-RabbitUtils

package nk.gk.wyl.module.rabbitmq.utils;

import com.rabbitmq.client.Connection;


public class RabbitUtils {
    private static final String ip = "192.168.116.122";
    private static final int port = 5672;//5672是RabbitMQ的默认端口号
    private static final String username = "zhangshuailing";
    private static final String password = "123";
    private static final String virtual = "zsl";

    private volatile Connection connection = null;

    public  Connection getConnection(){
        if(connection != null){
            return connection;
        }
        try {
            connection = new RabbitmqUtils(ip,port,username,password,virtual).getConnection();
            return connection;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

工具类-RabbitConstant

package nk.gk.wyl.module.rabbitmq.utils;

public class RabbitConstant {
    public static final String QUEUE_HELLOWORLD = "helloworld";
    public static final String QUEUE_SMS = "sms";
    public static final String EXCHANGE_WEATHER = "weather";
    public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
    public static final String QUEUE_BAIDU = "baidu";
    public static final String QUEUE_SINA = "sina";
    public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}

2、MQ六个工作模式

1》简单模式 Hello word

image.png

一个生产者对应一个消费者!!

代码
生产者-Consumer

package nk.gk.wyl.module.rabbitmq.helloword;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import nk.gk.wyl.module.rabbitmq.utils.RabbitConstant;
import nk.gk.wyl.module.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author :zhangshuailing
 * @date :2022/10/31 14:48
 * @project_name :pt-rabbitmq
 * @description:
 */
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取TCP长连接
         Connection conn = new RabbitUtils().getConnection();
        //创建通信“通道”,相当于TCP中的虚拟连接
        Channel channel = conn.createChannel();
        //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
        //第一个参数:队列名称ID
        //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
        //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
        //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
        //其他额外的参数, null
       channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null);
        //从MQ服务器中获取数据
        //创建一个消息消费者
        //第一个参数:队列名
        //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
        //第三个参数要传入DefaultConsumer的实现类
       channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new DefaultConsumer(channel){
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("消费者接收到的消息:"+message);

                System.out.println("消息的TagId:"+envelope.getDeliveryTag());
                //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

消费者-Consumer

package nk.gk.wyl.module.rabbitmq.helloword;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import nk.gk.wyl.module.rabbitmq.utils.RabbitConstant;
import nk.gk.wyl.module.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author :zhangshuailing
 * @date :2022/10/31 14:50
 * @project_name :pt-rabbitmq
 * @description:
 */
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取TCP长连接
        Connection conn = new RabbitUtils().getConnection();
        //创建通信“通道”,相当于TCP中的虚拟连接
        Channel channel = conn.createChannel();
        //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
        //第一个参数:队列名称ID
        //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
        //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
        //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
        //其他额外的参数, null
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null);
        String message = "hello world";
        //四个参数
        //exchange 交换机,暂时用不到,在后面进发布订阅时才会用到
        //队列名称
        //额外的设置属性
        //最后一个参数是要传递的消息字节数组
        channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null,message.getBytes());
        channel.close();
        conn.close();
        System.out.println("===发送成功===");
    }
}

2》工作队列模式 work queues

image.png

一个生产者对应多个消费者,但是一条消息只能有一个消费者获得消息!!!
轮询分发就是将[消息队列中的消息,依次发送给所有消费者。一个消息只能被一个消费者获取。

代码
生产者-Producer

package nk.gk.wyl.module.rabbitmq.workqueue;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import nk.gk.wyl.module.rabbitmq.utils.RabbitConstant;
import nk.gk.wyl.module.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author zhangshuailing
 * 生产者
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitUtils().getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        for(int i = 1 ; i <= 100 ; i++) {
            String jsonSMS = "先生【"+i+"】您的车票已预订成功";
            channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());
        }
        System.out.println("发送数据成功");
        channel.close();
        connection.close();
    }
}

消费者-多个

package nk.gk.wyl.module.rabbitmq.workqueue;
import com.rabbitmq.client.*;
import nk.gk.wyl.module.rabbitmq.utils.RabbitConstant;
import nk.gk.wyl.module.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
/**
 * @author zhangshuailing
 * 消费者
 */
public class SMSSender1 {
    public static void main(String[] args) throws IOException {
        Connection connection = new RabbitUtils().getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个
        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("SMSSender1-短信发送成功:" + jsonSMS);

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}
package nk.gk.wyl.module.rabbitmq.workqueue;

import com.rabbitmq.client.*;
import nk.gk.wyl.module.rabbitmq.utils.RabbitConstant;
import nk.gk.wyl.module.rabbitmq.utils.RabbitUtils;

import java.io.IOException;

/**
 * @author zhangshuailing
 * 消费者
 */
public class SMSSender2 {

    public static void main(String[] args) throws IOException {
        Connection connection = new RabbitUtils().getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个
        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("SMSSender2-短信发送成功:" + jsonSMS);
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

3》发布订阅模式 Publish/Subscribe

image.png

一个消费者将消息首先发送到[交换器],交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。
ps:X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers,这里的交换器是 fanout。下面我们会详细介绍这几种交换器。
  两个消费者获得了同一条消息。即就是,一个消息从交换机同时发送给了两个队列中,[监听]这两个队列的消费者消费了这个消息;
如果没有队列绑定交换机,则消息将丢失。因为交换机没有存储能力,消息只能存储在队列中。

4》路由模式 Routing

image.png

生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。
也就是让消费者有选择性的接收消息。
路由模式,是以路由规则为导向,引导消息存入符合规则的队列中。再由队列的消费者进行消费的。

5》主题模式 topics

image.png

上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。
符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。
与路由模式相似,但是,主题模式是一种模糊的匹配方式。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容