先编写如下4个类。因为在TCC中用到多出地方,建议写成jar
-
JedisClusterCallback接口类
public interface JedisClusterCallback<T> { public T doInJedisCluster(JedisCluster jedisCluster); }
jedis扩展类
public class JedisClusterExtend{
private String redisClusterIp;
private String pass;
private GenericObjectPoolConfig jedisPoolConfig;
public JedisClusterExtend(String redisClusterIp,String pass){
this(pass,redisClusterIp, new GenericObjectPoolConfig());
}
public JedisClusterExtend(String redisClusterIp,String pass,GenericObjectPoolConfig genericObjectPoolConfig){
this.pass = pass;
this.jedisPoolConfig = genericObjectPoolConfig;
}
/**
* 获取JedisCluster
* @Method_Name :getJedisCluster
*
* @return redis.clients.jedis.JedisCluster
* @Creation Date :2018/6/13
* @Author :zc.ding@foxmail.com
*/
public JedisCluster getJedisCluster(){
Set<HostAndPort> set = new HashSet<HostAndPort>();
String[] arr = redisClusterIp.split(",");
for(String host : arr){
String[] ipPort = host.trim().split(":");
if(ipPort.length < 2){
throw new RuntimeException(ipPort + " is Invalid.");
}
set.add(new HostAndPort(ipPort[0], Integer.parseInt(ipPort[1])));
}
return new JedisCluster(set, 5000, 2000, 2,pass,jedisPoolConfig);
}
}
- RedisClusterTransactionRepository 类
public class RedisClusterTransactionRepository extends CachableTransactionRepository {
private JedisCluster jedisCluster;
private JedisClusterExtend jedisClusterExtend;
private String keyPrefix = "TCC:";
public void setKeyPrefix(String keyPrefix) {
this.keyPrefix = keyPrefix;
}
private ObjectSerializer serializer = new JdkSerializationSerializer();
public void setSerializer(ObjectSerializer serializer) {
this.serializer = serializer;
}
public void setJedisClusterExtend(JedisClusterExtend jedisClusterExtend) {
this.jedisClusterExtend = jedisClusterExtend;
this.jedisCluster = jedisClusterExtend.getJedisCluster();
}
public void setJedisCluster(JedisCluster jedisCluster) {
this.jedisCluster = jedisCluster;
}
@Override
protected int doCreate(final Transaction transaction) {
try {
Long statusCode = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Long>() {
@Override
public Long doInJedisCluster(JedisCluster jedisCluster) {
List<byte[]> params = new ArrayList<byte[]>();
for (Map.Entry<byte[], byte[]> entry : ExpandTransactionSerializer.serialize(serializer, transaction). ()) {
params.add(entry.getKey());
params.add(entry.getValue());
}
Object result = jedisCluster.eval("if redis.call('exists', KEYS[1]) == 0 then redis.call('hmset', KEYS[1], unpack(ARGV)); return 1; end; return 0;".getBytes(),
Arrays.asList(RedisHelper.getRedisKey(keyPrefix, transaction.getXid())), params);
return (Long) result;
}
});
return statusCode.intValue();
} catch (Exception e) {
throw new TransactionIOException(e);
}
}
@Override
protected int doUpdate(final Transaction transaction) {
try {
Long statusCode = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Long>() {
@Override
public Long doInJedisCluster(JedisCluster jedisCluster) {
transaction.updateTime();
transaction.updateVersion();
List<byte[]> params = new ArrayList<byte[]>();
for (Map.Entry<byte[], byte[]> entry : ExpandTransactionSerializer.serialize(serializer, transaction).entrySet()) {
params.add(entry.getKey());
params.add(entry.getValue());
}
Object result = jedisCluster.eval(String.format("if redis.call('hget',KEYS[1],'VERSION') == '%s' then redis.call('hmset', KEYS[1], unpack(ARGV)); return 1; end; return 0;",
transaction.getVersion() - 1).getBytes(),
Arrays.asList(RedisHelper.getRedisKey(keyPrefix, transaction.getXid())), params);
return (Long) result;
}
});
return statusCode.intValue();
} catch (Exception e) {
throw new TransactionIOException(e);
}
}
@Override
protected int doDelete(final Transaction transaction) {
try {
Long result = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Long>() {
@Override
public Long doInJedisCluster(JedisCluster jedisCluster) {
return jedisCluster.del(RedisHelper.getRedisKey(keyPrefix, transaction.getXid()));
}
});
return result.intValue();
} catch (Exception e) {
throw new TransactionIOException(e);
}
}
@Override
protected Transaction doFindOne(final Xid xid) {
try {
Long startTime = System.currentTimeMillis();
Map<byte[], byte[]> content = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Map<byte[], byte[]>>() {
@Override
public Map<byte[], byte[]> doInJedisCluster(JedisCluster jedisCluster) {
return jedisCluster.hgetAll(RedisHelper.getRedisKey(keyPrefix, xid));
}
});
if (content != null && content.size() > 0) {
return ExpandTransactionSerializer.deserialize(serializer, content);
}
return null;
} catch (Exception e) {
throw new TransactionIOException(e);
}
}
@Override
protected List<Transaction> doFindAllUnmodifiedSince(Date date) {
List<Transaction> allTransactions = doFindAll();
List<Transaction> allUnmodifiedSince = new ArrayList<Transaction>();
for (Transaction transaction : allTransactions) {
if (transaction.getLastUpdateTime().compareTo(date) < 0) {
allUnmodifiedSince.add(transaction);
}
}
return allUnmodifiedSince;
}
protected List<Transaction> doFindAll() {
List<Transaction> list = new ArrayList<Transaction>();
try {
Set<byte[]> allKeys = new HashSet<byte[]>();
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
String pattern = keyPrefix + "*";
for(String k : clusterNodes.keySet()){
JedisPool jp = clusterNodes.get(k);
Jedis jedis = jp.getResource();
try {
allKeys.addAll(jedis.keys(pattern.getBytes()));
} catch(Exception e){
} finally{
jedis.close();
}
}
for (final byte[] key : allKeys) {
Map<byte[], byte[]> map = jedisCluster.hgetAll(key);
list.add(ExpandTransactionSerializer.deserialize(serializer, map));
}
} catch (Exception e) {
throw new TransactionIOException(e);
}
return list;
}
}
- RedisHelper 类
public class RedisHelper extends org.mengyun.tcctransaction.repository.helper.RedisHelper{
public static <T> T execute(JedisCluster jedisCluster,JedisClusterCallback<T> callback){
try{
return callback.doInJedisCluster(jedisCluster);
}finally {
if(jedisCluster != null){
}
}
}
}
在各应用中添加进入如上面的jar
在各个应用中添加tcc-xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="transactionRepository" class="com.seata.common.tccredis.RedisClusterTransactionRepository">
<property name="keyPrefix" value="tcc_ut_"/>
<property name="jedisClusterExtend" ref="jedisClusterExtend"/>
</bean>
<bean id="jedisClusterExtend" class="com.seata.common.tccredis.JedisClusterExtend">
<constructor-arg index="0" value="${redis.cluster.ip}" type="java.lang.String"/>
<constructor-arg index="1" value="${redis.cluster.pass}" type="java.lang.String"/>
<constructor-arg index="2" ref="jedisPoolConfig"/>
</bean>
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxTotal" value="1000"/>
<property name="maxWaitMillis" value="1000"/>
</bean>
<bean class="org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig">
<property name="maxRetryCount" value="30"/>
<property name="recoverDuration" value="120"/>
<property name="cronExpression" value="0 */1 * * * ?"/>
<property name="delayCancelExceptions">
<util:set>
<value>com.alibaba.dubbo.remoting.TimeoutException</value>
</util:set>
</property>
</bean>
</beans>