多线程应用(一)---Http请求阻塞回调处理

多线程应用(一)---Http请求阻塞回调处理

1.需求描述

1.1场景说明:

由于,微信端的业务需求量越来越大.将业务与微信第三方事件处理耦合在一起的单一项目的结构已经逐渐暴露出,承载能力不足的缺点.所以,需要将与微信的交互从业务逻辑中分离出,单独进行管理和处理.
这样做有以下几点好处:

  1. 可以达到业务解耦分离.
  2. 可以为业务系统微服务化做准备.
  3. 可以在解耦后针对性的对不同业务系统进行优化.
  4. 减少业务系统错误的影响面.

1.2技术难点说明:

微信中的通过http调用客户配置的回调地址的方式来进行事件通知.
事件通知分为两种类型:

  1. 发送http请求和数据以后,客户服务器默认回复success字符串.然后,业务系统业务处理完成后通过指定的http地址通知微信方
  2. 发送http请求和数据的同一个请求中需要客户服务器在response中返回业务处理的结果.

由于,我们已经将事件通知从主业务系统中抽离出.所以,微信事件管理系统会在收到微信事件通知以后,通过mq的方式进行发布事件.但是,事件通知的第二种类型需要在一个http请求中将业务处理数据带回微信方.此时,就需要微信事件管理系统阻塞微信方发送来的http请求,直到业务系统处理完业务数据并返回给系统.
这样,我们就需要有一个灵活,可靠的容器对被阻塞的微信方请求进行管理.

2.理论基础

2.1Future多线程模型:

2.1.1模型介绍:

future多线程模型,是一种比较常用的多线程阻塞回调的模式.具体逻辑结构如下图所示:

future时序图.png

具体处理逻辑是这样的.当一个请求发送到一个future模式的入口以后,此时这个线程是阻塞的.这时future的线程会进行后面的回调业务,或者是直接开始等待.直到,其他线程唤醒future线程或者future等待超时.此时,future线程唤醒,然后将具体的结果返回给调用线程.

2.2线程间通信:

2.2.1wait,notify,notifyAll

线程间通信有很多种方式,这次用到的是比较简单的wait notify notifyall组合.这3个方法是object类中的3个方法.他们控制的目标是对于这个实例的控制.所以,需要线程在获取到这个对象操作的monitor以后才能控制.一般使用的方法是通过synchronized关键字获取对象锁再调用这3个方法.如果,没有在同步代码块中执行,这时候java会报IllegalMonitorStateException异常.这样主要是为了控制当同一个对象实例被多个线程占用以后的操作问题.可以避免不同步的情况产生.

2.2.1.1wait

wait方法主要是用来将这个对象实例上的当前线程进行挂起.可以输入timeout时间,超过timeout时间以后线程会自动唤醒

2.2.1.2notify

notify方法用来唤醒在对应的对象实例上休眠的线程,但是需要注意的是,这个是非公平的.具体唤醒哪一个线程由jvm自行决定

2.2.1.3notifyall

notifyall方法顾名思义,是将在这个实例对象上所有挂起的线程唤醒.

3.实现思路:

  1. 容错能力:由于需要给多个业务服务提供消息分发,消息回复.需要有业务系统超时的处理能力.所以,提供的阻塞服务都会有timeout设定.
  2. 持续服务能力:我们需要提供持续稳定的服务.在项目中.对阻塞的请求会有一个溢出的管理.如果超出某个最大值,先入的请求就会被直接返回默认值.所以,在业务服务中需要自行处理幂等的问题,避免业务处理完成后,但是由于溢出导致业务处理失败.这样就会导致业务服务数据或者业务出现问题

4.具体实现:

ThreadHolder(消息载体):

import lombok.Data;

import java.util.concurrent.Callable;

/**
 * <p>
 * Description: com.javanewb.service
 * </p>
 * date:2017/10/31
 *
 * @author Dean.Hwang
 */
@Data
public abstract class ThreadHolder<T> implements Callable<T> {
    protected abstract T proData();//TODO 正常逻辑处理,以及默认数据返回

    private T defaultData;//返回的默认数据
    private Object needProData;//接受到需要处理的数据
    private Long createTime = System.currentTimeMillis();
    private Long maxWaitTime;
    private String mdc;
    private RequestHolder<T> holder;

    @Override
    public T call() throws Exception {
        waitThread();
        System.out.println("Thread mdc:" + mdc + "  notify");
        if (needProData == null) {
            holder.removeThread(mdc, false);
            return defaultData;
        }
        return proData();
    }

    public synchronized void waitThread() throws InterruptedException {
        this.wait(maxWaitTime);
    }

    public synchronized void notifyThread(Object needProData) {
        this.needProData = needProData;
        this.notify();
    }

    public synchronized void notifyDefault() {
        this.notify();
    }
}

RequestHolder(请求管理容器):


import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * <p>
 * Description: com.javanewb.entity
 * </p>
 * date:2017/10/26
 *
 * @author Dean.Hwang
 */
public class RequestHolder<T> {
    private Integer maxSize;
    private Long waitTime;

