什么高深的道理,一篇好的例子都能讲透;如果不行,那就两篇。如果还不行,文末看答案。
CompletableFuture是Java 8引入的,一个很好用的特性,但被Lambda、Stream等的光环给盖住了。
总的来说,CompletableFuture把Java 5引入的Future的锅都补上了,具体直接看例子吧。
例子说明
本文所述的例子纯属虚构,如有雷同,纯属巧合(可联系作者删除),版权归作者所有。
例子主要服务于问题/原理讲解,以通熟易懂为主。可能与现实或常识脱钩,请别太较真。
这是一个虚构的订单分拣的例子:
一个订单只包括一个商品,数量不限(>=1)
-
4个仓库中心(010,020,021,023),包含每个商品在具体哪个货架多少数量等明细
- 仓库不总是可用的(譬如错峰放假、运送区域不支持等)
有一个全局的库存汇总(无明细,可快速查询全国某商品的总数)
-
为了提高分库扣减库存的有效性
- 先查询总库,只有总库存数量足够,才开始进行分库库存分拣
在分拣的同时,订单可能被撤销。一旦撤销,本次分拣结束
-
为使代码简单
库存分拣只是查询分库库存
库存回撤没有任何实现
实现是有状态的,所以不是线程安全的
彩色文字是后面对应的代码调用。看代码的时候可以结合流程图加深理解。
大概的接口定义(延迟模拟远程调用):
-
int StockService.query(String product)
: 查总库库存 (快速)- 随机延迟100~200ms
-
int StockService.pick(String repo, String product)
: 查/锁定分库库存(含货架)- 随机延迟500~2500ms
-
Availability RepoService.isAvailable(String repo)
: 查分库是否可用 (快速)- 随机延迟100ms
-
void EventService.listenOrderCancel(String order)
: 监听订单取消事件- 延迟2000ms
-
boolean PackageService.pack(String oid, String pid)
: 分拣订单- 分成2个子类SingleRepoPackageService和MultiRepoPackage类分别演示单仓库和多仓库的实现
-
共用代码存放入AbstractRepoPackageService
例子实战 - 单仓库
文末有完整代码
流程总体控制
几个阀门(CompletableFuture,下简称CF)来控制什么时候终止流程:
/* 监听订单取消事件 */
CompletableFuture<Boolean> cancelListener =
runAsync(() -> eventService.listenOrderCancel(oid))
.thenApply(reason -> false);
/* 分拣结束标记 */
CompletableFuture<Boolean> allocated = new CompletableFuture<>();
/* 总开关(权衡订单取消和分拣结束) */
CompletableFuture<Boolean> terminator =
allocated.applyToEitherAsync(cancelListener, Function.identity());
监听订单取消事件
使用CF.runAsync
来创建CF实例,并在收到信号后回调(thenApply
)声明分拣失败
分拣结束标记
直接创建一个新实例,其结束将由后面的分拣流程控制(直接调用其complete(boolean)
)
总开关
使用CF的二选一(applyToEitherAsync
)来创建一个新的总控CF:即任何一个CF执行完毕,这个总控就结束。
分拣
supplyAsync(() -> stockService.query(pid))
.whenComplete((stock, e) -> {
if (e != null) {
e.printStackTrace();
allocated.complete(false);
} else if (stock < q) {
logger.log("not enough stock (%d < %d)", stock, q);
allocated.complete(false);
} else {
logger.log("total stock is enough (%d >= %d). " +
"allocating from repos...", stock, q);
startPick(pid);
}
});
先检查总库存(快速)
使用CF.supplyAsync(Supplier<Integer>)
开启异步总库存查询。
根据库存情况
whenComplete(BiConsumer<Integer, Throwable>)
是其执行结果的回调入口,根据库存情况:
- 不足,标识分拣以失败结束(
allocated.complete(false)
)(级联触发总开关结束) - 充足,开始分库分拣(
startPick()
) -
异常处理,标记失败结束(
allocated.complete(false)
)(级联触发总开关结束)
分库分拣
supplyAsync(() -> repoService.isAvailable(repo))
.thenCompose(this::pick)
.thenAccept(this::allocateStock)
.thenRun(this::isAllocatedFully)
// <-- 到这里返回就是startPickFromOneRepo(),后面多仓库会调用
.whenComplete(this::completeAllocate);
检查分库的可用性
supplyAsync(Supplier<Availability>)
创建CF实例,检查分库的可用性
分拣
thenCompose(Function<Availability, Stock>)
回调,根据分库的可用性:
-
不可用,直接创建一个空的CF<Stock>,标识数量为0
CompletableFuture<Stock> dummy = new CompletableFuture<Stock>(); dummy.complete(new Stock(a.repo, 0)); return dummy;
可用,创建新的查询库存的CF<Stock> (
supplyAsync(Supplier<Stock>)
)
return supplyAsync(() -> stockService.pick(a.repo, pid));
分配库存
thenAccept(Consumer<Stock>)
回调,执行实际的库存分配
检查是否库存分配达标
thenRun(Runnable)
回调,如果达标,allocated.complete(true)
标记分拣以成功结束(级联触发总开关结束)
打完收工
whenComplete(BiConsumer<Void, Throwable>)
收尾,处理分配标识还没结束的case(前面库存分配达标时,会标识结束)。这种遗漏的情况标记分拣以失败结束(级联触发总开关结束)
返回结果
return terminator.get(5, TimeUnit.SECONDS);
例子实战 - 多仓库
final CompletableFuture[] queries = new CompletableFuture[repos.size()];
final Iterator<String> iter = repos.iterator();
for (int i = 0; i < queries.length; i++) {
queries[i] = startPickFromOneRepo(iter.next(), pid);
}
allOf(queries).whenComplete(this::completeAllocate);
基本和单仓库类似,主要是循环让单仓库的分拣并行跑。
allOf(CF...)
:必须所有的CF都完成了,才开始回调whenComplete(BiConsumer<Void, Throwable>)
。注意之前的流程图和单仓库处理逻辑,中间:
- 库存分配满额/达标,会提前标识成功分拣结束(其实其他仓库的分配还在并行进行,实际代码需要注意中断/或防止额外再分配库存)
总结
在整个例子中,
创建/开启CF实例
用到的:
CF.supplyAsync(Supplier<T>)
CF.runAsync(Runnable)
new CF()
遗漏的:
handleAsync(Function<T, U>)
关于回调
用到的:
thenApply(Function<T, U>)
whenComplete(BiConsumier<T, Throwable>)
thenCompose(Function<T, U>)
thenAccept(Consumer<T>)
thenRun(Runnable)
遗漏的:
都在了
关于二合一或二选一
用到的:
applyToEitherAsync(CF, Function<T, U>)
遗漏的:
acceptEitherAsync(CF, Consumer<T>)
runAfterBothAsync(CF, Runnable)
runAfterEitherAsync(CF, Runnable)
thenAcceptBothAsync(CF, BiConsumer<T, U>)
thenCombineAsync(CF, BiFunction<T, U, V)
全部或任一
用到的:
allOf(CF...)
遗漏的:
都在了
状态检查
用到的:
isDone()
遗漏的:
isCancelled()
isCompletedExceptionally()
手动标识完成/或异常/取消
用到的:
complete(T)
遗漏的:
completedFuture(U)
completeExceptionally(Throwable)
exceptionally(Function<Throwable, T>)
obtrudeException(Throwable)
obtrudeValue(T)
等待/阻塞
用到的:
get
遗漏的:
getNow
join
CF的接口设计是很有规律的,譬如:
- 一个方法xxx,其相应的会有
xxxAsync
以及xxxAsync(.., Executor)
的变体- 建议使用Executor的那个,可以避免
ForkJoinPool.commonPool()
堆积问题(有空会单独成篇讲一下)
- 建议使用Executor的那个,可以避免
- 参数支持了
Function<T, U>
的,必然有其他类似但接受类型为Consumer<T>
和Runnable
的
理解了一种,对接受其他几个变体应该难度不大。
我们这个例子涵盖了每个大类的1~N个方法,对于CF的使用大概就应该是这样了。
调用套路
- 起手:
supplyAsync
或其他 - 序盘:
thenXxx
或其他的多个串行回调 - 中盘:二合一或二选一
thenCombineAsync
或其他 - 官子:全部allOf + 阻塞get等
注意:不是所有的问题都需要走完整的套路的。
回答开头的问题:
什么高深的道理,一篇好的例子都能讲透;如果不行,那就两篇。如果还不行:
放出所有源代码(guava是唯一的外部依赖)。注意,由于这里有很多随机数,请多跑几遍,应该能跑出所有的可能:
- 总的库存不足,分配失败
[ 109ms] initialized repos: [023, 020, 010, 021]
[ 11ms] pre-checking stock quickly...
[ 135ms] stocks: [total=3, repos={023=0, 020=0, 010=1, 021=2}]
[ 141ms] not enough stock (3 < 5)
[ 142ms] allocated: false
- 订单无故被取消
[ 110ms] initialized repos: [023, 020, 010, 021]
[ 14ms] pre-checking stock quickly...
[ 149ms] stocks: [total=7, repos={023=1, 020=1, 010=2, 021=3}]
[ 158ms] total stock is enough (7 >= 5). allocating from repos...
[ 165ms] repo 021 NOT available
[ 165ms] repo 021 was allocated 0
[ 194ms] repo 020 is available
[ 250ms] repo 010 is available
[ 255ms] repo 023 is available
[1336ms] repo 020 was allocated 1
[2027ms] cancelled with no reason
[2028ms] allocated: false
- 总库存足,但分库的分配没有满额
[ 126ms] initialized repos: [023, 020, 010, 021]
[ 27ms] pre-checking stock quickly...
[ 194ms] stocks: [total=6, repos={023=1, 020=1, 010=2, 021=2}]
[ 203ms] total stock is enough (6 >= 5). allocating from repos...
[ 259ms] repo 023 is available
[ 288ms] repo 020 is available
[ 291ms] repo 021 NOT available
[ 292ms] repo 021 was allocated 0
[ 294ms] repo 010 is available
[1355ms] repo 020 was allocated 1
[1664ms] repo 023 was allocated 1
[1930ms] repo 010 was allocated 2
[1930ms] didn't get enough stock.
[1930ms] allocated: false
- 分配成功
[ 104ms] initialized repos: [023, 020, 010, 021]
[ 13ms] pre-checking stock quickly...
[ 206ms] stocks: [total=6, repos={023=1, 020=1, 010=2, 021=2}]
[ 213ms] total stock is enough (6 >= 5). allocating from repos...
[ 237ms] repo 020 is available
[ 238ms] repo 023 is available
[ 241ms] repo 010 is available
[ 284ms] repo 021 is available
[ 937ms] repo 021 was allocated 2
[1052ms] repo 020 was allocated 1
[1278ms] repo 023 was allocated 1
[1718ms] repo 010 was allocated 2
[1719ms] 6 >= 5
[1719ms] allocated: true
代码版权归作者,仅非商业使用时可无需作者授权即可使用(非常感谢使用时标注来源)。
希望这篇博文能对你有所帮助,喜欢的话点个赞吧!
PackageService.java
package cf;
/**
* @author zhhe.me@gmail.com
* @since 10/9/2018
*/
public interface PackageService {
boolean pack(String oid, String pid);
}
AbstractPackageService.java
package cf;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import static java.util.concurrent.CompletableFuture.*;
import static cf.Util.*;
/**
* Not ThreadSafe and just for demo.
*
* @author zhhe.me@gmail.com
* @since 10/9/2018
*/
public abstract class AbstractPackageService implements PackageService {
protected StockService stockService = new StockService();
protected RepoService repoService = new RepoService();
protected EventService eventService = new EventService();
/* stateful variables. */
protected AtomicInteger queriedQ = new AtomicInteger(0);
protected CompletableFuture<Boolean> allocated = new CompletableFuture<>();
@Override
public boolean pack(String oid, String pid) {
/* set global repos since it's used by single repo + multi repo demos. */
repos.clear();
repos.addAll(getRepos());
logger.log("initialized repos: %s", repos);
/* set started time. */
logger.start();
/* a listener to monitor if this order's cancellation event was emitted. */
final CompletableFuture<Boolean> cancelListener =
runAsync(() -> eventService.listenOrderCancel(oid))
.thenApply(reason -> false);
/* a control to indicate if package was completed. */
final CompletableFuture<Boolean> terminator =
allocated.applyToEitherAsync(cancelListener, Function.identity());
logger.log("pre-checking stock quickly...");
supplyAsync(() -> stockService.query(pid))
.whenComplete((stock, e) -> {
if (e != null) {
e.printStackTrace();
allocated.complete(false);
} else if (stock < q) {
logger.log("not enough stock (%d < %d)", stock, q);
allocated.complete(false);
} else {
logger.log("total stock is enough (%d >= %d). allocating from repos...", stock, q);
startPick(pid);
}
});
try {
// wait until package was completed.
return terminator.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/** repos used to initialize global repos and generate stocks. */
protected abstract Collection<String> getRepos();
/** the entry to kick off pick process. */
protected abstract void startPick(String pid);
/** a process to pick up stock from one repo. */
protected CompletableFuture<Void> startPickFromOneRepo(String repo, String pid) {
return supplyAsync(() -> repoService.isAvailable(repo))
.thenCompose(this::pick)
.thenAccept(this::allocateStock)
.thenRun(this::isAllocatedFully)
;
}
/** pick up stock based on repo's availability. */
protected CompletableFuture<Stock> pick(RepoService.Availability a) {
if (!a.available) {
CompletableFuture<Stock> dummy = new CompletableFuture<Stock>();
dummy.complete(new Stock(a.repo, 0));
return dummy;
} else {
return supplyAsync(() -> stockService.pick(a.repo, pid));
}
}
/** allocate stock. */
protected void allocateStock(Stock stock) {
queriedQ.addAndGet(stock.count);
logger.log("repo %s was allocated %d", stock.repo, stock.count);
}
/** check if all stocks are allocated enough. If yes, stop process. */
protected boolean isAllocatedFully() {
final int i = queriedQ.get();
if (i >= q) {
logger.log("%d >= %d", i, q);
allocated.complete(true);
}
return i >= q;
}
/** complete allocation process. */
protected void completeAllocate(Void v, Throwable e) {
if (e != null) {
e.printStackTrace();
}else if (!allocated.isDone()) {
allocated.complete(false);
logger.log("didn't get enough stock.");
}
}
}
SingleRepoPackageService.java
package cf;
import com.google.common.collect.ImmutableSet;
import java.util.Collection;
import static cf.Util.*;
/**
* Not ThreadSafe and just for demo.
*
* @author zhhe.me@gmail.com
* @since 10/9/2018
*/
public class SingleRepoPackageService extends AbstractPackageService {
private final String repo = "021";
public static void main(String... args) throws Exception {
final boolean result = new SingleRepoPackageService().pack(oid, pid);
logger.log("allocated: %s", result);
}
@Override
protected Collection<String> getRepos() {
return ImmutableSet.of(repo);
}
protected void startPick(String pid) {
startPickFromOneRepo(repo, pid).whenComplete(this::completeAllocate);
}
}
MultiRepoPackageService.java
package cf;
import com.google.common.collect.ImmutableSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import static cf.Util.*;
import static java.util.concurrent.CompletableFuture.allOf;
/**
* Not ThreadSafe and just for demo.
*
* @author zhhe.me@gmail.com
* @since 9/9/2018
*/
public class MultiRepoPackageService extends AbstractPackageService {
public static void main(final String... args) throws Exception {
final boolean result = new MultiRepoPackageService().pack(oid, pid);
logger.log("allocated: %s", result);
}
@Override
protected Collection<String> getRepos() {
return ImmutableSet.of("010", "020", "021", "023");
}
@Override
protected void startPick(String pid) {
final CompletableFuture[] queries = new CompletableFuture[repos.size()];
final Iterator<String> iter = repos.iterator();
for (int i = 0; i < queries.length; i++) {
queries[i] = startPickFromOneRepo(iter.next(), pid);
}
allOf(queries).whenCompleteAsync(this::completeAllocate);
}
}
Util.java
package cf;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.time.LocalTime;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
/**
* @author zhhe.me@gmail.com
* @since 9/9/2018
*/
public interface Util {
Logger logger = Logger.getInstance();
int q = 5;
Set<String> repos = new HashSet<>();
String oid = "jianshu";
String pid = "Samsung S10";
Random r = new SecureRandom(ByteBuffer.allocate(4).putInt(LocalTime.now().getNano()).array());
static void delay(int base, int random) {
try {
Thread.sleep(base + r.nextInt(random));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Logger.java
package cf;
/**
* @author zhhe.me@gmail.com.
* @since 9/9/2018
*/
public class Logger {
private static final Logger INSTANCE = new Logger();
private long started;
private Logger() {
started = System.nanoTime();
}
static Logger getInstance() {
return INSTANCE;
}
Logger start() {
started = System.nanoTime();
return this;
}
void log(String s, Object... args) {
if (args==null)
args = new Object[0];
final String formatS = "[%4dms] " + s + "%n"; //+ "\t<<<%s>>>%n";
final int argLength = args.length + 2;
final Object[] args2 = new Object[argLength];
args2[0] = (System.nanoTime()-started)/1_000_000;
System.arraycopy(args, 0, args2, 1, args.length);
args2[argLength-1] = Thread.currentThread().getName();
System.out.format(formatS, args2);
}
}
EventService.java
package cf;
import static cf.Util.*;
/**
* @author zhhe.me@gmail.com
* @since 9/9/2018
*/
public class EventService {
public void listenOrderCancel(String order) {
delay(2000, 300);
logger.log("cancelled with no reason");
}
}
RepoService.java
package cf;
import static cf.Util.*;
/**
* @author zhhe.me@gmail.com
* @since 9/9/2018
*/
public class RepoService {
public Availability isAvailable(String repo) {
delay(0, 100);
final Availability availability = new Availability(repo, r.nextInt(5) != 0);
logger.log("repo %s %s available", repo, availability.available ? "is" : "NOT");
return availability;
}
public static class Availability {
String repo;
boolean available;
public Availability(String repo, boolean available) {
this.repo = repo;
this.available = available;
}
}
}
Stock.java
package cf;
/**
* @author zhhe.me@gmail.com
* @since 9/9/2018
*/
class Stock {
String repo;
int count;
public Stock(String repo, int count) {
this.repo = repo;
this.count = count;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Stock{");
sb.append("repo='").append(repo).append('\'');
sb.append(", count=").append(count);
sb.append('}');
return sb.toString();
}
}
StockService.java
package cf;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import static cf.Util.*;
/**
* @author zhhe.me@gmail.com
* @since 9/9/2018
*/
public class StockService {
private Map<String, Integer> stocks = new HashMap<>();
public int query(String prd) {
delay(100, 100);
int q2 = (q-q/4-1) + r.nextInt(q);
generateStock(q2);
return q2;
}
public Stock pick(String repo, String prd) {
final Stock stock = new Stock(repo, stocks.get(repo));
delay(500, 2000);
return stock;
}
private void generateStock(int q) {
final Iterator<String> iter = repos.iterator();
if (repos.size() == 1) {
stocks = ImmutableMap.of(iter.next(), q);
} else {
stocks = ImmutableMap.of(
iter.next(), q / 5,
iter.next(), q / 4,
iter.next(), q / 3,
iter.next(), (q - q / 5 - q / 4 - q / 3)
);
}
logger.log("stocks: [total=%d, repos=%s]", q, stocks);
}
}
希望这篇博文能对你有所帮助,喜欢的话点个赞吧!