阻塞队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。
而非阻塞的实现方式则可以使用循环CAS的方式来实现。因为采用CAS操作,允许多个线程并发执行,并且不会因为加锁而阻塞线程,使得并发性能更好。
示例:
package yxxy.queuue;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.sun.corba.se.impl.protocol.JIDLLocalCRDImpl;
public class TEST1 {
private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
public static void main(String[] args) throws InterruptedException {
Thread[] ths = new Thread[6];
for(int i=1;i<ths.length;i++){
Runnable task = new Runnable() {
@Override
public void run() {
try {
while (!queue.isEmpty()) {
int i = queue.poll();
}
} catch (Exception e) {
System.out.println();
}
}
};
ths[i] = new Thread(task);
}
Runnable task = new Runnable() {
@Override
public void run() {
for(int i=0;i<99999;i++){
try {
queue.offer(i);
} catch (Exception e) {
}
}
}
};
ths[0] = new Thread(task);
long timeStart = System.currentTimeMillis();
runAndComputeTime(ths);
System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
}
static void runAndComputeTime(Thread[] ths) {
Arrays.asList(ths).forEach(t->t.start());
Arrays.asList(ths).forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
当消费者速度大于生产者并且生产者速度不变的情况下,处理瓶颈就处在了消费者线程分别取队列里元素的速度上,所以此时使用不加锁的方式来取队列的元素会大大提高处理速度。