    public RequestHolder(Integer maxSize, Long maxWait, ExecutorService executorService) {
        if (maxSize > 1000) {
            throw new BusinessException(1022, "Bigger than max size num");
        }
        this.maxSize = maxSize;
        this.waitTime = maxWait;
        if (executorService != null) {
            this.executorService = executorService;
        } else {
            this.executorService = new ThreadPoolExecutor(Math.max(1, maxSize / 5), maxSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(maxSize));
        }
    }

    public RequestHolder(Integer maxSize, Long maxWait) {
        if (maxSize > 1000) {
            throw new BusinessException(1022, "Bigger than  max size num");
        }
        this.waitTime = maxWait;
        this.maxSize = maxSize;
        this.executorService = new ThreadPoolExecutor(Math.max(1, maxSize / 5), maxSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(maxSize));
    }

    private ExecutorService executorService;
    private final Map<String, ThreadHolder<T>> holderMap = new ConcurrentHashMap<>();
    private List<String> mdcOrderList = new CopyOnWriteArrayList<>();
    private AtomicBoolean isCleaning = new AtomicBoolean(false);

    public ThreadHolder<T> removeThread(String mdc, boolean needNotifyDefault) {
        mdcOrderList.remove(mdc);
        ThreadHolder<T> holder;
        synchronized (holderMap) {
            holder = holderMap.get(mdc);
            holderMap.remove(mdc);
        }
        if (holder != null && needNotifyDefault) {
            holder.notifyDefault();
        }
        return holder;
    }

    public void notifyThread(String mdc, Object data) {
        ThreadHolder<T> holder = removeThread(mdc, false);
        if (holder != null) {
            holder.notifyThread(data);
        }
    }


    public Future<T> getFuture(String mdcStr, Class<? extends ThreadHolder<T>> holder) {
        if (StringUtil.isEmpty(mdcStr) || holder == null) {
            throw new BusinessException(1020, "Mdc target missing!!!");
        }
        Future<T> future;
        try {
            ThreadHolder<T> thread = holder.newInstance();
            holderMap.put(mdcStr, thread);
            mdcOrderList.add(mdcStr);
            thread.setMaxWaitTime(waitTime);
            thread.setMdc(mdcStr);
            thread.setHolder(this);
            future = executorService.submit(thread);
            cleanThreadPool();
        } catch (InstantiationException | IllegalAccessException e) {
            holderMap.remove(mdcStr);
            mdcOrderList.remove(mdcStr);
            throw new BusinessException(1021, "Thread Holder initialized failed");
        }
        return future;
    }

    private void cleanThreadPool() {
        if (mdcOrderList.size() >= maxSize && isCleaning.compareAndSet(false, true)) {

            try {
                mdcOrderList.subList(0, mdcOrderList.size() - maxSize).forEach(//看测试效率,看是否用并行stream处理
                        mdc -> removeThread(mdc, true)
                );
            } finally {
                isCleaning.set(false);
            }
        }
    }
}

TestController(测试入口):

import com.javanewb.entity.TestThreadHolder;
import com.javanewb.thread.tools.RequestHolder;
import com.keruyun.portal.common.filter.LoggerMDCFilter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * <p>
 * Description: com.javanewb.controller
 * </p>
 * <p>
 * Copyright: Copyright (c) 2015
 * </p>
 * <p>
 
 * </p>
 * date:2017/10/25
 *
 * @author Dean.Hwang
 */
@Api
@RestController
@Slf4j
public class TestController {
    private RequestHolder<String> holder = new RequestHolder<>(100, 500000L);
    private List<String> mdcList = new ArrayList<>();

    @ApiOperation(value = "请求同步测试", notes = "请求同步测试")
    @RequestMapping(value = "/async", method = RequestMethod.GET)
    public void async(HttpServletRequest request, HttpServletResponse response, String id) {
        Long startTime = System.currentTimeMillis();
        String mdc = MDC.get(LoggerMDCFilter.IDENTIFIER);
        mdcList.add(mdc);
        Future<String> future = holder.getFuture(id, TestThreadHolder.class);
        log.info(Thread.currentThread().getName());
        try {
            System.out.println(mdc + " Thread Wait");
            String result = future.get();
            response.getOutputStream().print(result);
            System.out.println(" time: " + (System.currentTimeMillis() - startTime));
        } catch (IOException | ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    @ApiOperation(value = "释放list第一个", notes = "请求同步测试")
    @RequestMapping(value = "/notify", method = RequestMethod.GET)
    public String notifyFirst() {
        String mdc = mdcList.get(0);
        mdcList.remove(0);
        holder.notifyThread(mdc, "");
        return mdc;
    }

    @ApiOperation(value = "释放list第一个", notes = "请求同步测试")
    @RequestMapping(value = "/notifyThis", method = RequestMethod.GET)
    public String notifyThis(String mdc) {
        int idx = 0;
        for (int i = 0; i < mdcList.size(); i++) {
            if (mdcList.get(i).equals(mdc)) {
                idx = i;
                break;
            }
        }
        mdcList.remove(idx);
        holder.notifyThread(mdc, "");
        return mdc;
    }
}
```#5.后话:
本项目会放在github上,如果有兴趣,或者发现有bug需要处理的可以直接从博客联系我,也可以直接去github
地址:https://github.com/crowhyc/ThreadSynchronizer.git
邮箱:crowhyc@163.com
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容