RocketMQ 常用消息类型

文章首发于公众号《程序员果果》
地址 : https://mp.weixin.qq.com/s/dYqGd9zi2mNelsNNLIribg

消息发送示例

导入依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

application.yml:

rocketmq:
  name-server: 172.16.250.129:9876
  producer:
    group: myGroup

普通消息

同步发送

原理:

同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。


应用场景:

这种可靠性同步地发送方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

示例代码:

public void sendMsg() throws Exception {

    Message message = new Message(
            // 普通消息所属的Topic
            "Topic-Normal",
            // Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列 RocketMQ 的服务器过滤。
            "TagA",
            // Message Body可以是任何二进制形式的数据。
            "Hello MQ".getBytes()
    );
    rocketMQTemplate.getProducer().send( message );
    // 等同于上面的方式(常用)
    //rocketMQTemplate.convertAndSend("Topic-Normal:TagA","Hello MQ".getBytes());
}

异步发送

原理:

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。RocketMQ异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。


应用场景:

异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

示例代码:

public void sendAsyncMsg() {
    Map<String , Object> map = new HashMap<>();
    map.put( "name" , "zs" );
    map.put( "age" , 20);

    rocketMQTemplate.asyncSend( "Topic-Normal", map , new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            // 消息发送成功。
            log.info( "async send success" );
        }

        @Override
        public void onException(Throwable throwable) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
            log.info( "async send fail" );
        }
    } );
}

顺序消息

全局顺序消息

  • 概念:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
  • 适用场景:适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。
  • 示例:在证券处理中,以人民币兑换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息。

分区顺序消息

  • 概念:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。
  • 适用场景:适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照FIFO原则进行消息发布和消费的场景。
  • 示例:
  • 用户注册需要发送发验证码,以用户ID作为Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
  • 电商的订单创建,以订单ID作为Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

无序消息、全局顺序消息、分区顺序消息的对比


示例代码

public void sendOrderlyMsg() {
    //根据指定的hashKey按顺序发送
    for (int i = 0; i < 1000; i++) {
        String orderId = "biz_" + i % 10;
        // 分区顺序消息中区分不同分区的关键字段,Sharding Key与普通消息的key是完全不同的概念。
        // 全局顺序消息,该字段可以设置为任意非空字符串。
        String shardingKey = String.valueOf(orderId);
        try {
            SendResult sendResult = rocketMQTemplate.syncSendOrderly( "Topic-Order", "send order msg".getBytes(), shardingKey );
            // 发送消息,只要不抛异常就是成功。
            if (sendResult != null) {
                System.out.println(new Date() + " Send mq message success . msgId is:" + sendResult.getMsgId());
            }
        }
        catch (Exception e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
            System.out.println(new Date() + " Send mq message failed");
            e.printStackTrace();
        }
    }
}

延时消息

概念:

Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。

适用场景:

消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。

示例代码:

public void sendDelayMsg() {
    rocketMQTemplate.syncSend( "Topic-Delay",
            MessageBuilder.withPayload( "Hello MQ".getBytes() ).build(),
            3000,
            //设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
            //messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            3 );
}

事务消息

概念:

  • 事务消息:消息队列RocketMQ提供类似X/Open XA的分布式事务功能,通过消息队列RocketMQ事务消息能达到分布式事务的最终一致。
  • 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。

分布式事务消息的优势:

消息队列RocketMQ分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。

典型场景:

在电商购物车下单时,涉及到购物车系统和交易系统,这两个系统之间的数据最终一致性可以通过分布式事务消息的异步处理实现。在这种场景下,交易系统是最为核心的系统,需要最大限度地保证下单成功。而购物车系统只需要订阅消息队列RocketMQ的交易订单消息,做相应的业务处理,即可保证最终的数据一致性。

事务消息交互流程如下图所示:

事务消息发送步骤如下:

  1. 发送方将半事务消息发送至消息队列RocketMQ服务端。
  2. 消息队列RocketMQ服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤如下:

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

示例代码:

发送事务消息包含以下两个步骤:

    1. 发送半事务消息(Half Message,示例代码如下
/**
 * 事务消息
 */
public void sendTransactionMsg() {
    TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(
            "Topic-Tx:TagA",
            MessageBuilder.withPayload( "Hello MQ transaction===".getBytes() ).build(),
            null );
    SendStatus sendStatus = transactionSendResult.getSendStatus();
    LocalTransactionState localTransactionState = transactionSendResult.getLocalTransactionState();
    System.out.println( new Date() + " Send mq message status "+ sendStatus +" ,  localTransactionState "+ localTransactionState );
}
    1. 发送方开始执行本地事务逻辑
@Component
@RocketMQTransactionListener
public class TxProducerListener implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        System.out.println("TX message listener execute local transaction");
        RocketMQLocalTransactionState result;
        try {
            // 业务代码( 例如下订单 )
            result = RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            System.out.println("execute local transaction error");
            result = RocketMQLocalTransactionState.UNKNOWN;
        }
        return result;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 检查本地事务( 例如检查下订单是否成功 )
        System.out.println("TX message listener check local transaction");
        RocketMQLocalTransactionState result;
        try {
            //业务代码( 根据检查结果,决定是COMMIT或ROLLBACK )
            result = RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            // 异常就回滚
            System.out.println("check local transaction error");
            result = RocketMQLocalTransactionState.ROLLBACK;
        }
        return result;
    }

}
    1. 发送方在本地事务执行后,若向服务端提交二次确认是Commit,RocketMQ服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;订阅方代码如下
@Component
@Slf4j
@RocketMQMessageListener(topic = "Topic-Tx",consumerGroup = "consumer-tx-group")
public class TxConsumerListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("Receive message:{}" , message);
    }

}

源码

https://github.com/gf-huanchupk/SpringBootLearning

系列文章

RocketMQ 简介
RocketMQ 安装

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容