前言:接口测试平台由于越来越多的用例维护在上面,5000~6000条的用例执行线上执行1小时左右,而预发环境由于机器性能的原因可能要达到2个小时,这时候效率对我们来说就很重要了,所以我们将整个执行流程优化,通过多线程执行,来提升我们的效率;简单介绍下我在优化执行过程中用到的一些多线程的知识;
Java 中实现多线程有两种方法:继承 Thread 类、实现 Runnable 接口,大部分以实现Runnable 接口为主,但是在实际开发中,开发很少会在应用中自行显式的去创建线程,所以我们介绍一下常用的多线程类。
一,CompletableFuture
CompletableFuture 是JDK1.8提供的异步线程类,提供了下面两个常见方法来创建一个异步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
runAsync方法不支持返回值, runAsync 方法返回的对象只能用来确认该操作是否已经完成而不能获取到内容。
supplyAsync可以支持返回值,支持拿到return的值。
默认会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码,缺点:会尽可能起多个线程去执行(数量取决于机器的性能,例如cpu的核数)。
runAsync实例:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("run end ...");
});
supplyAsync实例:
List<Owner> after=list.stream().peek(owner -> {
CompletableFuture<Owner> future = CompletableFuture.supplyAsync(() -> {
//执行内容
owner.setName(owner.getName()+"*");
System.out.println("run ..."+owner.getName()+":"+owner.getOwner());
return owner;
});
System.out.print("$"+future.join()+"\n");
}).collect(Collectors.toList());
1.1获取对象
public T get() throws InterruptedException, ExecutionException;
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
public T join();
public T getNow(T valueIfAbsent);
get 和join方法:会在正常的情况下返回值,在遇到异常的情况下将异常抛出,如果线程正在执行,会等待到线程执行完成才会返回;
区别:get 抛出 checked exception,必须用必须用 try-catch 包裹;
join 抛出 unchecked exception,不需要用 try-catch 包裹;
getNow 方法不会阻塞线程,而是立即返回值,如果该 future 当前没有完成,则会立刻返回该方法的传入参数;
2.2错误处理
没有使用 Try的时候,使用 exceptionally 方式处理可能出现的异常:
List<Owner> after = list.stream().peek(owner -> {
CompletableFuture<Owner> future = CompletableFuture.supplyAsync(() -> {
//执行内容
owner.setName(owner.getName() + "*");
System.out.println("run ..." + owner.getName() + ":" + owner.getOwner());
return owner;
}).exceptionally(e -> {
System.out.println("future exception = " + e);
return null;//返回默认值
});
}).collect(Collectors.toList());
handle 是执行任务完成时对结果的处理,(包括结果,异常);
3.3组合 Future
thenApply 与 thenCompose 是 CompletableFuture Monad 的核心 API,对应 Scala 中的 map 和 flatMap;
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "hello")
.thenCompose(r -> CompletableFuture.c(() -> r + " world"));
String result = f2.get();
// hello world
System.out.println(result);
3.4并行处理 Future
当并行处理多个 Future 时,一般需要等待所有 Future 执行结束,将它们的结果作为整体进行处理。
CompletableFuture.allOf 静态方法就是为此而生,它会等待所有参数 Future,直到它们都完成,.allOf 方法,任何一个执行完;
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "beautiful");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "world");
// allOf 等待所有 Future 执行结束,其返回值为 Void
CompletableFuture<Void> f = CompletableFuture.allOf(f1, f2, f3);
但它可以通过流的方式来实现:
总结:我们会发现单纯的通过异步线程来执行,不指定线程池的大小,有多少待执行的任务就会起多少个线程(当然机器性能会限制并不是每个线程都能在执行),会很浪费线程资源;
二,Executors(线程池)
指定线程池,则使用指定的线程池来进行运行;CompletableFuture也提供了指定线程池的方法:
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
合理的使用线程池能够带来以下明显的好处:
1.可以自定义指定线程池,例如大小,超时等等;
2.降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗;
为了合理的利用线程池,指定线程池的大小,就会起指定个数的线程x,且用这x个线程去执行待执行的任务(很多个),避免了大量线程池的创建和销毁;
- 由于场景需要将线程池和账号池关联起来,一个线程对应一个执行账号资源,项目实际多模块并发的代码如下:
private void modelParallelRun(List<ExecuteCaseBean> executeCaseBeanList, Map<Integer, List<ApiDataDTO>> caseForModule, Map<Integer,
List<ApiDataDTO>> caseForModuleDelay, RunSupplies runSupplies, ArrayBlockingQueue<LoginAccountDTO> accountQueue) {
ExecutorService executor = Executors.newFixedThreadPool(accountQueue.size());
executeCaseBeanList.stream().map((executeCaseBean -> CompletableFuture.runAsync(() -> {
try {
LoginAccountDTO loginAccount = accountQueue.take();
if (loginAccount != null) {
this.runParentModule(executeCaseBean, caseForModule ,caseForModuleDelay, runSupplies, loginAccount);
accountQueue.put(loginAccount);
}
} catch (Exception e) {
log.warn("get account has a problem");
}
}, executor))).collect(Collectors.toList()).stream().map(CompletableFuture-> CompletableFuture.join()).collect(Collectors.toList());
executor.shutdown();
}
Executors 提供了一系列工厂方法用于创先线程池,返回的线程池都实现了 ExecutorService 接口:
public static ExecutorService newFixedThreadPool(int nThreads)
创建固定数目线程的线程池。
public static ExecutorService newCachedThreadPool()
创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newSingleThreadExecutor()
创建一个单线程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
但是这种方式并不被阿里推荐(网上解释说大流量的时候会容易出现问题):
同时也建议:
1,创建线程或线程池时请指定有意义的线程名称,方便出错时回溯;
2,不建议在应用中自行显式创建线程;
三,ThreadPoolExecutor
首先,它的线程池的流程图:
step1.调用ThreadPoolExecutor的execute提交线程,首先检查CorePool,如果CorePool内的线程小于CorePoolSize,新创建线程执行任务。
step2.如果当前CorePool内的线程大于等于CorePoolSize,那么将线程加入到BlockingQueue。
step3.如果不能加入BlockingQueue,在小于MaxPoolSize的情况下创建线程执行任务。
step4.如果线程数大于等于MaxPoolSize,那么执行拒绝策略。
任务提交时,判断的顺序为 corePoolSize --> workQueue --> maximumPoolSize
ThreadPoolExecutor parallelRunExecutor = new ThreadPoolExecutor(accountQueue.size(), accountQueue.size(), 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(executeCaseBeanList.size()));
corePoolSize 核心线程池大小
maximumPoolSize 线程池最大容量大小
keepAliveTime 线程池空闲时,线程存活的时间
TimeUnit 时间单位
ThreadFactory 线程工厂
BlockingQueue任务队列
RejectedExecutionHandler 线程拒绝策略
四,ArrayBlockingQueue阻塞队列+ConcurrentHashMap线程安全map
1,ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,其内部按先进先出的原则对元素进行排序,其中put方法和take方法为添加和删除的阻塞方法;加了lock的锁,如果没有取到会一直等待;
2,ConcurrentHashMap 是一个并发散列映射表的实现,它允许完全并发的读取,并且支持给定数量的并发更新。
1)相比于HashMap,它支持并发;
2)相比于 HashTable性能更好,因为HashTable使用一个全局的锁来同步不同线程间的并发访问。同一时间点,只能有一个线程持有锁,也就是说在同一时间点,只能有一个线程能访问容器;
五,并发编程注意点:
1,线程内的执行逻辑,线程内的资源一定要new一个新对象才会是线程内独享的,否则会在线程之间公享;
2,公共资源一定要用线程安全的类,例如要用ConcurrentHashMap,不能用hashMap;
3,并发编程的时候,调试一定建议通过打log的方式去看,不要打断点去看,会让你怀疑人生(因为取共享资源的时候会有问题);
4,并发编程的代码可能看起来很简单,但整个执行流程为了支持并发占了大头;