背景
最近公司要上一个需求,就是部分业务数据有插入动作时,对用户进行通知
举个虚假的例子,你的下属这会儿有个成交什么的
虽然 save
的逻辑的确是在我们的微服务代码里,
我如果在 save
这里加上这些逻辑,功能没问题,但是不就增加耦合度了么?后面修改逻辑,难度上天
用切面,先不说性能,感觉把代码逻辑放在api
容器里,总觉得不太对,我觉得这个功能肯定是要放在离线计算的task
容器里的
那用定时任务
,看下这段时间内有多少新创建的数据?可以是可以,但是一是延迟高,二是实现逻辑有点傻
还好 MongoDB
提供了 Change Stream
的功能, 戳 mongodb changeStreams
原理和中文文档,网上一搜一堆,但是就是没有生产使用的具体代码例子
所以我来分享下
Spring data
我用的是 Spring Data Mongodb
, 其它持久层框架的,请自行查阅官方文档
但是这个文档写的有点简陋
- import 没写,根本不知道该导哪个包
- 例子太简单了点,消费的时候就打印一下,看不到更多的细节
所以我尽量解释的详细点,但是注意,代码本身是运行不起来的,包括mongodb
的配置和业务代码等,需要自行实现
代码
ChangeStreamService
我自己的代码,包名是 com.xixi
开头的,其它都是可以引入的包的代码
import com.xixi.SkmrActionLogsDocument;
import com.xixi.DBNameConstant;
import com.xixi.StopWatch;
import com.xixi.OnApplicationStarted;
import com.xixi.IChangeStreamStop;
import com.xixi.CommonErrorHandler;
import com.xixi.LogListener;
import com.mongodb.client.model.changestream.OperationType;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.MatchOperation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.stereotype.Service;
/**
* @author YellowTail
* @since 2020-06-02
*/
@Service
public class ChangeStreamService implements IChangeStreamStop, OnApplicationStarted {
private static final Logger LOGGER = LoggerFactory.getLogger(ChangeStreamService.class);
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private LogListener logListener;
@Autowired
private ResumeTokenService resumeTokenService;
private MessageListenerContainer messageListenerContainer;
// https://docs.spring.io/spring-data/mongodb/docs/2.2.6.RELEASE/reference/html/#change-streams
public void onStarted() {
LOGGER.info("ChangeStreamService start");
// 1. 启动一个 消息监听容器
// 构造、使用了一个 spring 实现的线程池
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setConcurrencyLimit(10);
simpleAsyncTaskExecutor.setThreadNamePrefix("cs-mq-consumer-");
messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate, simpleAsyncTaskExecutor);
messageListenerContainer.start();
// 2. 建 一个监听器, 当有 消息收到的时候, 会被调用, 消息的 body 会被转成 domain, 原始消息在 Document 里面
// 也就是 LogListener
// 3. 设置一些监听选项
// https://docs.mongodb.com/manual/reference/change-events/#change-stream-output
ChangeStreamRequestOptions requestOptions = new ChangeStreamRequestOptions("yourDb",
"yourCollection", genOptions());
// 4. 向容器注册一个监听请求, 返回值是一个订阅对象, 可以检查当前任务的状态, 也可以用来取消执行来释放资源
CommonErrorHandler commonErrorHandler = new CommonErrorHandler();
messageListenerContainer.register(new ChangeStreamRequest<>(logListener, requestOptions),
SkmrActionLogsDocument.class, commonErrorHandler);
}
/**
* 停止 消息监听容器
* @param
* @author YellowTail
* @since 2020-06-02
*/
public void stop() {
LOGGER.info("ChangeStreamService stop MessageListenerContainer");
StopWatch stopWatch = StopWatch.createStarted();
// 5. 停止容器
messageListenerContainer.stop();
LOGGER.info("ChangeStreamService stop container success, cost {} ms", stopWatch.stopThenRestart());
// 6. 向 redis 存入 这次最后的 token
// dev 测试发现,新的容器先起来了,老的容器后 stop的
resumeTokenService.updateToken(logListener.getLastToken());
LOGGER.info("ChangeStreamService stop success, cost {} ms", stopWatch.stop());
}
/**
* 生成 change stream 的配置
* @param
* @author YellowTail
* @since 2020-06-09
*/
private ChangeStreamOptions genOptions() {
// 使用 pipeline 来过滤
MatchOperation matchOperation = Aggregation.match(Criteria.where("operationType").is(OperationType.INSERT.getValue()));
// ChangeStreamOptions changeStreamOptions = ChangeStreamOptions.empty();
ChangeStreamOptions.ChangeStreamOptionsBuilder changeStreamOptionsBuilder = ChangeStreamOptions.builder()
.filter(Aggregation.newAggregation(matchOperation));
String token = resumeTokenService.getToken();
if (StringUtils.isNotBlank(token)) {
// 重启的时候,可以接着上次的来
changeStreamOptionsBuilder.resumeToken(new BsonString(token));
}
return changeStreamOptionsBuilder.build();
}
}
现在来讲解一下代码细节
onStarted
这个方法做了几件事
- 新建一个
消息监听容器
,并启动起来 - 新建一个
监听器
,有消息来的时候,去消费 - 根据需要,设置一些监听选项
- 向
消息监听容器
注册一个监听请求,关联上监听器
新建一个消息监听容器
我偷懒,没有做什么更多的设置,基本是全默认的
设置了线程数(应该是这个作用吧,没详细了解)
设置了线程名字前缀(方便在日志文件里搜索日志)
新建一个监听器
我新建了一个 Service
来做这件事
import com.xixi.SkmrActionLogsDocument;
import com.xixi.IdTokenModel;
import com.xixi.ResumeTokenStack;
import com.xixi.VisitorNotifyMqObject;
import com.xixi.VisitorMqProducerService;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.stereotype.Service;
/**
* @author YellowTail
* @since 2020-06-02
*/
@Service
public class LogListener implements MessageListener<ChangeStreamDocument<Document>, SkmrActionLogsDocument> {
private static final Logger LOGGER = LoggerFactory.getLogger(LogListener.class);
@Autowired
private MqProducerService mqProducerService;
private final ResumeTokenStack resumeTokenStack = new ResumeTokenStack();
/**
* 收到消息的时候,这个方法会被调用
* @param message
* @author YellowTail
* @since 2020-06-03
*/
public void onMessage(Message<ChangeStreamDocument<Document>, SkmrActionLogsDocument> message) {
LOGGER.info("LogListener receive a message");
String token = message.getRaw().getResumeToken().get("_data").asString().getValue();
SkmrActionLogsDocument document = message.getBody();
String _id = document.get_id();
LOGGER.info("document _id is {}, targetType {}", _id, document.getTargetType());
// 1. 把 _id 和 token 存起来
resumeTokenStack.push(new IdTokenModel(_id, token));
// 2. 做一些事情
// 3. 发送 mq 消息
mqProducerService.send(xxx);
}
/**
* 得到消费的最后一个 token
* @author YellowTail
* @since 2020-06-08
*/
public String getLastToken() {
IdTokenModel pop = resumeTokenStack.pop();
if (null == pop) {
return null;
}
return pop.getToken();
}
}
错误处理
简单实现了一下
public class CommonErrorHandler implements ErrorHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(CommonErrorHandler.class);
public void handleError(Throwable t) {
LOGGER.error("occur error, ", t);
}
}
优雅停机和避免重复消费
因为我们是k8s
,容器重启的时候,可能会有多个容器并存,我担心多个容器同时去 watch
,导致重复提醒,
所以加了优雅停机
, 在重启的时候,让老容器不再去watch
逻辑就是
- 写一个 controller
- controller 调用接口
IChangeStreamStop
的stop
方法
这样,k8s
在对容器关停的时候,容器就能停止容器,避免重复消费;具体代码实现就不贴了
定义一个接口
public interface IChangeStreamStop {
void stop();
}
所以可以看到前面的代码
ChangeStreamService implements IChangeStreamStop
k8s
yaml 配置
containers:
- name: change-stream
env:
- name: aliyun_logs_xxx
value: /ms/logs/*.log
image: rxxx:latest
imagePullPolicy: Always
ports:
- containerPort: 8080
#就绪检查
readinessProbe:
failureThreshold: 10
httpGet:
path: /xxx
port: 8080
scheme: HTTP
initialDelaySeconds: 20
periodSeconds: 2
successThreshold: 1
timeoutSeconds: 1
#健康检查
livenessProbe:
failureThreshold: 10
initialDelaySeconds: 20
periodSeconds: 2
successThreshold: 1
tcpSocket:
port: 8080
timeoutSeconds: 1
# 优雅停机
lifecycle:
preStop:
httpGet:
path: /xxx/stop
port: 8080
scheme: HTTP
#资源限制
resources:
limits:
memory: 2Gi
requests:
memory: 1500Mi
启动
那么 ChangeStreamService
里的消息监听容器
什么时候启动起来呢?
定义一个接口
/**
* IOC 容器启动之后会自动调用的接口
* @author YellowTail
* @since 2019-04-03
*/
public interface OnApplicationStarted {
/**
* <br>IOC 容器启动之后会调用的方法
*
* @author YellowTail
* @since 2019-04-03
*/
void onStarted();
}
对应代码
ChangeStreamService implements IChangeStreamStop, OnApplicationStarted
具体自行实现