添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
看看spring-boot-starter-data-redis的依赖
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.5.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.2.RELEASE</version>
<scope>compile</scope>
</dependency>
springboot2.x 之后 jedis 改为 lettuce
jedis:采用直连,在多线程环境下是非线程安全的,这个时候只有使用连接池,为每个Jedis实例增加物理连接
lettuce:采用netty,实例可以在多个线程中共享,线程安全,可以减少线程数量
附上jedis和lettuce的对比
配置链接
application.properties文件添加
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=1000
- 为什么配置的前缀是就是 spring.redis 而不是 spring.xxx呢 ?
简单看下源码,springBoot所有的配置都会生成一个自动配置类,自动配置类都会绑定一个 Properties
redis的自动配置类 org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration 指定了自己的配置
@EnableConfigurationProperties({RedisProperties.class})
RedisProperties里面指定了自己的配置前缀 及可配置项
//获取前缀为 spring.redis 的配置项
@ConfigurationProperties(
prefix = "spring.redis"
)
public class RedisProperties {
//这些是我们可以添加的配置属性
private int database = 0;
private String url;
private String host = "localhost";
private String password;
private int port = 6379;
private boolean ssl;
private int timeout;
private RedisProperties.Pool pool;
private RedisProperties.Sentinel sentinel;
private RedisProperties.Cluster cluster;
.......
}
开始使用
接下来就可以注入RedisTemplate直接使用了
RedisTemplate 提供了各种redis数据类型的操作方法,API和redis命令基本相同
opsForValue()
opsForList()
opsForSet()
opsForHash()
opsForZSet()
opsForGeo()
opsForHyperLogLog()
测试一下
public void redisTest(){
redisTemplate.opsForValue().set("mykey1", 123);
System.out.println(redisTemplate.opsForValue().get("mykey1"));
}
能正常的set和get,实际开发中我们往往需要set 一个对象,直接储存对象会报错
org.springframework.data.redis.serializer.SerializationException: Cannot serialize;
对象的保存都需要序列化,对象要实现Serializable接口或者使用json序列化
public void redisTest() throws JsonProcessingException {
User user = new User("企迈","123");
//String jsonUser = new ObjectMapper().writeValueAsString(user);
redisTemplate.opsForValue().set("myKey2", user);
System.out.println(redisTemplate.opsForValue().get("myKey2"));
}
每次都操作序列化比较麻烦,而且我们看RedisTemplate源码
RedisTemplate默认使用JdkSerializationRedisSerializer序列化,会使我们的部分字符转义,使用stringRedisTemplate会存在同样的问题,需要手动将实体类转成json字符串;
Spring配置的两个RedisTemplate都不太方便使用,这个时候我们可以编写自己RedisTemplate代替spring提供的模板
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
// 自定义 String Object
RedisTemplate<String, Object> template = new RedisTemplate();
// 配置连接工厂
template.setConnectionFactory(redisConnectionFactory);
// Json 序列化配置
Jackson2JsonRedisSerializer<Object> objectJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
// ObjectMapper 转译
ObjectMapper objectMapper = new ObjectMapper();
// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会报异
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
objectJackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// String 的序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key 采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash 的key也采用 String 的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value 序列化方式采用 jackson
template.setValueSerializer(objectJackson2JsonRedisSerializer);
// hash 的 value 采用 jackson
template.setHashValueSerializer(objectJackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
自定义的RedisTemplate<String,Object> 的Bean,key使用String(包括Redis Hash 的key),value存取Redis时默认使用Json格式转换。
- 为什么自己实现一个RedisTemplate就可以替代spring提供的 ?
spring提供的 RedisTemplate 这个bean有个注解 @ConditionalOnMissingBean(name = {"redisTemplate"}) 当name里面的bean不存在才会生效,意味着我们可以自己实现
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration
public class RedisAutoConfiguration {
public RedisAutoConfiguration() {
}
@Bean
@ConditionalOnMissingBean(name = {"redisTemplate"})
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
//如果使用StringRedisTemplate key和value都是String。当需要存储实体类时,需要先转为String,再存入Redis
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
通常我们项目中开发一般不会使用原生方法写代码,提供一个RedisUtils封装一些常用操作供参考
package cn.com.codingce.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @author mxz
*/
@Component
public final class RedisUtils {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 指定缓存失效时间
*
* @param key 键
* @param time 时间(秒)
* @return
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据 key 获取过期时间
*
* @param key 键(不能为 Null)
* @return 时间(秒) 返回0代表永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断 key 是否存在
*
* @param key 键(不能为 Null)
* @return true 存在 false 不存在
*/
public boolean hashKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
*
* @param key 可以传一个值 或多个
*/
public void del(String... key) {
if (key != null && key.length > 0) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
//==================================String====================================
/**
* 普通缓存获取
*
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
*
* @param key 键
* @param value 值
* @return true 成功 false 失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
*
* @param key 键
* @param value 值
* @param time 时间(秒) time > 0 若 time <= 0 将设置无限期
* @return true 成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增
*
* @param key 键
* @param delta 要增加几(大于0)
* @return
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
*
* @param key 键
* @param delta 要减少几(小于0)
* @return
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().decrement(key, delta);
}
// ================================Map=================================
/**
* HashGet
*
* @param key 键 不能为null
* @param item 项 不能为null
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
*
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
*
* @param key 键
* @param map 对应多个键值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 并设置时间
*
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* 根据key获取Set中的所有值
*
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0) {
expire(key, time);
}
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取list缓存的长度
*
* @param key 键
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================HyperLogLog=================================
public long pfadd(String key, String value) {
return redisTemplate.opsForHyperLogLog().add(key, value);
}
public long pfcount(String key) {
return redisTemplate.opsForHyperLogLog().size(key);
}
public void pfremove(String key) {
redisTemplate.opsForHyperLogLog().delete(key);
}
public void pfmerge(String key1, String key2) {
redisTemplate.opsForHyperLogLog().union(key1, key2);
}
}
利用setnx机制实现简单的分布式锁
只有当锁定资源不存在的时候才能 SET 成功。利用 Redis 的原子性,Redis保证在同一时间,多个请求写入相同key的数据,只有一个请求可以成功写入,而之后的所有线程在锁定资源被释放之前都不能获得锁。
@Autowired
private RedisTemplate redisTemplate;
private long timeout = 3000;
/**
* 上锁
* @param key 锁标识
* @param value 线程标识
* @return 上锁状态
*/
public boolean lock(String key, String value) {
long start = System.currentTimeMillis();
while (true) {
//检测是否超时
if (System.currentTimeMillis() - start > timeout) {
return false;
}
//执行set命令
Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);
//是否成功获取锁
if (absent) {
return true;
}
return false;
}
}
/**
* 解锁
* @param target
* @param timeStamp
*/
public void unlock(String target,String timeStamp){
try {
String currentValue = redisTemplate.opsForValue().get(target);
if(!Strings.isNullOrEmpty(currentValue) && currentValue.equals(timeStamp) ){
// 删除锁状态
redisTemplate.opsForValue().getOperations().delete(target);
}
} catch (Exception e) {
log.error("解锁异常{}",e);
}
}
核心就一行代码:
redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);
redis2.0版本之后setnx和expire两个命令合并为一个原子操作避免了一些极端情况其他节点无法获取锁的情况,当然用redis做分布式锁还有一些其他场景和问题需要解决,比如:
- 如果在设定的超时时间之内,拿到锁服务A未处理完毕,Redis在超时后自动释放锁,其他进程B就可以拿到锁,在进程B拿到锁后,进程A执行完毕,调用del会造成误删锁。所以在执行del的时候,还需要判断是不是自己的锁;
- A服务没有执行完毕,B服务就获得了锁,这不符合分布式锁的逻辑,解决这个问题,那么客户端就需要开启一个守护线程,在超时之前如果服务还未处理完毕,要定时给锁续命。
- ...
这些问题后续可以再补充探讨