RocketMQ消息过滤

消息过滤包括基于表达式过滤与基于类模式两种过滤模式。其中表达式过滤又分为TAG和SQL92模式,分别介绍各自的过滤机制,及代码示例内容,深入探消息过滤的原理。

1、TAG模式过滤

发送消息时我们会为每一条消息设置TAG标签,同一大类中的消息放在一个主题TOPIC下,但是如果进行分类我们则可以根据TAG进行分类,每一类消费者可能不是关系某个主题下的所有消息,我们就可以通过TAG进行过滤,订阅关注的某一类数据。

1.1、producer

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_test");
        producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        producer.start();
        String[] tags = {"TagA","TagB","TagC","TagD"};
        for (int i = 0; i < 40; i++) {
            try {
                String tag = tags[i % tags.length];
                //构建消息
                Message msg = new Message("GumxTest" /* Topic */,
                        tag /* Tag */,
                    ("RocketMQ消息测试,消息的TAG="+tag+" == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

主题GumxTest,标签分别是"TagA","TagB","TagC","TagD"每个分别发送10条消息

1.2、consumer

public class Consumer { 
    public static void main(String[] args){
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
            consumer.setConsumerGroup("consumer_tags");
            consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
            consumer.subscribe("GumxTest", "TagA || TagC");
            consumer.registerMessageListener(new MessageListenerConcurrently(){

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
                        ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
                    try {
                        for(MessageExt msg : paramList){
                            String msgbody = new String(msg.getBody(), "utf-8");
                            SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
                            Date date = new Date(msg.getStoreTimestamp());
                            System.out.println("Consumer1===  存入时间 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//输出消息内容
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
                }
            });
            consumer.start();
            System.out.println("Consumer===启动成功!");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

查看结果:

image

消费者组订阅相同的主题不同的Tag时,如果订阅是多个Tag则通过“||” 分割

同一个消费者组订阅的主题,Tag必须相同

2、SQL表达式过滤

SQL92表达式消息过滤,是通过消息的属性运行SQL过滤表达式进行条件匹配,消息发送时需要设置用户的属性putUserProperty方法设置属性。

支持的语法

  1. 数值比较, 如>, >=, <, <=, BETWEEN, =;
  2. 字符比较, 如=, <>, IN;
  3. IS NULL or IS NOT NULL;
  4. 逻辑连接符AND, OR, NOT;

支持的类型

  1. 数值型, 如123, 3.1415;
  2. 字符型, 如 ‘abc’, 必须用单引号;
  3. NULL, 特殊常数;
  4. 布尔值, TRUE or FALSE;

2.1、producer

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_test");
        producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        producer.start();
        String[] tags = {"TagA","TagB","TagC","TagD"};
        for (int i = 0; i < 40; i++) {
            try {
                String tag = tags[i % tags.length];
                //构建消息
                Message msg = new Message("GumxTest" /* Topic */,
                        tag /* Tag */,
                    ("RocketMQ消息测试,消息的TAG="+tag+  ", 属性 age = " + i + ", == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                msg.putUserProperty("age", i+"");
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

2.2、consumer

public class Consumer { 
    public static void main(String[] args){
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
            consumer.setConsumerGroup("consumer_sql");
            consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
            consumer.subscribe("GumxTest", MessageSelector.bySql("age between 0 and 8"));
            consumer.registerMessageListener(new MessageListenerConcurrently(){

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
                        ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
                    try {
                        for(MessageExt msg : paramList){
                            String msgbody = new String(msg.getBody(), "utf-8");
                            SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
                            Date date = new Date(msg.getStoreTimestamp());
                            System.out.println("Consumer===  存入时间 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//输出消息内容
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
                }
            });
            consumer.start();
            System.out.println("Consumer===启动成功!");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

启动时:

image

启动消费者时发现启动失败,提示不支持SQL92过滤,网上也查了一些资料多说都是说版本太低,但是我使用的是RocketMQ4.2.0版本,已经是很高的版本了。

分析源码发现BrokerConfig的配置类中有一个属性 private boolean enablePropertyFilter = false;默认属性过滤没有开启,然而SQL92就是通过属性来过滤的。问题找到了,我们需要配置broker的属性在broker配置文件中添加enablePropertyFilter =true,需要依次关闭集群中的Broker、NameSrv服务,配置好后依次启动NameSrv、Broker服务

再次启动,启动成功,查看其结果:

image

3、类过滤模式(基于4.2.0版本)

RocketMQ通过定义消息过滤类的接口实现消息过滤

3.1、producer

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        producer.start();

        try {
            for (int i = 0; i < 6000000; i++) {
                Message msg = new Message("TopicFilter",// topic
                    "TagA",// tag
                    ("Hello MetaQ age = " + i ).getBytes());// body
                msg.putUserProperty("age", String.valueOf(i));

                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
                Thread.sleep(1000);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

3.2、consumer

public class Consumer {  
    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupFilter");
        consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        // 使用Java代码,在服务器做消息过滤
        String filterCode = MixAll.file2String("D:\\WorkSoft\\workspace\\rocketmq-example\\src\\main\\java\\cn\\gumx\\rocketmq\\filter\\MessageFilterImpl.java");
        consumer.subscribe("TopicFilter1", "cn.gumx.rocketmq.filter.MessageFilterImpl",filterCode);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
                    ConsumeConcurrentlyContext context) {
                try {
                    for(MessageExt msg : paramList){
                        String msgbody = new String(msg.getBody(), "utf-8");
                        System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgbody);
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

自定义消息的过滤类

public class MessageFilterImpl implements MessageFilter {

    public boolean match(MessageExt msg, FilterContext arg1) {
        String property = msg.getUserProperty("age");
        if (property != null) {
            int age = Integer.parseInt(property);
            if ((age % 3) == 0 && (age > 10)) {
                return true;
            }
        }
        return false;
    }
}

consumer启动后并没有出现预期的结果,查了资料也没有相关介绍,只是和我们代码一样的处理逻辑,查看源码发现,需要启动filter组件mqfiltersrv服务。

./mqfiltersrv -n 10.10.12.203:9876;10.10.12.204:9876 &

我们部署的是双主,mqfiltersrv服务都需要开启

image
image

查看服务端结果:

image

使用类消息过滤模式,需要额外需要启动filter组件mqfiltersrv服务,否则消费不了,每个broker都需要启动一个,相当于加了一层过滤层。

filtersrv 出现了。减少了 Broker 的负担,又减少了 Consumer 接收无用的消息。当然缺点也是有的,多了一层 filtersrv 网络开销

MessageFilterImpl消息过滤实现类中的代码最好不要带有中文防止错误

注意:RocketMQ4.3.1开始删除与 mqfilter 服务器相关的脚本,4.3.2删除客户端关于mqfilter 客户端代码,后面版本不支持该功能。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 230,247评论 6 543
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 99,520评论 3 429
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 178,362评论 0 383
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,805评论 1 317
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 72,541评论 6 412
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,896评论 1 328
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,887评论 3 447
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 43,062评论 0 290
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,608评论 1 336
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 41,356评论 3 358
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,555评论 1 374
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 39,077评论 5 364
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,769评论 3 349
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 35,175评论 0 28
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 36,489评论 1 295
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 52,289评论 3 400
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,516评论 2 379

推荐阅读更多精彩内容

  • rocketmq的消息过滤在consumer端和broker端都有过滤,更高级一点的还有通过filterServe...
    圣村的希望阅读 2,813评论 0 1
  • 一、 关键特性 1 消息发送和消费 1)消息发送者步骤分析: 创建消息生产者producer,并制定生产者组名 指...
    TiaNa_na阅读 2,047评论 0 2
  • rocketMq的部署架构模型 RocketMQ是一个分布式开放消息中间件,底层基于队列模型来实现消息收发功能。R...
    CgySHFF阅读 865评论 0 1
  • 简介 RocketMQ 特点 RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Ap...
    预流阅读 39,242评论 7 55
  • 核心组件(4个组件+消息存储结构) 客户端消费模式 1. MQ的使用场景 昨天在写完之后,有些读者在评论中提出:到...
    楼亭樵客阅读 1,053评论 0 3