LinkedBlockingQueue 是很优秀的阻塞队列实现,某些资料显示可以达到50w QPS左右,其读写使用两把锁,但底层使用ReentrantLock 进行阻塞,虽然也是基于CAS原理,但多线程操作时不可避免也有等待时间,所以在考虑想如何进一步提升其吞吐量和QPS,联想了分段锁和kafka分区后,觉得可以增加队列数量降低锁等待的概率,例如10个线程同时获取一个队列的写锁,其中9个线程在等待。但分成10个队列去存储数据,那么每把锁可能就没有锁竞争或者只有两、三个线程参与同一把锁竞争,可以大大降低锁等待时间。
普通入队实现:
import java.util.concurrent.LinkedBlockingQueue;
public class BlockQueue {
private LinkedBlockingQueue<String> queue = null ;
public void init(int size){
queue = new LinkedBlockingQueue( size ) ;
}
public void offer (String data){
queue.offer(data) ;
}
}
分区入队实现:
import java.util.concurrent.LinkedBlockingQueue;
public class PartitionBlockQueue {
private LinkedBlockingQueue<String>[] queue = null ;
public void init(int size , int partition ){
queue = new LinkedBlockingQueue[partition] ;
for(int i = 0 ;i< partition ;i++){
queue[i] = new LinkedBlockingQueue<>(size/partition);
}
}
public void offer (String data){
int idx = Math.abs(data.hashCode())%queue.length;
queue[idx].offer(data);
}
}
测试代码,多线程执行1000w个元素入队
public static void testNormalBlock(int queueSize , int thread) throws InterruptedException {
long start = System.currentTimeMillis() ;
BlockQueue b = new BlockQueue();
b.init(queueSize);
CountDownLatch countDownLatch = new CountDownLatch(thread);
for(int i=0;i<thread;i++){
Thread t = new Thread(()->{
System.out.println(Thread.currentThread().getId()+"Thread run" );
for(int ix = 0 ;ix< queueSize/thread ;ix++){
b.offer(String.valueOf(ix));
}
System.out.println(Thread.currentThread().getId()+"Thread end" );
countDownLatch.countDown();
});
t.start();
}
countDownLatch.await();
long end = System.currentTimeMillis() ;
System.out.println((end - start)+" ms" );
}
public static void testPartitionBlock(int queueSize , int partition ,int thread) throws InterruptedException {
long start = System.currentTimeMillis() ;
PartitionBlockQueue pb = new PartitionBlockQueue();
pb.init(queueSize,partition);
CountDownLatch countDownLatch = new CountDownLatch(thread);
for(int i=0;i<thread;i++){
Thread t = new Thread(()->{
System.out.println(Thread.currentThread().getId()+"Thread run" );
for(int ix = 0 ;ix< queueSize/thread ;ix++){
pb.offer(String.valueOf(ix));
}
System.out.println(Thread.currentThread().getId()+"Thread end" );
countDownLatch.countDown();
});
t.start();
}
countDownLatch.await();
long end = System.currentTimeMillis() ;
System.out.println((end - start)+" ms" );
}
运行结果
testNormalBlock
/*
threads 10 -> 4121 ms 5472 ms 5229 ms 5218 ms
threads 20 -> 3841 ms 5058 ms 4967 ms 5155 ms
threads 50 -> 5359 ms 5590 ms 5308 ms
threads 100 -> 5168 ms 5188 ms 5551 ms
*/
testPartitionBlock
/*
Threads 10 partition 10 -> 2171 ms 2210 ms 2114 ms 2150 ms
Threads 20 partition 10 ->2051 ms 1914 ms 1957 ms 1966 ms
threads 50 parttion 10 -> 2290 ms 2326 ms 2373 ms 2435 ms
threads 100 parttion 10 -> 1854 ms 2415 ms 2636 ms 2417 ms
Threads 10 partition 20 ->868 ms 839 ms 905 ms
Threads 20 partition 20 ->904 ms 1745 ms 1745 ms
*/
结论:使用分区后性能提升很大,吞吐量数倍提升。且随着分区数增大