采用了锁细化,双缓冲的做法,让【磁盘写】转化为【内存写】,如下图
伪代码
public class Demo1 {
private volatile boolean isWaitingSync = false;/*是否等待刷盘*/
private volatile boolean isSyncRunning = false;/*是否在写入磁盘*/
private volatile long syncMaxTxId = 0L;/*已刷盘最大ID*/
private DoubleBuffer doubleBuffer = new DoubleBuffer();
private Long txId = 0L;/*记录唯一ID,初始化为0*/
private ThreadLocal<Long> threadLocalTxId = new ThreadLocal<>();/*当前线程处理的记录id*/
public static final long bufferCount = 50000;/*达到一定阈值进行缓冲交换*/
/*
* 写数据
*/
public void writeEditLog(String content){
synchronized (this){
txId++;
threadLocalTxId.set(txId);
EditLog editLog = new EditLog(txId,content);
doubleBuffer.write(editLog);
}
syncEditLog();
}
private void syncEditLog(){
synchronized (this){
if (isSyncRunning){
Long txId = threadLocalTxId.get();
if (syncMaxTxId >= txId){
return;
}
if (isWaitingSync){
return;
}
isWaitingSync = true;
while (isSyncRunning){
try {
wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
isWaitingSync = false;
}
/*设定阈值,达到阈值后交换内存,防止频繁交换内存引起性能问题
(注:这里未考虑buffer小于bufferCount的情况,认为一直有数据进入)*/
if (doubleBuffer.memoryBuffer.size() >= bufferCount){
doubleBuffer.change();
}
if (doubleBuffer.syncBuffer.size() > 0){
syncMaxTxId = doubleBuffer.getSyncMaxTxId();
}
isSyncRunning = true;
}
doubleBuffer.flush();
synchronized (this){
isSyncRunning = false;
notify();
}
}
/**
* 数据传输对象
*/
class EditLog{
long txId;
String content;
public EditLog(long txId, String content){
this.txId = txId;
this.content = content;
}
@Override
public String toString() {
return "EditLog:{txId:"+this.txId+" content:"+this.content+"}";
}
}
class DoubleBuffer{
LinkedList<EditLog> memoryBuffer = new LinkedList<>();
LinkedList<EditLog> syncBuffer = new LinkedList<>();
public void write(EditLog editLog){
memoryBuffer.add(editLog);
}
public void change(){
LinkedList<EditLog> temp = memoryBuffer;
memoryBuffer = syncBuffer;
syncBuffer = temp;
}
/**
* 刷新缓冲到磁盘
*/
public void flush(){
if (syncBuffer.size()>0){
for (EditLog editLog : syncBuffer){
//TODO 写入磁盘
System.out.println(Thread.currentThread().getName()+"-"+editLog);
}
syncBuffer.clear();
}
}
public long getSyncMaxTxId(){
return syncBuffer.getLast().txId;
}
}
}
测试代码
public class test {
public static void main(String[] args){
long begin = System.currentTimeMillis();
final Demo1 demo1 = new Demo1();
/**
* 启动100个线程,每个线程写入10000条数据
*/
for (int i=0; i<100; i++){
Thread t = new Thread(() -> {
for (int j=0; j<10000; j++){
demo1.writeEditLog("content:"+j);
}
});
t.start();
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
long cost = end-begin;
System.out.println("执行时间:"+cost);
}
}
总结:只用了常规的synchronized锁、notify、ThreadLocal。整理的目的是为了熟悉这种提高系统能力的做法,以便应用到其他场景。