八、Callable
package chapter04;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* @Author Noperx
* @Create 2021-02-27 22:07
*/
public class JUCTest08 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> stringFutureTask = new FutureTask<String>(new MyThread());
new Thread(stringFutureTask).start();
String s = stringFutureTask.get(); //返回结果需要等很长时间时,使用异步处理来获取这个结果
System.out.println(s);
}
}
class MyThread implements Callable {
@Override
public Object call() throws Exception {
System.out.println("call()");
return "3.1415926";
}
}

1614445280651.png
Callable 接口可以抛异常,可以有返回值
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Callable 会阻塞,有缓存
以下例子说明有缓存

1614446199391.png
九、并发辅助类
1、CountDownLatch

1614476070266.png
package chapter04;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
/**
* @Author Noperx
* @Create 2021-02-27 22:07
*/
public class JUCTest09 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(8);
for (int i = 1; i <= 8; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" out");
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("close the door!");
}
}
2、CyclicBarrier

1614477504381.png
package chapter04;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
/**
* @Author Noperx
* @Create 2021-02-27 22:07
*/
public class JUCTest09 {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
System.out.println("五福到");
});
for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集福卡"+num+"张");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
3、Semaphore

1614479009671.png
package chapter04;
import java.util.concurrent.*;
/**
* @Author Noperx
* @Create 2021-02-27 22:07
*/
public class JUCTest09 {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(3); // 通常用于限流
for (int i = 1; i <= 5; i++) {
final int num = i;
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
十、ReadWriteLock

1614481159436.png
package chapter04;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @Author Noperx
* @Create 2021-02-28 10:42
*/
public class JUCTest10 {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(()->{
myCache.write(temp+"",temp+"");
},String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(()->{
myCache.read(temp+"");
},String.valueOf(i)).start();
}
}
}
class MyCache{
private volatile Map<String ,String> myMap = new HashMap<>();
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void write(String k, String v){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"开始存入"+v);
myMap.put(k, v);
System.out.println(Thread.currentThread().getName()+"完成存入"+v);
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
public void read(String k){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"开始读取"+k);
myMap.get(k);
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
十一、阻塞队列

1614484125864.png
package chapter04;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @Author Noperx
* @Create 2021-02-28 11:49
*/
public class JUCTest11 {
public static void main(String[] args) {
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.add(1);
blockingQueue.add(2);
blockingQueue.add(3);
// blockingQueue.add(4);//Exception in thread "main" java.lang.IllegalStateException: Queue full
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());//Exception in thread "main" java.util.NoSuchElementException
}
}
package chapter04;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @Author Noperx
* @Create 2021-02-28 11:49
*/
public class JUCTest11 {
public static void main(String[] args) {
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer(1);
blockingQueue.offer(2);
blockingQueue.offer(3);
blockingQueue.offer(4);
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll()); //null
}
}
package chapter04;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @Author Noperx
* @Create 2021-02-28 11:49
*/
public class JUCTest11 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.put(3);
// blockingQueue.put(4);//一直阻塞在这
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());//一直阻塞在这
}
}
SynchronousQueue
package chapter04;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* @Author Noperx
* @Create 2021-02-28 11:49
*/
public class JUCTest11 {
public static void main(String[] args) throws InterruptedException {
//SynchronousQueue不存储元素,必须存一个取一个
BlockingQueue<Object> synchronousQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"存1");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName()+"存2");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName()+"存3");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);// 这里的等待是为了保证先存了一个元素
System.out.println(Thread.currentThread().getName()+"取"+synchronousQueue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+"取"+synchronousQueue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+"取"+synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
}
十二、线程池
线程池:优化资源使用,提高效率

