Synchronization有哪些作用?
- mutual exclusion(互斥): 防止不同线程访问的对象处于一致的状态,即访问shared mutable data时
- reliable communication between threads(线程之间可靠的通信):一个线程能看到前一个线程的修改结果
对atomic的理解
- reading or writing a variable(long 和 double类型除外)是原子性的
Synchronization,volatile,AtomicLong正确使用的几个小例子
// Broken! - How long would you expect this program to run?
//boolean类型的读和写,虽然能保证原子性(mutual exclusion),但不能保证线程间可靠的通信
public class StopThread {
private static boolean stopRequested;
public static void main(String[] args)
throws InterruptedException {
Thread backgroundThread = new Thread(() -> {
int i = 0;
while (!stopRequested)
i++;
});
backgroundThread.start();
TimeUnit.SECONDS.sleep(1);
stopRequested = true;
}
}
// 优化方案一
// Properly synchronized cooperative thread termination
// Synchronization is not guaranteed to work unless both read and write operations are synchronized
public class StopThread {
private static boolean stopRequested;
private static synchronized void requestStop() {
stopRequested = true;
}
private static synchronized boolean stopRequested() {
return stopRequested;
}
public static void main(String[] args)
throws InterruptedException {
Thread backgroundThread = new Thread(() -> {
int i = 0;
while (!stopRequested())
i++;
});
backgroundThread.start();
TimeUnit.SECONDS.sleep(1);
requestStop();
}
}
// 优化方案二
// Cooperative thread termination with a volatile field
// volatile 能保证reliable communication between threads,但不能保证mutual exclusion
public class StopThread {
private static volatile boolean stopRequested;
public static void main(String[] args)
throws InterruptedException {
Thread backgroundThread = new Thread(() -> {
int i = 0;
while (!stopRequested)
i++;
});
backgroundThread.start();
TimeUnit.SECONDS.sleep(1);
stopRequested = true;
}
}
// Broken - requires synchronization!
// increment operator (++) is not atomic: first it reads the value, and then it writes back a new value, equal to the old value plus one
private static volatile int nextSerialNumber = 0;
public static int generateSerialNumber() {
return nextSerialNumber++;
}
// Lock-free synchronization with java.util.concurrent.atomic
// java.util.concurrent.atomic: This package provides primitives for lock-free, thread-safe programming on single variables
private static final AtomicLong nextSerialNum = new AtomicLong();
public static long generateSerialNumber() {
return nextSerialNum.getAndIncrement();
}
使用synchronization时,应该注意些什么?
- 应该在synchronized regions内部,do as little work as possible
Obtain the lock, examine the shared data, transform it as necessary, and drop the lock.
If you must perform some time-consuming activity, find a way to move it out of the synchronized region
- 过度使用synchronization,可能造成程序异常或死锁
// Broken - invokes alien method from synchronized block!
public class ObservableSet<E> extends ForwardingSet<E> {
public ObservableSet(Set<E> set) { super(set); }
private final List<SetObserver<E>> observers
= new ArrayList<>();
public void addObserver(SetObserver<E> observer) {
synchronized(observers) {
observers.add(observer);
}
}
public boolean removeObserver(SetObserver<E> observer) {
synchronized(observers) {
return observers.remove(observer);
}
}
private void notifyElementAdded(E element) {
synchronized(observers) {
for (SetObserver<E> observer : observers)
observer.added(this, element); // alien method(外来方法)
}
}
@Override public boolean add(E element) {
boolean added = super.add(element);
if (added)
notifyElementAdded(element);
return added;
}
@Override public boolean addAll(Collection<? extends E> c) {
boolean result = false;
for (E element : c)
result |= add(element); // Calls notifyElementAdded
return result;
}
}
@FunctionalInterface public interface SetObserver<E> {
// Invoked when an element is added to the observable set
void added(ObservableSet<E> set, E element);
}
public static void main(String[] args) {
ObservableSet<Integer> set = new ObservableSet<>(new HashSet<>());
// work fine
set.addObserver((s, e) -> System.out.println(e));
for (int i = 0; i < 100; i++)
set.add(i);
}
// throw ConcurrentModificationException
set.addObserver(new SetObserver<>() {
public void added(ObservableSet<Integer> s, Integer e) {
System.out.println(e);
if (e == 23)
s.removeObserver(this);
}
});
// uses a background thread needlessly,导致deadlock
set.addObserver(new SetObserver<>() {
public void added(ObservableSet<Integer> s, Integer e) {
System.out.println(e);
if (e == 23) {
ExecutorService exec =
Executors.newSingleThreadExecutor();
try {
exec.submit(() -> s.removeObserver(this)).get();
} catch (ExecutionException | InterruptedException ex) {
throw new AssertionError(ex);
} finally {
exec.shutdown();
}
}
}
});
// 方法一:Alien method moved outside of synchronized block - open calls
private void notifyElementAdded(E element) {
List<SetObserver<E>> snapshot = null;
synchronized(observers) {
snapshot = new ArrayList<>(observers);
}
for (SetObserver<E> observer : snapshot)
observer.added(this, element);
}
// 方法二:Thread-safe observable set with CopyOnWriteArrayList
// 此处rarely modified and often traversed,适合使用CopyOnWriteArrayList
private final List<SetObserver<E>> observers = new CopyOnWriteArrayList<>();
public void addObserver(SetObserver<E> observer) {
observers.add(observer);
}
public boolean removeObserver(SetObserver<E> observer) {
return observers.remove(observer);
}
private void notifyElementAdded(E element) {
for (SetObserver<E> observer : observers)
observer.added(this, element);
}
- 优先使用StringBuilder,而不是StringBuffer;优先使用java.util.concurrent.ThreadLocalRandom,而不是java.util.Random
使用Concurrency时,应该注意什么?
- 不要面向threads编程,应该面向executors, tasks, and streams编程
//Creating a work queue
ExecutorService exec = Executors.newSingleThreadExecutor();
//submit a runnable for execution
exec.execute(runnable);
//tell the executor to terminate gracefully
exec.shutdown();
//wait for a particular task to complete
with the get method
//wait for any or all of a collection of tasks to complete
using the invokeAny or invokeAll methods
//wait for the executor service to terminate
using the awaitTermination method
//retrieve the results of tasks one by one as they complete
using an ExecutorCompletionService
//schedule tasks to run at a particular time or to run periodically
using a ScheduledThreadPoolExecutor
//For a small program, or a lightly loaded server
using Executors.newCachedThreadPool
//For a heavily loaded production server
using Executors.newFixedThreadPool
说说你对Thread和executor framework的理解
- a Thread 同时充当两个角色: the unit of work 和 the execution mechanism
- In the executor framework,the unit of work 和 the execution mechanism 是分开的。the unit of work抽象成task, 执行task的机制抽象成executor service。有两种task: Runnable和Callable(它可以returns a value 并且能够 throw任意的exceptions)
说说你对fork-join的理解
- Java 7, the Executor Framework支持fork-join tasks
- A fork-join task 由一个ForkJoinTask instance代表,该task可能split up into smaller subtasks, 构成 a ForkJoinPool的threads不仅能够process these tasks,还能够 “steal” tasks from one another to ensure that all threads remain busy, 带来higher CPU utilization, higher throughput, and lower latency
考虑到正确使用wait 和 notify的困难性,应该首先考虑使用higher-level concurrency utilities
higher-level concurrency utilities分为哪几类?
- the Executor Framework
- concurrent collections
- synchronizers
concurrent collections小例子
- Map’s putIfAbsent(key, value): inserts a mapping for a key if none was present and returns the previous value associated with the key, or null if there was none
// This method simulates the behavior of String.intern
// Concurrent canonicalizing map atop ConcurrentMap - not optimal
private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
public static String intern(String s) {
String previousValue = map.putIfAbsent(s, s);
return previousValue == null ? s : previousValue;
}
// Concurrent canonicalizing map atop ConcurrentMap - faster!
public static String intern(String s) {
String result = map.get(s);
if (result == null) {
result = map.putIfAbsent(s, s);
if (result == null)
result = s;
}
return result;
}
- 优先使用Concurrent collections,不应该使用synchronized collections。比如use ConcurrentHashMap,not use Collections.synchronizedMap
- 一些collection interfaces实现了blocking operations,比如BlockingQueue
说说你对Synchronizers(同步器)的理解
- Synchronizers are objects that enable threads to wait for one another, allowing them to coordinate their activities
- 常用的synchronizers有: CountDownLatch, Semaphore
- 不常用的synchronizers有: CyclicBarrier, Exchanger
- 最强大的synchronizers: Phaser
说说你对CountDownLatch的理解
- 它是single-use barriers,允许one or more threads to wait for one or more other threads to do something
- The sole constructor for CountDownLatch takes an int that is the number of times the countDown method must be invoked on the latch before all waiting threads are allowed to proceed
举一个使用CountDownLatch的例子
- suppose you want to build a simple framework for timing the concurrent execution of an action. This framework consists of a single method that takes an executor to execute the action, a concurrency level representing the number of actions to be executed concurrently, and a runnable representing the action. All of the worker threads ready themselves to run the action before the timer thread starts the clock. When the last worker thread is ready to run the action, the timer thread “fires the starting gun,” allowing the worker threads to perform the action. As soon as the last worker thread finishes performing the action, the timer thread stops the clock
// Simple framework for timing concurrent execution
public static long time(Executor executor, int concurrency,
Runnable action) throws InterruptedException {
CountDownLatch ready = new CountDownLatch(concurrency);
CountDownLatch start = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(concurrency);
for (int i = 0; i < concurrency; i++) {
executor.execute(() -> {
ready.countDown(); // Tell timer we're ready
try {
start.await(); // Wait till peers are ready
action.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
done.countDown(); // Tell timer we're done
}
});
}
ready.await(); // Wait for all workers to be ready
long startNanos = System.nanoTime();
start.countDown(); // And they're off!
done.await(); // Wait for all workers to finish
return System.nanoTime() - startNanos;
}
- For interval timing, always use
System.nanoTime
rather than System.currentTimeMillis
为了维护遗产代码,在使用wait 和 notify时,应该注意什么?
- Always use the wait loop idiom to invoke the wait method
// The standard idiom for using the wait method
synchronized (obj) {
while (<condition does not hold>)
obj.wait(); // (Releases lock, and reacquires on wakeup)
... // Perform action appropriate to condition
}
- 相对于notify method,优先使用notifyAll method
thread safety的级别有哪些?
- Immutable: 类的实例是常量,如: String, Long, BigInteger
- Unconditionally thread-safe: 类的实例是变量,但其内部已进行充分的synchronization,如: AtomicLong, ConcurrentHashMap.为了实现灵活、细粒度、安全的并发控制,鼓励在Unconditionally thread-safe类中使用private final lock object,而不是synchronized methods,因为private lock object 在类外是不可见的, 客户端和the object’s synchronization产生交互,如:
private final Object lock = new Object();
public void foo() {
synchronized(lock) {
...
}
}
- Conditionally thread-safe: 和Unconditionally thread-safe相似,除了一些方法需要外部synchronization,才能用于安全的并发,如: 一些集合的Collections.synchronized包裹器
- Not thread-safe: 类的实例是变量,为了用于并发,客户端必须在外部使用synchronization包裹每一个方法,如: ArrayList, HashMap
- Thread-hostile: 即使客户端为每个方法在外部加上synchronization,也不能用于安全的并发,它往往源于modifying static data without synchronization
什么叫做Lazy initialization
在使用Lazy initialization时,应该注意些什么?
- 从线程安全的角度考虑,大部分情况下,应该使用normal initialization,而不是Lazy initialization
- 当为了获取高性能或者打破有害的初始化循环时,可以考虑采用Lazy initialization来实例化一个字段
lazy initialization有哪些好的技术?
- 对于instance fields的lazy initialization,使用double-check习俗
// Double-check idiom for lazy initialization of instance fields
// 局部变量(result), 是确保变量(field)在初始化完成的情形下,只被read only once,使用局部变量(result)会比不使用它快1.4倍
private volatile FieldType field;
private FieldType getField() {
FieldType result = field;
if (result == null) { // First check (no locking)
synchronized(this) {
if (field == null) // Second check (with locking)
field = result = computeFieldValue();
}
}
return result;
}
- 对于static fields的lazy initialization,使用holder class习俗
private static class FieldHolder {
static final FieldType field = computeFieldValue();
}
private static FieldType getField() { return FieldHolder.field; }
- 对于能容忍重复初始化的instance fields,使用single-check习俗
// Single-check idiom - can cause repeated initialization!
private volatile FieldType field;
private FieldType getField() {
FieldType result = field;
if (result == null)
field = result = computeFieldValue();
return result;
}
不要依赖thread scheduler
- 不要依赖 Thread.yield 和 线程优先级
- Threads不应该 busy-wait,不停地check a shared object waiting for its state to change
// Awful CountDownLatch implementation - busy-waits incessantly!
public class SlowCountDownLatch {
private int count;
public SlowCountDownLatch(int count) {
if (count < 0)
throw new IllegalArgumentException(count + " < 0");
this.count = count;
}
public void await() {
while (true) {
synchronized(this) {
if (count == 0)
return;
}
}
}
public synchronized void countDown() {
if (count != 0)
count--;
}
}