RabbitMq

Rabbitmq

简介

RabbitMQ是由Erlang语言编写的实现了高级消息队列协议(AMQP)的开源消息代理软件(也可称为 面向消息的中间件)。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。

MQ的理解

可以将消息队列理解成一个书柜,provider-小红,consumer-小明,小红把小明要看的书,一本一本陆陆续续地放到书柜上,小明有空就去拿来看。

RabbitMq的重要概念

  1. Broker :经纪人。提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。粗略的可以将图中的RabbitMQ Server当作Broker。
  2. Exchange :消息交换机。指定消息按照什么规则路由到哪个队列Queue。(可以理解成一个书柜)
  3. Queue :消息队列。消息的载体,每条消息都会被投送到一个或多个队列中。
  4. Binding :绑定。作用就是将Exchange和Queue按照某种路由规则绑定起来。
  5. RoutingKey :路由关键字。Exchange根据RoutingKey进行消息投递。
  6. Vhost :虚拟主机。一个Broker可以有多个虚拟主机,用作不同用户的权限分离。一个虚拟主机持有一组Exchange、Queue和Binding。
  7. Producer :消息生产者。主要将消息投递到对应的Exchange上面。一般是独立的程序。
  8. Consumer :消息消费者。消息的接收者,一般是独立的程序。
  9. Channel :消息通道,也称信道。在客户端的每个连接里可以建立多个Channel,每个Channel代表一个会话任务。

RabbitMq的消息处理流程

在rabbit mq中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式投递到相应的Queue上,Queue又将消息发送给已经在此Queue上注册的consumer。

消息队列的使用过程大概如下:

  1. 客户端连接到消息队列服务器,打开一个channel。
  2. 客户端声明一个exchange(转发消息,不做储存),并设置相关属性。
  3. 客户端声明一个queue,并设置相关属性。
  4. 客户端使用routing key,在exchange和queue之间建立好Binding关系。
  5. 生产者客户端投递消息到exchange。
  6. exchange接收到消息后,就根据消息的RoutingKey和已经设置的binding,进行消息路由(投递),将消息投递到一个或多个队列里。
  7. 消费者客户端从对应的队列中获取并处理消息。

以上流程的理解,provider-小红,consumer-小明,书柜-Queue,channel-地点,消息-书

小红(provider)将书(msg)放到某个地方(channel)的某个书柜(Queue),并和小明(consumer)规定是哪个第几排(RoutingKey),小红陆陆续续放上去,小明根据和小红约定的具体地点+书柜+第几排去拿书

RabbitMq的l路由类型

Direct(点对点) Exchange

  1. 名称:直接交换器类型(点对点)
  2. 默认的预先定义exchange名字:空字符串或者amq.direct
  3. 作用描述:根据Binding指定的Routing Key,将符合Key的消息发送到Binding的Queue。可以构建点对点消息传输模型。


    image.png

Fanout Exchange

  1. 名称:广播式交换器类型
  2. 默认的预先定义exchange名字:amq.fanout
  3. 作用描述:将同一个message发送到所有同该Exchange 绑定的queue。不论RoutingKey是什么,这条消息都会被投递到所有与此Exchange绑定的queue中。


    image.png

Topic(订阅) Exchange

  1. 名称:主题交换器类型
  2. 默认的预先定义exchange名字:amq.topic
  3. 作用描述:根据Binding指定的RoutingKey,Exchange对key进行模式匹配后投递到相应的Queue,模式匹配时符号“#”匹配一个或多个词,符号“*”匹配正好一个词,而且单词与单词之间必须要用“.”符号进行分隔。此模式可以用来支持经典的发布/订阅消息传输模型-使用主题名字空间作为消息寻址模式,将消息传递给那些部分或者全部匹配主题模式的queue。


    image.png

Headers Exchange

  1. 名称:标题交换器类型
  2. 默认的预先定义exchange名字:amq.match和amq.headers
  3. 作用描述:同direct exchange类似,不同之处是不再使用Routing Key路由,而是使用headers(Message attributes)进行匹配路由到指定Queue。
  4. Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成HashTable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型。

开发与使用

