vertx 实现动态 RPC

需求:替换ali lightApi 动态rpc的实现,因为api为商业版,不是开源的,是基于pandora 的EDAS平台的。那么我们如何实现开源的动态RPC呢?

定义:动态RPC指的是,可以动态的让一个服务上线和下线,换句话说就是动态的从注册中心剔除服务,而不是停止这个服务,启动这个服务。这个需求是来自链路的原路返回

简介 vertx:

vertx 是一套全异步的基于netty通信的框架,Vertx中核心组建有 verticle, eventbus, circuit breaker,service discovery and register, Router等等。后期我会一一的去探索这里面的组件的原理。而本文介绍的vertx 的动态rpc,就是使用vertx proxy service 这个功能来实现的。另外我们得知道vertx 中的线程模型是基于netty的event loop,一个verticle 是一个微服务,而且具有HA的机制的微服务。服务之间的通信是根据event bus 的netty 通信机制。但是event bus 通信不是100%的可靠的(这点很要命,后期我会写博客探索的)。

此外vertx需要依赖 分布式缓存建立集群(hazelcast, ignite等),基于gossip协议的p2p网络。

探索过程:

如果要实现需求的话,首先第一想到的还是spring cloud,dubbo,thrift 等RPC框架,hsf 是阿里pandora rpc 框架,肯定不能用。但是经一番折腾之后,没有可以让我觉得可以在代码里面直接让服务上线下线,从注册中心剔除掉或者注册到注册中心的。那怎么办呢?曾经也想过会用rabbitmq,这样的MQ 去做动态的RPC,但是发现很复杂,关键很难做scalablity,如果有上千个链路,就需要上千个topic,或者tag 这类的标记。感觉很复杂,也很难维护。

solutions: vertx proxy service 

这个方案上,我们不打算使用verticle的概念,仅仅使用vertx 代理服务的概念。首先我们要使用vertx 的服务发现和注册,其次我们要使用proxy service。

第一个坑:代理服务的自动生成。请在pom或者gradle里面加入 生成代理的依赖(对接口的代理)。然后创建 package-info.java,在root package。所谓的root package就是基package,web developer 开的springboot 应该都很清楚,需要将app.class 建在基package,以便扫描都可以扫描到

    <groupId>io.vertx</groupId>

    <artifactId>vertx-service-proxy</artifactId>

    <classifier>processor</classifier>

    <groupId>io.vertx</groupId>

    <artifactId>vertx-codegen</artifactId>

    <classifier>processor</classifier>

@ModuleGen(name ="ap-common-vertx", groupPackage ="com.xxxx.xx.xx.vertx")

package com.xxxx.xx.xx.vertx;

import io.vertx.codegen.annotations.ModuleGen;

其中groupPackage ="com.xxxx.xx.xx.vertx"), 是基package, 也可以是 你定义interface 所在的包。完成之后,使用maven clean install, 你就会发现在target 目录下面会有 ...Proxy 的class,这两个class 就是代理类,对于原理,后面的博客我会慢慢分析。

第二个坑是:服务发现publish 服务之后,需要注册服务代理,不然注册的服务,在event bus 上面是找不到的,看看代码怎么写吧:

Record record = EventBusService.createRecord(servicePublishRequestBean.getServiceName(), servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz());

servicePublishRequestBean.getDiscovery().publish(record, ar -> {

if (ar.succeeded() && ar.result() !=null) {

publishedRecords.add(record);

        recordMessageConsumerMap.putIfAbsent(record, ProxyServiceUtil.registerProxyService(servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz(), servicePublishRequestBean.getService()));

    }

handler.handle(ar.map(ar.result()));

});

servicePublishRequestBean就是我封装的bean,里面de属性有:

private StringserviceName;

private StringeventBusAddress;

private ServiceDiscoverydiscovery;

private Classclazz;

private T service;

此外我们一定要注册这样的proxy service才能够生效,不然是没有用的:

ProxyServiceUtil.registerProxyService(servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz(), servicePublishRequestBean.getService()));

看看里面怎么写的

public static MessageConsumerregisterProxyService(String eventBusAddress, Class clazz, T service) {

return ServiceBinderUtil.getBinderInstance()

.setAddress(eventBusAddress)

.register(clazz, service);

}

第三个坑是: 和第二个坑一样的,需要注销服务,注销服务的话,我们也要调用proxyService 去把服务注销了,而不能单纯的调用 service discovery and register unpublish 方法,这个是很恶心的:看看我怎么写的把,一些很细节的东西需要自己探索,我不会贴出所有的东西的。比如下面的discover record 有许多的状态,这里一不小心就会找不到你所发布的服务,另外注销代理服务,需要个奇怪的参数,这个参数我存储在map里面:recordMessageConsumerMap, 在服务publish时候,就会生成的。

