springboot + 消息队列

消息服务中间件可以提升系统异步通信、扩展解耦能力。

举个例子:传统注册流程和使用消息队列比较

第一种:用户注册信息写入数据库后在按照顺序先后发送注册邮件和短信,走完这三步后用户才完成注册

传统注册流程

第二种:用户注册消息写入数据后通过开启线程池的方式,同时发送邮件和注册短信,两个线程完成后返回,用户注册完成

采用多线程方式

第三步:用户注册消息写入数据后将消息写入到消息队列,此时发送邮件和发送短信通过异步读取消息队列执行具体的操作,但在写入消息队列之前已经返回给用户,用户注册完成,而发送短信和邮件是异步操作

消息队列

应用解耦

传统方式下单后调用库存系统更新商品的剩余库存。采用消息队列方式,可达到应用解耦,下单后订单系统调用mq将消息写入到消息队列,由库存系统订阅消息队列并按照业务逻辑处理对应消息

传统方式
采用消息队列方式

流量削峰

比如我们有100W用户同时抢100台手机,服务层并发请求压力至少为100W。

既然服务层知道库存只有100台手机,那完全没有必要把100W个请求都传递到数据库啊,那么可以先把这些请求都写到消息队列缓存一下,数据库层订阅消息减库存,减库存成功的请求返回秒杀成功,失败的返回秒杀结束。

当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。

消息队列主要有两种形式的目的地:

  • 队列(queue):点对点消息通信

消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移除队列,此时消息只有唯一的发送者和接收者,但并不是只能有一个接收者,这种情况下可以存在多个接收者,但一个接收者接收后,其他的就不再处理

  • 主题(topic):发布(public)/订阅(subscribe)消息通信

订阅式:发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息

再说下JMS和AMOP,JAMS(Java Messge Service) JAVA消息服务是给予JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现;
AMQP是高级消息队列协议,也是一个消息代理的规范,兼容JMS,RabbitMQ是AMQP的实现,AMQP提供了五种消息模型:direct exchage、fanout exchange、topic change、headers exchange、system exchange;

RabbitMQ

  • Message
    消息,消息是不具名的,它由消息头和消息体组成。消息题是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等

  • Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序

  • Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
    Exchange有4中类型:direct(默认)、fanout、topic和headers,不同类型的Exchange转发消息的策略有所区别,direct指的是点对点,fanout、topic和headers指的是订阅

  • Queue
    消息队列,用来保存消息直到发送给消费者,它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走

  • Binding
    绑定,用于消息队列和交换器之间的关联。一个关联就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange和Queue的绑定可以是多对多的关系

  • Connection
    网络连接,比如一个TCP连接

  • Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接

  • Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序

  • Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMOP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是/

  • Broker
    表示消息队列服务器实体

RabbitMQ 运行机制

  • Exchange

AMQP中消息的路由过程和Java开发者熟悉的JMS存在一些差别,AMQP中增加了Exchange和Binding的角色。生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收,而Binding决定交换器的消息应该发送到哪个队列上。

  • Exchange

Exchange分发消息时类型不同分发策略不同,目前有四种类型:direct、fanout、topic、headers。headers匹配AMQP消息的header而不是路由键,headers交换器和direct交换器完全一致,但性能相差很多,目前几乎用不到了,所以直接看另外三种类型:

  • Direct
Direct Exchange

消息中的路由键如果和Binding中的binding key一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为 “dog”,则交换器只转发“dog”的消息到此消息队列。不会转发“dog.puppy”,也不会转发“dog.guard”等等。direct是完全匹配、单播的模式。

  • Fanout
Fanout Exchange

每个发到Fanout类型交换器的消息都会分到所有绑定的队列上去。fanout交换器不处理路由键,只是简单的将队列交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每个子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的

  • topic
Topic Exchange

topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分为单词,这些单词之间用点个靠。它同样也会识别两个匹配符:符号# 和 符号* ,#匹配0个或多个单词,匹配一个单词

1、docker 安装 RabbitMQ

$ docker ps // docker查看运行的容器
$ docker pull rabbitmq:management // 拉取镜像,加上management,表明是带web管理界面的,便于管理
Using default tag: latest
latest: Pulling from library/rabbitmq
423ae2b273f4: Pull complete 
de83a2304fa1: Pull complete 
f9a83bce3af0: Pull complete 
b6b53be908de: Pull complete 
834aeb8bfce6: Pull complete 
3407dc115970: Pull complete 
a003ac596878: Pull complete 
5664c847e128: Pull complete 
d392687f8224: Pull complete 
8b6336946e55: Pull complete 
Digest: sha256:fb0023bda1d2237418417557b212ca027180dcdf6da883891c08b78591cc8c15
Status: Downloaded newer image for rabbitmq:latest
docker.io/library/rabbitmq:latest

