Spring Cloud 基于Bus 的AB-TEST组件

一、前情提要:

因剧情需要,所以准备在基础开发平台中进行AB-TEST组件开发。目前主要使用了Spring Cloud E-SR2 版本,其中使用了kafka作为内置bus总线,另一组kafka用于监控trace推送(如zipkin、自定义监控)。AB-TEST大家都应该了解过,如不了解请参考 https://www.itcodemonkey.com/article/10398.html ,这里就不多讲了。
其实简单来说就是根据配置好的分桶模式如A 、B、C ,在进行逻辑处理时根据某一类条件(如uid)计算出当前用户分桶进行动态的逻辑处理(简单来说就是多个if else)。

二、方案选型:

如要做成组件化那必然要对上层开发透明,最好时无感知的进行逻辑切换,当然我们第一步不需要那么完美,先实现功能组件。在进行技术选型的时候参考了两种模式:
1、zookeeper
优点:技术简单,有定义好的工具
缺点:增加应用依赖的组件,当zk出现问题时造成ab-test失效
2、bus总线
优点:实现简单,耦合度低
缺点:需要详解cloud bus 机制
当然我们选择了后者,因为在基础平台中,组件的侵入性越低,对上层开发越友好,而且就越稳定。

三、Spring CLoud Bus 事件机制

因为目前使用的是Spring Cloud E SR2全家桶,所以在技术处理上也遵循其原则,尽量使用内置技术栈实现。内部主要以cloud bus机制进行了简单的扩展实现,下面我们先来简单了解下BUS EVENT机制。

Bus 机制主要建立在 stream的基础之上,在cloud的下 我们可以选择rabbitmq 或者kafka,其特点主要是针对spring event消息的订阅与发布。

Spring Event 事件驱动模型


image.png

可以看出模型由三部分构成:
事件:ApplicationEvent,继承自JDK的EventObject,所有事件将继承它,并通过source得到事件源
发布者:ApplicationEventPublisher及ApplicationEventMulticaster接口,使用这个接口,我们的Service就拥有了发布事件的能力。
订阅者:在spring bus 中可以实现 ApplicationListener(继承自JDK的EventListener),所有监听器将继承它或添加@EventListener

Bus 事件
众所周知,在bus使用中,最多的场景就是配置中心的动态配置刷新。也就是说我们通过/bus/refresh 接口调用就可以进行针对性的配置刷新了,根据这条线,我们来看下内部的源码结构。

1、通过rest 接口进行外部请求
此处cloud 借助其端点监控机制实现,主要看 RefreshBusEndpoint,当然Cloud E 和 F版本有些许不一样,但其实不难理解Cloud E SR2

@ManagedResource
public class RefreshBusEndpoint extends AbstractBusEndpoint {

    public RefreshBusEndpoint(ApplicationEventPublisher context, String id,
            BusEndpoint delegate) {
        super(context, id, delegate);
    }
    //定义对外访问接口
    @RequestMapping(value = "refresh", method = RequestMethod.POST)
    @ResponseBody
    @ManagedOperation
    public void refresh(
            @RequestParam(value = "destination", required = false) String destination) {
        publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
    }

}

Cloud F SR2

@Endpoint(id = "bus-refresh") //TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {

    public RefreshBusEndpoint(ApplicationEventPublisher context, String id) {
        super(context, id);
    }

    @WriteOperation
    public void busRefreshWithDestination(@Selector String destination) { //TODO: document destination
        publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
    }

    @WriteOperation
    public void busRefresh() {
        publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null));
    }

}

通过上面的代码可以看到,请求进来的话都调用了 AbstractBusEndpoint 的publish进行了事件发布

public class AbstractBusEndpoint {

    private ApplicationEventPublisher context;

    private String appId;

    public AbstractBusEndpoint(ApplicationEventPublisher context, String appId) {
        this.context = context;
        this.appId = appId;
    }

    protected String getInstanceId() {
        return this.appId;
    }
    //发布事件
    protected void publish(ApplicationEvent event) {
        context.publishEvent(event);
    }

}

