本文介绍Spring Boot整合Redis实现队列存储。队列存储通常以Rest微服务形式提供服务接口,所以Spring Boot+Redis是一个理想选型。
典型的应用场景,比如爬虫系统中任务列表的存储,各个爬虫子进(线)程独立、主动访问该队列获取URLs,并支持批量获取。
- Step1:
Spring Boot工程的Maven中添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
本文使用SpringBoot 1.5.2.RELEASE。
- Step2
Application.java入口定义必要的Bean:
@SpringBootApplication(scanBasePackages = {
"", "" })
public class Application implements CommandLineRunner {
@Autowired private JedisConnectionFactory jedisConnFactory;
@Bean
public StringRedisTemplate redisTemplate() {
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(jedisConnFactory);
return redisTemplate;
}
@Bean
public QueueService queueService() {
return new QueueServiceSDRImpl(redisTemplate());
}
public static void main(String[] args) throws InterruptedException {
SpringApplication app = new SpringApplication(Application.class);
app.setBannerMode(Banner.Mode.CONSOLE);
app.setWebEnvironment(true);
app.run(args);
}
@Override
public void run(String... args) throws Exception {
System.out.println("Project: running...");
}
}
在此只定义一个StringRedisTemplate,至于保存对象的需求可以手动转成json存储。
- Step3:定义QueueService接口:
public interface QueueService {
/**
* 取N条URL队列数据
* @param fullTaskName
* @param numbersOfURL
* @return
*/
public List<BasicWebURL> fetchN(String fullTaskName, Long numbersOfURL);
/**
* URL队列入队
* @param webURLList
* @return
*/
public Long enQueue(String fullTaskName, String... webURLJSONStringArray);
/**
* URL队列长度
* @param fullTaskName
* @return
*/
public Long queueSize(String fullTaskName);
/**
* 清空URL队列
* @param fullTaskName
* @return
*/
public void queueDump(String fullTaskName);
/**
* 是否已访问过
* @param fullTaskName
* @param url
* @return
*/
public Boolean hasVisit(String fullTaskName, String url);
/**
* 保存链接对象
* @param fullTaskName
* @param url
*/
public Long saveURL(String fullTaskName, String... visitedLinkArray);
}
- Step4:QueueServiceSDRImpl.java的具体实现:
public class QueueServiceSDRImpl implements QueueService {
private StringRedisTemplate redisTemplate;
private static String HEAD_HISTORY = "HIST:";
private static String HEAD_QUEUE = "QUEUE:";
public QueueServiceSDRImpl(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public List<BasicWebURL> fetchN(String fullTaskName, Long numbersOfURL) {
List<Object> results = redisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection) connection;
for (int i = 0; i < numbersOfURL; i++) {
stringRedisConn.lPop(HEAD_QUEUE.concat(fullTaskName));
}
return null;
}
});
return results.stream().filter(obj -> obj != null).map(obj -> JSONObject.parseObject(obj.toString(), BasicWebURL.class)).collect(Collectors.toList());
}
@Override
public Long enQueue(String fullTaskName, String... webURLJSONStringArray) {
Long result = -1L;
BoundListOperations<String, String> opt = redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName));
try {
opt.rightPushAll(webURLJSONStringArray);
} catch (JedisException e) {
e.printStackTrace();
}
return result;
}
@Override
public Long queueSize(String fullTaskName) {
return redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName)).size();
}
@Override
public void queueDump(String fullTaskName) {
redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName)).expire(1, TimeUnit.MILLISECONDS);
redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).expire(1, TimeUnit.MILLISECONDS);
}
@Override
public Boolean hasVisit(String fullTaskName, String url) {
Boolean hasVisit = false;
try {
hasVisit = redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).isMember(url);
} catch (JedisException e) {
e.printStackTrace();
}
return hasVisit;
}
@Override
public Long saveURL(String fullTaskName, String... visitedLinkArray) {
Long result = -1L;
try {
result = redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).add(visitedLinkArray);
} catch (JedisException e) {
e.printStackTrace();
}
return result;
}
}
在QueueServiceSDRImpl中实现了两种队列(库),库的value分别是List和Set,特性对应java中的List(有序)和Set(查重)各自特性。fullTaskName为Spring封装的Redis存储中的key对象。
- Step5,最后看一下QueueController如何暴露服务接口:
@RestController
@RequestMapping()
public class QueueController {
@Autowired QueueService queueService;
/**
* Fetch n BasicWebURLs.
* @param request
* @param fullTaskName
* @param numbersOfURL
* @return
*/
@GetMapping("/queue/{fullTaskName}")
public JSONObject webURL(HttpServletRequest request,
@PathVariable String fullTaskName,
@RequestParam(defaultValue="10", required=false) Long numbersOfURL) {
JSONObject jo = new JSONObject();
if(numbersOfURL > 0) {
jo.put("popLength", numbersOfURL);
jo.put("data", queueService.fetchN(fullTaskName, numbersOfURL));
}else{
jo.put("popLength", 0);
jo.put("data", Lists.newArrayList());
}
jo.put("stillHas", queueService.queueSize(fullTaskName));
return jo;
}
/**
* 入队
* @param request
* @param fullTaskName
* @param body
* @return
*/
@PostMapping("/queue/{fullTaskName}")
public Long enQueue(HttpServletRequest request, @PathVariable String fullTaskName, @RequestBody String body) {
JSONObject jo = JSONObject.parseObject(body);
if(jo != null){
JSONArray webURLList = jo.getJSONArray("webURLs");
if(!webURLList.isEmpty()) {
String [] jsonArray = webURLList.stream().map(item -> item.toString()).toArray(String[]::new);
return queueService.enQueue(fullTaskName, jsonArray);
}
}
return -1L;
}
/**
*
* @param request
* @param fullTaskName
* @return
*/
@DeleteMapping("/queue/{fullTaskName}")
public Integer queueDump(HttpServletRequest request, @PathVariable String fullTaskName) {
queueService.queueDump(fullTaskName);
return 1;
}
}
以及在前文所述爬虫系统场景中,用作查重的接口:
@RestController
@RequestMapping("/link")
@Getter
@Setter
public class VisitedLinkController {
@Autowired QueueService queueService;
@GetMapping("/{fullTaskName}")
public String webURL(HttpServletRequest request,
@PathVariable String fullTaskName,
@RequestParam(defaultValue="", required = false) String link) {
return queueService.hasVisit(fullTaskName, link) ? "y" : "n";
}
/**
* 加入访问历史
* @param request
* @param fullTaskName
* @param body
* @return
*/
@PostMapping("/{fullTaskName}")
public Boolean visitLinks(HttpServletRequest request,
@PathVariable String fullTaskName, @RequestBody String body) {
JSONObject jo = JSONObject.parseObject(body);
if(jo != null){
JSONArray webURLList = jo.getJSONArray("visitedLinks");
if(!webURLList.isEmpty()) {
String [] jsonArray = webURLList.stream().map(item -> item.toString()).toArray(String[]::new);
queueService.saveURL(fullTaskName, jsonArray);
}
}
return true;
}
}
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;