CompletableFuture异步编程
随着互联网的发展和云计算的兴起,异步编程已成为当今软件开发中的常见需求。Java 8引入了CompletableFuture
类,为异步编程提供了很好的支持。下面将详细介绍CompletableFuture
的使用,包括创建和维护、异步执行、串行和并行执行、异常处理等内容。
创建和维护CompletableFuture
CompletableFuture
类是Java 8中Future
接口的扩展,支持更多的异步编程功能。要创建一个CompletableFuture
对象,可以使用静态方法CompletableFuture.supplyAsync()
或CompletableFuture.runAsync()
。下面是一个CompletableFuture
的创建示例代码:
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) {
// 创建CompletableFuture对象
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
// 获取CompletableFuture返回结果
int result = future.join();
System.out.println(result);
}
}
在上面的示例中,我们使用CompletableFuture.supplyAsync()
方法创建了一个返回结果为Integer
类型的CompletableFuture
对象,并使用Lambda表达式定义了异步任务。在异步执行完成后,我们使用join()
方法获取异步执行的结果,并输出到控制台上。
另外,CompletableFuture
还提供了一系列的方法用于维护、组合和拆分异步任务,包括thenApply()
、thenAccept()
、thenRun()
等等。这些方法可以在异步执行完成后,对结果进行操作和处理。下面是一个CompletableFuture
的维护示例代码:
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) throws InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
future.thenApply(result -> {
System.out.println(result);
return result * 2;
}).thenAccept(System.out::println);
Thread.sleep(3000);
}
}
在上面的示例中,我们使用thenApply()
方法实现了对异步结果的转换,将结果乘以2。使用thenAccept()
方法输出最终的结果。注意,thenApply()
方法会返回一个新的CompletableFuture
对象,在后续方法中需要对这个新对象进行操作和处理。
异步执行
CompletableFuture
类支持异步执行任务,使用线程池来支持多个并发执行的任务。可以使用thenApplyAsync()
、thenAcceptAsync()
、thenRunAsync()
等方法来异步执行任务。下面是一个异步执行任务的示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
private static ExecutorService threadPool = Executors.newFixedThreadPool(3);
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
}, threadPool);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 200;
}, threadPool);
CompletableFuture<Integer> future3 = future1.thenCombineAsync(future2, (result1, result2) -> {
return result1 + result2;
}, threadPool);
System.out.println(future3.join());
}
}
在上面的示例中,我们使用Executors.newFixedThreadPool()
方法创建了一个包含三个线程的线程池,在CompletableFuture.supplyAsync()
方法中传入了这个线程池。然后,我们创建了两个CompletableFuture
对象,并异步执行它们。在异步执行完成后,使用thenCombineAsync()
方法将两个异步结果合并,并在第三个线程中执行合并操作。最后,使用join()
方法等待异步执行完成,并输出结果到控制台上。
串行和并行执行
在异步编程中,有时需要对多个异步任务进行串行或并行执行。CompletableFuture
类提供了一系列的方法来实现这些执行方式,包括thenCompose()
、thenCombine()
、allOf()
、anyOf()
等等。
串行执行
thenCompose()
方法可以用于实现多个异步任务的串行执行。下面是一个串行执行任务的示例代码:
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 200;
});
CompletableFuture<Integer> future3 = future1.thenCompose(result1 -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result1 + 100;
}));
CompletableFuture<Integer> future4 = future2.thenCompose(result2 -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result2 + 200;
}));
CompletableFuture<Integer> future5 = future3.thenCombine(future4, (result1, result2) -> {
return result1 + result2;
});
System.out.println(future5.join());
}
}
在上面的示例中,我们使用thenCompose()
方法将两个异步任务串行执行,在第一个任务的结果基础上继续执行第二个任务。在异步任务串行执行完成后,使用thenCombine()
方法将两个异步结果合并。
并行执行
allOf()
方法可以用于实现多个异步任务的并行执行,等待所有的异步任务完成并合并结果。下面是一个并行执行任务的示例代码:
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 200;
});
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 300;
});
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
combinedFuture.join();
System.out.println(future1.join() + future2.join() + future3.join());
}
}
在上面的示例中,我们创建了三个异步任务,并使用CompletableFuture.allOf()
方法将它们并行执行,等待它们全部完成。最后,使用join()
方法获取三个异步任务的结果,并进行合并和输出。注意,CompletableFuture.allOf()
方法返回的是一个对结果无用的Void
对象,实际的结果需要通过异步任务的join()
方法获取。
异常处理
在异步编程中,异常处理也是必不可少的。CompletableFuture
类提供了一系列的方法来支持异常处理,包括exceptionally()
、handle()
、whenComplete()
、whenCompleteAsync()
、handleAsync()
等等。
exceptionally()
exceptionally()
方法支持在异步任务执行过程中的异常情况下,执行备用任务或提供一个默认值。下面是一个exceptionally()
方法的使用示例代码:
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("exceptionally error!");
});
CompletableFuture<Integer> future2 = future.exceptionally(e -> {
System.out.println(e.getMessage());
return 0;
});
System.out.println(future2.join());
}
}
在上面的示例中,我们创建了一个异步任务并抛出一个运行时异常。在异常捕获后,我们使用exceptionally()
方法进行备用处理,并返回一个默认值0。
handle()
handle()
方法支持处理异步任务的结果和异常。下面是一个handle()
方法的使用示例代码:
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<String> future2 = future.handle((result, ex) -> {
if (ex != null) {
return ex.getMessage();
} else {
return "Result = " + result;
}
});
System.out.println(future2.join());
}
}
在上面的示例中,我们创建了一个异步任务并定义了一个处理函数。在异步任务执行完成后,调用处理函数进行处理。如果异步任务出现异常,则处理函数会返回异常的错误消息;如果异步任务正常执行,则处理函数会返回结果字符串。
whenComplete()和whenCompleteAsync()
whenComplete()
和whenCompleteAsync()
方法支持对异步任务的结果和异常进行处理,但不会修改原始结果。这两个方法可以用于在结果或异常处理完成后执行特定的操作。下面是一个whenComplete()
和whenCompleteAsync()
方法的使用示例代码:
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
future.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println(ex.getMessage());
} else {
System.out.println("Result = " + result);
}
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
return 200;
});
future2.whenCompleteAsync((result, ex) -> {
if (ex != null) {
System.out.println(ex.getMessage());
} else {
System.out.println("Result = " + result);
}
});
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future, future2);
combinedFuture.join();
}
}
在上面的示例中,我们创建了两个异步任务,并使用whenComplete()
和whenCompleteAsync()
方法对它们的结果或异常进行处理。在处理完成后,我们输出相应的消息到控制台上。
handleAsync()
handleAsync()
方法和handle()
方法类似,不同的是handleAsync()
方法是在一个新的线程中执行处理函数。这个方法适用于当结果处理需要较长时间或涉及阻塞操作时。下面是一个handleAsync()
方法的使用示例代码:
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) throws InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
CompletableFuture<Integer> future2 = future.handleAsync((result, ex) -> {
if (ex != null) {
return -1;
} else {
return result * 2;
}
});
System.out.println(future2.join());
Thread.sleep(3000);
}
}
在上面的示例中,我们创建了一个异步任务,并使用handleAsync()
方法对结果进行处理,在处理完成后返回结果的两倍。注意,在输出结果之前,我们使用join()
方法等待异步任务的完成。