rpc系列4-处理超时场景.及提供hook

问题:客户端发起远程调用,如果服务端长时间不返回怎么办?

这就涉及到一个调用超时的问题,平时我们应用中很多场景都会规定超时时间,比如:sql查询超时,http请求超时等。那么如果服务端方法执行的时间超过规定的timeout时间,那么客户端就需要调出当前调用,抛出TimeoutException。

好了,下面开始对RpcBuidler进行改造了,让其支持超时情况的处理。同样,先给出预期的测试方案和结果:

// 业务类UserService在之前的基础上增加超时调用的方法:
public interface UserService {
    
    // other method
    
    /**
     * 超时测试
     */
    public boolean timeoutTest();
    
}
//实现类
public class UserServiceImpl implements UserService {
    
     // other method
    
    @Override
    public boolean timeoutTest() {
        try {
            //模拟长时间执行
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {}
        return true;
    }
}

ClientTest中测试代码:

    @Test
    public void timeoutTest(){
        long beginTime = System.currentTimeMillis();
        try {
            boolean result = userService.timeoutTest(); 
        } catch (Exception e) {
            long period = System.currentTimeMillis() - beginTime;
            System.out.println("period:" + period);
            Assert.assertTrue(period < 3100);
        }
    }

有了异步方法的实现经验,其实这个超时处理过程和异步非常类似,都是利用Future机制来实现的,下面对doInvoke方法进行重构,返回一个异步任务:

    private Future<RpcResponse> doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{

        //构造并提交FutureTask异步任务
        Future<RpcResponse> retVal = (Future<RpcResponse>) handlerPool.submit(new Callable<RpcResponse>(){
            @Override
            public RpcResponse call() throws Exception {
                Object res = null;
                try{
                    //创建连接,获取输入输出流
                    Socket socket = new Socket(host,port);
                    try{
                        ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
                        ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
                        try{
                            //发送
                            out.writeObject(request);
                            //接受server端的返回信息---阻塞
                            res = in.readObject();
                        }finally{
                            out.close();
                            in.close();
                        }
                    }finally{
                        socket.close();
                    }
                }catch(Exception e){
                    throw e;
                }
                return (RpcResponse)res;
            }
        });
        return retVal;
    }

回调方法invoke修改如下:

    @Override
    public Object invoke(Object proxy, Method method,
            Object[] args) throws Throwable {
        //如果是异步方法,立即返回null
        if(asyncMethods.get().contains(method.getName())) return null;
        Object retVal = null;

        RpcRequest request = new RpcRequest(method.getName(), method.getParameterTypes(),args,RpcContext.getAttributes());
        RpcResponse rpcResp  = null;
        try{
            Future<RpcResponse> response = doInvoke(request);
            //获取异步结果
            rpcResp  = (RpcResponse)response.get(TIMEOUT,TimeUnit.MILLISECONDS);
        }catch(TimeoutException e){
            throw e;
        }catch(Exception e){}
        
        if(!rpcResp.isError()){
            retVal = rpcResp.getResponseBody();
        }else{
            throw new RpcException(rpcResp.getErrorMsg());
        }
        return retVal;
    }

可见,经过这样改造后,所有的方法调用都是通过Future获取结果。

提供Hook,让开发人员进行RPC层面的AOP。

首先看下题目提供的Hook接口:

public interface ConsumerHook {
    public void before(RpcRequest request);
    public void after(RpcRequest request);
}
//实现类
public class UserConsumerHook implements ConsumerHook{
    @Override
    public void before(RpcRequest request) {
        RpcContext.addAttribute("hook key","this is pass by hook");
    }

    @Override
    public void after(RpcRequest request) {
        System.out.println("I have finished Rpc calling.");
    }
}

hook实现的功能很简单,即在客户端进行远程调用的前后执行before和after方法。

public final class RpcConsumer implements InvocationHandler{

    //。。。
    
    //钩子
    private ConsumerHook hook;

    public RpcConsumer hook(ConsumerHook hook){
        this.hook = hook;
        return this;
    }

static{
        userService = (UserService)consumer.targetHostPort(host, port)
                            .interfaceClass(UserService.class)
                            .timeout(TIMEOUT)
                            .hook(new UserConsumerHook())//新增钩子
                            .newProxy();
    }
//。。。
}

//UserServiceImpl中的测试方法
public Map<String, Object> getMap() {
        Map<String,Object> newMap = new HashMap<String,Object>();
        newMap.put("name","getMap");
        newMap.putAll(RpcContext.getAttributes());
        return newMap;
}

我们只需要在doInvoke方法开始出添加钩子函数的执行逻辑即可。如下:

    private Future<RpcResponse> doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{
        //插入钩子
        hook.before(request);
        //。。。
}

同时在asyncCall和invoke方法的结束添加after的执行逻辑。具体实现可以看源码。

github附上源码

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 静(娴静)女其姝,俟我于城隅(城角)。 爱(隐藏)而不见,搔首踟蹰。 静女其娈,贻我彤管(红管草)。 彤管有炜(红...
    ananli阅读 593评论 0 0
  • 花开的季节,很是喜欢去看花,特别是成片成片的花海。置身于花丛中,近距离接触花儿们,或娇艳或素雅,或热烈或清...
    温暖的简阅读 424评论 2 4