1.Exclusive write / Concurrent read access 互斥读写
有时候我们会对一份数据同时进行读和写的操作
ReadWriteLock 接口还有他的实现类ReentrantReadWriteLock 可以让我们实现如下场景的功能:
- 可能有任意数量的同步读取操作。如果有至少一个读取操作获得允许,那么就不会产生写入操作。
- 最多只能有一个写操作,如果已经有一个写操作已经被允许那么就不能进行读操作。
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Sample {
// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions. protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();
// This is a typical data that needs to be protected for concurrent access protected static int data = 0;
/**
* This will write to the data, in an exclusive access
*/
public static void writeToData() {
RW_LOCK.writeLock().lock();
try {
data++;
} finally {
RW_LOCK.writeLock().unlock();
}
}
public static int readData() {
RW_LOCK.readLock().lock();
try {
return data;
} finally {
RW_LOCK.readLock().unlock();
}
}
}
备注:如上场景我们应该使用AtomicInteger,但是我们在这边只是用来举例,这个锁操作并不关心这个数据是否是一个原子类型的变量。
在读操作这一边的锁是非常有必要的,虽然这个操作看起来像是针对普通读操作的。事实上如果你不在读文件时候进加锁,那么任何操作都有可能会出错:
- 基本类型的写入操作在任何行虚拟机上都不保证是原子类型的操作。在写入一个64bits 的 long型数据最后只会有32bits。
为了更高的性能要求,还有一种更快类型的锁,叫做StampedLock ,除此之外还有一些一起继承乐观锁的类型。这个所以与ReadWriteLock工作情况区别很大。
Producer-Consumer 生产者-消费者模型
- 一个简单的 Producer-Consumer 问题解决方法。注意 JDK的 类 (AtomicBoolean and BlockingQueue) 是用来同步的,他么不能减少了创建无效方法。有兴趣的话可以看一下 BlockingQueue。通过几种不同的实现,可能会产生不同的行为。例如e DelayQueue 延迟队列 or Priority Queue 优先队列。
public class Producer implements Runnable {
private final BlockingQueue<ProducedData> queue;
public Producer(BlockingQueue<ProducedData> queue) {
this.queue = queue;
}
public void run() {
int producedCount = 0;
try {
while (true) {
producedCount++; //put throws an InterruptedException when the thread is interrupted queue.put(new ProducedData()); } } catch (InterruptedException e) { // the thread has been interrupted: cleanup and exit producedCount--; //re-interrupt the thread in case the interrupt flag is needeed higher up Thread.currentThread().interrupt(); } System.out.println("Produced " + producedCount + " objects"); } }
}
public class Consumer implements Runnable {
private final BlockingQueue<ProducedData> queue;
public Consumer(BlockingQueue<ProducedData> queue) {
this.queue = queue;
}
public void run() {
int consumedCount = 0;
try {
while (true) { //put throws an InterruptedException when the thread is interrupted ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS); // process data consumedCount++; } } catch (InterruptedException e) { // the thread has been interrupted: cleanup and exit consumedCount--; //re-interrupt the thread in case the interrupt flag is needeed higher up Thread.currentThread().interrupt(); } System.out.println("Consumed " + consumedCount + " objects"); } }
}
public class ProducerConsumerExample {
static class ProducedData { // empty data object }
public static void main(String[] args) throws InterruptedException {
BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000); // choice of queue determines the actual behavior: see various BlockingQueue implementations
Thread producer = new Thread(new Producer(queue));
Thread consumer = new Thread(new Consumer(queue));
producer.start();
consumer.start();
Thread.sleep(1000);
producer.interrupt();
Thread.sleep(10);
consumer.interrupt();
}
}
}
使用synchronized / volatile 对读写操作可见性的影响
- 正如我们了解的那样,我们应该使用synchronized 关键字来进行同步方法,或者同步代码块。但是我们有些人可能会注意到 synchronized 与 volatile 关键字。提供了read / write barrier。那么问题来了什么是 read / write barrier。我们来看一下下面你的例子:
class Counter {
private Integer count = 10;
public synchronized void incrementCount() {
count++;
}
public Integer getCount() {
return count;
}
}
- 我们假设线程A 调用了incrementCount() 线程B调用了getCount。在这个场景中我不能保证数据更新对线程B可见,甚至很有可能线程B永远看不到数值更新。
- 为了理解这个行为,我们需要理解java 内存模型与硬件之间的关系。在Java中每个线程都有自己的线程stack。这个stack 里面包含:调用线程的方法还有线程中创建的本地变量在多核操作系统中,这个场景中很有可能这个线程存在于某个cpu核心或者缓存中。如果存在于某个线程中,一个对象使用 synchronized (or volatile) 关键字在synchronized代码块之后线程与主内存同步自己的本地变量。 synchronized (or volatile)关键字创建了一个读写屏障并且确保线程的最新数据可见。
- 在我们的这个案例中,既然线程B还没有使用synchronized 来同步计算,或许线程B永远看不到线程A的数据更新了。为了确保最新的数据我们需要用synchronized 来修饰getCount方法。
public synchronized Integer getCount() { return count; }
- 现在当线程A更新完数据 ,然后释放Couter 对象的锁。与此同时创建一个写屏障并且将线程A的变化更新到主内存。类似的当线程B请求一个锁,在主内存读取数进入读屏障,然后久可以看到所有的更新改动。
- image
- 使用volatile 关键字也可以代理同样的效果。volatile关键字修饰的变量的写入操作, 会将所有更新同步到主内存。volatile关键字修饰的变量的读取操作将会读取主内存中的值。
获取你的程序中的所有线程状态
代码片段 Code snippet
import java.util.Set;
public class ThreadStatus {
public static void main(String args[]) throws Exception {
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new MyThread());
t.setName("MyThread:" + i);
t.start();
}
int threadCount = 0;
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for (Thread t : threadSet) {
if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
System.out.println("Thread :" + t + ":" + "state:" + t.getState());
++threadCount;
}
}
System.out.println("Thread count started by Main thread:" + threadCount);
}
}
class MyThread implements Runnable {
public void run() {
try {
Thread.sleep(2000);
} catch (Exception err) {
err.printStackTrace();
}
}
}
解释:
Thread.getAllStackTraces().keySet()返回包含application 和 系统的所有线程。如果你只对你创建的线程的状态感兴趣,那么遍历Thread set 然后通过检查 Thread Group 来判断线程是否属于app的线程。
使用ThreadLocal
- ThreadLocal 是在Java并发编程中经常用到的工具。他允许一个变量在不同线程有不同的值。这样拿来说,即使是相同的代码在不同的线程中运行,这些操作将不贡献value,而且每个线程都有自己的本地变量。
- 例如这个在servlet中经常被发布的context 。你可能会这么做:
private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();
public static MyUserContext getContext() {
return contexts.get(); // get returns the variable unique to this thread
}
public void doGet(...) {
MyUserContext context = magicGetContextFromRequest(request);
contexts.put(context); // save that context to our thread-local - other threads
// making this call don't overwrite ours
try {
// business logic
} finally {
contexts.remove(); // 'ensure' removal of thread-local variable
}
}
使用共享全局队列的多producer/consumer 案例
- 如下代码展示了 多producer/consumer 编程。生产者和消费者模型共享一个全局队列。
import java.util.concurrent.*;
import java.util.Random;
public class ProducerConsumerWithES {
public static void main(String args[]) {
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue, 1));
pes.submit(new Producer(sharedQueue, 2));
ces.submit(new Consumer(sharedQueue, 1));
ces.submit(new Consumer(sharedQueue, 2));
pes.shutdown();
ces.shutdown();
}
}
/* Different producers produces a stream of integers continuously to a shared queue,
which is shared between all Producers and consumers */
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
private Random random = new Random();
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
// Producer produces a continuous stream of numbers for every 200 milli seconds
while (true) {
try {
int number = random.nextInt(1000);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
Thread.sleep(200);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
// Consumer consumes numbers generated from Producer threads continuously
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
- 输出
Produced:497:by thread:1
Produced:300:by thread:2
Consumed: 497:by thread:1
Consumed: 300:by thread:2
Produced:64:by thread:2
Produced:984:by thread:1
Consumed: 64:by thread:1
Consumed: 984:by thread:2
Produced:102:by thread:2
Produced:498:by thread:1
Consumed: 102:by thread:1
Consumed: 498:by thread:2
Produced:168:by thread:2
Produced:69:by thread:1
Consumed: 69:by thread:2
Consumed: 168:by thread:1
- 说明
- sharedQueue,是一个LinkedBlockingQueue,在生产者和消费者线程之间共享
- 生产者线程每隔200ms 生产一个数字 然后持续的添加入队列
- 消费者从sharedQueue 持续消耗数字
- 这个程序实现无需 synchronized或者锁结构。 BlockingQueue 是实现这个模型的关键。
- BlockingQueue 就是为了生产/消费 模型来设计的
- BlockingQueue是线程安全的。所有对列的方法都是原子类型的操作,其使用了 内部锁或者其他类型的并发控制。
使用Threadpool 相加两个 int 类型的数组
- 一个线程池就是一个队列的任务,其中每个任务都会被其中的线程执行。
- 如下的案例展示了如何使用线程池添加两个int 类型的数组。
public static void testThreadpool() {
int[] firstArray = { 2, 4, 6, 8 };
int[] secondArray = { 1, 3, 5, 7 };
int[] result = { 0, 0, 0, 0 };
ExecutorService pool = Executors.newCachedThreadPool();
// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
for (int i = 0; i < result.length; i++) {
final int worker = i;
pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
}
// Wait for all Workers to finish:
try {
// execute all submitted tasks
pool.shutdown();
// waits until all workers finish, or the timeout ends
pool.awaitTermination(12, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
pool.shutdownNow(); //kill thread
}
System.out.println(Arrays.toString(result));
}
说明:
- 这个案例只是单纯展示用。在实际使用中,我们不会仅仅为了这点任务就使用线程池。
- Java7 中你将看到使用匿名内部类而不是lamda 来实现这个任务。
Pausing Execution 暂停执行处理器时间对其他线程可用。sleep 方法有两个复写方法在Thread 类制作。
- 指定sleep 时间
public static void sleep(long millis) throws InterruptedException
- 指定sleep时间
public static void sleep(long millis, int nanos)
Thread 源码介绍
这对于系统核心的调度是非常重要的。这个可能产生不可预测的结果,而且有些实现甚至不考虑nano s参数。我们建议在 用try catch 包住 Thread.sleep 操作并且catch InterruptedException. 异常。
线程中断/终止线程
- 每个Java线程都有一个interrupt flag,默认的是false。打断一个线程,最基础的就是将这个flag 设置成true。在这个线程中执行的代码会暗中观察这个标记,然后做出反应。当然代码也可以忽略这个flag。但是为啥每个线程都要树flag?毕竟在线程中有一个Boolean 变量我们可以更好的管理线程。当然了在线程里面还有一些特别的方法,他们会在线程被中断的时候运行。这些方法叫做阻塞方法。这些方法会将线程设置成WAITING或是WAITING 状态。当线程是这个状态的话,那么打断线程会抛出一个InterruptedException。而不是interrupt flag 被设置成true。然后这个线程状态有一次变成RUNNABLE。调用阻塞方法的时候会强制要求处理InterruptedException。之后在这个线程中打断的时候就会产生一个WAIT 状态。注意,不是所有方法都要响应中断行为。最终线程被设置成中断状态,然后进入一个阻塞方法,然后立刻抛出一个InterruptedException, interrupt flag 将被清除。
- 与这些原理不同,java 并没有特别的特别语义描述中断。代码非常容易描述打断。但是大多数情况下中断是用来通知一个线程应该尽快停下来。从上面的描述可以清楚的看出,这取决于线程上的代码,对中断作出适当的反应以停止运行。停止线程是一种写作。当一个线程被打断了,它在运行的代码可能会在栈空间下沉好几个level。大多数方法不调用阻塞方法,并且结束时间充足,无须延迟 关闭 线程。代码在一个loop 中执行,处理任务应该首先关注中断。Loop 应该尽可能的初始化任务,检测打断状态来推出loop。对于一个有限的loop,所有任务必须在loop终止之前被执行完毕,以防有任务没有被执行。如果在语义上是可能的,那么它可以
简单地传递InterruptedException断,并声明抛出它。那么它对于它的调用者来说的话它就是一个阻塞方法。如果不能传递异常,那么它至少应该设置打断状态,那么调用者就会知道线程被打断了。在一些案例中,需要持续等待而无视等待异常。在这种情况下,必须延迟设置打断状态,知道它不再等待。这可能调用本地变量,这个本地变量用来检查推出方法和打断方法的优先级。
案例
用中断线程来打断任务执行
class TaskHandler implements Runnable {
private final BlockingQueue<Task> queue;
TaskHandler(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) { //
try {
Task task = queue.take(); // blocking call, responsive to interruption
handle(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private void handle(Task task) {
// actual handling
}
}
}
等待程序执行完毕,延迟设置打断flag
class MustFinishHandler implements Runnable {
private final BlockingQueue<Task> queue;
MustFinishHandler(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
boolean shouldInterrupt = false;
while (true) {
try {
Task task = queue.take();
if (task.isEndOfTasks()) {
if (shouldInterrupt) {
Thread.currentThread().interrupt();
}
return;
}
handle(task);
} catch (InterruptedException e) {
shouldInterrupt = true; // must finish, remember to set interrupt flag when we're
done
}
}
}
private void handle(Task task) {
// actual handling
}
}
固定的任务列表不过在中断时候可能会提前退出。
class GetAsFarAsPossible implements Runnable {
private final List<Task> tasks = new ArrayList<>();
@Override
public void run() {
for (Task task : tasks) {
if (Thread.currentThread().isInterrupted()) {
return;
}
handle(task);
}
}
private void handle(Task task) {
// actual handling
}
}