public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>
offer和put的源码解析:
public boolean offer(E e) {
Objects.requireNonNull(e);//非空
final ReentrantLock lock = this.lock;//获得可重入锁
lock.lock();
try {
if (count == items.length)//阻塞时,立即返回false
return false;
else {
enqueue(e);//入队
return true;
}
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)//阻塞
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
LinkedBlockingQueue
单链表,有两把锁:takeLock和putLock
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
@SuppressWarnings("serial") // Classes implementing Condition may be serializable.
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
@SuppressWarnings("serial") // Classes implementing Condition may be serializable.
private final Condition notFull = putLock.newCondition();
ExecutorService
public interface ExecutorService extends Executor {
<T> Future<T> submit(Callable<T> task);
//result = exec.submit(aCallable).get();
<T> Future<T> submit(Runnable task, T result);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
Future
异步返回得到的结果
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
RunnableFuture
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
Callable
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
ScheduledFuture
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
FutureTask
RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值
public class FutureTask<V> implements RunnableFuture<V> {
ConcurrentSkipListMap CopyOnWriteArrayList
RunnableAdapter
Callable,有返回值,Runable,没有返回值,所以RunnableAdapter封装了Runable,实现了Callablejiek,Runnable有返回值了
// Non-public classes supporting the public methods
/**
* A callable that runs given task and returns given result.
*/
private static final class RunnableAdapter<T> implements Callable<T> {
private final Runnable task;
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
public String toString() {
return super.toString() + "[Wrapped task = " + task + "]";
}
}
lock的示例用法
* Lock lock = ...;
* if (lock.tryLock()) {
* try {
* // manipulate protected state
* } finally {
* lock.unlock();
* }
* } else {
* // perform alternative actions
ReentrantLock
private final Sync sync;//AQS
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public Condition newCondition() {
return sync.newCondition();
}
public final boolean isFair() {
return sync instanceof FairSync;
}
ReadWriteLock
读写锁,读,写,各一把锁
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}
ExecutorCompletionService
CompletionService的实现目标是任务先完成可优先获取到,即结果按照完成先后顺序排序。
public interface CompletionService<V> {
// 提交
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
//获取
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
DelayQueue
DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;
private final Condition available = lock.newCondition();
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
Flow
public final class Flow {
@FunctionalInterface
public static interface Publisher<T> { //发布者,负责发布消息
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {//订阅者,负责订阅处理消息
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {//订阅控制类,可用于发布者和订阅者之间通信
public void request(long n);
public void cancel();
}
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {//处理者,同时充当Publisher和Subscriber的角色
}
Semaphore
public void acquire(); //获取信号量,获取信号量,获取不到则到等待区等待
public void acquireUniterruptibly(); //不支持中断的获取信号量
public boolean tryAcquire(); //尝试获取信号量,类似tryLock();
public boolean tryAcquire(long timeout,TimeUnit unit); //尝试获取信号量,获取不到则进行等待的时间
public void release(); //释放信号量