LinkedBlockingQueue在生产消费者模式下的具体使用

前言:

最近在做一个SDK的重构工作,app采集到信息,传递给持久化存储模块,数据存储到数据库中,上传模块定时拿取数据上传到服务器中。我负责的是数据持久化存储模块。数据持久存储这个功能简单的来说就是拿到数据后,写入到数据库中,需要的时候从数据库中提取出来就可以了。乍一看这么简单,但如果你想好好的设计一下的话,功能其实也不少。比如说:app传递过来一个包含数据的bean类,使用完回收继续再用,类似Message.obtain()的一种缓冲池,感兴趣的朋友可以移步 Android 模拟Message.obtain(),构建自己的缓存池;app数据传到我这个模块,模块拿到数据存储到数据库中,就是典型的生产消费者模型,这就是本篇文章要介绍的内容。

准备

LinkedBlockingQueue中能使用阻塞线程的只有两个方法:

添加数据:put(Object),把Object加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞
获取数据:take(),取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻塞当前线程进入等待状态直到BlockingQueue有新的数据被加入

那么使用生产消费者模型的话,生产者生产数据后使用put方法添加到BlockingQueue中,如果BlockingQueue满了,则阻塞当前线程,直到消费者消费了数据后唤醒此线程继续添加;消费者消费数据调研take()方法拿取数据,如果BlockingQueue中没有数据,则一直阻塞当前线程,直到生产者生产了数据,才会唤醒当前线程继续执行。

实践

对BlockQueue有了基本的认识之后,我们就可以根据两个阻塞方法设计我们自己的生产消费者模型了。首先构造线程池:

    private ScheduledThreadPoolExecutor mPoolExecutor = new ScheduledThreadPoolExecutor(5);

核心线程具体数目这个得根据项目中实际使用来去判断。接下来app传来数据,生产者开始生产数据:

 public void addAppMessage(final AppMessage message) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                if (!interrupted) {
                    try {
                        message.setUse(true);
                        mMsgQueue.put(message);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        mPoolExecutor.execute(runnable);
    }

interrupted是用来app退出或者主动设置为true的时候停止所有正在或者准备要执行的线程

private volatile boolean interrupted = false;

消费者开始消费数据:

private static class MessageStorageRunnable implements Runnable {

        private WeakReference<MonitorCache> mReference;

        private List<AppMessage> mMsgList = new ArrayList<>();

        private List<AppMessage> mCacheList = new ArrayList<>();

        private MessageStorageRunnable(MonitorCache cache) {
            mReference = new WeakReference<>(cache);
        }

        @Override
        public void run() {
            MonitorCache cache = mReference.get();
            if (cache == null) {
                return;
            }
            while (!cache.interrupted && !cache.interruptedTake) {
                //每次循环都检测弱引用是否被回收
                cache = mReference.get();
                if (cache == null) {
                    break;
                }
                try {
                    AppMessage message = cache.mMsgQueue.take();
                    mMsgList.add(message);
                    //集合数据量超过规定,进行存储
                    if (mMsgList.size() >= cache.msgCount) {
                        //进行数据迁移
                        date2Cache();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        /**
         * 数据迁移至缓存集合中,并存储数据
         */
        private void date2Cache() {
            mCacheList.clear();
            mCacheList.addAll(mMsgList);
            mMsgList.clear();
            mReference.get().mManager.addMsg2Data(mCacheList);//实际发生消费行为
        }

        /**
         * 检测集合中是否还有没有可以存储的数据
         * app退出时调用
         */
        private void checkMsgList() {
            if (mMsgList.isEmpty()) {
                return;
            }
            date2Cache();
        }
    }

数据存储无论是写入到数据库还是写入到文件中都需要消费很大的资源,那么设计的时候不可能来一条数据就写入一条,这样无论是内存开销还是性能上的问题都是非常大的,所以我们得设置一个阈值,当集合中的数量大于或者等于这个阈值的时候,进行数据存储行为。
然后在构造器或者初始化方法的时候执行消费者线程:

mCustomer1 = new MessageStorageRunnable(this);
mPoolExecutor.execute(mCustomer1);

在app退出的时候调用:

public void stop() {
        mCustomer1.checkMsgList();
        interrupted = true;
    }

这样就构成了一个完整的生产消费者模型。
其实到这有朋友就问我,来一条数据就开一个线程put一下,来一条就put一下,那万一并发量非常大,一下子来了成千上万条数据,消费者take的速度没有put的快,就会越堆越多,阻塞的线程也是越来越多,内存开销也非常大,为什么不直接使用ArrayList直接保存数据呢?
乍一看确实是这样啊,为啥不直接使用ArrayList保存数据而却要设计这么个复杂的生产消费者模型来做这件事。其实不然,

  • 首先数据写入到数据库或者文件中的行为肯定是频繁发生的,那么这个操作我们就不能放到主线程来完成,否则容易造成卡顿,那么我们肯定是要在工作线程中执行;这就涉及到一个多线程的问题,ArrayList是线程不安全的,那么又有人说了,ArrayList线程不安全,Vectory是线程安全的,CopyOnWriteArrayList也是线程安全,直接对List加锁Collections.synchronizedList也是线程安全,但这些实现线程安全的List,要么是读的效率低要么是写的效率低,甚至读写的效率都低,对我们这个读写兼具的模型下,肯定是很难能完成任务了。
  • 其次,对于数据堆积的情况解决起来也非常简单,一个消费者take的速度太慢,那么我们多开几个消费者线程不就完了,这样消费起来速度杠杠的,就不存在数据堆积的问题了。

    经测试:
    ojbk
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,753评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,668评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,090评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,010评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,054评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,806评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,484评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,380评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,873评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,021评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,158评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,838评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,499评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,044评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,159评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,449评论 3 374
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,136评论 2 356

推荐阅读更多精彩内容