徒手撸框架--高并发环境下的请求合并

原文地址:www.xilidou.com/2018/01/22/…

在高并发系统中,我们经常遇到这样的需求:系统产生大量的请求,但是这些请求实时性要求不高。我们就可以将这些请求合并,达到一定数量我们统一提交。最大化的利用系统性IO,提升系统的吞吐性能。

所以请求合并框架需要考虑以下两个需求:

  1. 当请求收集到一定数量时提交数据
  2. 一段时间后如果请求没有达到指定的数量也进行提交

我们就聊聊一如何实现这样一个需求。

阅读这篇文章你将会了解到:

  • ScheduledThreadPoolExecutor
  • 阻塞队列
  • 线程安全的参数
  • LockSuppor的使用

设计思路和实现

我们就聊一聊实现这个东西的具体思路是什么。希望大家能够学习到分析问题,设计模块的一些套路。

  1. 底层使用什么数据结构来持有需要合并的请求?
    • 既然我们的系统是在高并发的环境下使用,那我们肯定不能使用,普通的ArrayList来持有。我们可以使用阻塞队列来持有需要合并的请求。
    • 我们的数据结构需要提供一个 add() 的方法给外部,用于提交数据。当外部add数据以后,需要检查队列里面的数据的个数是否达到我们限额?达到数量提交数据,不达到继续等待。
    • 数据结构还需要提供一个timeOut()的方法,外部有一个计时器定时调用这个timeOut方法,如果方法被调用,则直接向远程提交数据。
    • 条件满足的时候线程执行提交动作,条件不满足的时候线程应当暂停,等待队列达到提交数据的条件。所以我们可以考虑使用 LockSuppor.park()LockSuppor.unpark 来暂停和激活操作线程。

经过上面的分析,我们就有了这样一个数据结构:

private static class FlushThread<Item> implements Runnable{

        private final String name;

        //队列大小
        private final int bufferSize;
        //操作间隔
        private int flushInterval;

        //上一次提交的时间。
        private volatile long lastFlushTime;
        private volatile Thread writer;

        //持有数据的阻塞队列
        private final BlockingQueue<Item> queue;

        //达成条件后具体执行的方法
        private final Processor<Item> processor;

        //构造函数
        public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor<Item> processor) {
            this.name = name;
            this.bufferSize = bufferSize;
            this.flushInterval = flushInterval;
            this.lastFlushTime = System.currentTimeMillis();
            this.processor = processor;

            this.queue = new ArrayBlockingQueue<>(queueSize);

        }

        //外部提交数据的方法
        public boolean add(Item item){
            boolean result = queue.offer(item);
            flushOnDemand();
            return result;
        }

        //提供给外部的超时方法
        public void timeOut(){
            //超过两次提交超过时间间隔
            if(System.currentTimeMillis() - lastFlushTime >= flushInterval){
                start();
            }
        }

        //解除线程的阻塞
        private void start(){
            LockSupport.unpark(writer);
        }

        //当前的数据是否大于提交的条件
        private void flushOnDemand(){
            if(queue.size() >= bufferSize){
                start();
            }
        }

        //执行提交数据的方法
        public void flush(){
            lastFlushTime = System.currentTimeMillis();
            List<Item> temp = new ArrayList<>(bufferSize);
            int size = queue.drainTo(temp,bufferSize);
            if(size > 0){
                try {
                    processor.process(temp);
                }catch (Throwable e){
                    log.error("process error",e);
                }
            }
        }

        //根据数据的尺寸和时间间隔判断是否提交
        private boolean canFlush(){
            return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;
        }

        @Override
        public void run() {
            writer = Thread.currentThread();
            writer.setName(name);

            while (!writer.isInterrupted()){
                while (!canFlush()){
                    //如果线程没有被打断,且不达到执行的条件,则阻塞线程
                    LockSupport.park(this);
                }
                flush();
            }

        }

    }
复制代码
  1. 如何实现定时提交呢?

通常我们遇到定时相关的需求,首先想到的应该是使用 ScheduledThreadPoolExecutor定时来调用FlushThread 的 timeOut 方法,如果你想到的是 Thread.sleep()...那需要再努力学习,多看源码了。

  1. 怎样进一步的提升系统的吞吐量?

我们使用的FlushThread 实现了 Runnable 所以我们可以考虑使用线程池来持有多个FlushThread

所以我们就有这样的代码:


public class Flusher<Item> {

    private final FlushThread<Item>[] flushThreads;

    private AtomicInteger index;

    //防止多个线程同时执行。增加一个随机数间隔
    private static final Random r = new Random();

    private static final int delta = 50;

    private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);

    private static ExecutorService POOL = Executors.newCachedThreadPool();

    public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) {

        this.flushThreads = new FlushThread[threads];

        if(threads > 1){
            index = new AtomicInteger();
        }

        for (int i = 0; i < threads; i++) {
            final FlushThread<Item> flushThread = new FlushThread<Item>(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor);
            flushThreads[i] = flushThread;
            POOL.submit(flushThread);
            //定时调用 timeOut()方法。
            TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS);
        }
    }

    // 对 index 取模,保证多线程都能被add
    public boolean add(Item item){
        int len = flushThreads.length;
        if(len == 1){
            return flushThreads[0].add(item);
        }

        int mod = index.incrementAndGet() % len;
        return flushThreads[mod].add(item);

    }

    //上文已经描述
    private static class FlushThread<Item> implements Runnable{
        ...省略
    }
}

复制代码
  1. 面向接口编程,提升系统扩展性:
public interface Processor<T> {
    void process(List<T> list);
}
复制代码

使用

我们写个测试方法测试一下:

//实现 Processor 将 String 全部输出
public class PrintOutProcessor implements Processor<String>{
    @Override
    public void process(List<String> list) {

        System.out.println("start flush");

        list.forEach(System.out::println);

        System.out.println("end flush");
    }
}

复制代码

public class Test {

    public static void main(String[] args) throws InterruptedException {

        Flusher<String> stringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor());

        int index = 1;
        while (true){
            stringFlusher.add(String.valueOf(index++));
            Thread.sleep(1000);
        }
    }
}

复制代码

执行的结果:


start flush
1
2
3
end flush
start flush
4
5
6
7
end flush

复制代码

我们发现并没有达到5个数字就触发了flush。因为触发了超时提交,虽然还没有达到规定的5 个数据,但还是执行了 flush。

如果我们去除 Thread.sleep(1000); 再看看结果:

start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush
复制代码

每5个数一次提交。完美。。。。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容

  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 31,916评论 2 89
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,094评论 1 32
  • 在一个方法内部定义的变量都存储在栈中,当这个函数运行结束后,其对应的栈就会被回收,此时,在其方法体中定义的变量将不...
    Y了个J阅读 4,415评论 1 14
  • 大家好,我是Amber!今天给大家推荐一部美国惊悚电影《NO ESCAPE》中文译名《无处可逃》。 一个美国工程师...
    甜不甜加点盐阅读 411评论 0 0
  • 人在世间的长河里,时代的大框架中,到底能为自己活多少?时代和个体就像一辆大马车,有时候显得多么无力。 我努力表现的...
    娃娃zy阅读 139评论 0 0