rabbitMq目前与springboot已经融合的非常好了,springboot可以直接采用自动化配置连接来访问rabbitMq,同时springboot直接支持rabbitMq的集群,不用像以前一样要单独部署一个HAProxy来作为rabbitMq的负载均衡。所以在开发项目中如果需要使用到RabbitMq,那么我们建议全部用springboot框架来开发。

开发配置

RabbitMq有许多配置,大部分配置都可以采用默认的方式,这里不一一叙述,但是在我们日常开发中,为了保证消息投递的高可用,我们会对一些特殊的配置进行设置。

基本配置

基本配置中我们需要在springboot项目中添加rabbitmq的依赖与mq的访问主机。

pom.xml 中添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.properties中配置rabbitmq的主机地址、端口、用户名、密码、虚拟主机等信息
spring.rabbitmq.host=192.168.1.45
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/gzjkDev
当然,如果是集群部署的话,配置则是这样,多个IP用逗号分开:
spring.rabbitmq.addresses=192.168.1.45:5672,192.168.1.46:5672,192.168.1.47:5672,
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/gzjkDev
发送确认模式配置

发送确认模式是开发中比较重要的一个细节,在使用RabbitMQ的时候,我们会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

RabbitMQ为我们提供了两种方式:
  1. 方式一:通过AMQP事务机制实现,这也是从AMQP协议层面提供的解决方案;

使用事务的方式固然好,但是从发送到broker投递到相应的exchange并持久化消息的时候,整个流程生产者都在等待,这样就大大降低的性能,有人做过测试,rabbitmq单机不使用事务情况下投递1000条消息需要89毫秒,使用事务的话投递1000条消息需要58244毫秒!所以在日常的开发中,我们是绝对不会推荐使用事务的方式投递消息的,这里也不再对事务模式做更多的代码实例,有兴趣的同事可以自己百度一下。

  1. 通过启动消息确认模式(无性能损失);
    发送消息确认,顾名思义就是生产者发送消息到MQ后,之后MQ主动告知生产者发送结果,生产者再对结果做相应的处理。
开启rabbitMq的消息发送确认模式配置:
spring.rabbitmq.publisher-confirms=true

消费确认模式也是比较重要的一项配置,默认情况下,springboot是没有开启消息消费确认模式的,当消费者获取到从MQ推过来的一个消息时,MQ中就会将该消息从相应的队列中移除,那么就会带来一个问题:

消费者通常在获取到MQ的消息时,会针对该消息做后续的一些消费逻辑处理,如果在这个过程中消费者出现异常,中途退出了,那么这条消息就没有被成功的消费,更糟糕的是,这条消息同时也被MQ从队列里删除了,当消费者服务恢复时,却永远都无法再次重新消费这条消息了!

开启消费确认模式配置:
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

代码示例

定义消息队列:
@Configuration
public class RabbitMQConfig {
    public final static String QUEUE_NAME = "test_queue";
    // 创建队列
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME);
    }
}
生产者代码:
@RestController
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMessage")
    public Object sendMessage() {
        for (int i = 0; i < 100; i++) {
            String value = i + "消息";
            System.out.println(value);
            rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, value);
        }
        return "ok";
    }
}
生产者发送消息监听确认
@Component
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("消息唯一标识:"+correlationData);
        System.out.println("确认结果:"+ack);
        System.out.println("失败原因:"+cause);
    }
}
消费者代码
@Component
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public class Consumer {
    @RabbitHandler
    public void consumeMessage(String messageChannel, Channel channel, Message message) throws IOException {
        System.out.println("HelloReceiver收到  : " + messageChannel + "收到时间" + new Date());
        try {
            //必须发送ack动作告诉RabbitMq你已成功消费消息,这么做的目的在于消费者服务在未消费完服务却宕机,重启时还可以继续消费上一个未消费完成的消息。
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("receiver success");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("receiver fail");
        }
    }
}

开发注意事项

消息幂等

消息幂等即保证生产者发送消息的数量要等于消费者消费消息的数量,这个适用于大多数的消息应用场景,尤其是对于一些与第三方系统对接消息,为了保证消息的幂等,除了配置消息发送确认模式和消费确认模式的同时,我们可以在生产者发送消息时记录日志,消费消息时也保留记录,这样可以在出现消息不匹配时有数可循。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容