手写RPC框架(2) 引入zookeeper做服务治理

本人微信公众号(jwfy)欢迎关注

上一期完成了手写一个RPC框架,看看100个线程同时调用效果如何,但还是遗留了很多问题以及可以优化的点,这次就完全重写之前的代码,演进到v2版本,使得代码逻辑更加规范的同时,引入ZooKeeper辅助完成服务治理。

在代码展示之前还是先介绍一些基本的概念以及设计思路,ZooKeeper是什么,服务治理又是什么等,最后贴了部分关键代码以说明和v1版本的区别,有哪些点的改进措施。

最后还提了个问题:线程池打满了怎么办?,你有什么好的解决方案呢?

ZooKeeper

ZooKeeper(直译为动物管理员,简称zk)是一个分布式、开源的应用协调服务,利用和Paxos类似的ZAB选举算法实现分布式一致性服务。有类似于Unix文件目录的节点信息,同时可以针对节点的变更添加watcher监听以能够即使感知到节点信息变更。可提供的功能例如域名服务、配置维护、同步以及组服务等(此功能介绍来自官网描述:It exposes common services - such as naming, configuration management, synchronization, and group services - in a simple interface)。如下图就是DUBBO存储在ZooKeeper的节点数据情况。

image

在本地启动服务后通过zk客户端连接后也可通过命令查看节点信息,如下图所示。

image

ZooKeeper包含了4种不同含义的功能节点,在每次创建节点之前都需要明确声明节点类型。

类型 定义 描述
PERSISTENT 持久化目录节点 客户端与zookeeper断开连接后,该节点依旧存在
PERSISTENT_SEQUENTIAL 持久化顺序编号目录节点 客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
EPHEMERAL 临时目录节点 客户端与zookeeper断开连接后,该节点被删除
EPHEMERAL_SEQUENTIAL 临时顺序编号目录节点 客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号

ZooKeeper使用之前需要先进行安装,后开启服务端的服务,我们的服务作为客户端连接ZooKeeper以便于后续的操作。具体可参考官网文档Zookeeper3.5.5 官方文档,在实际的java项目开发中也是可以通过maven引入ZkClient或者Curator开源的客户端,在本文学习笔记中是使用的Curator,因为其已经封装了原始的节点注册、数据获取、添加watcher等功能。具体maven引入的版本如下,

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

服务治理

服务治理也就是针对服务进行管理的措施,例如服务发现服务暴露负载均衡快速上下线等都是服务治理的具体体现。

  • 服务发现:从服务管理中心获取到需要的服务相关信息,例如我们可以从zk中获取相关服务的机器信息,然后我们就可以和具体机器直连完成相关功能
  • 服务暴露:服务提供方可以提供什么样子的功能,经过服务暴露暴露出去,其他使用方就可以通过服务发现发现具体的服务提供方信息
  • 负载均衡:一般针对的是服务提供方,避免大量请求同时打到一台机器上,采用随机、轮询等措施让请求均分到各个机器上,提供服务效率,限流灰度等也都是类似的操作,通过动态路由、软负载的形式处理分发请求。
  • 快速上线下:以往需要上下线可能需要杀掉机器上的进程,现在只需要让该服务停止暴露即可,实现服务的灵活上下线。

数据处理流程

服务端:服务的提供方,接受网络传输的请求数据、通过网络把应答数据发送给客户端
客户端:服务的调用方,使用本地代理,通过网络把请求数据发送出去,接受服务端返回的应答数据

image

所有的数据传输都是按照上面图片说的流程来的,如果需要添加自定义的序列化工具,则需要在把数据提交到socket的输出流缓冲区之前按照序列化工具完成序列化操作,反序列化则进行反向操作即可。

RPC 实践 V2版本

文件夹目录如下图所示,其中:

  • balance文件夹:负载均衡有关
  • config文件夹:网络套接字传输的数据模型以及服务暴露、服务发现的数据模型
  • core文件夹:核心文件夹,包含了服务端和客户端的请求处理、代理生成等
  • demo文件夹:测试试用
  • io.protocol文件夹:目前是只有具体的请求对象和网络io的封装
  • register:服务注册使用,实现了使用zk进行服务注册和服务发现的操作
image

由于代码太长,只贴部分重要的代码操作。

服务暴露 & 服务发现

public interface ServiceRegister {

    /**
     * 服务注册
     * @param config
     */
    void register(BasicConfig config);

