RocketMQ事务消息

为了保证消息投递的可靠性和效率,RocketMQ设计了事务消息投递

原理:本地事务处理后会主动告诉broke处理状态,如果出现网络等原因broker在指定时间没有收到producer发送的的事务处理状态,broker会主动执行checkLocalTransaction(MessageExt msg)回调函数,可以在回调函数去查数据库等操作看事务是否执行成功,如果没成功再执行一遍事务操作,如果还没成功就再等待RocketMQ轮询执行checkLocalTransaction(MessageExt msg)回调函数,记录调用函数次数,如果一直反复调用超过设置的最大次数,可以记录到本地日志,然后rocketMQ回滚消息。

TransactionProducer代码:
package com.xy.rocketmqtest.transactionProducer;

import com.xy.rocketmqtest.quickstart.constants.Const;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException {
        TransactionMQProducer producer = new TransactionMQProducer("tansaction_producer_group");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thead = new Thread(r);
                thead.setName("tansaction_producer_group"+"callback-thread");
                return thead;
            }
        });
        producer.setNamesrvAddr(Const.NAMESRV_ADDRS);
        producer.setExecutorService(executorService);
        //这个接口有两个方法,一个是异步执行本地事务,一个是回查
        TransactionListener transactionListener =new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        producer.start();
        Message message = new Message("tansaction_producer_topic","tagA","key",("hello world!").getBytes("UTF-8"));
        producer.sendMessageInTransaction(message,"回调参数");
        Thread.sleep(Integer.MAX_VALUE);
        producer.shutdown();
    }
}

TransactionListenerImpl代码:
package com.xy.rocketmqtest.transactionProducer;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;

public class TransactionListenerImpl implements TransactionListener {
    private ConcurrentHashMap<String, Integer> countHashMap = new ConcurrentHashMap<>();
    private final static int MAX_COUNT = 3;
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("------------开始异步执行本地事务————————————————");
        String callArg = (String) arg;
        System.out.println("回调参数:"+callArg);
        System.out.println("message:"+msg);
        //数据库入库成功标志
        //先设置为false模拟事务执行失败
        boolean addDadabase = false;
        //TODO 执行数据库入库等


