任务队列的特点
任务队列:就是“传递消息的队列”。与任务队列进行交互的实体有两类,一类是生产者(producer),另一类则是消费者(consumer)。生产者将需要处理的任务放入任务队列中,而消费者则不断地从任务独立中读入任务信息并执行。
任务队列的好处:
松耦合。生产者和消费者只需按照约定的任务描述格式,进行编写代码。
易于扩展。多消费者模式下,消费者可以分布在多个不同的服务器中,由此降低单台服务器的负载。
redis消息队列
Redis提供了发布订阅功能,可以用于消息的传输,Redis的发布订阅机制包括三个部分,发布者,订阅者和Channel。发布者将消息发送到某个的Channel,订阅了这个Channel的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。
订阅者可以订阅一个或多个频道(channel)(可以用通配符,监听多个频道),而发布者可以向指定的频道发送消息,所有订阅次频道的订阅者都会收到次消息。
发布者端代码
目录结构
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.yml
server:
port: 18081
spring:
redis:
host: 192.168.0.102
password:
port: 6379
jedis:
pool:
max-idle: 100
max-wait: 1000s
max-active: -1
min-idle: 1
Const.java
public class Const {
//通道名称
public static final String CHANNEL = "test_channel";
}
redis 操作类 RedisService.java
@Component
public class RedisService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private CountDownLatch latch;
/**
* 向通道发送消息的方法
* @param channel
* @param message
*/
public void sendChannelMess(String channel, String message) {
try {
stringRedisTemplate.convertAndSend(channel, message);
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
发布消息service层 PublisherServiceImpl
@Slf4j
@Service
public class PublisherServiceImpl implements PublisherService {
@Autowired
private RedisService redisService;
@Override
public String pushMsg(String params) {
log.info(" 又开始发布消息 .......... ");
//直接使用convertAndSend方法即可向指定的通道发布消息
redisService.sendChannelMess(Const.CHANNEL,"我又开始发布消息了,你那边有没有收到呢?");
return "success";
}
}
controller层 PublisherController
@RestController
public class PublisherController {
@Autowired
private PublisherService publisherService;
/**
* push 消息
* @param params
* @return
*/
@PostMapping("pushMsg")
public String pushMsg(String params){
return publisherService.pushMsg(params);
}
}
订阅者端代码
目录结构
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.yml
server:
port: 18081
spring:
redis:
host: 192.168.0.102
password:
port: 6379
jedis:
pool:
max-idle: 100
max-wait: 1000s
max-active: -1
min-idle: 1
Const.java
public class Const {
//通道名称
public static final String CHANNEL = "test_channel";
}
创建 消息接收者(订阅者)RedisReceiver.java
/***
* 消息接收者(订阅者) 需要注入到springboot中
*/
@Slf4j
@Component
public class RedisReceiver {
private CountDownLatch latch;
@Autowired
public RedisReceiver(CountDownLatch latch) {
this.latch = latch;
}
/**
* 收到通道的消息之后执行的方法
* @param message
*/
public void receiveMessage(String message) {
//这里是收到通道的消息之后执行的方法
log.info("我收到通道里你发的的消息了....." + message);
latch.countDown();
}
}
创建 监听配置 RedisSubListenerConfig.java
/***
* redis 监听配置
*/
@Component
public class RedisSubListenerConfig {
/**
* 初始化监听器
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic(Const.CHANNEL)); // new PatternTopic("这里是监听的通道的名字") 通道要和发布者发布消息的通道一致
return container;
}
/**
* 绑定消息监听者和接收监听的方法
* @param redisReceiver
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
// redisReceiver 消息接收者
// receiveMessage 消息接收后的方法
return new MessageListenerAdapter(redisReceiver, "receiveMessage");
}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
/**
* 注册订阅者
* @param latch
* @return
*/
@Bean
RedisReceiver receiver(CountDownLatch latch) {
return new RedisReceiver(latch);
}
/**
* 计数器,用来控制线程
* @return
*/
@Bean
CountDownLatch latch() {
return new CountDownLatch(1); //指定了计数的次数 1
}
}