其中ApplicationEventPublisher 在哪定义的呢,这就不得不说BusAutoConfiguration 这里了,这是bus的核心加载器,在通过外部接口调用发布事件后内部对事件进行了监听和处理就在BusAutoConfiguration中,如下:

//消费事件,进行逻辑判断是否是由自己发出的事件,如果是自己内部发出的事件则通过stream(kafka)进行发布
@EventListener(classes = RemoteApplicationEvent.class)
    public void acceptLocal(RemoteApplicationEvent event) {
        if (this.serviceMatcher.isFromSelf(event)
                && !(event instanceof AckRemoteApplicationEvent)) {
            this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
        }
    }
//通过stream(kafka)进行外部订阅类型为RemoteApplicationEvent 的事件
@StreamListener(SpringCloudBusClient.INPUT)
    public void acceptRemote(RemoteApplicationEvent event) {
        //判断是否是ack类型的回执事件,是的话进行内部发布,用于bustrace 等处理
        if (event instanceof AckRemoteApplicationEvent) {
            if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
                    && this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent(event);
            }
            // If it's an ACK we are finished processing at this point
            return;
        }
        //判断是否是给自己的事件,在外部接口请求时可增加destination 进行选择*:*代表全部应用
        if (this.serviceMatcher.isForSelf(event)
                && this.applicationEventPublisher != null) {
            //如果不是自己发的就进行内部转发
            if (!this.serviceMatcher.isFromSelf(event)) {
                this.applicationEventPublisher.publishEvent(event);
            }
            //判断是否要进行ack处理,默认开启
            if (this.bus.getAck().isEnabled()) {
                AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
                        this.serviceMatcher.getServiceId(),
                        this.bus.getAck().getDestinationService(),
                        event.getDestinationService(), event.getId(), event.getClass());
                this.cloudBusOutboundChannel
                        .send(MessageBuilder.withPayload(ack).build());
                this.applicationEventPublisher.publishEvent(ack);
            }
        }
        //判断是否要进行trace跟踪,默认关闭
        if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
            // We are set to register sent events so publish it for local consumption,
            // irrespective of the origin
            this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
                    event.getOriginService(), event.getDestinationService(),
                    event.getId(), event.getClass()));
        }
    }

观看完内部事件消费,和stream消息订阅,那bus 的stream又是怎么进行初始化和工作的呢,答案依然在BusAutoConfiguration 中,如下:

@Autowired
    private BusProperties bus;

    private ApplicationEventPublisher applicationEventPublisher;

    @PostConstruct
    public void init() {
        BindingProperties inputBinding = this.bindings.getBindings()
                .get(SpringCloudBusClient.INPUT);
        if (inputBinding == null) {
            this.bindings.getBindings().put(SpringCloudBusClient.INPUT,
                    new BindingProperties());
        }
        BindingProperties input = this.bindings.getBindings()
                .get(SpringCloudBusClient.INPUT);
        if (input.getDestination() == null) {
            input.setDestination(this.bus.getDestination());
        }
        BindingProperties outputBinding = this.bindings.getBindings()
                .get(SpringCloudBusClient.OUTPUT);
        if (outputBinding == null) {
            this.bindings.getBindings().put(SpringCloudBusClient.OUTPUT,
                    new BindingProperties());
        }
        BindingProperties output = this.bindings.getBindings()
                .get(SpringCloudBusClient.OUTPUT);
        if (output.getDestination() == null) {
            output.setDestination(this.bus.getDestination());
        }
    }

对于cloud stream就不多说了,这里很简单的进行了初始化,所以对于发布和订阅消息就很清晰了。其实在BusAutoConfiguration中所以的事件消费、发布、订阅都是为了集群内部的通信,而真正的事件处理却不此处。那么对于配置的刷新行为到底在哪呢,经过查看对于刷新的操作要看RefreshListener 了。
如下图 RefreshListener 针对事件进行了监听,其事件使用的是RefreshRemoteApplicationEvent,其继承RemoteApplicationEvent。