$ docker images // 查看当前镜像
REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE
rabbitmq            latest              4c8cb17c3ab5        31 hours ago        151MB

$ docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 4c8cb17c3ab5 // 运行镜像 ,默认的端口为:5672,web管理的端口为:15672,
328854acf29841bb7bd1dee54b6d0c4c4b5077284e301fe97bcdbdd0494ddf17

浏览器直接访问服务器地址 ,默认用户和密码为:guest


2、添加交换器

添加交换器的步骤,选择Durable持久化的原因是,关闭服务器后交换器还在

分别添加direct、fanout、topic

添加direct
添加fanout
添加 topic
添加的交换器在列表展示

3、添加消息队列

添加消息队列的步骤
添加的消息队列在列表展示

4、交换器绑定Binding

direct交换器 绑定消息队列
fonout交换器 绑定消息队列
topic交换器 绑定消息队列

5、发送消息到交换器

发送到direct交换器,根据绑定时路由键(Routing key)发送到消息队列

发送消息的步骤
查看发送消息结果

消息队列中get message

发送到fonout交换器,fonout绑定了所有队列 不管什么路由键是什么都可以接收消息

查看发送消息结果,四个队列都接收到了这条消息
任意查看一个队列

发送到topic交换器,按照路由规则接收消息

4个消息队列都收到了
继续发送一个其他的消息测试
这次只有两个收到了

springboot 整合消息队列

使用idea 创建过程,可参考前几篇中的内容

选择注入依赖的时候,选择web和rabbitmq
# application.properties
# 不写默认是localhost
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672

RabbitAutoConfiguration自动配置类提供了连接工厂ConnectionFactory,可以获得rabbitmq的连接,通过RabbitProperties获得user和password,给RabbitMQ发送和接收消息

发送点对点消息

@SpringBootTest
class SpringbootRabbitmqApplicationTests {

    @Autowired
    RabbitTemplate rabblitTemplete;

    /**
     * 单播(点对点)
     */
    @Test
    void contextLoads() {
//        rabblitTemplete.send(exchange,routeKey,message); // Message需要自己构造一个;自己构造的时候定义消息体内容和消息头。

//        rabblitTemplete.convertAndSend(exchange,routeKey,object); // object默认当成消息体,只需要传递要发送的对象,会自动序列化发送给rabbitmq
        Map<String,Object> map = new HashMap<>();
        map.put("msg","这是第一个消息");
        map.put("data", Arrays.asList("helloworld",123,true)); // 对象被默认序列化以后发送出去
//        rabblitTemplete.convertAndSend("exchange.direct","sangyu.news",map);
        rabblitTemplete.convertAndSend("exchange.direct","sangyu.news",new Book("AA","BB")); // 发送包含对象的信息

    }

接收消息

@SpringBootTest
class SpringbootRabbitmqApplicationTests {

    @Autowired
    RabbitTemplate rabblitTemplete;
    /**
     * 接收数据
     */
    @Test
    public void receive(){
        Object o = rabblitTemplete.receiveAndConvert("sangyu.news");
        System.out.println(o.getClass());
        System.out.println(o);
    }
 }

将数据自动地转为json发送出去,通过MessageConverter

@Configuration
public class MyAMQConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

发送广播消息

@SpringBootTest
class SpringbootRabbitmqApplicationTests {
    /**
     * 广播 fanout
     */
    @Test
    public void sendMsg(){
        rabblitTemplete.convertAndSend("exchange.fanout","",new Book("EE","FF"));
    }
}

通过AmqpAdmin :创建和删除Queue、Exchange、Binging

@SpringBootTest
class SpringbootRabbitmqApplicationTests {
    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createExchange(){
//        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
//        System.out.println("ok");
        amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
        amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqpadmin.haha",null));
    }
}

@EnableRabbit + @RabbitListener 监听消息队列的内容

@Service
public class BookService {
    @RabbitListener(queues = "sangyu.news") // 可以监听多个消息队列,只要这个消息队列有消息,receive这个方法就会被调用
    public void receive(Book book){ //通过监听mq来调用的
        System.out.println("收到消息: " + book);
    }
    @RabbitListener(queues = "sangyu")
    public void receive02(Message message){
        System.out.println(message.getBody()); // 获得消息的内容
        System.out.println(message.getMessageProperties()); // 获得消息体
    }
}
@EnableRabbit //开启基于注解的RabblitMQ注解
@SpringBootApplication
public class SpringbootRabbitmqApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqApplication.class, args);
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容