RocketMQ应用——消息过滤

消费者再进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型

对于指定Topic消息的过滤方式有两种:Tag过滤与SQL过滤

Tag过滤

通过Consumer的subscribe()方法指定要订阅消息的Tag。如果订阅多个Tag的消息,Tag间使用或运算符(||)链接

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); 
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

SQL过滤

SQL过滤是一种通过特定表达式对事先买入到消息中的用户属性进行筛选过滤的方式。通过SQL过滤,可以实现对消息的复杂过滤。不过,只有使用PUSH模式的消费者才能使用SQL过滤。

SQL过滤表达式中支持多种常量类型与运算符

支持的常量类型:

  • 数值
  • 字符
  • 布尔
  • NULL

支持的运算符:

  • 数字比较:>,>=,<,<=,between,=
  • 字符比较:=,<>,IN
  • 逻辑运算:AND,OR,NOT
  • NULL判断:IS NULL或IS NOT NULL

默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,

enablePropertyFilter = true

consumer.subscribe("myTopic", MessageSelector.bySql("age between 0 and 6"));
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容