JUC并发编程(中)

八、Callable

package chapter04;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * @Author Noperx
 * @Create 2021-02-27 22:07
 */
public class JUCTest08 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> stringFutureTask = new FutureTask<String>(new MyThread());
        new Thread(stringFutureTask).start();
        String s = stringFutureTask.get(); //返回结果需要等很长时间时,使用异步处理来获取这个结果
        System.out.println(s);

    }
}

class MyThread implements Callable {

    @Override
    public Object call() throws Exception {
        System.out.println("call()");
        return "3.1415926";
    }
}

1614445280651.png

Callable 接口可以抛异常,可以有返回值

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable 会阻塞,有缓存

以下例子说明有缓存

1614446199391.png

九、并发辅助类

1、CountDownLatch

1614476070266.png
package chapter04;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

/**
 * @Author Noperx
 * @Create 2021-02-27 22:07
 */
public class JUCTest09 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(8);
        for (int i = 1; i <= 8; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" out");
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        System.out.println("close the door!");


    }
}

2、CyclicBarrier

1614477504381.png
package chapter04;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;

/**
 * @Author Noperx
 * @Create 2021-02-27 22:07
 */
public class JUCTest09 {
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
            System.out.println("五福到");
        });

        for (int i = 0; i < 5; i++) {
            final int num = i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"收集福卡"+num+"张");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }


    }
}

3、Semaphore

1614479009671.png
package chapter04;

import java.util.concurrent.*;

/**
 * @Author Noperx
 * @Create 2021-02-27 22:07
 */
public class JUCTest09 {
    public static void main(String[] args) throws InterruptedException {

        Semaphore semaphore = new Semaphore(3); // 通常用于限流
        for (int i = 1; i <= 5; i++) {
            final int num = i;
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"抢到车位");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName()+"离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }
    }
}

十、ReadWriteLock

1614481159436.png
package chapter04;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @Author Noperx
 * @Create 2021-02-28 10:42
 */
public class JUCTest10 {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();
        for (int i = 0; i < 5; i++) {
            final int temp = i;
            new Thread(()->{
                myCache.write(temp+"",temp+"");
            },String.valueOf(i)).start();
        }

        for (int i = 0; i < 5; i++) {
            final int temp = i;
            new Thread(()->{
                myCache.read(temp+"");
            },String.valueOf(i)).start();
        }
    }
}


class MyCache{
    private volatile Map<String ,String> myMap = new HashMap<>();
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    public void write(String k, String v){
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"开始存入"+v);
            myMap.put(k, v);
            System.out.println(Thread.currentThread().getName()+"完成存入"+v);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }

    public void read(String k){
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"开始读取"+k);
            myMap.get(k);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
}

十一、阻塞队列

1614484125864.png
package chapter04;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @Author Noperx
 * @Create 2021-02-28 11:49
 */
public class JUCTest11 {
    public static void main(String[] args) {
        BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);

        blockingQueue.add(1);
        blockingQueue.add(2);
        blockingQueue.add(3);
//        blockingQueue.add(4);//Exception in thread "main" java.lang.IllegalStateException: Queue full

        System.out.println(blockingQueue.element());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());//Exception in thread "main" java.util.NoSuchElementException
    }
}

package chapter04;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @Author Noperx
 * @Create 2021-02-28 11:49
 */
public class JUCTest11 {
    public static void main(String[] args) {
        BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);

        blockingQueue.offer(1);
        blockingQueue.offer(2);
        blockingQueue.offer(3);
        blockingQueue.offer(4);

        System.out.println(blockingQueue.peek());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll()); //null
    }
}

package chapter04;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @Author Noperx
 * @Create 2021-02-28 11:49
 */
public class JUCTest11 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);

        blockingQueue.put(1);
        blockingQueue.put(2);
        blockingQueue.put(3);
//        blockingQueue.put(4);//一直阻塞在这

        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());//一直阻塞在这
    }
}

SynchronousQueue

package chapter04;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * @Author Noperx
 * @Create 2021-02-28 11:49
 */
public class JUCTest11 {
    public static void main(String[] args) throws InterruptedException {

        //SynchronousQueue不存储元素,必须存一个取一个
        BlockingQueue<Object> synchronousQueue = new SynchronousQueue<>();

        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"存1");
                synchronousQueue.put("1");
                System.out.println(Thread.currentThread().getName()+"存2");
                synchronousQueue.put("2");
                System.out.println(Thread.currentThread().getName()+"存3");
                synchronousQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        },"A").start();

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1);// 这里的等待是为了保证先存了一个元素
                System.out.println(Thread.currentThread().getName()+"取"+synchronousQueue.take());
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName()+"取"+synchronousQueue.take());
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName()+"取"+synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"B").start();
    }
}

十二、线程池

线程池:优化资源使用,提高效率

1614495637892.png

1、创建线程池的三种方法,不建议使用

package chapter04;

import java.util.concurrent.*;

/**
 * @Author Noperx
 * @Create 2021-02-28 14:49
 */
