rabbitmq消息标记拦截处理

链路压测是一种常见的压测手段,可以测试出系统,链路的性能瓶颈在哪。大公司基本都有根据自己的业务开发的整套链路压测的产品。但是基本没有开源出来,技术细节都是没有的,只是有文章介绍他们的场景和解决方案。本人最近也参与了一个链路压测的项目,把这个项目中的遇到的一些问题和解决写出来,希望给到有需要的人,技术方案并不复杂。

链路压测的操作方式有2中

  • 可以部署一套跟生产一样的服务,然后录制线上流量到模拟环境回放
  • 另一种就是直接对线上服务进行压测,但是不能污染到线上的数据

这里只讨论第二种方式涉及到的问题

  • 既然是链路,就有上下游依赖的服务,就需要把压测请求一直传递下去,这叫透传
  • 识别到压测请求后数据插到影子库,影子mongo(mongodb影子库动态切换),影子redis,mq 消息也要标记识别

说到mq,消息的标记识别需要在发送端跟消费端做处理,也分2种方式

  • 要么让压测请求跟正常请求都进入到生产的服务器的队列,压测消息发送端加上标记,接收端加上识别。
  • 另一个就是压测的消息发送放到另一个影子队列里面,跟生产的队列完全隔离开,消费端监听生产对列跟影子队列,接收的拦截里面判断是哪个队列发过来的消息,做对应处理
这里针对的都是注解方式使用的rabbitmq,不包括那些硬编码使用mq发送接收的
消息接收
@RabbitListener(queues = MqConstant.QUEUE)
public final void onMessage(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        System.out.println("是否是压测消息: " + HeaderThreadLocal.isTestRequest());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
消息发送
public void sendMq(){
        rabbitTemplate.convertAndSend(MqConstant.EXCHANGE, MqConstant.ROUT_KEY, "message-body-" + UUID.randomUUID().toString());
}

1 先来说说第一种

消息的标记识别实际上就是对发送接收做一个拦截处理,配置2个bean,在bean 的方法里面做拦截的逻辑就可以了

    @Bean(name = "rabbitListenerContainerFactory")
    @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                                     ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //消息接收之前加拦截处理,每次接收消息都会调用,是有压测消息标记的,先存到副本变量,后续的操作数据库根据这个变量进行切换影子库
        factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                Map header = message.getMessageProperties().getHeaders();
                //判断是哪个队列的消息,影子队列的话要动态切换影子库跟后续操作
                String queue = message.getMessageProperties().getConsumerQueue();
                if (header.containsKey("test")){
                    HeaderThreadLocal.setIsTestRequest(true);
                }
                return message;
            }
        });
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        //发送之前加一个拦截器,每次发送都会调用这个方法,方法名称已经说明了一切了
        rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                if (HeaderThreadLocal.isTestRequest()) {
                    //拦截逻辑就是如果是压测请求就加个header标记
                    message.getMessageProperties().getHeaders().put("test", true);
                }
                return message;
            }
        });
        return rabbitTemplate;
    }

RabbitTemplate 也有 setAfterReceivePostProcessors方法,但是这个方法对注解方式的接收消息是没用的,源码注释有说明只适用哪种接收消息的方式

2 接下来说说第二种

第二种也很简单,操作如下

  • 1 在生产的mq服务器上面建好影子队列


    image.png
  • 2 用一个对应的影子 routing-key 做好影子队列与交换器的绑定


    image.png
  • 3 在发送消息的时候,如果是正常请求,就直接发送,压测请求就把发送的routing-key 加上一个后缀对应到影子routing-key
使用一个自定义的RabbitTemplate,重写里面的convertAndSend方法,每次调用都判断,是否要切换routing-key
public class CustomRabbitTemplate extends RabbitTemplate {
    public CustomRabbitTemplate(ConnectionFactory connectionFactory) {
        super(connectionFactory);
    }
    @Override
    public void convertAndSend(String exchange,
                               String routingKey,
                               final Object object) throws AmqpException {
        if (HeaderThreadLocal.isTestRequest()){
            routingKey = routingKey +"-shadow";
        }
        super.convertAndSend(exchange, routingKey, object, (CorrelationData) null);
    }
}

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
        CustomRabbitTemplate rabbitTemplate = new CustomRabbitTemplate(factory);
        return rabbitTemplate;
    }
  • 4 消息消费的时候加上拦截,判断是否是影子队列的标记放一个到 ThreadLocal里面,后面的操作根据这个标记来写数据库或者其他
这一步参考这个bean 的配置 SimpleRabbitListenerContainerFactory
  • 5 自动配置里面解析@RabbitListener(queues = MqConstant.QUEUE)这一段的逻辑自动加上影子队列的监听

找到这个bean RabbitListenerAnnotationBeanPostProcessor 里面的这个方法 resolveQueues

private String[] resolveQueues(RabbitListener rabbitListener) {
        String[] queues = rabbitListener.queues();
        //修改这里面的逻辑,加上这2行代码
        String oldQueue = queues[0];
        queues = new String[]{oldQueue, oldQueue+"-shadow"};
        QueueBinding[] bindings = rabbitListener.bindings();
        return result.toArray(new String[result.size()]);
}

那个bean 是spring的代码,这个方法是私有的,很多方法都是私有的,怎么改? AOP ? 我直接简单粗暴的把
SimpleRabbitListenerContainerFactory 的所有代码弄出来到一个新的子类里面,修改这个方法的逻辑,然后注入这个bean 就行了。

    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
        //这个类 CustomRabbitListenerAnnotationBeanPostProcessor 的代码时全部复制
        //RabbitListenerAnnotationBeanPostProcessor的,只是加了2行代码
        return new CustomRabbitListenerAnnotationBeanPostProcessor();
    }

    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }

这样就可以自动加上影子队列的监听了,一顿操作下来,可以进行测试了,消息是可以动态指定发到对应的队列的

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容