1.新增依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2.配置redis
-
配置连接
spring: redis: host: localhost port: 6379 password: root
-
发布者配置
@Configuration public class PublisherConfig { @Bean public RedisTemplate<String, Object> redisMessageTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); template.setDefaultSerializer(new FastJsonRedisSerializer<>(Object.class)); return template; } }
-
订阅者配置
@Configuration public class ReceiverConfig { @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter userListenerAdapter,MessageListenerAdapter goodsListenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(userListenerAdapter, new PatternTopic("user")); container.addMessageListener(goodsListenerAdapter, new PatternTopic("goods")); return container; } @Bean public MessageListenerAdapter userListenerAdapter(UserReceiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean public MessageListenerAdapter goodsListenerAdapter(GoodsReceiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean public UserReceiver userReceiver() { return new UserReceiver(); } @Bean public GoodsReceiver goodsReceiver() { return new GoodsReceiver(); } }
3.消息模型
@Data
public class RedisMessage implements Serializable {
public String msgId;
public long createStamp;
}
@EqualsAndHashCode(callSuper = true)
@Data
public class GoodsMessage extends RedisMessage {
private String goodsType;
private String number;
}
@EqualsAndHashCode(callSuper = true)
@Data
public class UserMessage extends RedisMessage {
private String userId;
private String username;
private String password;
}
4.消息发布者
@Service
public class Publisher {
private final RedisTemplate<String, Object> redisMessageTemplate;
@Autowired
public Publisher(RedisTemplate<String, Object> redisMessageTemplate) {
this.redisMessageTemplate = redisMessageTemplate;
}
public void pushMessage(String topic, RedisMessage message) {
redisMessageTemplate.convertAndSend(topic,message);
}
}
5.消息订阅者
public abstract class AbstractReceiver {
public abstract void receiveMessage(Object message);
}
@Slf4j
public class GoodsReceiver extends AbstractReceiver {
@Override
public void receiveMessage(Object message) {
log.info("接收到商品消息:{}", JSON.toJSONString(message));
}
}
@Slf4j
public class UserReceiver extends AbstractReceiver {
@Override
public void receiveMessage(Object message) {
log.info("接收到用户消息:{}", JSON.toJSONString(message));
}
}
6.测试
@Test
public void pushMessage() {
UserMessage userMessage = new UserMessage();
userMessage.setMsgId(UUID.randomUUID().toString().replace("-",""));
userMessage.setUserId("1");
userMessage.setUsername("admin");
userMessage.setUsername("root");
userMessage.setCreateStamp(System.currentTimeMillis());
publisher.pushMessage("user",userMessage);
GoodsMessage goodsMessage = new GoodsMessage();
goodsMessage.setMsgId(UUID.randomUUID().toString().replace("-",""));
goodsMessage.setGoodsType("苹果");
goodsMessage.setNumber("十箱");
goodsMessage.setCreateStamp(System.currentTimeMillis());
publisher.pushMessage("goods",goodsMessage);
}
测试结果如下:
2018-09-28 12:02:16.281 INFO 25244 --- [ main] org.qiqiang.redismq.core.PublisherTest : Started PublisherTest in 2.889 seconds (JVM running for 3.723)
2018-09-28 12:02:16.544 INFO 25244 --- [ container-2] org.qiqiang.redismq.core.UserReceiver : 接收到用户消息:"{\"createStamp\":1538107336525,\"msgId\":\"62c60489199545209f036d5eeffc353e\",\"userId\":\"1\",\"username\":\"root\"}"
2018-09-28 12:02:16.547 INFO 25244 --- [ container-3] org.qiqiang.redismq.core.GoodsReceiver : 接收到商品消息:"{\"createStamp\":1538107336543,\"goodsType\":\"苹果\",\"msgId\":\"b0ee90c6635444c490afc691a3c5cf74\",\"number\":\"十箱\"}"
2018-09-28 12:02:16.551 INFO 25244 --- [ Thread-2] o.s.w.c.s.GenericWebApplicationContext : Closing org.springframework.web.context.support.GenericWebApplicationContext@1a4927d6: startup date [Fri Sep 28 12:02:13 CST 2018]; root of context hierarchy
2018-09-28 12:02:16.552 INFO 25244 --- [ Thread-2] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647