image.png
public class RefreshListener
        implements ApplicationListener<RefreshRemoteApplicationEvent> {

    private static Log log = LogFactory.getLog(RefreshListener.class);

    private ContextRefresher contextRefresher;

    public RefreshListener(ContextRefresher contextRefresher) {
        this.contextRefresher = contextRefresher;
    }

    @Override
    public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
        Set<String> keys = contextRefresher.refresh();
        log.info("Received remote refresh request. Keys refreshed " + keys);
    }
}

通过源码可以看出,事件的刷新行为 Set<String> keys = contextRefresher.refresh(); (当然contextRefresher 定义也是在BusAutoConfiguration 有兴趣可以查看下),对于刷新到底怎么实现的,也就接近了bus config刷新的核心:ContextRefresher

public synchronized Set<String> refresh() {
        //加载当前内存中的配置
        Map<String, Object> before = extract(
                this.context.getEnvironment().getPropertySources());
        //单独启动一个内部容器进行远程配置加载,
        addConfigFilesToEnvironment();
        //交叉对比配置信息,其实就俩Map进行,如果新配置不包含这个key了就result.put(key, null);,如果俩val不一样,就            覆盖新值
        Set<String> keys = changes(before,
                extract(this.context.getEnvironment().getPropertySources())).keySet();
        //发布内部变更事件
        this.context.publishEvent(new EnvironmentChangeEvent(context, keys));
        //刷新配置,主要是加了注解@Refresh的方法 参数等,这里的坑在于会造成服务状态变更 UP-》DOWN-》UP 
        //大面积波动的话很可怕
        this.scope.refreshAll();
        return keys;
    }

经过以上的源码,脉络就很清晰了:
1、外部POST /bus/refresh 进行了刷新行为,发布内部事件RefreshRemoteApplicationEvent
2、通过@EventListener 进行内部事件消费,如果是自己内部发布的事件就通过stream进行广播
3、通过@StreamListener 对stream进行监听,如果是给自己的事件,就进行内部转发,具体需不需要ack trace则根据配置进行。
4、内部通过RefreshListener 消费事件,通过ContextRefresher.refresh 进行配置刷新
这下一个完整的bus机制就展现在我们眼前,我们只需要简单的进行改造,就能实现自己的动态事件推送了。

四、AB-TEST机制实现:
经过上面bus event的铺垫,现在我们来说下实现AB-TEST中分为了几个目标阶段:

1、应用启动load分桶数据
看过了bus的模式,这里就简单咯。在这里我们通过继承 PropertyResourceConfigurer 来实现配置的初始化,而配置的来源就是cloud 配置中心的 application.properties,因为使用了配置中心后此配置会自动加载不要额外的处理。然后在对加载的配置进行归类即可(因我为test配置定义了前缀,所以只需过滤其即可),模仿bus配置筛选即可。

public static Map<String, Object> extract(MutablePropertySources propertySources) {
        Map<String, Object> result = new HashMap<>(16);
        List<PropertySource<?>> sources = new ArrayList<PropertySource<?>>();
        for (PropertySource<?> source : propertySources) {
            sources.add(0, source);
        }
        for (PropertySource<?> source : sources) {
            if (!standardSources.contains(source.getName())) {
                extract(source, result);
            }
        }
        return result;
    }

    public static void extract(PropertySource<?> parent, Map<String, Object> result) {
        if (parent instanceof CompositePropertySource) {
            try {
                List<PropertySource<?>> sources = new ArrayList<PropertySource<?>>();
                for (PropertySource<?> source : ((CompositePropertySource) parent)
                        .getPropertySources()) {
                    sources.add(0, source);
                }
                for (PropertySource<?> source : sources) {
                    extract(source, result);
                }
            } catch (Exception e) {
                return;
            }
        } else if (parent instanceof EnumerablePropertySource) {
            for (String key : ((EnumerablePropertySource<?>) parent).getPropertyNames()) {
                result.put(key, parent.getProperty(key));
                log.debug("PropertyConfigure load K[{}] V[{}]", key, parent.getProperty(key));
            }
        }
    }