public class JUCTest12 {
    public static void main(String[] args) {
        // 三种线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
//        ExecutorService executorService = Executors.newSingleThreadExecutor();
//        ExecutorService executorService = Executors.newCachedThreadPool();


        try {
            for (int i = 0; i < 10; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();//一定要释放资源
        }
    }
}


2、ThreadPoolExecutor这个方法,建议使用

底层用的都是ThreadPoolExecutor这个方法,该方法有7个参数,建议使用

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

3、自定义线程池

package chapter04;

import java.util.concurrent.*;

/**
 * @Author Noperx
 * @Create 2021-02-28 14:49
 */
public class JUCTest12 {
    public static void main(String[] args) {
        //自定义线程池
        /*
        拒绝策略:
        AbortPolicy 抛出异常java.util.concurrent.RejectedExecutionException
        CallerRunsPolicy 从哪儿来回哪儿去 main work
        DiscardPolicy 丢掉任务,不抛出异常
        DiscardOldestPolicy 尝试和最早的任务竞争,不抛出异常
         */
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                Runtime.getRuntime().availableProcessors(),
                5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy()
        );


        try {
            for (int i = 0; i < 9; i++) {
                threadPoolExecutor.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" work");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
}

4、CPU密集型 IO密集型

CPU密集型:充分利用CPU个数,有多少个用多少个Runtime.getRuntime().availableProcessors()

IO密集型:保证最大线程数大于操作IO的线程数

十三、函数式接口

package chapter04;

import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
 * @Author Noperx
 * @Create 2021-02-28 16:51
 */
public class JUCTest13 {
    public static void main(String[] args) {
        //四大函数式接口
        Function<String, String> function = (str)->{return str;};
        System.out.println(function.apply("hello"));

        Predicate<String> predicate = (str)->{return str.isEmpty();};
        System.out.println(predicate.test(""));
//
        Supplier<String> Supplier = ()->{return "hello";};
        System.out.println(Supplier.get());

        Consumer<String> consumer = (str)->{ System.out.println("hello");};
        consumer.accept("hello");


    }
}
public interface Function<T, R> {

    /**
     * Applies this function to the given argument.
     *
     * @param t the function argument
     * @return the function result
     */
    R apply(T t);
}


@FunctionalInterface
public interface Predicate<T> {

    /**
     * Evaluates this predicate on the given argument.
     *
     * @param t the input argument
     * @return {@code true} if the input argument matches the predicate,
     * otherwise {@code false}
     */
    boolean test(T t);
}


@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}


@FunctionalInterface
public interface Consumer<T> {

    /**
     * Performs this operation on the given argument.
     *
     * @param t the input argument
     */
    void accept(T t);
}

十四、Stream流计算

1614504027082.png
package chapter04;

import java.util.Arrays;
import java.util.List;

/**
 * @Author Noperx
 * @Create 2021-02-28 17:20
 */
public class JUCTest14 {
    public static void main(String[] args) {
        /*
        1、id为偶数
        2、年龄大于20
        3、name转为小写
        4、name倒序排序
        5、只输出一个用户
         */
        User user1 = new User(1, "A", 24);
        User user2 = new User(2, "B", 18);
        User user3 = new User(3, "C", 19);
        User user4 = new User(4, "D", 24);
        User user5 = new User(5, "E", 32);
        User user6 = new User(6, "F", 43);

        // 链式编程,lambda表达式,函数式接口,流计算
        List<User> users = Arrays.asList(user1, user2, user3, user4, user5, user6);
        users.stream().filter(u->{return u.getId()%2==0;})
                .filter(u->{return u.getAge()>=20;})
                .map(u->{return u.getName().toLowerCase();})
                .sorted((u1, u2)->{return u2.compareTo(u1);})
                .limit(1)
                .forEach(System.out::println);

    }
}


class User{
    private int id;
    private String name;
    private int age;

    public User(int id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

十五、ForkJoinPool

ForkJoin:用于分解任务,提高计算效率;工作窃取(双端队列)

package chapter04;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

/**
 * @Author Noperx
 * @Create 2021-02-28 18:23
 */
public class JUCTest15 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

//        Long start = System.currentTimeMillis();
//        Long sum = 0L;
//        for (long i = 0L; i <= 10_0000_0000L; i++) {
//            sum += i;
//        }
//        System.out.println(sum);
//        Long end = System.currentTimeMillis();
//        System.out.println(end-start);//3085

        Long start = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> submit = forkJoinPool.submit(new ForkJoinTest(0L, 10_0000_0000L));
        System.out.println(submit.get());
        Long end = System.currentTimeMillis();
        System.out.println(end-start); //5078

//        Long start = System.currentTimeMillis();
//        long reduce = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0L, Long::sum);
//        System.out.println(reduce);
//        Long end = System.currentTimeMillis();
//        System.out.println(end-start); //223
    }
}


class ForkJoinTest extends RecursiveTask<Long> {
    private long start;
    private long end;
    private Long temp = 1_000L;


    public ForkJoinTest(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if ((end-start)<temp){
            Long sum = 0L;
            for (Long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }else {
            Long middle = (end+start)/2;
            ForkJoinTest forkJoinTest = new ForkJoinTest(start, middle);
            forkJoinTest.fork();
            ForkJoinTest forkJoinTest2 = new ForkJoinTest(middle+1, end);
            forkJoinTest2.fork();
            return forkJoinTest.join()+forkJoinTest2.join();
        }
    }
}

十六、异步回调

package chapter04;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @Author Noperx
 * @Create 2021-02-28 21:53
 */
public class JUCTest16 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //无返回值的异步回调
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"runAsync");
        });
        System.out.println("hello");
        voidCompletableFuture.get();


        // 有换回结果的回调
        CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
            int i = 1/0;
            return 200;
        });
        integerCompletableFuture.whenComplete((t, u)->{
            System.out.println(t);
            System.out.println(u);
        }).exceptionally((e)->{e.printStackTrace();
        return 400;}).get();
    }
}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容