        if(!addDadabase){
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("------------rocketMQ回查————————————————");
        //获取消息唯一ID
        String key = msg.getKeys();
        System.out.println("msgkeys:"+key);

        //数据库入库成功标志
        boolean addDadabase = false;
        //由于RocketMQ迟迟没有收到消息的确认消息,因此主动询问这条prepare消息,是否正常?
        //可以查询数据库看这条数据是否已经处理,没有处理可以做回查次数记录
        if(!addDadabase){
            //记录回查次数,不能让它无限回查,超过设置的次数就回滚事务
            Integer num = countHashMap.get(key);
            if(num != null &&  ++num == MAX_COUNT) {
                System.out.println("----事务执行了"+MAX_COUNT+"次,回滚消息----");
                countHashMap.remove(key);
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }if(num == null) {
                num = new Integer(1);
            }
            countHashMap.put(key, num);
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

Const代码:
package com.xy.rocketmqtest.quickstart.constants;

public class Const {
    public static final String NAMESRV_ADDR="106.13.88.XXX:19876";
    public static final String NAMESRV_ADDRS="106.13.88.XXX:19876;106.13.88.XXX:29876";
}

TransationConsumer代码:
package com.xy.rocketmqtest.transactionProducer;

import com.xy.rocketmqtest.quickstart.constants.Const;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class TransationConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_tansation_comsumer_name");
        consumer.setConsumeThreadMin(10);
        consumer.setConsumeThreadMax(10);
        //端口号一般是使用9876,Const.NAMESRV_ADDR=106.13.88.XXX:19876
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR);
        //从那个位置开始消费,可以从末尾,最前端  这里是最末尾
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //消费哪个主题的消息和标签,标签可以是表达式,如*表示消费该topic下的所有类型标签的消息
        consumer.subscribe("tansaction_producer_topic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt me = msgs.get(0);
                try{
                    String topic = me.getTopic();
                    String tags = me.getTags();
                    String keys =me.getKeys();
//                    if(keys.equals("key3")){
//                        System.out.println("模拟消费失败*********");
//                        int a = 1/0;
//                    }
                    String body = new String(me.getBody(), "UTF-8");
                    System.out.println("test_transation消费消息:topic:"+topic+"tags:"+tags+"keys:"+keys+"body:"+body);
                }catch (Exception e){
                    e.printStackTrace();
                    int reconsumerTimes = me.getReconsumeTimes();
                    System.out.println("第"+reconsumerTimes+"次消费该消息!");
                    if(reconsumerTimes==4){
                        //如果消费四次还没有消费到就做日志,然后做补偿
                    }
                    //消费失败会重新消费,时间从1s 开始然后越来越长,到最后一次重试的时间是2h 默认重试15次
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("-------------开始消费消息---");
    }

}

TransactionProducer控制台打印信息:
"C:\Program Files\Java\jdk1.8.0_111\bin\java.exe" "-javaagent:D:\IDEA\IntelliJ IDEA Free2020.1\IntelliJ IDEA Community Edition 2020.1\lib\idea_rt.jar=60281:D:\IDEA\IntelliJ IDEA Free2020.1\IntelliJ IDEA Community Edition 2020.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_111\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\rt.jar;E:\学习资料视频\rocketMQ\rocketmq-externals-master\rocketmq-test\target\classes;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.2.6.RELEASE\spring-boot-starter-web-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter\2.2.6.RELEASE\spring-boot-starter-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot\2.2.6.RELEASE\spring-boot-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.2.6.RELEASE\spring-boot-autoconfigure-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.2.6.RELEASE\spring-boot-starter-logging-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\96935\.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\96935\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.12.1\log4j-to-slf4j-2.12.1.jar;C:\Users\96935\.m2\repository\org\apache\logging\log4j\log4j-api\2.12.1\log4j-api-2.12.1.jar;C:\Users\96935\.m2\repository\org\slf4j\jul-to-slf4j\1.7.30\jul-to-slf4j-1.7.30.jar;C:\Users\96935\.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;C:\Users\96935\.m2\repository\org\yaml\snakeyaml\1.25\snakeyaml-1.25.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.2.6.RELEASE\spring-boot-starter-json-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.10.3\jackson-databind-2.10.3.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.10.3\jackson-annotations-2.10.3.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.10.3\jackson-core-2.10.3.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.3\jackson-datatype-jdk8-2.10.3.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.3\jackson-datatype-jsr310-2.10.3.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.10.3\jackson-module-parameter-names-2.10.3.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.2.6.RELEASE\spring-boot-starter-tomcat-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.33\tomcat-embed-core-9.0.33.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.33\tomcat-embed-el-9.0.33.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.33\tomcat-embed-websocket-9.0.33.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-validation\2.2.6.RELEASE\spring-boot-starter-validation-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\jakarta\validation\jakarta.validation-api\2.0.2\jakarta.validation-api-2.0.2.jar;C:\Users\96935\.m2\repository\org\hibernate\validator\hibernate-validator\6.0.18.Final\hibernate-validator-6.0.18.Final.jar;C:\Users\96935\.m2\repository\org\jboss\logging\jboss-logging\3.4.1.Final\jboss-logging-3.4.1.Final.jar;C:\Users\96935\.m2\repository\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;C:\Users\96935\.m2\repository\org\springframework\spring-web\5.2.5.RELEASE\spring-web-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-beans\5.2.5.RELEASE\spring-beans-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-webmvc\5.2.5.RELEASE\spring-webmvc-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-aop\5.2.5.RELEASE\spring-aop-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-context\5.2.5.RELEASE\spring-context-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-expression\5.2.5.RELEASE\spring-expression-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\mybatis\spring\boot\mybatis-spring-boot-starter\2.1.2\mybatis-spring-boot-starter-2.1.2.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-jdbc\2.2.6.RELEASE\spring-boot-starter-jdbc-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\com\zaxxer\HikariCP\3.4.2\HikariCP-3.4.2.jar;C:\Users\96935\.m2\repository\org\springframework\spring-jdbc\5.2.5.RELEASE\spring-jdbc-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-tx\5.2.5.RELEASE\spring-tx-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\mybatis\spring\boot\mybatis-spring-boot-autoconfigure\2.1.2\mybatis-spring-boot-autoconfigure-2.1.2.jar;C:\Users\96935\.m2\repository\org\mybatis\mybatis\3.5.4\mybatis-3.5.4.jar;C:\Users\96935\.m2\repository\org\mybatis\mybatis-spring\2.0.4\mybatis-spring-2.0.4.jar;C:\Users\96935\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-client\4.3.0\rocketmq-client-4.3.0.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-common\4.3.0\rocketmq-common-4.3.0.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-remoting\4.3.0\rocketmq-remoting-4.3.0.jar;C:\Users\96935\.m2\repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;C:\Users\96935\.m2\repository\io\netty\netty-all\4.1.48.Final\netty-all-4.1.48.Final.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-logging\4.3.0\rocketmq-logging-4.3.0.jar;C:\Users\96935\.m2\repository\io\netty\netty-tcnative-boringssl-static\2.0.30.Final\netty-tcnative-boringssl-static-2.0.30.Final.jar;C:\Users\96935\.m2\repository\org\apache\commons\commons-lang3\3.9\commons-lang3-3.9.jar;C:\Users\96935\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;C:\Users\96935\.m2\repository\org\springframework\spring-core\5.2.5.RELEASE\spring-core-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-jcl\5.2.5.RELEASE\spring-jcl-5.2.5.RELEASE.jar" com.xy.rocketmqtest.transactionProducer.TransactionProducer
------------开始异步执行本地事务————————————————
回调参数:回调参数
message:Message{topic='tansaction_producer_topic', flag=0, properties={KEYS=key, TRAN_MSG=true, UNIQ_KEY=C0A80065156C18B4AAC26F7230DF0000, WAIT=true, PGROUP=tansaction_producer_group, TAGS=tagA}, body=[104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 33], transactionId='C0A80065156C18B4AAC26F7230DF0000'}
------------rocketMQ回查————————————————
msgkeys:key
------------rocketMQ回查————————————————
msgkeys:key
------------rocketMQ回查————————————————
msgkeys:key
----事务执行了3次,回滚消息----

文件目录结构
第二部分:SpringBoot结合RocketMQ事务消息
场景:支付场景将客户扣款和商家账户入账通过RocketMQ中间件拆分成两个过程
SpringBoot结合RocketMQ事务消息文件目录结构
第一步:导入rocketMQ jar包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.xy</groupId>
    <artifactId>xy</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>xy</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <rocketmq.version>4.3.0</rocketmq.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.39</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.mybatis.generator</groupId>
                <artifactId>mybatis-generator-maven-plugin</artifactId>
                <version>1.3.5</version>
                <dependencies>
                    <dependency>
                        <groupId>mysql</groupId>
                        <artifactId>mysql-connector-java</artifactId>
                        <version>5.1.38</version>
                    </dependency>
                </dependencies>
            </plugin>
        </plugins>
    </build>
</project>
生产者代码部分:
package com.xy.service.producer;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

@Component
public class TransactionProducer implements InitializingBean {
    //TransactionMQProducer是rocketMQ jar包里的类
    private TransactionMQProducer producer;
    //线程池
    private ExecutorService executorService;
    @Autowired
    private TransactionListenerImpl transactionListenerImpl;
    //实际业务这些ip地址和端口写进配置文件,为了安全部署的机器最好是内网
    public static final String NAMESRV_ADDRS="106.13.88.xxx:19876;106.13.88.xxx
:29876";

    public static final String PRODUCER_SHOPPING_GROUP="producer_shopping_group";
    /**
     * @desc 私有无参构造函数,让别人不能通过new关键字实例化第二个TransactionProducer()对象,
     *       容器实例化的时候手动new出executorService和producer的对象
     * */
    private TransactionProducer(){
        System.out.println("**********************TransactionProducer构造函数初始化************************");
        this.producer = new TransactionMQProducer(PRODUCER_SHOPPING_GROUP);
        this.executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thead = new Thread(r);
                thead.setName(PRODUCER_SHOPPING_GROUP+"check-thread");
                /**此线程用来执行TransactionListenerImpl.checkLocalTransaction(MessageExt msg)函数,
                 * 如果return null; 能够正常投递消息,但是如果本地事务失败,不会执行回查函数
                 * */
                return thead;
            }
        });
        this.producer.setNamesrvAddr(NAMESRV_ADDRS);
        this.producer.setExecutorService(executorService);
    }
    /**
     * @desc 实现InitializingBean的bean在完成所有bean容器注入完成后执行此方法
     * */
    @Override
    public void afterPropertiesSet()  {
        System.out.println("**********************TransactionProducera fterPropertiesSet()方法执行************************");
        //把此步骤提取到 afterPropertiesSet()方法,防止TransactionProducer()构造方法执行时transactionListenerImpl还没被注入
        this.producer.setTransactionListener(transactionListenerImpl);
        start();
    }

    private void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public void shutdown(){
        this.producer.shutdown();
    }

    public TransactionSendResult sendMessage(Message message, Object argument){
        TransactionSendResult sendResult = null;
        try {
            sendResult =   this.producer.sendMessageInTransaction(message,argument);
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        return sendResult;
    }
}
package com.xy.service.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Component
public class TransactionListenerImpl implements TransactionListener {
    //countHashMap broker迟迟没有收到后台发来的本地事务执行完的记录,记录回查消息的key及回查函数调用次数
    private ConcurrentHashMap<String, Integer> countHashMap = new ConcurrentHashMap<>();
    //设置回查函数最大调用次数
    private final static int MAX_COUNT = 3;
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        CountDownLatch countDownLatch=null;
        try {
            Map<String,Object> params = (Map<String,Object>) arg;
            String openId = (String)params.get("openId");
            String orderId = (String)params.get("orderId");
            //金额使用BigDecimal
            double money = (double)params.get("money");
            countDownLatch = (CountDownLatch) params.get("countDownLatch");
            //TODO 做当前账号数据库表得金额数据更新(本地事务)
            //模拟数据库操作成功失败标志
            boolean updateLog = false;
            if(updateLog){
                //释放本地阻塞
                countDownLatch.countDown();
                return LocalTransactionState.COMMIT_MESSAGE;
            }else{
                log.info("*****本地事务执行失败*******");
                countDownLatch.countDown();
                //本地事务操作失败,ocketMQ需要执行回查函数再次执行本地事务
                return LocalTransactionState.UNKNOW;
            }

        } catch (Exception e) {
            e.printStackTrace();
            countDownLatch.countDown();
            //本地事务操作失败,rocketMQ需要执行回查函数再次执行本地事务
            return LocalTransactionState.UNKNOW;
        }
    }
    /**
     * @desc 有可能本地事务执行成功,但是因为网络波动,本地事务执行成功的消息没有发送到broker,
     *       RocketMQ迟迟没有收到消息的确认消息,可以做查库操作,发现本地事务的数据还没入库过段时间
     *       再查,查了设定的最大次数还没有数据,可以认定本地事务执行失败,做日志记录等操作,如果本地数据
     *       入库成功,就设置投递消息对消费者可见,让消费者正常消费
     * */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        log.info("------------rocketMQ回查————————————————");
        //获取消息唯一ID
        String key = msg.getKeys();
        //数据库入库成功标志
        boolean addDadabase = false;
        //由于RocketMQ迟迟没有收到消息的确认消息,因此主动询问这条prepare消息,是否正常?
        //可以先查询数据库看这条数据是否已经处理,没有处理可以做回查次数记录
        if(!addDadabase){
            //记录回查次数,不能让它无限回查,超过设置的次数就回滚事务
            Integer num = countHashMap.get(key);
            if(num != null &&  ++num == MAX_COUNT) {
                log.info("----事务执行了{}次,已经达到设置的阈值,不再重复回查,rocketMQ回滚取消这条消息投递",MAX_COUNT);
                countHashMap.remove(key);
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }if(num == null) {
                num = new Integer(1);
            }
            countHashMap.put(key, num);
            //本地事务操作失败,rocketMQ需要执行回查函数再次执行本地事务
            return LocalTransactionState.UNKNOW;
        }
        //本地事务操作失败,rocketMQ取消投递这条消息
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