2、在请求来临时进行动态计算分桶
定义自定义注解@NoveTest用于标注需要进行测试的方法,定义NoveParamInterceptor 对入参进行解析,定义NoveTestInterceptor内部拦截器进行注解切入 ,为增加了@NoveTest进行动态分桶计算。

@Pointcut("@annotation(NoveTest)")
    public void anyMethod() {

    } 
@Before(value = "anyMethod()")
    public void doBefore(JoinPoint jp) throws Exception {

        try {
            MethodSignature methodSig = (MethodSignature) jp.getSignature();
            Annotation[] annotations = methodSig.getMethod().getDeclaredAnnotations();
            for (Annotation annotation : annotations) {
                String name = annotation.annotationType().getName();
                String novetest = NoveTest.class.getName();
                if (novetest.equals(name)) {
                    NoveTest test = (NoveTest) annotation;
                    Map<String, String> buckets = RandomBucketUtils.getBuckets(test.name());
                    RandomBucketUtils.loadBuckets(buckets);
                }
            }
        } catch (Exception e) { //防御性容错

        }

    }

其中分桶计算的策略很简单,通过uid + 因子进行hash 计算index 获取数据

int hash = (((sole + factor).hashCode() % 100) + 100) % 100;
 //获取参数百分比值
      config.entrySet().stream().sorted(Map.Entry.comparingByKey(Comparator.reverseOrder())).forEach(entry -> {
                if (entry.getKey().contains(percent)) {
                    IntStream.range(0, Integer.valueOf((String) entry.getValue())).forEach(i -> bucket.add(entry.getKey()));
                }

            });

好了,定义完成,上层应用开发使用方式:

@RequestMapping("/test")
    @NoveTest(name = {"312", "feed"})
    public Map<String, String> test(@RequestParam(defaultValue = "", required = false) String ma) {

        String type = RandomBucketUtils.getProperties("type");
        //TODO 通过type 分枝处理 if else
        
         return null;

    }

3、修改配置进行分桶策略的动态刷新
分桶策略不可能一直不变,而且变化的时候也不应该重新发版,那真是太恶心人了。所以动态配置推送就至关重要了,当然大家也可以使用自带的bus总线进行数据推送,但是其destroy的问题真是恶心到家了,而且有可能造成服务大面积瘫痪,慎用。
基于种种问题,就需要自定义bus event,结合其现有的bus进行数据推送。

其实很简单,分为几步:
自定义端点,进行自定义事件发送。(事件广播不需要特殊处理)
自定义Listener进行本地事件消费,进行数据刷新
注意几点,在Cloud E 和F 版本有一些不同之处。主要在于端点的暴露的策略上,在F版本上使用如下

@WebEndpoint(id = "nove-refresh")
public class NoveRefreshBusEndpoint extends AbstractBusEndpoint {

    public NoveRefreshBusEndpoint(ApplicationEventPublisher context,  String id) {
        super(context, id);
    }



    @WriteOperation
    public void busNoveRefresh() {
        publish(new NoveRefreshRemoteApplicationEvent(this, getInstanceId(),null));
    }

    //此处原来的destination 参数必须写成arg0 不然就不生效,恶心人,内部机制,这样处理最简单
    @WriteOperation
    public void busRefreshWithDestination(@Selector String arg0) {
        publish(new NoveRefreshRemoteApplicationEvent(this, getInstanceId(), arg0));
    }
}

SO 到目前为止一个可用于生产的AB-TEST 机制就实现了,有的同学就会说了,你这根据数据在进行if else 的逻辑判断调用真是恶心到人了。确实,因为第一版目前是最简单的实现,在第二版将进行动态调用的优化。其实动态调用说来也很简单,无非通过定义的接口实现各个不同的逻辑,然后针对其进行简单的动态代理即可。后期源码会同步上传。

总结:
自定义AB-TEST组件的过程
1、自定义内部端点暴露动态配置刷新,发送刷新事件
2、接收事件,定义刷新策略,只刷新需要的配置即可
3、定义启动初始化方式
4、通过动态代理实现动态逻辑调用(待完成)

qrcode_for_gh_13314ac27929_258.jpg
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容