Java7并发编程实战手册
线程管理
- Thread/Runnable/Thread.State
- 线程的信息获取和设置
- 线程的中断
- sleep/yield
- join
- daemon
- UncaughtExceptionHandler
- Thread#setDefaultUncaughtExceptionHandler
- ThreadLocal/InheritableThreadLocal
- ThreadGroup
- uncaughtException
- ThreadFactory
线程同步基础
- synchronized 同步方法 this
- synchronized 属性对象 object
- 同步代码块中使用条件
- Object#wait/notify/notifyAll
- 虚假唤醒(while)
- Lock
- ReentrantLock
- try/finally
- ReadWriteLock
- 公平性 fair
- Condition
- while
- lock/unlock之间
- ReentrantLock
线程同步辅助类
- Semaphore
- 内部计数器
- acquire/release
- acquireUninterruptibly
- 忽略线程中断且不会抛出任何异常
- CountDownLatch
- 内部计数器被初始化之后就不能被再次初始化,唯一能改值的是countDown
- CylicBarrier
- BrokenBarrierException
- reset
- Phaser
- 在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步
- Phaser(3) 指定参与阶段同步的线程数是3个
- 被Phaser类置于休眠的线程不会响应中断事件
- awaitAdvanceInterruptibly(int phaser),被中断会抛出InterruptedException
- onAdavance,阶段改变时被自动执行
- Exchanger
- 只能同步两个线程
- exchange调用后将休眠直到其他的线程到达
线程执行器
- ThreadPoolExecutor/Executors
- shutdown,当执行完所有待运行的任务后,它将结束执行,调用完毕立即返回,结合awaitTermination判断是否线程池是否关闭
- awaitTermination(long timeout, TimeUnit unit)
- 提供很多方法获取自身状态的信息
- newCachedThreadPool/newFixedThreadPool
- Future/Callbale
- submit
- call() throws Exception
- ThreadPoolExecutor#invokeAny
- 返回第一个完成任务且没有抛出异常的任务的执行结果
- ThreadPoolExecutor#invokeAll
- 等待所有任务的完成
- landon:是否可以用这个方法做场景心跳
while(!isDone) { - long startTime = getCurrent() - 场景调度器提交场景任务,等待所有场景任务执行完毕(invokeAll),这样亦可以保证潜在的顺序问题,因为每次都是将当前的tick执行完毕 - 而非之前submit,如果一次tick执行超过50ms则下次循环又会向线程池提交任务,会出现同一个任务多个线程执行的潜在情况 - long endTime = getCurrent() - long sleepTime = startTime + TickInterval - endTime - if(sleepTime > 0) sleep(sleepTime) - else println("busy,oneTick executeTime:" + sleepTime) }
- ScheduledThreadPoolExecutor
- schedule
- scheduleAtFixedRate
- scheduleWithFixDelay
- Future
- cancel
- get
- FutureTask
- done,允许在执行器中的任务执行结束之后还可以执行一些代码
- FutureTask<V> implements RunnableFuture<V>
- FutureTask(Callable<V> callable)
- CompletionService
- submit,提交任务
- poll/take,获取任务已经完成的Future对象
- 即任务完成后将Future对象放到一个完成的阻塞队列中
- RejectedExecutionHandler
Fork/Join框架
-
分治
- fork-将一个任务拆分成更小的多个任务
- join-等待子任务的完成执行
- 工作窃取算法-work-stealing algorithm
-
ForkJoinPool
- ForkJoinTask
- RecursiveAction 任务无返回结果
- RecursiveTask 任务有返回结果
- 递归
- ForkJoinTask
if(problem size > default size)
{
tasks = divide(task)
execute(tasks)
}
else {resolve problem using another algorithm}
- Task extends RecursiveAction // 无返回结果
- compute
if(...) // divide { Task t1 = new Task(...) Task t2 = new Task(...) invokeAll(t1,t2) // 同步调用,执行创建的多个子任务 } else {...}
- ForkJoinPool#execute(task) --默认创建一个线程数等于计算机cpu数目的线程池
- 合并任务的结果
if(problem size > default size) tasks = divide(task) execute(tasks) groupResults return result else resolve problem return result
- ForkJoinTask#get 等待返回任务计算结果
- 异步运行任务
- 同步方式如invokeAll,任务被挂起,直到任务被发送到fork/join线程池中执行。该方式允许ForkJoinPool采用工作窃取算法
- 异步方式如fork时(立即返回),无法使用该算法
- ForkJoinTask#V join()
- get和join有区别
- 任务中抛出异常
- ForkJoinTask#isCompletedAbnormally 检查主任务或者子任务是否抛出了异常
- getException 获取异常信息
- 任务抛出运行时异常,会影响其父任务...父任务..
- 取消任务
- 在任务开始前可以取消
- 例:在数字数组中寻找一组数字,拆分为更小的问题,但仅关心数字的一次出现。当我们找到他时,就会取消其他子任务
- 可存储发送到线程池中的所有任务,当发现当前任务找到数字后,取消非当前任务的所有任务
- 如果任务正在运行或者已经执行结束,则不能取消,cancel返回false。因此可以尝试去取消所有的任务而不用担心可能带来的间接影响
并发集合
- ConcurrentLinkedDeque 非阻塞
- getFirst.../peekFirst.../removeFirst.../pollFirst... - 双端队列
- LinkedBlockingDeque 阻塞
- put/take/poll... 双端队列
- PriorityBlockingQueue
- 队列的元素必须实现Comparable接口
- 按照排序结果决定插入元素的位置
- DelayQueue
- 元素必须实现Delayed接口
- public interface Delayed extends Comparable<Delayed>
- 两个待实现方法
- compareTo(Delayed o)
- getDelay(Timeunit unit)
- 从队列取元素时,到期的元素会返回(未来的元素等待到期_延迟时间)
- landon:是否可以用于游戏服务器中的计时器如果有多个timer按照到期时间排队_
- ConcurrentSkipListMap
- 根据键值排序所有元素
- 内部机制-Skip List
- firstEntry/lastEntry/subMap/...
- landon:可以和TreeMap做比较,一个线程不安全,一个线程安全
- ThreadLocalRandom
- current,该方法是一个静态方法,如果调用线程还未关联随机数对象,就会初始化一个(localInit)
- Atomic Variable
- compareAndSet,这是是最主要的方法
- 判断内存变量的值是否是expect,如果是说明未被其他线程改过,可以直接用update新值更新
- 否则说明被其他线程改过,进而可以选择下一步处理方式
- AtomicReference#public final boolean compareAndSet(V expect, V update)
- CAS
- AtomicIntegerArray -原子数组
- compareAndSet,这是是最主要的方法
- LinkedTransferQueue
- AtomicReference--实现单例
public class Singleton {
private static final AtomicReference<Singleton> INSTANCE = new AtomicReference<Singleton>();
private Singleton (){}
public static Singleton getInstance() {
for (;;) {
Singleton current = INSTANCE.get();
if (current != null) {
return current;
}
current = new Singleton();
if (INSTANCE.compareAndSet(null, current)) {
return current;
}
}
}
}
定制并发类
- 定制ThreadPoolExecutor
- 继承该类
- 覆写_记得调用super
- shutdown
- shutdownNow
- 输出如等待执行的任务数目 getQueue().size...
- getCompletedTaskCount,获得已执行过的任务数
- getActiveCount,获得正在执行的任务数
- beforeExecute
- afterExecute
- 实现基于优先级的Executor类
- ThreadPoolExector参数传入PriorityBlockingQueue
- 如果是fixed两个线程,那么前2个任务是被2个线程执行的;后面的排队任务按照优先级顺序执行
- 实现ThreadFactory接口生成定制线程
- 覆写newThread方法
- 返回的Thread可以是定制的Thread对象(MyThread extends Thread)
- 可外部直接调用Threadfactory#newThread返回线程
- 在Executor对象中使用ThreadFactory
- 线程池参数中指定线程工厂
- Executors内部有一个DefaultThreadFactory
* Executors$DefaultThreadFactory
- 定制运行在定时线程池中的任务
- 继承ScheduledThreadPoolExecutor
- 覆写protected <V> RunnableScheduledFuture<V> decorateTask(
* Runnable runnable, RunnableScheduledFuture<V> task) - 可自定义一个调度类extends FutureTask<V> implements RunnableScheduledFuture<V>
- 参考ScheduledThreadPoolExecutor$ScheduledFutureTask
- 通过实现ThreadFactory接口为Fork/Join框架生成定制线程
- ForkJoinPool内部实现
- 一个任务队列,存放等待被执行的任务
- 一个执行这些任务的线程池
- ForkJoinWorkerThread持有一个ForkJoinPool.WorkQueue workQueue
* work-stealing mechanics
- MyWorkerThread extends ForkJoinWorkerThread
- onStart
- onTermination
- 调用super
- MyWorkerThreadFactory implements ForkJoinWorkerThreadFactory
- newThread
- ForkJoinPool内部实现
- 定制运行在Fork/Join框架中的任务
- MyWorkTask extends ForkJoinTask(Void)
- getRawResult
- exec
- 调用compute抽象方法
- MyWorkTask extends ForkJoinTask(Void)
- 实现定制Lock类
- ReentrantLock内部有一个很重要的类Sync(AQS)
- abstract static class Sync extends AbstractQueuedSynchronizer
- static final class FairSync extends Sync
- static final class NonfairSync extends Sync
- 自定义实现一个MyAbstractQueuedSynchronizer extends AbstractQueuedSynchronizer
- tryActuire
- tryRelease
- 自定义实现MyLock implements Lock
- lock
- unlock
- tryLock
- newCondition
- 实现基于优先级的传输队列
- MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E>
- tryTransfer
- transfer
- hasWaitingConsumer
- getWaitingConsumerCount
- take
- MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E>
- 实现自己的原子对象
- MyCounter extends AtomicInteger
for(;;)
{
int value = get();
if(value == 10) return false;
else
{
int newValue = value + 1;
boolean changed = compareAndSet(value,newValue);
if(changed) return true;
}
}
测试并发应用程序
- 监控Lock接口
- ReentrantLock内部方法都是protected的,所以可以继承
- MyLock extends ReentrantLock
- 调用Thread getOwner()
- 调用Collection<Thread> getQueuedThreads()
- ...
- 监控Phaser类
- getPhase
- getRegisteredParties
- getArrivedParties
- ...
- 监控执行器框架
- ThreadPoolExecutor
- getPoolSize
- getCorePoolSize
- getActiveCount
- getTaskCount
- ...
- ThreadPoolExecutor
- 监控Fork/Join池
- getPoolSize
- getParallelism
- getActiveThreadCount
- getStealCount
- ...
- 输出高效的日志信息
- 输出必要的信息
- 为消息设定恰当的级别
- 使用FindBugs分析并发代码
- 配置Eclipse调试并发代码
- 可选择Default suspend policy for new breakpoints的值为Suspend VM
- 默认为Suspend Thread
- 配置NetBeans调试并发代码
- 使用MultithreadedTC测试并发代码
- MultithreadedTestCase
- waitForTick