带你手写基于 Spring 的可插拔式 RPC 框架(五)注册中心

注册中心代码使用 zookeeper 实现,通过图片来看看我们注册中心的架构。


5.png

首先说明, zookeeper 的实现思路和代码是参考架构探险这本书上的,另外在 github 和我前面配置文件中的 zookeeper 服务器是用的1个月免费适用的阿里云,大家也可以用它当测试用。

不多说,一次性给出注册中心全部代码。

客户端对应的注册中心接口

public interface RegisterCenter4Consumer {

    /**
     * 消费端初始化服务提供者信息本地缓存
     */
    public void initProviderMap();

    /**
     * 消费端获取服务提供者信息
     * @return
     */
    public Map<String,List<ServiceProvider>> getServiceMetaDataMap4Consumer();

    /**
     * 消费端将消费者信息注册到 zookeeper 对应的节点下
     * @param invokers
     */
    public void registerConsumer(final List<ServiceConsumer> invokers);
}

服务端对应的注册中心接口

public interface RegisterCenter4Provider {
    /**
     * 服务端将服务提供者信息注册到 zookeeper 对应的节点下
     * @param serivceList
     */
    public void registerProvider(final List<ServiceProvider> serivceList);

    /**
     * 服务端获取服务提供者信息
     * @return key:服务提供者接口 value:服务提供者服务方法列表
     */
    public Map<String, List<ServiceProvider>> getProviderService();
}

注册中心实现类:

public class ZookeeperRegisterCenter implements RegisterCenter4Provider, RegisterCenter4Consumer {

    private static ZookeeperRegisterCenter registerCenter = new ZookeeperRegisterCenter();

    private ZookeeperRegisterCenter(){};

    public static ZookeeperRegisterCenter getInstance(){
        return registerCenter;
    }
    //服务提供者列表,key:服务提供者接口,value:服务提供者服务方法列表
    private static final Map<String,List<ServiceProvider>> providerServiceMap = new ConcurrentHashMap<>();

    //服务端 zookeeper 元信息,选择服务(第一次从zookeeper 拉取,后续由zookeeper监听机制主动更新)
    private static final Map<String,List<ServiceProvider>> serviceData4Consumer = new ConcurrentHashMap<>();

    //从配置文件中获取 zookeeper 服务地址列表
    private static String  ZK_SERIVCE = Configuration.getInstance().getAddress();

    //从配置文件中获取 zookeeper 会话超时时间配置
    private static int ZK_SESSION_TIME_OUT = 5000;

    //从配置文件中获取 zookeeper 连接超时事件配置
    private static int  ZK_CONNECTION_TIME_OUT = 5000;

    private static String ROOT_PATH = "/rpc_register";
    public  static String PROVIDER_TYPE = "/provider";
    public  static String CONSUMER_TYPE = "/consumer";

    private static volatile ZkClient zkClient = null;

    @Override
    public void initProviderMap() {
        if(serviceData4Consumer.isEmpty()){
            serviceData4Consumer.putAll(fetchOrUpdateServiceMetaData());
        }

    }

    @Override
    public Map<String, List<ServiceProvider>> getServiceMetaDataMap4Consumer() {
        return serviceData4Consumer;
    }

    @Override
    public void registerConsumer(List<ServiceConsumer> consumers) {
        if(consumers == null || consumers.size() == 0){
            return;
        }

        //连接 zookeeper ,注册服务
        synchronized (ZookeeperRegisterCenter.class){
            if(zkClient == null){
                zkClient = new ZkClient(ZK_SERIVCE,ZK_SESSION_TIME_OUT,ZK_CONNECTION_TIME_OUT, new SerializableSerializer());
            }
            //创建  zookeeper 命名空间
            boolean exist = zkClient.exists(ROOT_PATH);
            if(!exist){
                zkClient.createPersistent(ROOT_PATH,true);
            }
            //创建服务提供者节点
            exist = zkClient.exists((ROOT_PATH));
            if(!exist){
                zkClient.createPersistent(ROOT_PATH);
            }

            for(int i = 0; i< consumers.size();i++) {
                ServiceConsumer consumer = consumers.get(i);
                //创建服务消费者节点
                String serviceNode = consumer.getConsumer().getName();
                String servicePath = ROOT_PATH + CONSUMER_TYPE + "/" + serviceNode;

                exist = zkClient.exists(servicePath);
                System.out.println("exist:" + exist);
                System.out.println("servicePath:" + servicePath);
                if (!exist) {
                    zkClient.createPersistent(servicePath, true);
                }

                //创建当前服务器节点
                InetAddress addr = null;
                try {
                    addr = InetAddress.getLocalHost();
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                }
                String ip = addr.getHostAddress();
                String currentServiceIpNode = servicePath + "/" + ip;
                exist = zkClient.exists(currentServiceIpNode);
                if (!exist) {
                    zkClient.createEphemeral(currentServiceIpNode);
                }


            }


        }

    }

