一段时间间隔类只允许执行某一个task一次
限流工具类
import com.google.common.annotations.VisibleForTesting;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
@Service
@RequiredArgsConstructor
public class TaskExecuteRateLimiter {
public static final String KEY_PREFIX = "an:itask:";
@Qualifier("redissonSingleClient")
private final RedissonClient redissonClient;
private boolean setIfNotExist(String taskKey, int expireSeconds) {
String key = KEY_PREFIX + taskKey;
RBucket<String> bucket = redissonClient.getBucket(key);
return bucket.trySet(getDefaultValue(), expireSeconds, TimeUnit.SECONDS);
}
private void set(String taskKey, int expireSeconds) {
String key = KEY_PREFIX + taskKey;
RBucket<String> bucket = redissonClient.getBucket(key);
bucket.set(getDefaultValue(), expireSeconds, TimeUnit.SECONDS);
}
@SneakyThrows
private String getDefaultValue() {
return InetAddress.getLocalHost().getHostName() + "-" + Thread.currentThread().getName();
}
private boolean exists(String taskKey) {
String key = KEY_PREFIX + taskKey;
RBucket<String> bucket = redissonClient.getBucket(key);
return bucket.isExists();
}
public boolean cannotExecute(String taskKey) {
return exists(taskKey);
}
public boolean execute(Runnable task, String taskKey, int expireSeconds, boolean forceExecute) {
if (forceExecute) {
forceExecuteTask(task, taskKey, expireSeconds);
return true;
}
return executeTaskOnlyOnceInInterval(task, taskKey, expireSeconds);
}
/**
* 在同一个时间间隔内,不能重复执行 task
*
* @param task 需要执行的任务
* @param taskKey 任务的唯一 key
* @param expireSeconds 间隔多少秒可以执行一次 task
*
* @return true: 成功执行,false:被限制执行(因为之前已经执行过了)
*/
public boolean executeTaskOnlyOnceInInterval(Runnable task, String taskKey, int expireSeconds) {
if (setIfNotExist(taskKey, expireSeconds)) {
task.run();
return true;
}
return false;
}
/**
* 强制执行 task,并执行过期时间到 taskKey 中
*/
@VisibleForTesting
void forceExecuteTask(Runnable task, String taskKey, int expireSeconds) {
set(taskKey, expireSeconds);
task.run();
}
}
单元测试
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
public class TaskExecuteRateLimiterTest extends AbstractITTest {
@Autowired
private TaskExecuteRateLimiter taskExecuteRateLimiter;
@Qualifier("redissonSingleClient")
@Autowired
private RedissonClient redissonClient;
private int count = 0;
private String taskKey = "taskKey";
@Before
public void setUp() {
count = 0;
redissonClient.getBucket(TaskExecuteRateLimiter.KEY_PREFIX + taskKey).delete();
}
@Test
public void executeTaskOnlyOnceInInterval() {
Assert.assertEquals(0, count);
int innerCount = 0;
for (int i = 0; i < 10; ++i) {
taskExecuteRateLimiter.executeTaskOnlyOnceInInterval(() -> count++, taskKey, 10);
innerCount++;
}
Assert.assertEquals(1, count);
Assert.assertEquals(10, innerCount);
}
@Test
public void forceExecuteTask() {
Assert.assertEquals(0, count);
taskExecuteRateLimiter.forceExecuteTask(() -> count++, taskKey, 10);
taskExecuteRateLimiter.forceExecuteTask(() -> count++, taskKey, 10);
Assert.assertEquals(2, count);
}
@Test
public void execute() throws InterruptedException {
Assert.assertEquals(0, count);
// forceExecute=false 需要间隔 1s(expireSeconds=1) 才能执行
taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
Assert.assertEquals(1, count);
// forceExecute=true 可以立即执行
taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, true);
taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, true);
Assert.assertEquals(3, count);
// forceExecute=false 需要间隔 1s(expireSeconds=1) 才能执行
taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
Assert.assertEquals(3, count);
Thread.sleep(1200);
taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
Assert.assertEquals(4, count);
}
@Test
public void cannotExecute() {
Assert.assertFalse(taskExecuteRateLimiter.cannotExecute(taskKey));
taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
Assert.assertTrue(taskExecuteRateLimiter.cannotExecute(taskKey));
}
}