springboot2 整合rocketMQ4.3

2019.5.27日更新
目前springcloud alibaba的项目已经快要更新到第一个relase版本了,如果想要整合rocketmq建议移步springcloud alibaba的github哪里有更加方便的操作,下面仅供参考

前言
前段时间更新技术架构的时候,看到rocketmq出了4的版本,而且本身这个mq有事务消息,在分布式的场景中有很好的启发性,和作用,而且本身它也是阿里开源到apache的一个项目,从出身还是实力来说都很不错,所以今天就抱着支持国产的心态开看一看这个mq

1.项目代码
maven

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
 </dependency>

producer
单从分类producer的官网doc来看主要分成3种,
DefaultMQProducer
TransactionMQProducer
messagingAccessPoint.createProducer()
本文主要说的是,DefaultMQProducer,TransactionMQProducer
DefaultMQProducer
默认的producer,从官方的文档来看,前四个都是对这个producer的运用只是set的值不同而已,而且是很细微的变化而已


1B48.tmp.png

那么从最简单的开始
application.yml文件

rocketmq: 
  # 生产者配置
  producer: 
    groupName: ${spring.application.name}
    namesrvAddr: 192.168.40.133:9876
    default: false

yml文件配置读取类

@Getter
@Setter
@ConfigurationProperties(prefix = "rocketmq.producer")
@Configuration
@ToString
public class ProducerConfig {
    private String namesrvAddr;
    
    private String groupName;
}

producer类的创建类,需要注意的是这个producer一个程序里面只能出现一个,当重复创建时就会报错

@Log4j2
@Configuration
public class ProducerConfigure {

    @Autowired
    private ProducerConfig producerConfigure;

    /**
     * 创建普通消息发送者实例
     * 
     * @return
     * @throws MQClientException
     */
    @Bean
    @ConditionalOnProperty(prefix = "rocketmq.producer", value = "default", havingValue = "true")
    public DefaultMQProducer defaultProducer() throws MQClientException {
        log.info(producerConfigure.toString());
        log.info("defaultProducer 正在创建---------------------------------------");
        DefaultMQProducer producer = new DefaultMQProducer(producerConfigure.getGroupName());
        producer.setNamesrvAddr(producerConfigure.getNamesrvAddr());
        producer.setVipChannelEnabled(false);
        producer.setRetryTimesWhenSendAsyncFailed(10);
        producer.start();
        log.info("rocketmq producer server开启成功---------------------------------.");
        return producer;
    }
}

当producer创建完毕之后就是consumer的公用设置
首先也是yml和配置类的定义

rocketmq: 
  # 消费者配置
  consumer: 
    groupName: ${spring.application.name}
    namesrvAddr: 192.168.40.133:9876
@Getter
@Setter
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Configuration
@ToString
public class ConsumerConfig {
    private String groupName;
    
    private String namesrvAddr;
}
@Configuration
@Log4j2
public abstract class DefaultConsumerConfigure {

    @Autowired
    private ConsumerConfig consumerConfig;

    // 开启消费者监听服务
    public void listener(String topic, String tag) throws MQClientException {
        log.info("开启" + topic + ":" + tag + "消费者-------------------");
        log.info(consumerConfig.toString());

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());

        consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());

        consumer.subscribe(topic, tag);

        // 开启内部类实现监听
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                return DefaultConsumerConfigure.this.dealBody(msgs);
            }
        });

        consumer.start();

        log.info("rocketmq启动成功---------------------------------------");

    }
    
    // 处理body的业务
    public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs);

}

值得注意的是,这里DefaultConsumerConfigure没有定义在什么时候运行,还有对body的操作也抽象出来了,提供给实现类做处理,方便业务抽取

@Log4j2
@Configuration
public class TestConsumer extends DefaultConsumerConfigure implements ApplicationListener<ContextRefreshedEvent>{

    @Override
    public void onApplicationEvent(ContextRefreshedEvent arg0) {
        try {
            super.listener("t_TopicTest", "Tag1");
        } catch (MQClientException e) {
            log.error("消费者监听器启动失败", e);
        }
        
    }