    @Override
    public void registerProvider(List<ServiceProvider> serivceList) {
        if(serivceList == null || serivceList.size() == 0){
            return;
        }
        
        //连接 zookeeper,注册服务,加锁,将所有需要注册的服务放到providerServiceMap里面
        synchronized (ZookeeperRegisterCenter.class){
            for(ServiceProvider provider:serivceList){
                //获取接口名称
                String serviceItfKey = provider.getProvider().getName();
                //先从当前服务提供者的集合里面获取
                List<ServiceProvider> providers = providerServiceMap.get(serviceItfKey);
                if(providers == null){
                    providers = new ArrayList<>();
                }
                providers.add(provider);
                providerServiceMap.put(serviceItfKey,providers);
            }

            if(zkClient == null){
                zkClient = new ZkClient(ZK_SERIVCE,ZK_SESSION_TIME_OUT,ZK_CONNECTION_TIME_OUT,new SerializableSerializer());
            }

            //创建当前应用 zookeeper 命名空间
            boolean exist = zkClient.exists(ROOT_PATH);
            if(!exist){
                zkClient.createPersistent(ROOT_PATH,true);
            }

            //服务提供者节点
            exist = zkClient.exists((ROOT_PATH));
            if(!exist){
                zkClient.createPersistent(ROOT_PATH);
            }

            for(Map.Entry<String,List<ServiceProvider>> entry:providerServiceMap.entrySet()){
                //创建服务提供者节点
                String serviceNode = entry.getKey();
                String servicePath = ROOT_PATH +PROVIDER_TYPE +"/" + serviceNode;
                exist = zkClient.exists(servicePath);
                if(!exist){
                    zkClient.createPersistent(servicePath,true);
                }

                //创建当前服务器节点,这里是注册时使用,一个接口对应的ServiceProvider 只有一个 
                int serverPort = entry.getValue().get(0).getPort();
                InetAddress addr = null;
                try {
                    addr = InetAddress.getLocalHost();
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                }
                String ip = addr.getHostAddress();
                String impl = (String)entry.getValue().get(0).getServiceObject();
                String serviceIpNode = servicePath +"/" + ip + "|" + serverPort + "|" + impl;
                System.out.println("serviceIpNode:" + serviceIpNode);
                exist = zkClient.exists(serviceIpNode);
                if(!exist){
                    //创建临时节点
                    zkClient.createEphemeral(serviceIpNode);
                }
                //监听注册服务的变化,同时更新数据到本地缓存
                zkClient.subscribeChildChanges(servicePath, new IZkChildListener() {
                    @Override
                    public void handleChildChange(String s, List<String> list) throws Exception {
                        if(list  == null){
                            list = new ArrayList<>();
                        }
                        //存活的服务 IP 列表
                        List<String> activeServiceIpList = new ArrayList<>();
                        for(String input:list){
                            String ip = StringUtils.split(input, "|").get(0);
                            activeServiceIpList.add(ip);
                        }
                        refreshActivityService(activeServiceIpList);
                    }
                });

            }
        }

    }

    /**
     * 
     * 在某个服务端获取自己暴露的服务
     */
    @Override
    public Map<String, List<ServiceProvider>> getProviderService() {
        return providerServiceMap;
    }
    
    
    //利用ZK自动刷新当前存活的服务提供者列表数据
    private void refreshActivityService(List<String> serviceIpList) {
        if (serviceIpList == null||serviceIpList.isEmpty()) {
            serviceIpList = new ArrayList<>();
        }

        Map<String, List<ServiceProvider>> currentServiceMetaDataMap = new HashMap<>();
        for (Map.Entry<String, List<ServiceProvider>> entry : providerServiceMap.entrySet()) {
            String key = entry.getKey();
            List<ServiceProvider> providerServices = entry.getValue();

            List<ServiceProvider> serviceMetaDataModelList = currentServiceMetaDataMap.get(key);
            if (serviceMetaDataModelList == null) {
                serviceMetaDataModelList = new ArrayList<>();
            }

            for (ServiceProvider serviceMetaData : providerServices) {
                if (serviceIpList.contains(serviceMetaData.getIp())) {
                    serviceMetaDataModelList.add(serviceMetaData);
                }
            }
            currentServiceMetaDataMap.put(key, serviceMetaDataModelList);
        }
        providerServiceMap.clear();
        providerServiceMap.putAll(currentServiceMetaDataMap);
    }