    /**
     * 服务发现,从注册中心获取可用的服务提供方信息
     * @param request
     * @return
     */
    InetSocketAddress discovery(RpcRequest request, ServiceType nodeType);
}

默认使用了CuratorFramework客户端完成zk数据的操作

public class ZkServiceRegister implements ServiceRegister {

    private CuratorFramework client;

    private static final String ROOT_PATH = "jwfy/simple-rpc";

    private LoadBalance loadBalance = new DefaultLoadBalance();

    public ZkServiceRegister() {
        RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);

        this.client = CuratorFrameworkFactory
                .builder()
                .connectString("127.0.0.1:2182")
                .sessionTimeoutMs(50000)
                .retryPolicy(policy)
                .namespace(ROOT_PATH)
                .build();
        // 业务的根路径是 /jwfy/simple-rpc ,其他的都会默认挂载在这里

        this.client.start();
        System.out.println("zk启动正常");
    }

    @Override
    public void register(BasicConfig config) {
        String interfacePath = "/" + config.getInterfaceName();
        try {
            if (this.client.checkExists().forPath(interfacePath) == null) {
                // 创建 服务的永久节点
                this.client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .forPath(interfacePath);
            }

            config.getMethods().forEach(method -> {
                try {
                    String methodPath = null;
                    ServiceType serviceType = config.getType();
                    if (serviceType == ServiceType.PROVIDER) {
                        // 服务提供方,需要暴露自身的ip、port信息,而消费端则不需要
                        String address = getServiceAddress(config);
                        methodPath = String.format("%s/%s/%s/%s", interfacePath, serviceType.getType(), method.getMethodName(), address);
                    } else {
                        methodPath = String.format("%s/%s/%s", interfacePath, serviceType.getType(), method.getMethodName());
                    }
                    System.out.println("zk path: [" + methodPath + "]");
                    this.client.create()
                            .creatingParentsIfNeeded()
                            .withMode(CreateMode.EPHEMERAL)
                            .forPath(methodPath, "0".getBytes());
                    // 创建临时节点,节点包含了服务提供段的信息
                } catch (Exception e) {
                    e.getMessage();
                }
            });
        } catch (Exception e) {
            e.getMessage();
        }
    }

    @Override
    public InetSocketAddress discovery(RpcRequest request, ServiceType nodeType) {
        String path = String.format("/%s/%s/%s", request.getClassName(), nodeType.getType(), request.getMethodName());
        try {
            List<String> addressList = this.client.getChildren().forPath(path);
            // 采用负载均衡的方式获取服务提供方信息,不过并没有添加watcher监听模式
            String address = loadBalance.balance(addressList);
            if (address == null) {
                return null;
            }
            return parseAddress(address);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private String getServiceAddress(BasicConfig config) {
        String hostInfo = new StringBuilder()
                .append(config.getHost())
                .append(":")
                .append(config.getPort())
                .toString();
        return hostInfo;
    }

    private InetSocketAddress parseAddress(String address) {
        String[] result = address.split(":");
        return new InetSocketAddress(result[0], Integer.valueOf(result[1]));
    }

    public void setLoadBalance(LoadBalance loadBalance) {
        // 可以重新设置负载均衡的策略
        this.loadBalance = loadBalance;
    }
}
image

服务启动后利用zkclient查询到在zk中包含的节点信息,其中默认的命名空间是jwfy/simple-rpc

负载均衡

public interface LoadBalance {
    String balance(List<String> addressList);
}
public abstract class AbstractLoadBalance implements LoadBalance {

    @Override
    public String balance(List<String> addressList) {
        if (addressList == null || addressList.isEmpty()) {
            return null;
        }
        if (addressList.size() == 1) {
            return addressList.get(0);
        }
        return doLoad(addressList);
    }

    abstract String doLoad(List<String> addressList);
}
public class DefaultLoadBalance extends AbstractLoadBalance {

    @Override
    String doLoad(List<String> addressList) {
        Random random = new Random();
        // 利用随机函数选择一个,其中random.nextIn生成的数据是在[0, size) 之间
        return addressList.get(random.nextInt(addressList.size()));
    }
}

上面的负载均衡代码其实很简单,就是从一个机器列表addressList中选择一个,如果列表为空或者不存在则直接返回null,如果机器只有1台则直接获取返回即可,当列表记录超过1条后利用随机函数生成一个列表偏移量以获取对应数据。也可以按照类似完成更多负载均衡的策略,然后调用setLoadBalance方法就可以了。

IO 处理

public interface MessageProtocol {
    /**
     * 服务端解析从网络传输的数据,转变成request
     * @param inputStream
     * @return
     */
    void serviceToRequest(RpcRequest request, InputStream inputStream);

    /**
     * 服务端把计算机的结果包装好,通过outputStream 返回给客户端
     * @param response
     * @param outputStream
     * @param <T>
     */
     <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream);

    /**
     * 客户端把请求拼接好,通过outputStream发送到服务端
     * @param request
     * @param outputStream
     */
     void clientToRequest(RpcRequest request, OutputStream outputStream);

    /**
     * 客户端接收到服务端响应的结果,转变成response
     * @param response
     * @param inputStream
     */
     void clientGetResponse(RpcResponse response, InputStream inputStream);
}