    @Override
    public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs)  {
        int num = 1;
        log.info("进入");
        for(MessageExt msg : msgs) {
            log.info("第" + num + "次消息");
            try {
                String msgStr = new String(msg.getBody(), "utf-8");
                log.info(msgStr);
            } catch (UnsupportedEncodingException e) {
                log.error("body转字符串解析失败");
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

那么这些写完之后基本就做测试类的编写了
这个地实现了ApplicationListener,让他在启动的时候就开始执行这个consumer,相信有些同学会喜欢用@PostConstruct,但是不要这么做,因为他会在init之前执行,那么有些类会加载不完全,会导致无法开机启动的

然后再controller里面引入producer,然后直接调用即可

@RestController
@RequestMapping("/test")
@Log4j2
public class TestController {

    @Autowired
    private DefaultMQProducer defaultMQProducer;
    
//  @Autowired
//  private TransactionMQProducer producer;
    
    @Autowired
    private TestTransactionListener testTransactionListener;

    @GetMapping("/test")
    public void test(String info) throws Exception {
        Message message = new Message("TopicTest", "Tag1", "12345", "rocketmq测试成功".getBytes());
                // 这里用到了这个mq的异步处理,类似ajax,可以得到发送到mq的情况,并做相应的处理
               //不过要注意的是这个是异步的
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("传输成功");
                log.info(GsonUtil.GSON.toJson(sendResult));
            }
            @Override
            public void onException(Throwable e) {
                log.error("传输失败", e);
            }
        });
    }
}

跑一跑,就可以看到结果了

说完了DefaultMQProducer那么就来说transactionMQProducer
同样参考官网的例子来做整合
在原来的ProducerConfigure类的基础上加上即可

需要注意的是ConditionalOnProperty这个必须得有,而且配置文件中
transaction和default中只能有一个是true,不然就会同时创建两个producer,那么就会报错

    /**
     * 创建事务消息发送者实例
     * 
     * @return
     * @throws MQClientException
     */
    @Bean
    @ConditionalOnProperty(prefix = "rocketmq.producer", value = "transaction", havingValue = "true")
    public TransactionMQProducer transactionMQProducer() throws MQClientException {
        log.info(producerConfigure.toString());
        log.info("TransactionMQProducer 正在创建---------------------------------------");
        TransactionMQProducer producer = new TransactionMQProducer(producerConfigure.getGroupName());

        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("client-transaction-msg-check-thread");
                        return thread;
                    }
                });
        producer.setNamesrvAddr(producerConfigure.getNamesrvAddr());
        producer.setExecutorService(executorService);
        producer.start();
        log.info("TransactionMQProducer server开启成功---------------------------------.");
        return producer;
    }

因为transaction的流程下,rocketmq会先发送一个consumer不可见的消息,然后在调用
TransactionListener这个接口中的executeLocalTransaction,中的方法执行事务,然后方法内部需要返回
一个LocalTransactionState的枚举信息,分别为

public enum LocalTransactionState {
    COMMIT_MESSAGE, // 提交
    ROLLBACK_MESSAGE, // 回滚
    UNKNOW, // 未知
}

相应的当我们返回的是COMMIT_MESSAGE时,那么producer会把消息提交到mq上,
如果是ROLLBACK_MESSAGE那么producer就会结束,并且不提交到mq,

public interface TransactionListener {
    /**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

需要注意的是checkLocalTransaction是用作mq长时间没有收到producer的executeLocalTransaction响应的时候调用的,这个类在3.0之后的版本就被阉割了,只有接口,却没有实现,那么直接写一个空实现即可,在我这边的代码上,我做了一个抽象,把需要实现的executeLocalTransaction抽象出来

@Configuration
public abstract class AbstractTransactionListener implements TransactionListener {

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        return LocalTransactionState.COMMIT_MESSAGE;
    }

}

这个是executeLocalTransaction的实现类,简单的做了些业务,然后返回了一个commit

@Configuration
@Log4j2
public class TestTransactionListener extends AbstractTransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                log.info(new String(msg.getBody()));
        return LocalTransactionState.COMMIT_MESSAGE;
    }

}

consumer是没有变化的,基本相同,那么就直接贴上controller的测试代码

    
    @GetMapping("t_test")
    public void Ttest(String info) throws Exception {
        Message message = new Message("t_TopicTest", "Tag1", "12345", "rocketmq测试成功".getBytes());
        producer.setTransactionListener(testTransactionListener);
        producer.setSendMsgTimeout(10000);
        producer.sendMessageInTransaction(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("传输成功");
                log.info(GsonUtil.GSON.toJson(sendResult));
            }
            @Override
            public void onException(Throwable e) {
                log.error("传输失败", e);
            }
        });
    }

跑一跑即可看到结果参数

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容