前言:
最近在做一个SDK的重构工作,app采集到信息,传递给持久化存储模块,数据存储到数据库中,上传模块定时拿取数据上传到服务器中。我负责的是数据持久化存储模块。数据持久存储这个功能简单的来说就是拿到数据后,写入到数据库中,需要的时候从数据库中提取出来就可以了。乍一看这么简单,但如果你想好好的设计一下的话,功能其实也不少。比如说:app传递过来一个包含数据的bean类,使用完回收继续再用,类似Message.obtain()的一种缓冲池,感兴趣的朋友可以移步 Android 模拟Message.obtain(),构建自己的缓存池;app数据传到我这个模块,模块拿到数据存储到数据库中,就是典型的生产消费者模型,这就是本篇文章要介绍的内容。
准备
LinkedBlockingQueue中能使用阻塞线程的只有两个方法:
添加数据:put(Object),把Object加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞
获取数据:take(),取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻塞当前线程进入等待状态直到BlockingQueue有新的数据被加入
那么使用生产消费者模型的话,生产者生产数据后使用put方法添加到BlockingQueue中,如果BlockingQueue满了,则阻塞当前线程,直到消费者消费了数据后唤醒此线程继续添加;消费者消费数据调研take()方法拿取数据,如果BlockingQueue中没有数据,则一直阻塞当前线程,直到生产者生产了数据,才会唤醒当前线程继续执行。
实践
对BlockQueue有了基本的认识之后,我们就可以根据两个阻塞方法设计我们自己的生产消费者模型了。首先构造线程池:
private ScheduledThreadPoolExecutor mPoolExecutor = new ScheduledThreadPoolExecutor(5);
核心线程具体数目这个得根据项目中实际使用来去判断。接下来app传来数据,生产者开始生产数据:
public void addAppMessage(final AppMessage message) {
Runnable runnable = new Runnable() {
@Override
public void run() {
if (!interrupted) {
try {
message.setUse(true);
mMsgQueue.put(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
mPoolExecutor.execute(runnable);
}
interrupted是用来app退出或者主动设置为true的时候停止所有正在或者准备要执行的线程
private volatile boolean interrupted = false;
消费者开始消费数据:
private static class MessageStorageRunnable implements Runnable {
private WeakReference<MonitorCache> mReference;
private List<AppMessage> mMsgList = new ArrayList<>();
private List<AppMessage> mCacheList = new ArrayList<>();
private MessageStorageRunnable(MonitorCache cache) {
mReference = new WeakReference<>(cache);
}
@Override
public void run() {
MonitorCache cache = mReference.get();
if (cache == null) {
return;
}
while (!cache.interrupted && !cache.interruptedTake) {
//每次循环都检测弱引用是否被回收
cache = mReference.get();
if (cache == null) {
break;
}
try {
AppMessage message = cache.mMsgQueue.take();
mMsgList.add(message);
//集合数据量超过规定,进行存储
if (mMsgList.size() >= cache.msgCount) {
//进行数据迁移
date2Cache();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 数据迁移至缓存集合中,并存储数据
*/
private void date2Cache() {
mCacheList.clear();
mCacheList.addAll(mMsgList);
mMsgList.clear();
mReference.get().mManager.addMsg2Data(mCacheList);//实际发生消费行为
}
/**
* 检测集合中是否还有没有可以存储的数据
* app退出时调用
*/
private void checkMsgList() {
if (mMsgList.isEmpty()) {
return;
}
date2Cache();
}
}
数据存储无论是写入到数据库还是写入到文件中都需要消费很大的资源,那么设计的时候不可能来一条数据就写入一条,这样无论是内存开销还是性能上的问题都是非常大的,所以我们得设置一个阈值,当集合中的数量大于或者等于这个阈值的时候,进行数据存储行为。
然后在构造器或者初始化方法的时候执行消费者线程:
mCustomer1 = new MessageStorageRunnable(this);
mPoolExecutor.execute(mCustomer1);
在app退出的时候调用:
public void stop() {
mCustomer1.checkMsgList();
interrupted = true;
}
这样就构成了一个完整的生产消费者模型。
其实到这有朋友就问我,来一条数据就开一个线程put一下,来一条就put一下,那万一并发量非常大,一下子来了成千上万条数据,消费者take的速度没有put的快,就会越堆越多,阻塞的线程也是越来越多,内存开销也非常大,为什么不直接使用ArrayList直接保存数据呢?
乍一看确实是这样啊,为啥不直接使用ArrayList保存数据而却要设计这么个复杂的生产消费者模型来做这件事。其实不然,
- 首先数据写入到数据库或者文件中的行为肯定是频繁发生的,那么这个操作我们就不能放到主线程来完成,否则容易造成卡顿,那么我们肯定是要在工作线程中执行;这就涉及到一个多线程的问题,ArrayList是线程不安全的,那么又有人说了,ArrayList线程不安全,Vectory是线程安全的,CopyOnWriteArrayList也是线程安全,直接对List加锁Collections.synchronizedList也是线程安全,但这些实现线程安全的List,要么是读的效率低要么是写的效率低,甚至读写的效率都低,对我们这个读写兼具的模型下,肯定是很难能完成任务了。
-
其次,对于数据堆积的情况解决起来也非常简单,一个消费者take的速度太慢,那么我们多开几个消费者线程不就完了,这样消费起来速度杠杠的,就不存在数据堆积的问题了。
经测试:
ojbk