实现类DefaultMessageProtocol

public class DefaultMessageProtocol implements MessageProtocol {

    @Override
    public void serviceToRequest(RpcRequest request, InputStream inputStream) {
        try {
            ObjectInputStream input = new ObjectInputStream(inputStream);

            request.setClassName(input.readUTF());
            request.setMethodName(input.readUTF());
            request.setParameterTypes((Class<?>[])input.readObject());
            request.setArguments((Object[])input.readObject());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream) {
        try {
            ObjectOutputStream output = new ObjectOutputStream(outputStream);

            output.writeBoolean(response.getError());
            output.writeObject(response.getResult());
            output.writeObject(response.getErrorMessage());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void clientToRequest(RpcRequest request, OutputStream outputStream) {
        try {
            ObjectOutputStream ouput = new ObjectOutputStream(outputStream);

            ouput.writeUTF(request.getClassName());
            ouput.writeUTF(request.getMethodName());
            ouput.writeObject(request.getParameterTypes());
            ouput.writeObject(request.getArguments());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void clientGetResponse(RpcResponse response, InputStream inputStream) {
        try {
            ObjectInputStream input = new ObjectInputStream(inputStream);

            response.setError(input.readBoolean());
            response.setResult(input.readObject());
            response.setErrorMessage((String) input.readObject());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

服务端请求处理

public class ServiceHandler {

    private ThreadPoolExecutor executor = null;
    private RpcService rpcService;
    private MessageProtocol messageProtocol;

    public ServiceHandler(RpcService rpcService) {
        this.rpcService = rpcService;

        ThreadFactory commonThreadName = new ThreadFactoryBuilder()
                .setNameFormat("Parse-Task-%d")
                .build();

        this.executor = new ThreadPoolExecutor(
                10,
                10,
                2,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(200),
                commonThreadName, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                SocketTask socketTask = (SocketTask) r;
                Socket socket = socketTask.getSocket();
                if (socket != null) {
                    try {
                        socket.close();
                        System.out.println("reject socket:" + socketTask + ", and closed");
                        // 无法及时处理和响应的就快速拒绝掉
                    } catch (IOException e) {
                    }
                }
            }
        });
    }

    public RpcService getRpcService() {
        return rpcService;
    }

    public void setRpcService(RpcService rpcService) {
        this.rpcService = rpcService;
    }

    public MessageProtocol getMessageProtocol() {
        return messageProtocol;
    }

    public void setMessageProtocol(MessageProtocol messageProtocol) {
        this.messageProtocol = messageProtocol;
    }

    public void handler(Socket socket) {
        // 接收到新的套接字,包装成为一个runnable提交给线程去执行
        this.executor.execute(new SocketTask(socket));
    }

    class SocketTask implements Runnable {

        private Socket socket;

        public SocketTask(Socket socket) {
            this.socket = socket;
        }

        public Socket getSocket() {
            return socket;
        }

        @Override
        public void run() {
            try {
                InputStream inputStream = socket.getInputStream();
                OutputStream outputStream = socket.getOutputStream();

                RpcRequest request = new RpcRequest();
                messageProtocol.serviceToRequest(request, inputStream);
                // 获取客户端请求数据,统一包装成RpcRequest
                RpcResponse response = rpcService.invoke(request);
                // 反射调用,得到具体的返回值
                System.out.println("request:[" + request + "], response:[" + response + "]");
                messageProtocol.serviceGetResponse(response, outputStream);
                // 再返回给客户端
            } catch (Exception e) {
                // error
            } finally {
                if (socket != null) {
                    // SOCKET 关闭一定要加上,要不然会出各种事情
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

客户端 代理对象

public class ProxyInstance implements InvocationHandler {

    private RpcClient rpcClient;
    private Class clazz;

    public ProxyInstance(RpcClient client, Class clazz) {
        this.rpcClient = client;
        this.clazz = clazz;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest request = new RpcRequest();
        request.setClassName(clazz.getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setArguments(args);

        // 获取服务提供方信息,这里既是服务发现的入口,找到一个合适的可用的服务提供方信息
        InetSocketAddress address = rpcClient.discovery(request);
        System.out.println("[" + Thread.currentThread().getName() + "]discover service:" + address);

        // 发起网络请求,得到请求数据
        RpcResponse response = rpcClient.invoke(request, address);
        return response.getResult();
    }
}

上面的InetSocketAddress address = rpcClient.discovery(request)是相比v1多了一个最重要的东西,每次获取请求后都实时从zk中获取对应的服务提供方信息,这就是服务发现

实践

public class Client {

    public static void main(String[] args) {
        RpcClient rpcClient = new RpcClient();

        rpcClient.subscribe(Calculate.class);
        rpcClient.start();

        Calculate<Integer> calculateProxy = rpcClient.getInstance(Calculate.class);

        for(int i=0; i< 200; i++) {
            new Thread(() -> {
                long start = System.currentTimeMillis();
                int s1 = new Random().nextInt(100);
                int s2 = new Random().nextInt(100);
                int s3 = calculateProxy.add(s1, s2);
                System.out.println("[" + Thread.currentThread().getName() + "]a: " + s1 + ", b:" + s2 + ", c=" + s3 + ", 耗时:" + (System.currentTimeMillis() - start));
            }).start();
        }
    }
}

客户端开启200个线程后,执行结果是顺利执行的,在服务端开启的接受请求被添加到线程池中,而代码中线程池的任务队列长度是200,可以完全的存储200个线程,但是如果我们把客户端请求量从200个改成500个呢,又会出现什么情况?

服务端

image

客户端

image

image

如上述的图片显示,当请求量打满线程池之后,线程池的拒绝策略就开始生效了,在本代码中是直接调用了close操作,而客户端感知到关闭后也会出现io错误,而正常的请求则顺利执行。其中还有输出discover服务发现了服务提供方的机器信息,这也是符合起初的想法的。

这里一定要加上一些策略以及时关闭无法处理的socket,否则就会出现服务提供方无任何可执行,但是服务调用方却还在等待中,因为socket并没有关闭,从而出现资源被占用了,还不执行相关任务。

提个问题:线程池打满了怎么办?

在本demo中采取了非常粗暴的策略,直接丢弃了无法处理的任务,在实际的线上业务中,可以先加机器以能再最短的时间内恢复线上情况,后期结合业务特点提出针对性的解决方案。如果业务接受一定的延迟,可以考虑接入kafka类似的消息队列(削峰是mq的一大特点);如果对时间要求很高,要么加机器,要么压榨机器性能,可能之前设置的线程池的数量太小,那就需要调节线程池的各个核心数据,修改线程池的任务队列类型也是可以考虑的;此外也有可能是业务耗时太多,无法及时处理完全造成请求堆积导致的,那么就需要考虑业务的同步改异步化。最后线上告警也需要及时完善。

没有绝对的解决方案,只有最合适当下场景的方案,没有银弹,任何不具体结合业务的方案都是扯淡。

总结思考

v2版本相比v1版本修改了整个代码结构,使得结构能够更加明确,引入zookeeper作为服务治理功能,大致介绍了zookeeper的特点以及功能,给服务注册、服务发现、序列化协议等均留下了口子,以便实现自定义的协议,v1的io模型是BIO,v2并没有变化,只是由单线程改造成多线程。

整体而言符合一个简单的rpc框架,依旧还是有很多点可以完善、优化的点,如:

  • io模型还是没有替换,后面考虑直接整体接入netty;
  • 也不应该每次实时从zk获取节点信息,应该先设置一个本地缓存,再利用zookeeper的watcher功能,开启一个异步线程去监听更新本地缓存,降低和zk交互带来的性能损耗;
  • 也没有快速失败、重试的功能,客观情况下存在网络抖动的问题,重试就可以了
  • 整体的各种协议约定并没有明确规范,比较混乱

本人微信公众号(搜索jwfy)欢迎关注

微信公众号
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,744评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,505评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,105评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,242评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,269评论 6 389
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,215评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,096评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,939评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,354评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,573评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,745评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,448评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,048评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,683评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,838评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,776评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,652评论 2 354

推荐阅读更多精彩内容