参考dubbo实现异步转同步方案

1. 需求背景

在支付系统中,因为上游通道的差异化,有时会出现这种情形:

  1. 上游提供的接口是异步的,即上游接收请求后,立即响应处理中(中间状态),最终处理结果以异步通知的形式告知;

  2. 平台对接多家上游机构,有同步响应也有异步响应,为了标准统一,平台对下游统一封装了同步响应接口,方便下游处理业务。

此时,对于异步响应的通道,平台需要通过异步转同步功能,让下游无感知。

2. 方案简介

请求与响应流程

具体流程见上图:

    1. 客户端同步请求服务端(集群);
  • 2.1 将orderNo/uniqueId作为key/value 放入redis缓存,其中uniqueId用于异步转同步识别请求线程;

  • 2.2 服务端请求第三方服务;

(中间忽略了同步响应的中间状态响应)

    1. 第三方发起异步响应到服务端集群;

(此时,不能保证异步响应路由到源请求的节点上)

  • 4.1 按照orderNo查询redis,获取请求的uniqueId;

  • 4.2 按约定组装结果放入MQ中(此处使用RabbitMq演示);

    1. MQ向消费者广播;
  • 5.1 节点接收广播后,识别是该节点发出的请求,则进行响应处理;

  • 5.2 节点接收广播后,识别不是该节点发出的请求,则丢弃该通知;

    1. 在未超时的情况下,同步响应客户端。

3. 技术要点

参考Dubbo 2.5.x com.alibaba.dubbo.remoting.exchange.support. DefaultFuture类

主要使用 ReentrantLock解决互斥问题,使用 Condition 实现超时等待功能。对DefaultFuture做适当的简化之后,示例如下:

public class DefaultFuture {

    private static final Map<String, DefaultFuture> FUTURES  = new ConcurrentHashMap<>();

    private final String id;
    private final int timeout;
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();
    private final long start = System.currentTimeMillis();
    private volatile Response response;
    private final Request request;

    public DefaultFuture(Request request, int timeout){
        this.id = request.getUniqueId();
        this.timeout = timeout > 0 ? timeout : Constants.DEFAULT_TIMEOUT;
        this.request = request;
        // put into waiting map.
        FUTURES.put(id, this);
    }

    /**
     * 阻塞获取响应
     * @return
     * @throws TimeoutException
     */
    public Object get() throws Exception {
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    // 超时等待
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 如果有返回结果了,或者,超时了,就退出循环
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // 如果是超时了,就抛出异常
            if (!isDone()) {
                throw new TimeoutException(request.getUniqueId());
            }
        }
        // 远程服务正常返回结果,则返回给调用方
        return returnFromResponse();

    }

    private Object returnFromResponse() throws Exception {
        Response res = response;
        if (res.getStatus() == Response.OK) {
            return response.getResult();
        }
        if (res.getStatus() == Response.TIMEOUT) {
            throw new TimeoutException(res.getErrorMessage());
        }
        FUTURES.remove(id);
        throw new Exception(res.getErrorMessage());
    }

    /**
     * 是否响应
     * @return
     */
    private boolean isDone(){
        return this.response != null;
    }

    public static void received(Response response) {
        try {
            // 根据请求id从FUTURES中获取DefaultFuture,并删除
            DefaultFuture future = FUTURES.remove(response.getUniqueId());
            if (future != null) {
                future.doReceived(response);
            } else {
                log.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response);
            }
        } finally {
        }
    }

    private void doReceived(Response response) {
        lock.lock();
        try {
            this.response = response;
            this.response.setStatus(Response.OK);
            if (done != null) {
                // 唤醒阻塞的线程
                done.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * 是否是正确的响应
     * @param id
     * @return
     */
    public static boolean isCorrectResponse(String id){
        return FUTURES.containsKey(id);
    }
    private int getTimeout() {
        return timeout;
    }

    private long getStartTimestamp() {
        return start;
    }

    public String getId() {
        return id;
    }

    private static class RemotingInvocationTimeoutScan implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            // create exception response.
                            Response timeoutResponse = new Response(future.getId());
                            // set timeout status.
                            timeoutResponse.setErrorMessage("响应超时");
                            // handle response.
                            DefaultFuture.received(timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    log.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }
        }
    }

    static {
        // 自动清除超时任务
        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "remoting-invocation-timeout-scan");
        th.setDaemon(true);
        th.start();
    }

}

4. 测试

  1. 测试类接收请求后,直接组装响应,将响应放入mq,然后调用DefaultFuture.get()阻塞等待响应;
  2. mq发送广播,测试项目接收后,按流程通过DefaultFuture.received()处理响应;
  3. DefaultFuture.doReceived()方法中,唤醒等待的请求线程,测试完成。

测试环境: Macbook Pro 8G + Spring boot(单实例)+ jmeter(共100个线程)

线程组
TPS
RabbitMQ
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354