MQ 入门实践

MQ

Message Queue,消息队列,FIFO 结构。

image

例如电商平台,在用户支付订单后执行对应的操作;

image

优点:

  • 异步
  • 削峰
  • 解耦

缺点

  • 增加系统复杂性
  • 数据一致性
  • 可用性

JMS

Java Message Service,Java消息服务,类似 JDBC 提供了访问数据库的标准,JMS 也制定了一套系统间消息通信的规范;

区别于 JDBC,JDK 原生包中并未定义 JMS 相关接口。

  1. ConnectionFactory

  2. Connection

  3. Destination

  4. Session

  5. MessageConsumer

  6. MessageProducer

  7. Message

协作方式图示为;

image

业界产品

ActiveMQ RabbitMQ RocketMQ kafka
单机吞吐量 万级 万级 10 万级 10 万级
可用性 非常高 非常高
可靠性 较低概率丢失消息 基本不丢 可以做到 0 丢失 可以做到 0 丢失
功能支持 较为完善 基于 erlang,并发强,性能好,延时低 分布式,拓展性好,支持分布式事务 较为简单,主要应用与大数据实时计算,日志采集等
社区活跃度

ActiveMQ

作为 Apache 下的开源项目,完全支持 JMS 规范。并且 Spring Boot 内置了 ActiveMQ 的自动化配置,作为入门再适合不过。

快速开始

添加依赖;

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

消息发送;

// 1. 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2. 工厂创建连接
Connection connection = factory.createConnection();
// 3. 启动连接
connection.start();
// 4. 创建连接会话session,第一个参数为是否在事务中处理,第二个参数为应答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 根据session创建消息队列目的地
Destination queue = session.createQueue("test-queue");
// 6. 根据session和目的地queue创建生产者
MessageProducer producer = session.createProducer(queue);
// 7. 根据session创建消息实体
Message message = session.createTextMessage("hello world!");
// 8. 通过生产者producer发送消息实体
producer.send(message);
// 9. 关闭连接
connection.close();

Spring Boot 集成

自动注入参考:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration

添加依赖;

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

添加 yaml 配置;

spring:
  activemq:
    broker-url: tcp://localhost:61616
  jms:
    #消息模式 true:广播(Topic),false:队列(Queue),默认时false
    pub-sub-domain: true

收发消息;

@Autowired
private JmsTemplate jmsTemplate;

// 接收消息
@JmsListener(destination = "test")
public void receiveMsg(String msg) {
    System.out.println(msg);
}

// 发送消息
public void sendMsg(String destination, String msg) {
    jmsTemplate.convertAndSend(destination, msg);
}

高可用

基于 zookeeper 实现主从架构,修改 activemq.xml 节点 persistenceAdapter 配置;

<persistenceAdapter>
    <replicatedLevelDB
        directory="${activemq.data}/levelDB"
        replicas="3"
        bind="tcp://0.0.0.0:0"
        zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183"
        zkPath="/activemq/leveldb-stores"
        hostname="localhost"
    />
</persistenceAdapter>

broker 地址为:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false

负载均衡

在高可用集群节点 activemq.xml 添加节点 networkConnectors;

<networkConnectors>
    <networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/>
</networkConnectors>

更多详细信息可参考:https://blog.csdn.net/haoyuyang/article/details/53931710

集群消费

由于发布订阅模式,所有订阅者都会接收到消息,在生产环境,消费者集群会产生消息重复消费问题。

ActiveMQ 提供 VirtualTopic 功能,解决多消费端接收同一条消息的问题。于生产者而言,VirtualTopic 就是一个 topic,对消费而言则是 queue。

image

在 activemq.xml 添加节点 destinationInterceptors;

<destinationInterceptors> 
    <virtualDestinationInterceptor> 
        <virtualDestinations> 
            <virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/>    
        </virtualDestinations>
    </virtualDestinationInterceptor> 
</destinationInterceptors>

生产者正常往 testTopic 中发送消息,订阅者可修改订阅主题为类似 consumer.A.testTopic 这样来消费。

更多详细信息可参考:https://blog.csdn.net/java_collect/article/details/82154829

RocketMQ

是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。

架构图示

image
  1. Name Server

    名称服务器,类似于 Zookeeper 注册中心,提供 Broker 发现;

  2. Broker

    RocketMQ 的核心组件,绝大部分工作都在 Broker 中完成,接收请求,处理消费,消息持久化等;

  3. Producer

    消息生产方;

  4. Consumer

    消息消费方;

快速开始

安装后,依次启动 nameserver 和 broker,可以用 mqadmin 管理主题、集群和 broker 等信息;

https://segmentfault.com/a/1190000017841402

添加依赖;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

消息发送;

DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("producer");
producer.start();
Message msg = new Message(
    "producer-topic",
    "msg",
    "hello world".getBytes()
);
//msg.setDelayTimeLevel(1);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
producer.shutdown();

delayLevel 从 1 开始默认依次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

参考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。

消息接收;

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("producer-topic", "msg");
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
    for (MessageExt msg : list) {
        System.out.println(new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

.\mqadmin.cmd sendMessage -t producer-topic -c msg -p "hello rocketmq" -n localhost:9876

Spring Boot 集成

添加依赖;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

添加 yaml 配置;

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: producer

发送消息;

@Autowired
private RocketMQTemplate mqTemplate;

public void sendMessage(String topic, String tag, String message) {
    SendResult result = mqTemplate.syncSend(topic + ":" + tag, message);
    System.out.println(JSON.toJSONString(result));
}

接收消息;

@Component
@RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test")
public class MsgListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }
}

Console 控制台

RocketMQ 拓展包提供了管理控制台;

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

image

重复消费

产生原因:

  1. 生产者重复投递;
  2. 消息队列异常;
  3. 消费者异常消费;

怎么解决重复消费的问题,换句话怎么保证消息消费的幂等性

通常基于本地消息表的方案实现,消息处理过便不再处理。

顺序消息

消息错乱的原因:

  1. 一个消息队列 queue,多个 consumer 消费;
  2. 一个 queue 对应一个 consumer,但是 consumer 多线程消费;

要保证消息的顺序消费,有三个关键点:

  1. 消息顺序发送
  2. 消息顺序存储
  3. 消息顺序消费
image

参考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly。

分布式事务

在分布式系统中,一个事务由多个本地事务组成。这里介绍一个基于 MQ 的分布式事务解决方案。

image

通过 broker 的 HA 高可用,和定时回查 prepare 消息的状态,来保证最终一致性。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351