异步处理http请求同步返回结果

如何设计一个接口,使用消息队列异步请求,但是客户端同步收到结果

  异步处理,同步返回?为什么会有这样一个需求?既然接口要求同步返回,那么直接阻塞就好了,要什么异步消息同步返回?高并发保护系统的手段是缓存、限流、降级。限流有许多的手段,想令牌桶、漏桶算法按数量限流,也有使用消息队列,排队限流的。至于使用消息队列的好处就不多说了,这里主要将如何实现这个需求,有一个系统比较的不稳定,但是没人维护,又不能替换它,只能在他的上层加一层来保护她,可以限流处理,也可以用mq让它以他的最大处理能力处理。说白了这东西就是一个缓冲系统,可替代性高,存粹的技术型应用,由于新鲜所以我觉得可以一试;

选型

  首先我们来选型,分析需求:使用消息队列异步请求,那么选型消息队列: zeromq、rabbitmq、activemq、kafka、rocketmq等等,消息队列很多如果没有什么要求,那么都可以选,但是首先我们需要考虑实现问题呢,使用的mq是否支持。我们需要可以排队,那么zeromq就不能选了,activemq有较小概率丢失消息,一般我不太爱用这个。好了我们实现这个需求不需要什么复杂的功能,那么剩下的都是可以选的,接下来就是考虑架设成本和易用性的问题。rabbitmq的时效性非常的好,但是吞吐量不及kafka和rocketmq,而且隔热你用的较少;所以一般来说我习惯在kafka和rocketmq中选择。rocketmq综合性能比较好,而且有很多的功能(消息提交重新消费、延时消费等),做支付金融首选rocketmq,但是我们这里不需要用到这些,所以这里用了kafka。
  有了异步处理消息的mq,我们还需要一个保存mq处理完的返回值队列,能让阻塞的线程获取到。因为要分布式的,所以这个队列不能是java中的数据,所以这里使用redis保存mq处理完的数据。

架构

  接下来我们先构造系统,首先我们有一个web服务,用来接收http的请求,接受请求后发送mq处理,然后阻塞当前处理的线程,等待mq处理完成,从redis的队列中取出数据,现在还差一个mq的接收方,实现一个server服务,接受mq消息并处理,然后将数据放入redis,并且通知web的这个线程消息已经处理完毕,让web这个阻塞的线程取出redis中处理完成的数据。至于通知需要广播通知,因为分布式的话这个处理请求的线程会在任意一台web服务中,至于这个通知我们可以用redis的发布订阅功能来实现;
  整体我们就有2个服务,一个web,一个server,之间通过mq通信,redis共享数据,redis发布订阅同步状态唤醒线程。

实现

  首先是web端的实现,简单的springboot项目加上web依赖,这里不赘述,这里我们模拟场景:我们需要去一个三方系统获取用户信息,通过后台http调用获取他的用户信息,用户要么输入手机,用户名或邮箱和密码(加密的);

@RestController
@RequestMapping("/async")
public class AsyncController {

    @Autowired
    private AsyncRequestExecutorService asyncService;

    /**
     * json处理工具 这里用的gson 也可以fasejson或其他
     */
    private Gson gson = new Gson();

    @RequestMapping(value = "/userInfo", method = RequstMethod.POST)
    public String getUserInfo(UserInfoQueryDTO userInfoQuery) {
        //正常的话我们调用一个service就得到返回值就同步返回了,客户端就能获取到相关信息;但是这里要异步处理,同步返回,我们准备一个异步处理的线程池来处理;先看AsyncRequestExecutorService;
        try{
            Future<String> result asyncService.doRequest(UUID.randomUUID().toString(), gson.toJson(userInfoQuery));
            // 这里需要设置超时时间, 保证能给客户端一个反应(这里就实现了阻塞)
            return result.get(5, TimeUnit.SECONDS);
        } catch(Exception e) {
            //这里要么超时要么失败处理
        }

    }


}

@Data
public class UserInfoQueryDTO {

    private String mobile;

    private Strig userName;

    private String email;

    private String password;
}

@Service
public class AsyncRequestExecutorService {

    /**
     * 线程名称方便定位问题
     */
    private String final threadName = "ASYNC-THREAD-";

    /**
     * 这就是kafkaspringboot的简单集成使用kafkaTemplate的发送消息这里不详述
     */
    @Autowired
    private KafkaMessagePublisher messagePublisher;

    /**
     * redis的集成,保存mq处理完成后的结果集,这里用的是set数据结构,因为这里请求时一次性的 返回就移除pop()方法正好满足, 而且请求不重复
     */
    @Autowired;
    private ResponseRedisCache responseCache;

    /**
     * 线程池
     */
    private ExecutorService executorService;

    public AsyncRequestExecutorService() {
        this.executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 8,
                200,
                3000,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                new ThreadFactory() {
                    private AtomicInteger count = new AtomicInteger(0);
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, THREAD_NAME + count.incrementAndGet());
                    }
        });
    }

    /**
     * 实现异步处理的关键
     * @param requestId 当前请求的id 可以用uuid
     * @param message 当前参数(json格式)
     * @return 返回一个Future用来阻塞
     *
     */
    public Future<String> doRequest(String requestId, String message) {
        // 我们收到消息后发送mq处理(这里一定要将requestId一起处理,方便server处理完放入redis后的存取)
        messagePublisher.send(requestId, message);
        // 返回一个线程处理
        return executorService.submit(() -> {
            //一个静态map保存当前线程(注意使用ConcurrentHashMap)
            GlobalThreadMap.parkThreadMap.put(requestId, Thread.currentThread());
            // 立即阻塞,应为它不会马上处理完成的(最多阻塞5s)
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            // 之后是线程被唤醒的处理
            // 首先从静态map中删除当前请求的线程
            GlobalThreadMap.parkThreadMap.remove(requestId);
            // 返回从redis中取出的结果,是null也直接返回(因为5s的阻塞时间,过了5s还没处理完就需要响应客户端了,但是这时redis还没有数据);
            return responseCache.pop(requestId);
        })    
    }

}

以上是web端的处理,是关键部分,接下来是server端的处理,比较简单,就简单叙述下:
在web中向kafka中推送了一条获取用户信息的消息,接下来就只要处理一下步骤:
srver端消费消息
反序列化
http调用第三方,同步获取返回结果(这里注意配置http调用的超时时间和异常处理)
将http的返回结果用消息中的requestId作为key写入redis
最后通过发布订阅返回requestId处理完成的消息

到这里这条请求的处理又回到了web端:
web端收到了redis的发布订阅消息,从GlobalThreadMap中用发布订阅的requestId(也就是一开始的UUID生成的id)取出被park的线程执行unpark唤醒,之后result.get(5, TimeUnit.SECONDS)就能获取从redis中取出的数据完成一次请求处理;

总结

  1. 使用kafka异步处理请求,注意消费端消费请求的时候用多线程处理。
  2. 使用Future阻塞处理的线程达到同步返回的目的。
  3. 使用redis保存处理结果按请求id取出;发布订阅通知阻塞的线程处理完成(否则就要轮询队列来判断是否完成,浪费cpu)。
  4. 其中使用到的kafka和redis都是springboot的整合,这个又大量教程。
  5. 其中的线程数等优化还需结合实际,也许不会比直接限流强多少。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350

推荐阅读更多精彩内容