MongoDB change stream 实战

背景

最近公司要上一个需求,就是部分业务数据有插入动作时,对用户进行通知
举个虚假的例子,你的下属这会儿有个成交什么的

虽然 save 的逻辑的确是在我们的微服务代码里,
我如果在 save 这里加上这些逻辑,功能没问题,但是不就增加耦合度了么?后面修改逻辑,难度上天
用切面,先不说性能,感觉把代码逻辑放在api容器里,总觉得不太对,我觉得这个功能肯定是要放在离线计算的task容器里的

那用定时任务,看下这段时间内有多少新创建的数据?可以是可以,但是一是延迟高,二是实现逻辑有点傻

还好 MongoDB 提供了 Change Stream 的功能, 戳 mongodb changeStreams

原理和中文文档,网上一搜一堆,但是就是没有生产使用的具体代码例子

所以我来分享下

Spring data

我用的是 Spring Data Mongodb, 其它持久层框架的,请自行查阅官方文档

spring data change-streams

但是这个文档写的有点简陋

  1. import 没写,根本不知道该导哪个包
  2. 例子太简单了点,消费的时候就打印一下,看不到更多的细节

所以我尽量解释的详细点,但是注意,代码本身是运行不起来的,包括mongodb的配置和业务代码等,需要自行实现

代码

ChangeStreamService

我自己的代码,包名是 com.xixi 开头的,其它都是可以引入的包的代码


import com.xixi.SkmrActionLogsDocument;
import com.xixi.DBNameConstant;
import com.xixi.StopWatch;
import com.xixi.OnApplicationStarted;
import com.xixi.IChangeStreamStop;
import com.xixi.CommonErrorHandler;
import com.xixi.LogListener;

import com.mongodb.client.model.changestream.OperationType;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.MatchOperation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.stereotype.Service;


/**
 * @author YellowTail
 * @since 2020-06-02
 */
@Service
public class ChangeStreamService implements IChangeStreamStop, OnApplicationStarted  {

    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeStreamService.class);

    @Autowired
    private MongoTemplate mongoTemplate;

    @Autowired
    private LogListener logListener;

    @Autowired
    private ResumeTokenService resumeTokenService;

    private MessageListenerContainer messageListenerContainer;


    // https://docs.spring.io/spring-data/mongodb/docs/2.2.6.RELEASE/reference/html/#change-streams

    public void onStarted() {

        LOGGER.info("ChangeStreamService start");

        // 1. 启动一个 消息监听容器
        // 构造、使用了一个 spring 实现的线程池
        SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
        simpleAsyncTaskExecutor.setConcurrencyLimit(10);
        simpleAsyncTaskExecutor.setThreadNamePrefix("cs-mq-consumer-");

        messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate, simpleAsyncTaskExecutor);
        messageListenerContainer.start();

        // 2. 建 一个监听器, 当有 消息收到的时候, 会被调用, 消息的 body 会被转成 domain, 原始消息在 Document 里面
        // 也就是 LogListener

        
        // 3. 设置一些监听选项
        // https://docs.mongodb.com/manual/reference/change-events/#change-stream-output

        ChangeStreamRequestOptions requestOptions = new ChangeStreamRequestOptions("yourDb",
                "yourCollection", genOptions());


        // 4. 向容器注册一个监听请求, 返回值是一个订阅对象, 可以检查当前任务的状态, 也可以用来取消执行来释放资源
        CommonErrorHandler commonErrorHandler = new CommonErrorHandler();

        messageListenerContainer.register(new ChangeStreamRequest<>(logListener, requestOptions),
                SkmrActionLogsDocument.class, commonErrorHandler);
    }

    /**
     * 停止 消息监听容器
     * @param
     * @author YellowTail
     * @since 2020-06-02
     */
    public void stop() {
        LOGGER.info("ChangeStreamService stop MessageListenerContainer");

        StopWatch stopWatch = StopWatch.createStarted();

        // 5. 停止容器
        messageListenerContainer.stop();
        LOGGER.info("ChangeStreamService stop container success, cost {} ms", stopWatch.stopThenRestart());

        // 6. 向 redis 存入 这次最后的 token
        // dev 测试发现,新的容器先起来了,老的容器后 stop的
        resumeTokenService.updateToken(logListener.getLastToken());

        LOGGER.info("ChangeStreamService stop success, cost {} ms", stopWatch.stop());
    }

    /**
     * 生成 change stream 的配置
     * @param
     * @author YellowTail
     * @since 2020-06-09
     */
    private ChangeStreamOptions genOptions() {
        // 使用 pipeline 来过滤

        MatchOperation matchOperation = Aggregation.match(Criteria.where("operationType").is(OperationType.INSERT.getValue()));

//        ChangeStreamOptions changeStreamOptions =  ChangeStreamOptions.empty();

        ChangeStreamOptions.ChangeStreamOptionsBuilder changeStreamOptionsBuilder = ChangeStreamOptions.builder()
                .filter(Aggregation.newAggregation(matchOperation));

        String token = resumeTokenService.getToken();
        if (StringUtils.isNotBlank(token)) {
            // 重启的时候,可以接着上次的来
            changeStreamOptionsBuilder.resumeToken(new BsonString(token));
        }

        return changeStreamOptionsBuilder.build();
    }
}

