今天讨论分布式应用中的限流问题,这里通过redis的pipeline实现个简单的限流。
这里先简单说一下pipeline。
redis服务器处理命令的时间是微秒级别的,所以时间消耗主要在网络传输过程中,为了解决这个问题产生了pipeline,就是批量将命令打包发送到redis服务器,然后服务器按顺序执行命令,并将结果按顺序返回到客户端。这样已降低网络交互。
但是pipeline打包的命令不是原子性的,有可能被打散加入等待队列中,至于原理就不在这里展开了。
现在要实现一个60秒内,只允许某用户执行某种行为5次的限流。需求比较简单,需要有个时间滑块的概念,只统计在60内时间范围的请求,其他的数据可以丢弃。这里使用的是zset结构,直接上代码。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
public class DemoTest {
@Autowired
RedisTemplate redisTemplate;
@Test
public void PipelineTest() {
//用户id
String userId = "zhangsan";
//行为key
String actionKey = "good";
//时间范围
Integer period = 60;
//次数上限
Integer maxCount = 5;
for (int i = 0; i < 20; i++) {
List<Object> list = redisTemplate.executePipelined(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
connection.openPipeline();
String key = String.format("hist:%s:%s", userId, actionKey);
long nowTs = System.currentTimeMillis();
//添加行为
connection.zAdd(key.getBytes(), nowTs, (nowTs + "").getBytes());
//清理无用的数据
connection.zRemRangeByScore(key.getBytes(), 0, nowTs - period * 1000);
//设置key有效时间 对冷数据自动清理
connection.expire(key.getBytes(), period + 1);
//获取次数
Long card = connection.zCard(key.getBytes());
return card;
}
});
System.out.println(list);
if((Long)list.get(list.size() - 1) >= maxCount) {
System.out.println("已经超出最大点赞量");
break;
}
}
}
}
这里使用了redisTemplate作为客户端,通过zset中的key的命名来确定用户行为。通过时间戳设置value和score,便于删除和范围获取,在将对象设置有效时长,来对无行为的用户数据进行自动清理,这样就完成了简单的限流功能。也算是zset的一种应用场景吧。
参考《Redis深度历险》