Service代码部分:
package com.xy.service;

public interface ShoppingService {
    String shopping(String openId, String orderId, double money) throws Exception;
}
package com.xy.serviceImpl;

import com.alibaba.fastjson.JSON;
import com.xy.service.ShoppingService;
import com.xy.service.producer.TransactionProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Service
public class ShoppingServiceImpl implements ShoppingService {
    public static String SHOPPING_TOPIC = "shooping_topic_pay";
    public static String SHOPPING_TAGS = "shooping_tags_pay";
    @Autowired
    private TransactionProducer transactionProducer;
    @Override
    public String shopping(String openId, String orderId, double money) throws Exception {
        //TODO 查询账号余额足不足,如果余额不足 return "余额不足,购物失败";
        String keys = UUID.randomUUID().toString()+"&"+System.currentTimeMillis();
        Map<String,Object> params = new HashMap<>();
        params.put("openId",openId);
        params.put("orderId",orderId);
        params.put("money",money);
        Message message = new Message(SHOPPING_TOPIC,SHOPPING_TAGS,keys, JSON.toJSONString(params).getBytes());
        //同步阻塞
        CountDownLatch countDownLatch = new CountDownLatch(1);
        params.put("countDownLatch",countDownLatch);
        //消息发送且本地事务执行
        TransactionSendResult sendResult = transactionProducer.sendMessage(message,params);
        //等待异步执行得本地事务完成
        countDownLatch.await();
        //如果消息发送成功,并且本地事务执行成功(消息发送和本地事务执行时异步执行)
        if(sendResult.getSendStatus()== SendStatus.SEND_OK
            &&sendResult.getLocalTransactionState()== LocalTransactionState.COMMIT_MESSAGE){
            log.info("*******消息投递成功,本地事务执行完毕,支付完成!!!******");
            return "支付完成,购物成功";
        }else{
            log.info("*******消息投递失败,支付失败!!!******");
            return "购物失败";
        }
    }
}
Controller代码:
package com.xy.controller;

