一、关于@Transactional支持
在阅读Spring Data Redis V2.5.1官方文档的时候,在10.12.1. @Transactional Support
这一小节介绍了如何配置Spring Data Redis,使其能够支持@Transactional
。让我感到奇怪的是以下这句话:
Spring Data Redis distinguishes between read-only and write commands in an ongoing transaction. Read-only commands, such as KEYS, are piped to a fresh (non-thread-bound) RedisConnection to allow reads. Write commands are queued by RedisTemplate and applied upon commit.
大概是说Spring Data Redis能够区分事务中的读写操作,写命令由专门的事务连接(每个事务都会新建一个连接并且绑定线程,并且带有MULTI、EXEC
)向Redis服务发出请求,读命令由非事务连接(一个多线程共享的连接,不支持事务和blocking操作,一般操作都由它向Redis服务发送请求)向Redis服务发出请求。但是阅读源码的时候并没有发现用于区分读写操作的代码,最终通过DEBUG走单步的方式查找到了相关代码,原来它是通过代理的方式拦截了相关命令的执行,在命令发送前对连接进行了切换。
RedisConnectionUtils.class
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean transactionSupport) {
...
//在@Transaction事务模式下会运行以下代码,为connection创建一个代理
if (bindSynchronization && isActualNonReadonlyTransactionActive()) {
connection = createConnectionSplittingProxy(connection, factory);
}
...
}
private static RedisConnection createConnectionSplittingProxy(RedisConnection connection, RedisConnectionFactory factory) {
ProxyFactory proxyFactory = new ProxyFactory(connection);
proxyFactory.addAdvice(new ConnectionSplittingInterceptor(factory));
proxyFactory.addInterface(RedisConnectionProxy.class);
return RedisConnection.class.cast(proxyFactory.getProxy());
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ConnectionSplittingInterceptor.class
public Object intercept(Object obj, Method method, Object[] args) throws Throwable {
if (method.getName().equals("getTargetConnection")) {
// Handle getTargetConnection method: return underlying RedisConnection.
return obj;
}
RedisCommand commandToExecute = RedisCommand.failsafeCommandLookup(method.getName());
//这段代码会判断是否是写命令
if (isPotentiallyThreadBoundCommand(commandToExecute)) {
if (log.isDebugEnabled()) {
log.debug(String.format("Invoke '%s' on bound connection", method.getName()));
}
return invoke(method, obj, args);
}
if (log.isDebugEnabled()) {
log.debug(String.format("Invoke '%s' on unbound connection", method.getName()));
}
//如果是读命令会获取一个新RedisConnection,这个RedisConnection 是一个相对高级的对象它包含了真正的Redis命令连接,执行读命令的时候它会根据上下文关系返回一个共享连接还是专用连接
RedisConnection connection = factory.getConnection();
try {
return invoke(method, connection, args);
} finally {
// properly close the unbound connection after executing command
if (!connection.isClosed()) {
doCloseConnection(connection);
}
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//我使用的是Lettuce客户端库,以下只是一个读命令的例子
LettuceKeyCommands.class
@Override
public Set<byte[]> keys(byte[] pattern) {
Assert.notNull(pattern, "Pattern must not be null!");
try {
if (isPipelined()) {
pipeline(connection.newLettuceResult(getAsyncConnection().keys(pattern), LettuceConverters.bytesListToBytesSet()));
return null;
}
if (isQueueing()) {
transaction(connection.newLettuceResult(getAsyncConnection().keys(pattern), LettuceConverters.bytesListToBytesSet()));
return null;
}
//最终会运行到这里,获取共享连接
return LettuceConverters.toBytesSet(getConnection().keys(pattern));
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
}
第二个疑惑是关于事务中读操作的,因为根据相关知识了解到,在Redis事务中的命令会先缓存在客户端,等到EXEC
命令调用后才会向Redis服务发出请求。但是根据上面的分析读命令并不会缓存而是直接向Redis服务发出请求(使用了一个共享连接),DEBUG后发现的确是如此。奇怪的是如果你用System.out.println(template.opsForValue().get("asd"));
这样的代码它的结果是NULL或者空对象,但是DEBUG的时候它明明已经获取到了数据。
//一个包含RedisConnection、DeserializingConverter、SerializingConverter等类的上层类
DefaultStringRedisConnection.class
@Override
public byte[] get(byte[] key) {
//这里的delegate就是前面的RedisConnection的代理对象
return convertAndReturn(delegate.get(key), identityConverter);
}
// DEBUG到这里value的返回值依然是正确的
@SuppressWarnings("unchecked")
@Nullable
private <T> T convertAndReturn(@Nullable Object value, Converter converter) {
//这个就是关键代码,这个条件会判断为true,最终返回null
if (isFutureConversion()) {
addResultConverter(converter);
return null;
}
if (!(converter instanceof ListConverter) && value instanceof List) {
return (T) new ListConverter<>(converter).convert((List) value);
}
return value == null ? null
: ObjectUtils.nullSafeEquals(converter, identityConverter) ? (T) value : (T) converter.convert(value);
}
//判断是否在事务中或者在流水模式中
//虽然读命令使用了非事务的连接,但是这个用于判断的依然是事务连接,也就是整个上下文依然属于事务连接中
private boolean isFutureConversion() {
return isPipelined() || isQueueing();
}
从上面的代码可以看出,最终读命令的返回值被重置为了null
,在正常事务模式下这是正常的,因为事务模式下任何操作只是缓存在客户端,这时候还没有值。既然这里的读写连接模式是分开的,而且读操作确实已经从Redis服务那读取了数据,那么为什么Spring Data Redis没有对这些读操作返回值作兼容性处理而是沿用正常的事务操作处理,所以不太明白读写分离模式具体有什么作用。
当然官方文档是不建议我们在事务中进行读取操作的,以下是一些官方说明:
// must be performed on thread-bound connection
template.opsForValue().set("thing1", "thing2");
// read operation must be run on a free (not transaction-aware) connection
template.keys("*");
// returns null as values set within a transaction are not visible
template.opsForValue().get("thing1");
个人觉得在实际开发中没必要使用@Transactional来管理Redis事务,主要是Redis的事务跟关系数据库的事务是有区别的,而且读写分离模式貌似没有作用,最后使用template.execute(new SessionCallback<Object>() {})
更符合Redis事务语义(读写不区分)。