    private void refreshServiceMetaDataMap(List<String> serviceIpList) {
        if (serviceIpList == null) {
            serviceIpList = new ArrayList<>();
        }

        Map<String, List<ServiceProvider>> currentServiceMetaDataMap = new HashMap<>();
        for (Map.Entry<String, List<ServiceProvider>> entry : serviceData4Consumer.entrySet()) {
            String serviceItfKey = entry.getKey();
            List<ServiceProvider> serviceList = entry.getValue();

            List<ServiceProvider> providerServiceList = currentServiceMetaDataMap.get(serviceItfKey);
            if (providerServiceList == null) {
                providerServiceList = new ArrayList<>();
            }

            for (ServiceProvider serviceMetaData : serviceList) {
                if (serviceIpList.contains(serviceMetaData.getIp())) {
                    providerServiceList.add(serviceMetaData);
                }
            }
            currentServiceMetaDataMap.put(serviceItfKey, providerServiceList);
        }

        serviceData4Consumer.clear();
        serviceData4Consumer.putAll(currentServiceMetaDataMap);
    }


    private Map<String, List<ServiceProvider>> fetchOrUpdateServiceMetaData() {
        final Map<String, List<ServiceProvider>> providerServiceMap = new ConcurrentHashMap<>();
        //连接zk
        synchronized (ZookeeperRegisterCenter.class) {
            if (zkClient == null) {
                zkClient = new ZkClient(ZK_SERIVCE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer());
            }
        }

        //从ZK获取服务提供者列表
        String providePath = ROOT_PATH+PROVIDER_TYPE;
        System.out.println("111111:"+providePath);
        List<String> providerServices = zkClient.getChildren(providePath);
        System.out.println(providerServices.toString());
        for (String serviceName : providerServices) {
            String servicePath = providePath +"/"+ serviceName;
            System.out.println("1100:"+servicePath);
            List<String> ipPathList = zkClient.getChildren(servicePath);
            System.out.println("ipPathList:"+ipPathList.toString());
            for (String ipPath : ipPathList) {
                String serverIp = ipPath.split("\\|")[0];
                String serverPort = ipPath.split("\\|")[1];
                String impl = ipPath.split("\\|")[2];
                List<ServiceProvider> providerServiceList = providerServiceMap.get(serviceName);
                if (providerServiceList == null) {
                    providerServiceList = new ArrayList<>();
                }
                ServiceProvider providerService = new ServiceProvider();

                try {
                    Class clazz = Class.forName(serviceName);
                    providerService.setProvider(clazz);
                } catch (ClassNotFoundException e) {
                    throw new RuntimeException(e);
                }

                providerService.setIp(serverIp);
                providerService.setPort(Integer.parseInt(serverPort));
                providerService.setServiceObject(impl);
                providerService.setGroupName("");
                providerServiceList.add(providerService);

                providerServiceMap.put(serviceName, providerServiceList);
            }

            //监听注册服务的变化,同时更新数据到本地缓存
            zkClient.subscribeChildChanges(servicePath, new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    if (currentChilds == null) {
                        currentChilds = new ArrayList<>();
                    }
                    List<String> activeServiceIpList = new ArrayList<>();
                    for(String input:currentChilds){
                        String ip = StringUtils.split(input, "|").get(0);
                        activeServiceIpList.add(ip);
                    }
                    refreshServiceMetaDataMap(activeServiceIpList);
                }
            });
        }
        return providerServiceMap;
    }

}

写完这部分整个 rpc 框架也就实现了,测试的客户端和服务端在代码里也有,这里就不贴出来了。平时时间有限,只能下班和周末的时间来写,整个框架肯定有不足和错误的地方,也有可以改进的地方。希望大家能够不吝指教,互相进步。

我只是想将自己思考的过程给大家展示出来,希望大家一起讨论这些问题,看看还有哪些能够改进的地方。

需要改进的地方:

  1. 服务端的启动方式。
  2. 更高并发的改进。
  3. 服务治理。
  4. 监测中心。

这篇文章没有收费,如果您觉得对你的编程或多或少有点启发就点个赞。

最后给出源码 github 地址,源码 编码和码字不易,如果您觉得学到了东西就请在 github 上加个 star, 当然在 github 上提出问题一起改进是最好的。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容