import com.xy.bean.Userinfo;
import com.xy.service.DemoService;
import com.xy.service.ShoppingService;
import com.xy.service.TestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import java.util.Optional;

@Controller
@RequestMapping("/demo")
public class UserInfoController {
    @Autowired
    private ShoppingService shoppingService;
    @RequestMapping("/shopping")
    @ResponseBody
    public String shopping(@RequestParam String openId,
                           @RequestParam String orderId,
                           @RequestParam Double money){
        String resultStr = "";
        try {
            //Double.valueOf(String)实际业务中可能会丢失精度,建议用BigDecimal
           resultStr = shoppingService.shopping(openId,orderId,money);
        } catch (Exception e) {
            e.printStackTrace();
            resultStr ="购物失败!";
        }
        return resultStr;
    }
}
consumer代码(注意:分布式系统中consumer代码和producer代码可能不在一个项目下,这里演示放在一个项目下):
package com.xy.service.consumer;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class ShoppingConsumer {
    private DefaultMQPushConsumer consumer;
  //实际业务ip写在配置文件
    public static final String NAMESRV_ADDRS="106.13.88.xxx:19876;106.13.88.xxx:29876";

    public static final String CONSUMER_SHOPPING_GROUP="consumer_shopping_group";
    public static String SHOPPING_TOPIC = "shooping_topic_pay";
    public static String SHOPPING_TAGS = "shooping_tags_pay";
    /**
     * @desc 设置为私有,不想让别人new一个新的对象出来
     * */
    private ShoppingConsumer(){
        try {
            this.consumer = new DefaultMQPushConsumer(CONSUMER_SHOPPING_GROUP);
            this.consumer.setConsumeThreadMin(10);
            this.consumer.setConsumeThreadMax(10);
            //端口号一般是使用9876,Const.NAMESRV_ADDR=106.13.88.XXX:19876
            this.consumer.setNamesrvAddr(NAMESRV_ADDRS);
            //从那个位置开始消费,可以从末尾,最前端  这里是最末尾
            this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            //消费哪个主题的消息和标签,标签可以是表达式,如*表示消费该topic下的所有类型标签的消息
            this.consumer.subscribe(SHOPPING_TOPIC,SHOPPING_TAGS);
            this.consumer.registerMessageListener(new MessageListenerConcurrentlyShopping());
            this.consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    class MessageListenerConcurrentlyShopping implements MessageListenerConcurrently{

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt me = msgs.get(0);
            try{
                String topic = me.getTopic();
                String tags = me.getTags();
                String keys =me.getKeys();
                //发消息的时候时通过fastJson序列化的,所有需要转换成String
                String body = new String(me.getBody(), "UTF-8");
                log.info("消费端消费消息:topic:{},tags:{},keys:{},body:{}",topic,tags,keys,body);
                //System.out.println("SHopping_transation消费消息:topic:"+topic+"tags:"+tags+"keys:"+keys+"body:"+body);
                //TODO 消费端需要去重,网络原因可能导致重复消费同一消息,rocketMQ可能基于性能考虑没有自己做去重,需要业务自己去重
                //可以使用redis主键去重,或者如果并发要求不是太高可以用mysql主键去重,如使用inset  如果之前消费过了就肯定inser失败、
                Map<String,Object> map = JSONObject.parseObject(body);
                String openId = (String)map.get("openId");
                String orderId = (String)map.get("orderId");
                BigDecimal money = (BigDecimal) map.get("money");
                //TODO 商户数据数据库更新操作,发送短信通知操作等等

            }catch (Exception e){
                e.printStackTrace();
                int reconsumerTimes = me.getReconsumeTimes();
                System.out.println("第"+reconsumerTimes+"次消费该消息!");
                if(reconsumerTimes==4){
                    //如果消费四次还没有消费到就做日志,然后做补偿
                }
                //消费失败会重新消费,时间从1s 开始然后越来越长,到最后一次重试的时间是2h 默认重试15次
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
}

测试场景一:TransactionListenerImpl.executeLocalTransaction(Message msg, Object arg)的addDadabase设置为false,启动项目:浏览器访问http://localhost:8082/demo/shopping?openId=xxxxxxyyyy&orderId=1&money=98,模拟消息发送成功,本地事务执行失败
"C:\Program Files\Java\jdk1.8.0_111\bin\java.exe" -XX:TieredStopAtLevel=1 -noverify -Dspring.output.ansi.enabled=always -Dcom.sun.management.jmxremote -Dspring.jmx.enabled=true -Dspring.liveBeansView.mbeanDomain -Dspring.application.admin.enabled=true "-javaagent:D:\IDEA\IntelliJ IDEA 2019.3.1\lib\idea_rt.jar=64824:D:\IDEA\IntelliJ IDEA 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_111\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\rt.jar;D:\svn\myproject\zjhy\code\xy\target\classes;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.2.2.RELEASE\spring-boot-starter-web-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter\2.2.2.RELEASE\spring-boot-starter-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot\2.2.2.RELEASE\spring-boot-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.2.2.RELEASE\spring-boot-autoconfigure-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.2.2.RELEASE\spring-boot-starter-logging-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\96935\.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\96935\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.12.1\log4j-to-slf4j-2.12.1.jar;C:\Users\96935\.m2\repository\org\apache\logging\log4j\log4j-api\2.12.1\log4j-api-2.12.1.jar;C:\Users\96935\.m2\repository\org\slf4j\jul-to-slf4j\1.7.29\jul-to-slf4j-1.7.29.jar;C:\Users\96935\.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;C:\Users\96935\.m2\repository\org\yaml\snakeyaml\1.25\snakeyaml-1.25.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.2.2.RELEASE\spring-boot-starter-json-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.10.1\jackson-databind-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.10.1\jackson-annotations-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.10.1\jackson-core-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.1\jackson-datatype-jdk8-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.1\jackson-datatype-jsr310-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.10.1\jackson-module-parameter-names-2.10.1.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.2.2.RELEASE\spring-boot-starter-tomcat-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.29\tomcat-embed-core-9.0.29.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.29\tomcat-embed-el-9.0.29.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.29\tomcat-embed-websocket-9.0.29.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-validation\2.2.2.RELEASE\spring-boot-starter-validation-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\jakarta\validation\jakarta.validation-api\2.0.1\jakarta.validation-api-2.0.1.jar;C:\Users\96935\.m2\repository\org\hibernate\validator\hibernate-validator\6.0.18.Final\hibernate-validator-6.0.18.Final.jar;C:\Users\96935\.m2\repository\org\jboss\logging\jboss-logging\3.4.1.Final\jboss-logging-3.4.1.Final.jar;C:\Users\96935\.m2\repository\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;C:\Users\96935\.m2\repository\org\springframework\spring-web\5.2.2.RELEASE\spring-web-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-beans\5.2.2.RELEASE\spring-beans-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-webmvc\5.2.2.RELEASE\spring-webmvc-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-aop\5.2.2.RELEASE\spring-aop-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-context\5.2.2.RELEASE\spring-context-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-expression\5.2.2.RELEASE\spring-expression-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\mybatis\spring\boot\mybatis-spring-boot-starter\2.1.1\mybatis-spring-boot-starter-2.1.1.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-jdbc\2.2.2.RELEASE\spring-boot-starter-jdbc-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\com\zaxxer\HikariCP\3.4.1\HikariCP-3.4.1.jar;C:\Users\96935\.m2\repository\org\springframework\spring-jdbc\5.2.2.RELEASE\spring-jdbc-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-tx\5.2.2.RELEASE\spring-tx-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\mybatis\spring\boot\mybatis-spring-boot-autoconfigure\2.1.1\mybatis-spring-boot-autoconfigure-2.1.1.jar;C:\Users\96935\.m2\repository\org\mybatis\mybatis\3.5.3\mybatis-3.5.3.jar;C:\Users\96935\.m2\repository\org\mybatis\mybatis-spring\2.0.3\mybatis-spring-2.0.3.jar;C:\Users\96935\.m2\repository\org\projectlombok\lombok\1.16.18\lombok-1.16.18.jar;C:\Users\96935\.m2\repository\mysql\mysql-connector-java\5.1.39\mysql-connector-java-5.1.39.jar;C:\Users\96935\.m2\repository\org\slf4j\slf4j-api\1.7.29\slf4j-api-1.7.29.jar;C:\Users\96935\.m2\repository\org\springframework\spring-core\5.2.2.RELEASE\spring-core-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-jcl\5.2.2.RELEASE\spring-jcl-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-client\4.3.0\rocketmq-client-4.3.0.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-common\4.3.0\rocketmq-common-4.3.0.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-remoting\4.3.0\rocketmq-remoting-4.3.0.jar;C:\Users\96935\.m2\repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;C:\Users\96935\.m2\repository\io\netty\netty-all\4.1.43.Final\netty-all-4.1.43.Final.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-logging\4.3.0\rocketmq-logging-4.3.0.jar;C:\Users\96935\.m2\repository\io\netty\netty-tcnative-boringssl-static\2.0.28.Final\netty-tcnative-boringssl-static-2.0.28.Final.jar;C:\Users\96935\.m2\repository\org\apache\commons\commons-lang3\3.9\commons-lang3-3.9.jar" com.xy.XyApplication

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.2.2.RELEASE)

Run thirdInitializer
FirstInitializer
[main] INFO  com.xy.XyApplication - Starting XyApplication on DESKTOP-5TGBJTN with PID 10292 (D:\svn\myproject\zjhy\code\xy\target\classes started by 96935 in D:\svn\myproject\zjhy\code\xy)
[main] INFO  com.xy.XyApplication - No active profile set, falling back to default profiles: default
hello fourthListener!
[main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8082 (http)
[main] INFO  org.apache.catalina.core.StandardService - Starting service [Tomcat]
[main] INFO  org.apache.catalina.core.StandardEngine - Starting Servlet engine: [Apache Tomcat/9.0.29]
[main] INFO  o.a.c.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring embedded WebApplicationContext
[main] INFO  org.springframework.web.context.ContextLoader - Root WebApplicationContext: initialization completed in 952 ms
test setApplicationContext......
**********************TransactionProducer构造函数初始化************************
**********************TransactionProducera fterPropertiesSet()方法执行************************
[main] INFO  o.s.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'applicationTaskExecutor'
[main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8082 (http) with context path ''
[main] INFO  com.xy.XyApplication - Started XyApplication in 4.771 seconds (JVM running for 5.929)
hello ThirdListener!
hello fourthListener!
[http-nio-8082-exec-1] INFO  org.apache.tomcat.util.http.parser.Cookie - A cookie header was received [1586131383; NG_TRANSLATE_LANG_KEY=%22en%22] that contained an invalid cookie. That cookie will be ignored.
 Note: further occurrences of this error will be logged at DEBUG level.
[http-nio-8082-exec-1] INFO  o.a.c.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring DispatcherServlet 'dispatcherServlet'
[http-nio-8082-exec-1] INFO  org.springframework.web.servlet.DispatcherServlet - Initializing Servlet 'dispatcherServlet'
[http-nio-8082-exec-1] INFO  org.springframework.web.servlet.DispatcherServlet - Completed initialization in 7 ms
[http-nio-8082-exec-1] INFO  com.xy.service.producer.TransactionListenerImpl - *****本地事务执行失败*******
[http-nio-8082-exec-1] INFO  com.xy.serviceImpl.ShoppingServiceImpl - *******消息投递失败,支付失败!!!******
[producer_shopping_groupcheck-thread] INFO  com.xy.service.producer.TransactionListenerImpl - ------------rocketMQ回查————————————————
[producer_shopping_groupcheck-thread] INFO  com.xy.service.producer.TransactionListenerImpl - ------------rocketMQ回查————————————————
[producer_shopping_groupcheck-thread] INFO  com.xy.service.producer.TransactionListenerImpl - ------------rocketMQ回查————————————————
[producer_shopping_groupcheck-thread] INFO  com.xy.service.producer.TransactionListenerImpl - ----事务执行了3次,已经达到设置的阈值,不再重复回查,rocketMQ回滚取消这条消息投递
测试场景二:TransactionListenerImpl.executeLocalTransaction(Message msg, Object arg)的addDadabase设置为true,启动项目:浏览器访问http://localhost:8082/demo/shopping?openId=xxxxxxyyyy&orderId=1&money=98

消息投递成功,本地事务执行成功

"C:\Program Files\Java\jdk1.8.0_111\bin\java.exe" -XX:TieredStopAtLevel=1 -noverify -Dspring.output.ansi.enabled=always -Dcom.sun.management.jmxremote -Dspring.jmx.enabled=true -Dspring.liveBeansView.mbeanDomain -Dspring.application.admin.enabled=true "-javaagent:D:\IDEA\IntelliJ IDEA 2019.3.1\lib\idea_rt.jar=65500:D:\IDEA\IntelliJ IDEA 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_111\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\rt.jar;D:\svn\myproject\zjhy\code\xy\target\classes;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.2.2.RELEASE\spring-boot-starter-web-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter\2.2.2.RELEASE\spring-boot-starter-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot\2.2.2.RELEASE\spring-boot-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.2.2.RELEASE\spring-boot-autoconfigure-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.2.2.RELEASE\spring-boot-starter-logging-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\96935\.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\96935\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.12.1\log4j-to-slf4j-2.12.1.jar;C:\Users\96935\.m2\repository\org\apache\logging\log4j\log4j-api\2.12.1\log4j-api-2.12.1.jar;C:\Users\96935\.m2\repository\org\slf4j\jul-to-slf4j\1.7.29\jul-to-slf4j-1.7.29.jar;C:\Users\96935\.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;C:\Users\96935\.m2\repository\org\yaml\snakeyaml\1.25\snakeyaml-1.25.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.2.2.RELEASE\spring-boot-starter-json-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.10.1\jackson-databind-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.10.1\jackson-annotations-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.10.1\jackson-core-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.1\jackson-datatype-jdk8-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.1\jackson-datatype-jsr310-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.10.1\jackson-module-parameter-names-2.10.1.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.2.2.RELEASE\spring-boot-starter-tomcat-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.29\tomcat-embed-core-9.0.29.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.29\tomcat-embed-el-9.0.29.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.29\tomcat-embed-websocket-9.0.29.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-validation\2.2.2.RELEASE\spring-boot-starter-validation-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\jakarta\validation\jakarta.validation-api\2.0.1\jakarta.validation-api-2.0.1.jar;C:\Users\96935\.m2\repository\org\hibernate\validator\hibernate-validator\6.0.18.Final\hibernate-validator-6.0.18.Final.jar;C:\Users\96935\.m2\repository\org\jboss\logging\jboss-logging\3.4.1.Final\jboss-logging-3.4.1.Final.jar;C:\Users\96935\.m2\repository\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;C:\Users\96935\.m2\repository\org\springframework\spring-web\5.2.2.RELEASE\spring-web-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-beans\5.2.2.RELEASE\spring-beans-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-webmvc\5.2.2.RELEASE\spring-webmvc-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-aop\5.2.2.RELEASE\spring-aop-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-context\5.2.2.RELEASE\spring-context-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-expression\5.2.2.RELEASE\spring-expression-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\mybatis\spring\boot\mybatis-spring-boot-starter\2.1.1\mybatis-spring-boot-starter-2.1.1.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-jdbc\2.2.2.RELEASE\spring-boot-starter-jdbc-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\com\zaxxer\HikariCP\3.4.1\HikariCP-3.4.1.jar;C:\Users\96935\.m2\repository\org\springframework\spring-jdbc\5.2.2.RELEASE\spring-jdbc-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-tx\5.2.2.RELEASE\spring-tx-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\mybatis\spring\boot\mybatis-spring-boot-autoconfigure\2.1.1\mybatis-spring-boot-autoconfigure-2.1.1.jar;C:\Users\96935\.m2\repository\org\mybatis\mybatis\3.5.3\mybatis-3.5.3.jar;C:\Users\96935\.m2\repository\org\mybatis\mybatis-spring\2.0.3\mybatis-spring-2.0.3.jar;C:\Users\96935\.m2\repository\org\projectlombok\lombok\1.16.18\lombok-1.16.18.jar;C:\Users\96935\.m2\repository\mysql\mysql-connector-java\5.1.39\mysql-connector-java-5.1.39.jar;C:\Users\96935\.m2\repository\org\slf4j\slf4j-api\1.7.29\slf4j-api-1.7.29.jar;C:\Users\96935\.m2\repository\org\springframework\spring-core\5.2.2.RELEASE\spring-core-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-jcl\5.2.2.RELEASE\spring-jcl-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-client\4.3.0\rocketmq-client-4.3.0.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-common\4.3.0\rocketmq-common-4.3.0.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-remoting\4.3.0\rocketmq-remoting-4.3.0.jar;C:\Users\96935\.m2\repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;C:\Users\96935\.m2\repository\io\netty\netty-all\4.1.43.Final\netty-all-4.1.43.Final.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-logging\4.3.0\rocketmq-logging-4.3.0.jar;C:\Users\96935\.m2\repository\io\netty\netty-tcnative-boringssl-static\2.0.28.Final\netty-tcnative-boringssl-static-2.0.28.Final.jar;C:\Users\96935\.m2\repository\org\apache\commons\commons-lang3\3.9\commons-lang3-3.9.jar" com.xy.XyApplication

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.2.2.RELEASE)

Run thirdInitializer
FirstInitializer
[main] INFO  com.xy.XyApplication - Starting XyApplication on DESKTOP-5TGBJTN with PID 704 (D:\svn\myproject\zjhy\code\xy\target\classes started by 96935 in D:\svn\myproject\zjhy\code\xy)
[main] INFO  com.xy.XyApplication - No active profile set, falling back to default profiles: default
hello fourthListener!
[main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8082 (http)
[main] INFO  org.apache.catalina.core.StandardService - Starting service [Tomcat]
[main] INFO  org.apache.catalina.core.StandardEngine - Starting Servlet engine: [Apache Tomcat/9.0.29]
[main] INFO  o.a.c.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring embedded WebApplicationContext
[main] INFO  org.springframework.web.context.ContextLoader - Root WebApplicationContext: initialization completed in 969 ms
test setApplicationContext......
**********************TransactionProducer构造函数初始化************************
**********************TransactionProducera fterPropertiesSet()方法执行************************
[main] INFO  o.s.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'applicationTaskExecutor'
[main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8082 (http) with context path ''
[main] INFO  com.xy.XyApplication - Started XyApplication in 4.763 seconds (JVM running for 5.914)
hello ThirdListener!
hello fourthListener!
[http-nio-8082-exec-1] INFO  org.apache.tomcat.util.http.parser.Cookie - A cookie header was received [1586131383; NG_TRANSLATE_LANG_KEY=%22en%22] that contained an invalid cookie. That cookie will be ignored.
 Note: further occurrences of this error will be logged at DEBUG level.
[http-nio-8082-exec-1] INFO  o.a.c.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring DispatcherServlet 'dispatcherServlet'
[http-nio-8082-exec-1] INFO  org.springframework.web.servlet.DispatcherServlet - Initializing Servlet 'dispatcherServlet'
[http-nio-8082-exec-1] INFO  org.springframework.web.servlet.DispatcherServlet - Completed initialization in 7 ms
[http-nio-8082-exec-1] INFO  com.xy.serviceImpl.ShoppingServiceImpl - *******消息投递成功,本地事务执行完毕,支付完成!!!******
[ConsumeMessageThread_1] INFO  com.xy.service.consumer.ShoppingConsumer - 消费端消费消息:topic:shooping_topic_pay,tags:shooping_tags_pay,keys:6a0108b5-1213-483a-bbf0-42b458b65194&1587967678182,body:{"money":98.0,"orderId":"1","openId":"xxxxxxyyyy"}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 前言 得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席...
    零点145阅读 4,149评论 0 0
  • 这是我的第一篇博客,思来想去,决定以RocketMQ(版本4.3.2) 源码分析开始写写,不定期更新,也可能随时停...
    __TiAmo阅读 4,108评论 0 3
  • 概述 事务消息解决的问题是:Provider本地事务 + 消息投递 一起执行。解决应用端 和 MQ端两个独立的应用...
    黄靠谱阅读 12,936评论 1 12
  • 引出分布式事务相关内容 这里主要是想说明下,是什么背景下面产生了此类问题。 首先我们来说说事务,说道事务,首先让我...
    Java大生阅读 3,456评论 0 4
  • 有没有这样一样情况,把一个集群中的某个表导到另一个群集中,或者hbase的表结构发生了更改,但是数据还要,比如预分...
    kikiki5阅读 1,033评论 0 4