为了保证消息投递的可靠性和效率,RocketMQ设计了事务消息投递
原理:本地事务处理后会主动告诉broke处理状态,如果出现网络等原因broker在指定时间没有收到producer发送的的事务处理状态,broker会主动执行checkLocalTransaction(MessageExt msg)回调函数,可以在回调函数去查数据库等操作看事务是否执行成功,如果没成功再执行一遍事务操作,如果还没成功就再等待RocketMQ轮询执行checkLocalTransaction(MessageExt msg)回调函数,记录调用函数次数,如果一直反复调用超过设置的最大次数,可以记录到本地日志,然后rocketMQ回滚消息。
TransactionProducer代码:
package com.xy.rocketmqtest.transactionProducer;
import com.xy.rocketmqtest.quickstart.constants.Const;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("tansaction_producer_group");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thead = new Thread(r);
thead.setName("tansaction_producer_group"+"callback-thread");
return thead;
}
});
producer.setNamesrvAddr(Const.NAMESRV_ADDRS);
producer.setExecutorService(executorService);
//这个接口有两个方法,一个是异步执行本地事务,一个是回查
TransactionListener transactionListener =new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
producer.start();
Message message = new Message("tansaction_producer_topic","tagA","key",("hello world!").getBytes("UTF-8"));
producer.sendMessageInTransaction(message,"回调参数");
Thread.sleep(Integer.MAX_VALUE);
producer.shutdown();
}
}
TransactionListenerImpl代码:
package com.xy.rocketmqtest.transactionProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
public class TransactionListenerImpl implements TransactionListener {
private ConcurrentHashMap<String, Integer> countHashMap = new ConcurrentHashMap<>();
private final static int MAX_COUNT = 3;
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("------------开始异步执行本地事务————————————————");
String callArg = (String) arg;
System.out.println("回调参数:"+callArg);
System.out.println("message:"+msg);
//数据库入库成功标志
//先设置为false模拟事务执行失败
boolean addDadabase = false;
//TODO 执行数据库入库等
if(!addDadabase){
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("------------rocketMQ回查————————————————");
//获取消息唯一ID
String key = msg.getKeys();
System.out.println("msgkeys:"+key);
//数据库入库成功标志
boolean addDadabase = false;
//由于RocketMQ迟迟没有收到消息的确认消息,因此主动询问这条prepare消息,是否正常?
//可以查询数据库看这条数据是否已经处理,没有处理可以做回查次数记录
if(!addDadabase){
//记录回查次数,不能让它无限回查,超过设置的次数就回滚事务
Integer num = countHashMap.get(key);
if(num != null && ++num == MAX_COUNT) {
System.out.println("----事务执行了"+MAX_COUNT+"次,回滚消息----");
countHashMap.remove(key);
return LocalTransactionState.ROLLBACK_MESSAGE;
}if(num == null) {
num = new Integer(1);
}
countHashMap.put(key, num);
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
Const代码:
package com.xy.rocketmqtest.quickstart.constants;
public class Const {
public static final String NAMESRV_ADDR="106.13.88.XXX:19876";
public static final String NAMESRV_ADDRS="106.13.88.XXX:19876;106.13.88.XXX:29876";
}
TransationConsumer代码:
package com.xy.rocketmqtest.transactionProducer;
import com.xy.rocketmqtest.quickstart.constants.Const;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransationConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_tansation_comsumer_name");
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(10);
//端口号一般是使用9876,Const.NAMESRV_ADDR=106.13.88.XXX:19876
consumer.setNamesrvAddr(Const.NAMESRV_ADDR);
//从那个位置开始消费,可以从末尾,最前端 这里是最末尾
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//消费哪个主题的消息和标签,标签可以是表达式,如*表示消费该topic下的所有类型标签的消息
consumer.subscribe("tansaction_producer_topic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt me = msgs.get(0);
try{
String topic = me.getTopic();
String tags = me.getTags();
String keys =me.getKeys();
// if(keys.equals("key3")){
// System.out.println("模拟消费失败*********");
// int a = 1/0;
// }
String body = new String(me.getBody(), "UTF-8");
System.out.println("test_transation消费消息:topic:"+topic+"tags:"+tags+"keys:"+keys+"body:"+body);
}catch (Exception e){
e.printStackTrace();
int reconsumerTimes = me.getReconsumeTimes();
System.out.println("第"+reconsumerTimes+"次消费该消息!");
if(reconsumerTimes==4){
//如果消费四次还没有消费到就做日志,然后做补偿
}
//消费失败会重新消费,时间从1s 开始然后越来越长,到最后一次重试的时间是2h 默认重试15次
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("-------------开始消费消息---");
}
}
TransactionProducer控制台打印信息:
"C:\Program Files\Java\jdk1.8.0_111\bin\java.exe" "-javaagent:D:\IDEA\IntelliJ IDEA Free2020.1\IntelliJ IDEA Community Edition 2020.1\lib\idea_rt.jar=60281:D:\IDEA\IntelliJ IDEA Free2020.1\IntelliJ IDEA Community Edition 2020.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_111\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\rt.jar;E:\学习资料视频\rocketMQ\rocketmq-externals-master\rocketmq-test\target\classes;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.2.6.RELEASE\spring-boot-starter-web-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter\2.2.6.RELEASE\spring-boot-starter-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot\2.2.6.RELEASE\spring-boot-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.2.6.RELEASE\spring-boot-autoconfigure-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.2.6.RELEASE\spring-boot-starter-logging-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\96935\.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\96935\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.12.1\log4j-to-slf4j-2.12.1.jar;C:\Users\96935\.m2\repository\org\apache\logging\log4j\log4j-api\2.12.1\log4j-api-2.12.1.jar;C:\Users\96935\.m2\repository\org\slf4j\jul-to-slf4j\1.7.30\jul-to-slf4j-1.7.30.jar;C:\Users\96935\.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;C:\Users\96935\.m2\repository\org\yaml\snakeyaml\1.25\snakeyaml-1.25.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.2.6.RELEASE\spring-boot-starter-json-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.10.3\jackson-databind-2.10.3.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.10.3\jackson-annotations-2.10.3.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.10.3\jackson-core-2.10.3.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.3\jackson-datatype-jdk8-2.10.3.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.3\jackson-datatype-jsr310-2.10.3.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.10.3\jackson-module-parameter-names-2.10.3.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.2.6.RELEASE\spring-boot-starter-tomcat-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.33\tomcat-embed-core-9.0.33.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.33\tomcat-embed-el-9.0.33.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.33\tomcat-embed-websocket-9.0.33.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-validation\2.2.6.RELEASE\spring-boot-starter-validation-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\jakarta\validation\jakarta.validation-api\2.0.2\jakarta.validation-api-2.0.2.jar;C:\Users\96935\.m2\repository\org\hibernate\validator\hibernate-validator\6.0.18.Final\hibernate-validator-6.0.18.Final.jar;C:\Users\96935\.m2\repository\org\jboss\logging\jboss-logging\3.4.1.Final\jboss-logging-3.4.1.Final.jar;C:\Users\96935\.m2\repository\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;C:\Users\96935\.m2\repository\org\springframework\spring-web\5.2.5.RELEASE\spring-web-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-beans\5.2.5.RELEASE\spring-beans-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-webmvc\5.2.5.RELEASE\spring-webmvc-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-aop\5.2.5.RELEASE\spring-aop-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-context\5.2.5.RELEASE\spring-context-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-expression\5.2.5.RELEASE\spring-expression-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\mybatis\spring\boot\mybatis-spring-boot-starter\2.1.2\mybatis-spring-boot-starter-2.1.2.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-jdbc\2.2.6.RELEASE\spring-boot-starter-jdbc-2.2.6.RELEASE.jar;C:\Users\96935\.m2\repository\com\zaxxer\HikariCP\3.4.2\HikariCP-3.4.2.jar;C:\Users\96935\.m2\repository\org\springframework\spring-jdbc\5.2.5.RELEASE\spring-jdbc-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-tx\5.2.5.RELEASE\spring-tx-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\mybatis\spring\boot\mybatis-spring-boot-autoconfigure\2.1.2\mybatis-spring-boot-autoconfigure-2.1.2.jar;C:\Users\96935\.m2\repository\org\mybatis\mybatis\3.5.4\mybatis-3.5.4.jar;C:\Users\96935\.m2\repository\org\mybatis\mybatis-spring\2.0.4\mybatis-spring-2.0.4.jar;C:\Users\96935\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-client\4.3.0\rocketmq-client-4.3.0.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-common\4.3.0\rocketmq-common-4.3.0.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-remoting\4.3.0\rocketmq-remoting-4.3.0.jar;C:\Users\96935\.m2\repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;C:\Users\96935\.m2\repository\io\netty\netty-all\4.1.48.Final\netty-all-4.1.48.Final.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-logging\4.3.0\rocketmq-logging-4.3.0.jar;C:\Users\96935\.m2\repository\io\netty\netty-tcnative-boringssl-static\2.0.30.Final\netty-tcnative-boringssl-static-2.0.30.Final.jar;C:\Users\96935\.m2\repository\org\apache\commons\commons-lang3\3.9\commons-lang3-3.9.jar;C:\Users\96935\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;C:\Users\96935\.m2\repository\org\springframework\spring-core\5.2.5.RELEASE\spring-core-5.2.5.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-jcl\5.2.5.RELEASE\spring-jcl-5.2.5.RELEASE.jar" com.xy.rocketmqtest.transactionProducer.TransactionProducer
------------开始异步执行本地事务————————————————
回调参数:回调参数
message:Message{topic='tansaction_producer_topic', flag=0, properties={KEYS=key, TRAN_MSG=true, UNIQ_KEY=C0A80065156C18B4AAC26F7230DF0000, WAIT=true, PGROUP=tansaction_producer_group, TAGS=tagA}, body=[104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 33], transactionId='C0A80065156C18B4AAC26F7230DF0000'}
------------rocketMQ回查————————————————
msgkeys:key
------------rocketMQ回查————————————————
msgkeys:key
------------rocketMQ回查————————————————
msgkeys:key
----事务执行了3次,回滚消息----

文件目录结构
第二部分:SpringBoot结合RocketMQ事务消息
场景:支付场景将客户扣款和商家账户入账通过RocketMQ中间件拆分成两个过程

SpringBoot结合RocketMQ事务消息文件目录结构
第一步:导入rocketMQ jar包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.xy</groupId>
<artifactId>xy</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>xy</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<rocketmq.version>4.3.0</rocketmq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.5</version>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>
生产者代码部分:
package com.xy.service.producer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@Component
public class TransactionProducer implements InitializingBean {
//TransactionMQProducer是rocketMQ jar包里的类
private TransactionMQProducer producer;
//线程池
private ExecutorService executorService;
@Autowired
private TransactionListenerImpl transactionListenerImpl;
//实际业务这些ip地址和端口写进配置文件,为了安全部署的机器最好是内网
public static final String NAMESRV_ADDRS="106.13.88.xxx:19876;106.13.88.xxx
:29876";
public static final String PRODUCER_SHOPPING_GROUP="producer_shopping_group";
/**
* @desc 私有无参构造函数,让别人不能通过new关键字实例化第二个TransactionProducer()对象,
* 容器实例化的时候手动new出executorService和producer的对象
* */
private TransactionProducer(){
System.out.println("**********************TransactionProducer构造函数初始化************************");
this.producer = new TransactionMQProducer(PRODUCER_SHOPPING_GROUP);
this.executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thead = new Thread(r);
thead.setName(PRODUCER_SHOPPING_GROUP+"check-thread");
/**此线程用来执行TransactionListenerImpl.checkLocalTransaction(MessageExt msg)函数,
* 如果return null; 能够正常投递消息,但是如果本地事务失败,不会执行回查函数
* */
return thead;
}
});
this.producer.setNamesrvAddr(NAMESRV_ADDRS);
this.producer.setExecutorService(executorService);
}
/**
* @desc 实现InitializingBean的bean在完成所有bean容器注入完成后执行此方法
* */
@Override
public void afterPropertiesSet() {
System.out.println("**********************TransactionProducera fterPropertiesSet()方法执行************************");
//把此步骤提取到 afterPropertiesSet()方法,防止TransactionProducer()构造方法执行时transactionListenerImpl还没被注入
this.producer.setTransactionListener(transactionListenerImpl);
start();
}
private void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public void shutdown(){
this.producer.shutdown();
}
public TransactionSendResult sendMessage(Message message, Object argument){
TransactionSendResult sendResult = null;
try {
sendResult = this.producer.sendMessageInTransaction(message,argument);
} catch (MQClientException e) {
e.printStackTrace();
}
return sendResult;
}
}
package com.xy.service.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Component
public class TransactionListenerImpl implements TransactionListener {
//countHashMap broker迟迟没有收到后台发来的本地事务执行完的记录,记录回查消息的key及回查函数调用次数
private ConcurrentHashMap<String, Integer> countHashMap = new ConcurrentHashMap<>();
//设置回查函数最大调用次数
private final static int MAX_COUNT = 3;
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
CountDownLatch countDownLatch=null;
try {
Map<String,Object> params = (Map<String,Object>) arg;
String openId = (String)params.get("openId");
String orderId = (String)params.get("orderId");
//金额使用BigDecimal
double money = (double)params.get("money");
countDownLatch = (CountDownLatch) params.get("countDownLatch");
//TODO 做当前账号数据库表得金额数据更新(本地事务)
//模拟数据库操作成功失败标志
boolean updateLog = false;
if(updateLog){
//释放本地阻塞
countDownLatch.countDown();
return LocalTransactionState.COMMIT_MESSAGE;
}else{
log.info("*****本地事务执行失败*******");
countDownLatch.countDown();
//本地事务操作失败,ocketMQ需要执行回查函数再次执行本地事务
return LocalTransactionState.UNKNOW;
}
} catch (Exception e) {
e.printStackTrace();
countDownLatch.countDown();
//本地事务操作失败,rocketMQ需要执行回查函数再次执行本地事务
return LocalTransactionState.UNKNOW;
}
}
/**
* @desc 有可能本地事务执行成功,但是因为网络波动,本地事务执行成功的消息没有发送到broker,
* RocketMQ迟迟没有收到消息的确认消息,可以做查库操作,发现本地事务的数据还没入库过段时间
* 再查,查了设定的最大次数还没有数据,可以认定本地事务执行失败,做日志记录等操作,如果本地数据
* 入库成功,就设置投递消息对消费者可见,让消费者正常消费
* */
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
log.info("------------rocketMQ回查————————————————");
//获取消息唯一ID
String key = msg.getKeys();
//数据库入库成功标志
boolean addDadabase = false;
//由于RocketMQ迟迟没有收到消息的确认消息,因此主动询问这条prepare消息,是否正常?
//可以先查询数据库看这条数据是否已经处理,没有处理可以做回查次数记录
if(!addDadabase){
//记录回查次数,不能让它无限回查,超过设置的次数就回滚事务
Integer num = countHashMap.get(key);
if(num != null && ++num == MAX_COUNT) {
log.info("----事务执行了{}次,已经达到设置的阈值,不再重复回查,rocketMQ回滚取消这条消息投递",MAX_COUNT);
countHashMap.remove(key);
return LocalTransactionState.ROLLBACK_MESSAGE;
}if(num == null) {
num = new Integer(1);
}
countHashMap.put(key, num);
//本地事务操作失败,rocketMQ需要执行回查函数再次执行本地事务
return LocalTransactionState.UNKNOW;
}
//本地事务操作失败,rocketMQ取消投递这条消息
return LocalTransactionState.COMMIT_MESSAGE;
}
}
Service代码部分:
package com.xy.service;
public interface ShoppingService {
String shopping(String openId, String orderId, double money) throws Exception;
}
package com.xy.serviceImpl;
import com.alibaba.fastjson.JSON;
import com.xy.service.ShoppingService;
import com.xy.service.producer.TransactionProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Service
public class ShoppingServiceImpl implements ShoppingService {
public static String SHOPPING_TOPIC = "shooping_topic_pay";
public static String SHOPPING_TAGS = "shooping_tags_pay";
@Autowired
private TransactionProducer transactionProducer;
@Override
public String shopping(String openId, String orderId, double money) throws Exception {
//TODO 查询账号余额足不足,如果余额不足 return "余额不足,购物失败";
String keys = UUID.randomUUID().toString()+"&"+System.currentTimeMillis();
Map<String,Object> params = new HashMap<>();
params.put("openId",openId);
params.put("orderId",orderId);
params.put("money",money);
Message message = new Message(SHOPPING_TOPIC,SHOPPING_TAGS,keys, JSON.toJSONString(params).getBytes());
//同步阻塞
CountDownLatch countDownLatch = new CountDownLatch(1);
params.put("countDownLatch",countDownLatch);
//消息发送且本地事务执行
TransactionSendResult sendResult = transactionProducer.sendMessage(message,params);
//等待异步执行得本地事务完成
countDownLatch.await();
//如果消息发送成功,并且本地事务执行成功(消息发送和本地事务执行时异步执行)
if(sendResult.getSendStatus()== SendStatus.SEND_OK
&&sendResult.getLocalTransactionState()== LocalTransactionState.COMMIT_MESSAGE){
log.info("*******消息投递成功,本地事务执行完毕,支付完成!!!******");
return "支付完成,购物成功";
}else{
log.info("*******消息投递失败,支付失败!!!******");
return "购物失败";
}
}
}
Controller代码:
package com.xy.controller;
import com.xy.bean.Userinfo;
import com.xy.service.DemoService;
import com.xy.service.ShoppingService;
import com.xy.service.TestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.Optional;
@Controller
@RequestMapping("/demo")
public class UserInfoController {
@Autowired
private ShoppingService shoppingService;
@RequestMapping("/shopping")
@ResponseBody
public String shopping(@RequestParam String openId,
@RequestParam String orderId,
@RequestParam Double money){
String resultStr = "";
try {
//Double.valueOf(String)实际业务中可能会丢失精度,建议用BigDecimal
resultStr = shoppingService.shopping(openId,orderId,money);
} catch (Exception e) {
e.printStackTrace();
resultStr ="购物失败!";
}
return resultStr;
}
}
consumer代码(注意:分布式系统中consumer代码和producer代码可能不在一个项目下,这里演示放在一个项目下):
package com.xy.service.consumer;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class ShoppingConsumer {
private DefaultMQPushConsumer consumer;
//实际业务ip写在配置文件
public static final String NAMESRV_ADDRS="106.13.88.xxx:19876;106.13.88.xxx:29876";
public static final String CONSUMER_SHOPPING_GROUP="consumer_shopping_group";
public static String SHOPPING_TOPIC = "shooping_topic_pay";
public static String SHOPPING_TAGS = "shooping_tags_pay";
/**
* @desc 设置为私有,不想让别人new一个新的对象出来
* */
private ShoppingConsumer(){
try {
this.consumer = new DefaultMQPushConsumer(CONSUMER_SHOPPING_GROUP);
this.consumer.setConsumeThreadMin(10);
this.consumer.setConsumeThreadMax(10);
//端口号一般是使用9876,Const.NAMESRV_ADDR=106.13.88.XXX:19876
this.consumer.setNamesrvAddr(NAMESRV_ADDRS);
//从那个位置开始消费,可以从末尾,最前端 这里是最末尾
this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//消费哪个主题的消息和标签,标签可以是表达式,如*表示消费该topic下的所有类型标签的消息
this.consumer.subscribe(SHOPPING_TOPIC,SHOPPING_TAGS);
this.consumer.registerMessageListener(new MessageListenerConcurrentlyShopping());
this.consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
class MessageListenerConcurrentlyShopping implements MessageListenerConcurrently{
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt me = msgs.get(0);
try{
String topic = me.getTopic();
String tags = me.getTags();
String keys =me.getKeys();
//发消息的时候时通过fastJson序列化的,所有需要转换成String
String body = new String(me.getBody(), "UTF-8");
log.info("消费端消费消息:topic:{},tags:{},keys:{},body:{}",topic,tags,keys,body);
//System.out.println("SHopping_transation消费消息:topic:"+topic+"tags:"+tags+"keys:"+keys+"body:"+body);
//TODO 消费端需要去重,网络原因可能导致重复消费同一消息,rocketMQ可能基于性能考虑没有自己做去重,需要业务自己去重
//可以使用redis主键去重,或者如果并发要求不是太高可以用mysql主键去重,如使用inset 如果之前消费过了就肯定inser失败、
Map<String,Object> map = JSONObject.parseObject(body);
String openId = (String)map.get("openId");
String orderId = (String)map.get("orderId");
BigDecimal money = (BigDecimal) map.get("money");
//TODO 商户数据数据库更新操作,发送短信通知操作等等
}catch (Exception e){
e.printStackTrace();
int reconsumerTimes = me.getReconsumeTimes();
System.out.println("第"+reconsumerTimes+"次消费该消息!");
if(reconsumerTimes==4){
//如果消费四次还没有消费到就做日志,然后做补偿
}
//消费失败会重新消费,时间从1s 开始然后越来越长,到最后一次重试的时间是2h 默认重试15次
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
测试场景一:TransactionListenerImpl.executeLocalTransaction(Message msg, Object arg)的addDadabase设置为false,启动项目:浏览器访问http://localhost:8082/demo/shopping?openId=xxxxxxyyyy&orderId=1&money=98,模拟消息发送成功,本地事务执行失败
"C:\Program Files\Java\jdk1.8.0_111\bin\java.exe" -XX:TieredStopAtLevel=1 -noverify -Dspring.output.ansi.enabled=always -Dcom.sun.management.jmxremote -Dspring.jmx.enabled=true -Dspring.liveBeansView.mbeanDomain -Dspring.application.admin.enabled=true "-javaagent:D:\IDEA\IntelliJ IDEA 2019.3.1\lib\idea_rt.jar=64824:D:\IDEA\IntelliJ IDEA 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_111\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\rt.jar;D:\svn\myproject\zjhy\code\xy\target\classes;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.2.2.RELEASE\spring-boot-starter-web-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter\2.2.2.RELEASE\spring-boot-starter-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot\2.2.2.RELEASE\spring-boot-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.2.2.RELEASE\spring-boot-autoconfigure-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.2.2.RELEASE\spring-boot-starter-logging-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\96935\.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\96935\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.12.1\log4j-to-slf4j-2.12.1.jar;C:\Users\96935\.m2\repository\org\apache\logging\log4j\log4j-api\2.12.1\log4j-api-2.12.1.jar;C:\Users\96935\.m2\repository\org\slf4j\jul-to-slf4j\1.7.29\jul-to-slf4j-1.7.29.jar;C:\Users\96935\.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;C:\Users\96935\.m2\repository\org\yaml\snakeyaml\1.25\snakeyaml-1.25.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.2.2.RELEASE\spring-boot-starter-json-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.10.1\jackson-databind-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.10.1\jackson-annotations-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.10.1\jackson-core-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.1\jackson-datatype-jdk8-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.1\jackson-datatype-jsr310-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.10.1\jackson-module-parameter-names-2.10.1.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.2.2.RELEASE\spring-boot-starter-tomcat-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.29\tomcat-embed-core-9.0.29.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.29\tomcat-embed-el-9.0.29.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.29\tomcat-embed-websocket-9.0.29.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-validation\2.2.2.RELEASE\spring-boot-starter-validation-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\jakarta\validation\jakarta.validation-api\2.0.1\jakarta.validation-api-2.0.1.jar;C:\Users\96935\.m2\repository\org\hibernate\validator\hibernate-validator\6.0.18.Final\hibernate-validator-6.0.18.Final.jar;C:\Users\96935\.m2\repository\org\jboss\logging\jboss-logging\3.4.1.Final\jboss-logging-3.4.1.Final.jar;C:\Users\96935\.m2\repository\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;C:\Users\96935\.m2\repository\org\springframework\spring-web\5.2.2.RELEASE\spring-web-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-beans\5.2.2.RELEASE\spring-beans-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-webmvc\5.2.2.RELEASE\spring-webmvc-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-aop\5.2.2.RELEASE\spring-aop-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-context\5.2.2.RELEASE\spring-context-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-expression\5.2.2.RELEASE\spring-expression-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\mybatis\spring\boot\mybatis-spring-boot-starter\2.1.1\mybatis-spring-boot-starter-2.1.1.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-jdbc\2.2.2.RELEASE\spring-boot-starter-jdbc-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\com\zaxxer\HikariCP\3.4.1\HikariCP-3.4.1.jar;C:\Users\96935\.m2\repository\org\springframework\spring-jdbc\5.2.2.RELEASE\spring-jdbc-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-tx\5.2.2.RELEASE\spring-tx-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\mybatis\spring\boot\mybatis-spring-boot-autoconfigure\2.1.1\mybatis-spring-boot-autoconfigure-2.1.1.jar;C:\Users\96935\.m2\repository\org\mybatis\mybatis\3.5.3\mybatis-3.5.3.jar;C:\Users\96935\.m2\repository\org\mybatis\mybatis-spring\2.0.3\mybatis-spring-2.0.3.jar;C:\Users\96935\.m2\repository\org\projectlombok\lombok\1.16.18\lombok-1.16.18.jar;C:\Users\96935\.m2\repository\mysql\mysql-connector-java\5.1.39\mysql-connector-java-5.1.39.jar;C:\Users\96935\.m2\repository\org\slf4j\slf4j-api\1.7.29\slf4j-api-1.7.29.jar;C:\Users\96935\.m2\repository\org\springframework\spring-core\5.2.2.RELEASE\spring-core-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-jcl\5.2.2.RELEASE\spring-jcl-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-client\4.3.0\rocketmq-client-4.3.0.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-common\4.3.0\rocketmq-common-4.3.0.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-remoting\4.3.0\rocketmq-remoting-4.3.0.jar;C:\Users\96935\.m2\repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;C:\Users\96935\.m2\repository\io\netty\netty-all\4.1.43.Final\netty-all-4.1.43.Final.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-logging\4.3.0\rocketmq-logging-4.3.0.jar;C:\Users\96935\.m2\repository\io\netty\netty-tcnative-boringssl-static\2.0.28.Final\netty-tcnative-boringssl-static-2.0.28.Final.jar;C:\Users\96935\.m2\repository\org\apache\commons\commons-lang3\3.9\commons-lang3-3.9.jar" com.xy.XyApplication
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.2.RELEASE)
Run thirdInitializer
FirstInitializer
[main] INFO com.xy.XyApplication - Starting XyApplication on DESKTOP-5TGBJTN with PID 10292 (D:\svn\myproject\zjhy\code\xy\target\classes started by 96935 in D:\svn\myproject\zjhy\code\xy)
[main] INFO com.xy.XyApplication - No active profile set, falling back to default profiles: default
hello fourthListener!
[main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8082 (http)
[main] INFO org.apache.catalina.core.StandardService - Starting service [Tomcat]
[main] INFO org.apache.catalina.core.StandardEngine - Starting Servlet engine: [Apache Tomcat/9.0.29]
[main] INFO o.a.c.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring embedded WebApplicationContext
[main] INFO org.springframework.web.context.ContextLoader - Root WebApplicationContext: initialization completed in 952 ms
test setApplicationContext......
**********************TransactionProducer构造函数初始化************************
**********************TransactionProducera fterPropertiesSet()方法执行************************
[main] INFO o.s.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'applicationTaskExecutor'
[main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8082 (http) with context path ''
[main] INFO com.xy.XyApplication - Started XyApplication in 4.771 seconds (JVM running for 5.929)
hello ThirdListener!
hello fourthListener!
[http-nio-8082-exec-1] INFO org.apache.tomcat.util.http.parser.Cookie - A cookie header was received [1586131383; NG_TRANSLATE_LANG_KEY=%22en%22] that contained an invalid cookie. That cookie will be ignored.
Note: further occurrences of this error will be logged at DEBUG level.
[http-nio-8082-exec-1] INFO o.a.c.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring DispatcherServlet 'dispatcherServlet'
[http-nio-8082-exec-1] INFO org.springframework.web.servlet.DispatcherServlet - Initializing Servlet 'dispatcherServlet'
[http-nio-8082-exec-1] INFO org.springframework.web.servlet.DispatcherServlet - Completed initialization in 7 ms
[http-nio-8082-exec-1] INFO com.xy.service.producer.TransactionListenerImpl - *****本地事务执行失败*******
[http-nio-8082-exec-1] INFO com.xy.serviceImpl.ShoppingServiceImpl - *******消息投递失败,支付失败!!!******
[producer_shopping_groupcheck-thread] INFO com.xy.service.producer.TransactionListenerImpl - ------------rocketMQ回查————————————————
[producer_shopping_groupcheck-thread] INFO com.xy.service.producer.TransactionListenerImpl - ------------rocketMQ回查————————————————
[producer_shopping_groupcheck-thread] INFO com.xy.service.producer.TransactionListenerImpl - ------------rocketMQ回查————————————————
[producer_shopping_groupcheck-thread] INFO com.xy.service.producer.TransactionListenerImpl - ----事务执行了3次,已经达到设置的阈值,不再重复回查,rocketMQ回滚取消这条消息投递
测试场景二:TransactionListenerImpl.executeLocalTransaction(Message msg, Object arg)的addDadabase设置为true,启动项目:浏览器访问http://localhost:8082/demo/shopping?openId=xxxxxxyyyy&orderId=1&money=98,
消息投递成功,本地事务执行成功
"C:\Program Files\Java\jdk1.8.0_111\bin\java.exe" -XX:TieredStopAtLevel=1 -noverify -Dspring.output.ansi.enabled=always -Dcom.sun.management.jmxremote -Dspring.jmx.enabled=true -Dspring.liveBeansView.mbeanDomain -Dspring.application.admin.enabled=true "-javaagent:D:\IDEA\IntelliJ IDEA 2019.3.1\lib\idea_rt.jar=65500:D:\IDEA\IntelliJ IDEA 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_111\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\rt.jar;D:\svn\myproject\zjhy\code\xy\target\classes;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.2.2.RELEASE\spring-boot-starter-web-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter\2.2.2.RELEASE\spring-boot-starter-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot\2.2.2.RELEASE\spring-boot-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.2.2.RELEASE\spring-boot-autoconfigure-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.2.2.RELEASE\spring-boot-starter-logging-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\96935\.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\96935\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.12.1\log4j-to-slf4j-2.12.1.jar;C:\Users\96935\.m2\repository\org\apache\logging\log4j\log4j-api\2.12.1\log4j-api-2.12.1.jar;C:\Users\96935\.m2\repository\org\slf4j\jul-to-slf4j\1.7.29\jul-to-slf4j-1.7.29.jar;C:\Users\96935\.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;C:\Users\96935\.m2\repository\org\yaml\snakeyaml\1.25\snakeyaml-1.25.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.2.2.RELEASE\spring-boot-starter-json-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.10.1\jackson-databind-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.10.1\jackson-annotations-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.10.1\jackson-core-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.1\jackson-datatype-jdk8-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.1\jackson-datatype-jsr310-2.10.1.jar;C:\Users\96935\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.10.1\jackson-module-parameter-names-2.10.1.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.2.2.RELEASE\spring-boot-starter-tomcat-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.29\tomcat-embed-core-9.0.29.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.29\tomcat-embed-el-9.0.29.jar;C:\Users\96935\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.29\tomcat-embed-websocket-9.0.29.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-validation\2.2.2.RELEASE\spring-boot-starter-validation-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\jakarta\validation\jakarta.validation-api\2.0.1\jakarta.validation-api-2.0.1.jar;C:\Users\96935\.m2\repository\org\hibernate\validator\hibernate-validator\6.0.18.Final\hibernate-validator-6.0.18.Final.jar;C:\Users\96935\.m2\repository\org\jboss\logging\jboss-logging\3.4.1.Final\jboss-logging-3.4.1.Final.jar;C:\Users\96935\.m2\repository\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;C:\Users\96935\.m2\repository\org\springframework\spring-web\5.2.2.RELEASE\spring-web-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-beans\5.2.2.RELEASE\spring-beans-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-webmvc\5.2.2.RELEASE\spring-webmvc-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-aop\5.2.2.RELEASE\spring-aop-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-context\5.2.2.RELEASE\spring-context-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-expression\5.2.2.RELEASE\spring-expression-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\mybatis\spring\boot\mybatis-spring-boot-starter\2.1.1\mybatis-spring-boot-starter-2.1.1.jar;C:\Users\96935\.m2\repository\org\springframework\boot\spring-boot-starter-jdbc\2.2.2.RELEASE\spring-boot-starter-jdbc-2.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\com\zaxxer\HikariCP\3.4.1\HikariCP-3.4.1.jar;C:\Users\96935\.m2\repository\org\springframework\spring-jdbc\5.2.2.RELEASE\spring-jdbc-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-tx\5.2.2.RELEASE\spring-tx-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\mybatis\spring\boot\mybatis-spring-boot-autoconfigure\2.1.1\mybatis-spring-boot-autoconfigure-2.1.1.jar;C:\Users\96935\.m2\repository\org\mybatis\mybatis\3.5.3\mybatis-3.5.3.jar;C:\Users\96935\.m2\repository\org\mybatis\mybatis-spring\2.0.3\mybatis-spring-2.0.3.jar;C:\Users\96935\.m2\repository\org\projectlombok\lombok\1.16.18\lombok-1.16.18.jar;C:\Users\96935\.m2\repository\mysql\mysql-connector-java\5.1.39\mysql-connector-java-5.1.39.jar;C:\Users\96935\.m2\repository\org\slf4j\slf4j-api\1.7.29\slf4j-api-1.7.29.jar;C:\Users\96935\.m2\repository\org\springframework\spring-core\5.2.2.RELEASE\spring-core-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\org\springframework\spring-jcl\5.2.2.RELEASE\spring-jcl-5.2.2.RELEASE.jar;C:\Users\96935\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-client\4.3.0\rocketmq-client-4.3.0.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-common\4.3.0\rocketmq-common-4.3.0.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-remoting\4.3.0\rocketmq-remoting-4.3.0.jar;C:\Users\96935\.m2\repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;C:\Users\96935\.m2\repository\io\netty\netty-all\4.1.43.Final\netty-all-4.1.43.Final.jar;C:\Users\96935\.m2\repository\org\apache\rocketmq\rocketmq-logging\4.3.0\rocketmq-logging-4.3.0.jar;C:\Users\96935\.m2\repository\io\netty\netty-tcnative-boringssl-static\2.0.28.Final\netty-tcnative-boringssl-static-2.0.28.Final.jar;C:\Users\96935\.m2\repository\org\apache\commons\commons-lang3\3.9\commons-lang3-3.9.jar" com.xy.XyApplication
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.2.RELEASE)
Run thirdInitializer
FirstInitializer
[main] INFO com.xy.XyApplication - Starting XyApplication on DESKTOP-5TGBJTN with PID 704 (D:\svn\myproject\zjhy\code\xy\target\classes started by 96935 in D:\svn\myproject\zjhy\code\xy)
[main] INFO com.xy.XyApplication - No active profile set, falling back to default profiles: default
hello fourthListener!
[main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8082 (http)
[main] INFO org.apache.catalina.core.StandardService - Starting service [Tomcat]
[main] INFO org.apache.catalina.core.StandardEngine - Starting Servlet engine: [Apache Tomcat/9.0.29]
[main] INFO o.a.c.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring embedded WebApplicationContext
[main] INFO org.springframework.web.context.ContextLoader - Root WebApplicationContext: initialization completed in 969 ms
test setApplicationContext......
**********************TransactionProducer构造函数初始化************************
**********************TransactionProducera fterPropertiesSet()方法执行************************
[main] INFO o.s.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'applicationTaskExecutor'
[main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8082 (http) with context path ''
[main] INFO com.xy.XyApplication - Started XyApplication in 4.763 seconds (JVM running for 5.914)
hello ThirdListener!
hello fourthListener!
[http-nio-8082-exec-1] INFO org.apache.tomcat.util.http.parser.Cookie - A cookie header was received [1586131383; NG_TRANSLATE_LANG_KEY=%22en%22] that contained an invalid cookie. That cookie will be ignored.
Note: further occurrences of this error will be logged at DEBUG level.
[http-nio-8082-exec-1] INFO o.a.c.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring DispatcherServlet 'dispatcherServlet'
[http-nio-8082-exec-1] INFO org.springframework.web.servlet.DispatcherServlet - Initializing Servlet 'dispatcherServlet'
[http-nio-8082-exec-1] INFO org.springframework.web.servlet.DispatcherServlet - Completed initialization in 7 ms
[http-nio-8082-exec-1] INFO com.xy.serviceImpl.ShoppingServiceImpl - *******消息投递成功,本地事务执行完毕,支付完成!!!******
[ConsumeMessageThread_1] INFO com.xy.service.consumer.ShoppingConsumer - 消费端消费消息:topic:shooping_topic_pay,tags:shooping_tags_pay,keys:6a0108b5-1213-483a-bbf0-42b458b65194&1587967678182,body:{"money":98.0,"orderId":"1","openId":"xxxxxxyyyy"}