discovery.getRecord(info -> info.getName().equals(serviceName), res -> {

if (res.succeeded() && res.result() !=null && res.result().getStatus() == Status.UP) {

Record record = res.result();

                discovery.unpublish(record.getRegistration(), ar -> {

if(ar.succeeded()) {

List records =recordMessageConsumerMap.keySet().stream().filter(map-> map.getName().equals(serviceName)).collect(Collectors.toList());

                        offlineRecord(records);

                    }

handler.handle(ar.map((Void)null));

                });

            }else {

handler.handle(res.map((Void)null));

            }

}

);

OfflineRecord 主要是调用ProxyServiceUtil.unregisterProxyService(recordMessageConsumerMap.get(records.get(0)));,其中unregisterProxyService 方法的实现如下,是不是比较简单?

ServiceBinderUtil.getBinderInstance().unregister(consumer);

然后基本上做到这里,vertx 动态的rpc 就可以实现了,主要是用了vertx 服务发现组件的,publish, unpublish方式,和proxy service 的 register和unregister方法。但是里面的坑比较多。此外除了上面所描述的坑之外,我还想告诉小伙伴们,event bus 通信exception 会出现一些奇怪的错误,所有event bus 通信也是个坑,其实proxy sevice 本质就是 event bus,所以event bus 网络连接不通的话,确实很头疼,经过我这边的测试,不管ecs 集群的部署,还是ecs 和docker 的混合部署,网络都是可以通的(我使用的是ignite分布式缓存)。暂时我先贴出来关于网络的代码,如果你们遇到了问题,先暂时按照我这个来,不会让你很恼火。

@Bean

public IgniteConfigurationgetIgniteSelfConfiguration()throws Exception{

IgniteConfiguration igniteConfiguration =new IgniteConfiguration();

    igniteConfiguration.setClientMode(false);

    igniteConfiguration.setPeerClassLoadingEnabled(true);

    igniteConfiguration.setDeploymentMode(DeploymentMode.CONTINUOUS);

    igniteConfiguration.setPeerClassLoadingMissedResourcesCacheSize(0);

    igniteConfiguration.setDiscoverySpi(getTcpDiscoverySpi());

    igniteConfiguration.setCacheConfiguration(getCacheConfiguration());

  // igniteConfiguration.setLocalHost(IPUtil.getLocalIp());

    return igniteConfiguration;

}

@Bean

public TcpDiscoverySpigetTcpDiscoverySpi()throws Exception{

TcpDiscoverySpi tcpDiscoverySpi =new TcpDiscoverySpi();

    tcpDiscoverySpi.setIpFinder(getTcpDiscoveryMulticastIpFinder());

    tcpDiscoverySpi.setNetworkTimeout(10000);

    System.out.println("setting success for host");

    tcpDiscoverySpi.setLocalAddress(IPUtil.getLocalIp());

    return tcpDiscoverySpi;

}

@Bean

public TcpDiscoveryMulticastIpFindergetTcpDiscoveryMulticastIpFinder(){

TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder =new TcpDiscoveryMulticastIpFinder();

    tcpDiscoveryMulticastIpFinder.setMulticastGroup("224.0.0.100");

    return tcpDiscoveryMulticastIpFinder;

}

@Bean

public CacheConfigurationgetCacheConfiguration(){

CacheConfiguration cacheConfiguration =new CacheConfiguration();

    cacheConfiguration.setName("myCache");

    cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);

    cacheConfiguration.setBackups(1);

    return cacheConfiguration;

}

@Bean

public VertxClusterStartervertxClusterStarter() {

VertxClusterStarter vertxClusterStarter =new VertxClusterStarter();

    return vertxClusterStarter;

}

ClusterManager clusterManager =new IgniteClusterManager(igniteSelfConfiguration);

TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)igniteSelfConfiguration.getDiscoverySpi();

TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder = (TcpDiscoveryMulticastIpFinder) discoverySpi.getIpFinder();

tcpDiscoveryMulticastIpFinder.setAddresses(Arrays.asList(propertiesHolderUtils.getVertxClusterIps().split(CommonConstants.CHARACTER_SEPARATOR_COMMA)));

VertxOptions vertxOptions =new VertxOptions().setClustered(true).setClusterHost(IPUtil.getLocalIp()).setClusterPort(Integer.valueOf(propertiesHolderUtils.getVertxClusterPorts().split(CommonConstants.CHARACTER_SEPARATOR_COMMA)[0])).setClusterManager(clusterManager);

ServiceDiscoveryOptions discoveryOptions =new ServiceDiscoveryOptions();

谢谢,希望对大家有所帮助,后面我会尝试探索event bus 通信的原理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354

推荐阅读更多精彩内容