现在来讲解一下代码细节

onStarted 这个方法做了几件事

  1. 新建一个消息监听容器,并启动起来
  2. 新建一个监听器,有消息来的时候,去消费
  3. 根据需要,设置一些监听选项
  4. 消息监听容器注册一个监听请求,关联上监听器

新建一个消息监听容器

我偷懒,没有做什么更多的设置,基本是全默认的
设置了线程数(应该是这个作用吧,没详细了解)
设置了线程名字前缀(方便在日志文件里搜索日志)

新建一个监听器

我新建了一个 Service 来做这件事


import com.xixi.SkmrActionLogsDocument;
import com.xixi.IdTokenModel;
import com.xixi.ResumeTokenStack;
import com.xixi.VisitorNotifyMqObject;
import com.xixi.VisitorMqProducerService;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.stereotype.Service;

/**
 * @author YellowTail
 * @since 2020-06-02
 */
@Service
public class LogListener implements MessageListener<ChangeStreamDocument<Document>, SkmrActionLogsDocument> {

    private static final Logger LOGGER = LoggerFactory.getLogger(LogListener.class);

    @Autowired
    private MqProducerService mqProducerService;

    private final ResumeTokenStack resumeTokenStack = new ResumeTokenStack();

    /**
     * 收到消息的时候,这个方法会被调用
     * @param message
     * @author YellowTail
     * @since 2020-06-03
     */
    public void onMessage(Message<ChangeStreamDocument<Document>, SkmrActionLogsDocument> message) {
        LOGGER.info("LogListener receive a message");

        String token = message.getRaw().getResumeToken().get("_data").asString().getValue();

        SkmrActionLogsDocument document = message.getBody();
        String _id = document.get_id();

        LOGGER.info("document _id is {}, targetType {}", _id, document.getTargetType());

        // 1. 把 _id 和 token 存起来
        resumeTokenStack.push(new IdTokenModel(_id, token));

        // 2. 做一些事情
      

        // 3. 发送 mq 消息
        mqProducerService.send(xxx);

    }

    /**
     * 得到消费的最后一个 token
     * @author YellowTail
     * @since 2020-06-08
     */
    public String getLastToken() {
        IdTokenModel pop = resumeTokenStack.pop();

        if (null == pop) {
            return null;
        }

        return pop.getToken();
    }

}

错误处理

简单实现了一下

public class CommonErrorHandler implements ErrorHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(CommonErrorHandler.class);

    public void handleError(Throwable t) {
        LOGGER.error("occur error, ", t);
    }
}

优雅停机和避免重复消费

因为我们是k8s,容器重启的时候,可能会有多个容器并存,我担心多个容器同时去 watch,导致重复提醒,
所以加了优雅停机, 在重启的时候,让老容器不再去watch

逻辑就是

  1. 写一个 controller
  2. controller 调用接口IChangeStreamStopstop 方法

这样,k8s在对容器关停的时候,容器就能停止容器,避免重复消费;具体代码实现就不贴了

定义一个接口

public interface IChangeStreamStop {

    void stop();
}

所以可以看到前面的代码

ChangeStreamService implements IChangeStreamStop

k8s yaml 配置

containers:
- name: change-stream
  env:
    - name: aliyun_logs_xxx
      value: /ms/logs/*.log
  image: rxxx:latest
  imagePullPolicy: Always   
  ports:
    - containerPort: 8080
  #就绪检查
  readinessProbe:
    failureThreshold: 10
    httpGet:
      path: /xxx
      port: 8080
      scheme: HTTP
    initialDelaySeconds: 20
    periodSeconds: 2
    successThreshold: 1
    timeoutSeconds: 1
  #健康检查
  livenessProbe:
    failureThreshold: 10
    initialDelaySeconds: 20
    periodSeconds: 2
    successThreshold: 1
    tcpSocket:
      port: 8080
    timeoutSeconds: 1
  # 优雅停机
  lifecycle:
    preStop:
        httpGet:
            path: /xxx/stop
            port: 8080
            scheme: HTTP
  #资源限制
  resources:
    limits:
      memory: 2Gi
    requests:
      memory: 1500Mi

启动

那么 ChangeStreamService里的消息监听容器 什么时候启动起来呢?

定义一个接口

/**
 * IOC 容器启动之后会自动调用的接口
 * @author YellowTail
 * @since 2019-04-03
 */
public interface OnApplicationStarted {

    /**
     * <br>IOC 容器启动之后会调用的方法
     *
     * @author YellowTail
     * @since 2019-04-03
     */
    void onStarted();
}

对应代码

ChangeStreamService implements IChangeStreamStop, OnApplicationStarted 

具体自行实现

参考

mongodb changeStreams

spring data change-streams

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353