1614495637892.png
1、创建线程池的三种方法,不建议使用
package chapter04;
import java.util.concurrent.*;
/**
* @Author Noperx
* @Create 2021-02-28 14:49
*/
public class JUCTest12 {
public static void main(String[] args) {
// 三种线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// ExecutorService executorService = Executors.newSingleThreadExecutor();
// ExecutorService executorService = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();//一定要释放资源
}
}
}
2、ThreadPoolExecutor这个方法,建议使用
底层用的都是ThreadPoolExecutor这个方法,该方法有7个参数,建议使用
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
3、自定义线程池
package chapter04;
import java.util.concurrent.*;
/**
* @Author Noperx
* @Create 2021-02-28 14:49
*/
public class JUCTest12 {
public static void main(String[] args) {
//自定义线程池
/*
拒绝策略:
AbortPolicy 抛出异常java.util.concurrent.RejectedExecutionException
CallerRunsPolicy 从哪儿来回哪儿去 main work
DiscardPolicy 丢掉任务,不抛出异常
DiscardOldestPolicy 尝试和最早的任务竞争,不抛出异常
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(),
5,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
try {
for (int i = 0; i < 9; i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" work");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
4、CPU密集型 IO密集型
CPU密集型:充分利用CPU个数,有多少个用多少个Runtime.getRuntime().availableProcessors()
IO密集型:保证最大线程数大于操作IO的线程数
十三、函数式接口
package chapter04;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
/**
* @Author Noperx
* @Create 2021-02-28 16:51
*/
public class JUCTest13 {
public static void main(String[] args) {
//四大函数式接口
Function<String, String> function = (str)->{return str;};
System.out.println(function.apply("hello"));
Predicate<String> predicate = (str)->{return str.isEmpty();};
System.out.println(predicate.test(""));
//
Supplier<String> Supplier = ()->{return "hello";};
System.out.println(Supplier.get());
Consumer<String> consumer = (str)->{ System.out.println("hello");};
consumer.accept("hello");
}
}
public interface Function<T, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
}
@FunctionalInterface
public interface Predicate<T> {
/**
* Evaluates this predicate on the given argument.
*
* @param t the input argument
* @return {@code true} if the input argument matches the predicate,
* otherwise {@code false}
*/
boolean test(T t);
}
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
@FunctionalInterface
public interface Consumer<T> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
*/
void accept(T t);
}
十四、Stream流计算

1614504027082.png
package chapter04;
import java.util.Arrays;
import java.util.List;
/**
* @Author Noperx
* @Create 2021-02-28 17:20
*/
public class JUCTest14 {
public static void main(String[] args) {
/*
1、id为偶数
2、年龄大于20
3、name转为小写
4、name倒序排序
5、只输出一个用户
*/
User user1 = new User(1, "A", 24);
User user2 = new User(2, "B", 18);
User user3 = new User(3, "C", 19);
User user4 = new User(4, "D", 24);
User user5 = new User(5, "E", 32);
User user6 = new User(6, "F", 43);
// 链式编程,lambda表达式,函数式接口,流计算
List<User> users = Arrays.asList(user1, user2, user3, user4, user5, user6);
users.stream().filter(u->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>=20;})
.map(u->{return u.getName().toLowerCase();})
.sorted((u1, u2)->{return u2.compareTo(u1);})
.limit(1)
.forEach(System.out::println);
}
}
class User{
private int id;
private String name;
private int age;
public User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
十五、ForkJoinPool
ForkJoin:用于分解任务,提高计算效率;工作窃取(双端队列)
package chapter04;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
/**
* @Author Noperx
* @Create 2021-02-28 18:23
*/
public class JUCTest15 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Long start = System.currentTimeMillis();
// Long sum = 0L;
// for (long i = 0L; i <= 10_0000_0000L; i++) {
// sum += i;
// }
// System.out.println(sum);
// Long end = System.currentTimeMillis();
// System.out.println(end-start);//3085
Long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> submit = forkJoinPool.submit(new ForkJoinTest(0L, 10_0000_0000L));
System.out.println(submit.get());
Long end = System.currentTimeMillis();
System.out.println(end-start); //5078
// Long start = System.currentTimeMillis();
// long reduce = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0L, Long::sum);
// System.out.println(reduce);
// Long end = System.currentTimeMillis();
// System.out.println(end-start); //223
}
}
class ForkJoinTest extends RecursiveTask<Long> {
private long start;
private long end;
private Long temp = 1_000L;
public ForkJoinTest(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if ((end-start)<temp){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else {
Long middle = (end+start)/2;
ForkJoinTest forkJoinTest = new ForkJoinTest(start, middle);
forkJoinTest.fork();
ForkJoinTest forkJoinTest2 = new ForkJoinTest(middle+1, end);
forkJoinTest2.fork();
return forkJoinTest.join()+forkJoinTest2.join();
}
}
}
十六、异步回调
package chapter04;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @Author Noperx
* @Create 2021-02-28 21:53
*/
public class JUCTest16 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//无返回值的异步回调
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"runAsync");
});
System.out.println("hello");
voidCompletableFuture.get();
// 有换回结果的回调
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
int i = 1/0;
return 200;
});
integerCompletableFuture.whenComplete((t, u)->{
System.out.println(t);
System.out.println(u);
}).exceptionally((e)->{e.printStackTrace();
return 400;}).get();
}
}