故事从 Vert.x 开始
它是 Vert.x 的控制中心,也是您做几乎一切事情的基础,包括创建客户端和服务器、 获取事件总线的引用、设置定时器等等。
创建实例的两种方式
1.Vertxvertx=Vertx.vertx(); 大部分应用将只会需要一个vert.x实例,
2.Vertxvertx=Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));如果默认缺省的配置不适合您,可以创建vertx对象的同时指定配置项。VertxOptions
对象有很多配置,包括集群,高可用,池大小等,在javadoc中描述了所有配置的细节。
创建集群模式的 Vert.x 对象
// 注意要添加对应的集群管理器依赖,详情见集群管理器章节VertxOptionsoptions=newVertxOptions();Vertx.clusteredVertx(options, res -> {if(res.succeeded()) {Vertxvertx=res.result();// 获取到了集群模式下的 Vertx 对象// 做一些其他的事情}else{// 获取失败,可能是集群管理器出现了问题}});
Vert.x API调用您提供的 处理器 来处理事件。 例如每隔一秒发送一个事件的计时器:
vertx.setPeriodic(1000, id -> {// 这个处理器将会每隔一秒被调用一次System.out.println("timer fired!");});
又比如收到一个 HTTP 请求:
server.requestHandler(request -> {// 服务器每次收到一个HTTP请求时这个处理器将被调用request.response().end("hello world!");});
对象有很多配置,包括集群,高可用,池大小等,在javadoc中描述了所有配置的细节。
Future的异步结果
vert.x 4使用future承载异步结果
异步的方法会返回一个future对象,包含成功或者失败的异步结果,我们不能直接操作future的异步结果,而应该设置future的handler,党future执行完毕,结果可用时,会调用handler进行处理,
FileSystemfs=vertx.fileSystem();Future future = fs.props("/my_file.txt");future.onComplete((AsyncResult ar) -> {if(ar.succeeded()) {FilePropsprops=ar.result(); System.out.println("File size = "+ props.size()); }else{ System.out.println("Failure: "+ ar.cause().getMessage()); }});
futures代表的是异步结果的读取端,
promise代表的是异步结果的写入端。在大多数情况vert.x程序不需要自信创建promise对象,
future组合和future协作 提供了转换和合并异步结果的工具。
Promise<String> promise = Promise.promise();
legacyGreetAsync(promise);
Future<String> greeting = promise.future();
小心:onSuccess,onFailure和onComplete之类的终端操作并不能保证回调的调用顺序。
future.onComplete(ar -> {// 做些什么});
future.onComplete(ar -> {// 可能先被调用});
第二个回调完全有可能在第一个回调之前被调用。
如果需要保证顺序调用,可以将future组合与andThen一起使用。
Future组合
compose方法作用顺序组合future:
若当前future成功,执行compose方法指定的方法,该方法返回新的future,当返回的新future完成时,future组合成功。若future失败则组合失败。
FileSystemfs=vertx.fileSystem();
Future future = fs .createFile("/foo") .compose(v -> {// createFile文件创建完成后执行returnfs.writeFile("/foo", Buffer.buffer()); }) .compose(v -> {// writeFile文件写入完成后执行returnfs.move("/foo","/bar"); });
这里例子中,有三个操作被串起来了,
1.一个文件被创建 createFile
2.一些东西被写入到文件 writefile
3文件被移走move
如果这三个步骤全部成功,则最终的future会是成功的,其中任何一步失败,则最终future就是失败的。
除了上诉方法,future还提供了更多方法,map,recover,otherwise,andThen及flatMap(等同compose方法)
future协作
vert.x中的futures支持协调多个future。支持并发组合(并执行多个异步调用)和顺序组合(依次执行异步调用)
CompositeFuture.all 方法接受多个future对象作为参数(最多6个,可以传入一个list),当所有的future都成功完成,该方法将返回一个成功的future,当人一个future执行失败,则返回一个失败的future。
Future httpServerFuture = httpServer.listen();
Future netServerFuture = netServer.listen();
CompositeFuture.all(httpServerFuture, netServerFuture).onComplete(ar -> {if(ar.succeeded()) {
// 所有服务器启动完成}else{// 有一个服务器启动失败}});
所有被合并的future中的操作同时运行,当组合的处理操作完成时,该方法返回的future上绑定的处理器handler会被调用,只要有一个操作失败(其中的某一个future的状态被标记成失败),则返回的future会被标记为失败,如果所有的操作都成功,则返回的future将会成功完成。
当操作成功时,需要按照 CompositeFuture.all 的参数顺序来调用resultAt方法以获取操作的执行结果,以上面的代码为力,无论是那个操作先完成,通过resultAt(0)获取到的都是httpServer 的结果,而通过resultAt(1)获取到netService的结果
CompositeFuture.all(Arrays.asList(future1, future2, future3));传入list也是如此
all 方法的合并会 等待 所有的 Future 成功执行(或任一失败),而 any 方法的合并会 等待 第一个成功执行的 Future。CompositeFuture.any 方法接受多个future作为参数最多6个可以传入list,当任意一个future成功得到结果,则future成功,党所有future都执行失败,则future失败。
CompositeFuture.any(future1, future2).onComplete(ar -> {if(ar.succeeded()) {// 至少一个成功}else{// 所有的都失败}});
CompositeFuture.any(Arrays.asList(f1, f2, f3));
join方法的合并会 等待所有future完成,无论成败,
composeiteFuture.join方法接受多个future作为参数最多6个,并将结果归并成一个future,当全部future成功执行完成,得到的future是成功状态的,党至少一个future执行失败,得到的future是失败状态。
CompositeFuture.join(future1, future2, future3).onComplete(ar -> {if(ar.succeeded()) {// 所有都成功}else{// 全部完成(无论成功还是失败),且至少一个失败}});
CompositeFuture.join(Arrays.asList(future1, future2, future3));
兼容completionStage
jdk的completionStage接口用于组合异步操作,vert.x的future Api可兼容completionState,
我们可以用toCOmpletionStage方法将vert.x的future对象转成completionStage对象,如:
Future future = vertx.createDnsClient().lookup("vertx.io");future.toCompletionStage().whenComplete((ip, err) -> {if(err !=null) { System.err.println("Could not resolve vertx.io"); err.printStackTrace(); }else{ System.out.println("vertx.io => "+ ip); }});
相应地,可使用 Future.fromCompletionStage方法将CompletionStage对象转唯vert.x的future对象,
Future.fromCompletionStage 有两个重载方法
第一个重载方法只接收一个 CompletionStage 参数,会在执行 CompletionStage 实例的线程中调用 Future 的方法;
第二个重载方法额外多接收一个 Context 参数,会在Vert.x的Context中调用 Future 的方法。
重要:由于vert.x的future通常会与vert.x的代码,库以及客户端等一起使用,为了与vert.x的线程模型更好地配合,大部分场景下使用:
Future.fromCompletionStage(CompletionStage, Context) 方法。
下面的例子展示了如何将 CompletionStage 对象转为Vert.x的 Future 对象,这里选择使用Vert.x的Context执行:
Future.fromCompletionStage(completionStage, vertx.getOrCreateContext()) .
flatMap(str -> {Stringkey=UUID.randomUUID().toString();returnstoreInDb(key, str); }) .
onSuccess(str -> { System.out.println("We have a result: "+ str); }) .
onFailure(err -> { System.err.println("We have a problem"); err.printStackTrace(); });
verticles
vert.x通过开箱即用的方式提供了一个简单便捷的,可扩展的部署和并发模型机制,可以用次模型机制来保管代码组件。
vertical的实现必须实现verticle接口或者实现抽象类abstractVerticle继承更简单
public class MyVerticle extends AbstractVerticle {// Verticle部署时调用public void start() {}
// 可选 - Verticle撤销时调用public void stop({}}
vertical类分2种
1.standard verticals 这是最常用的一类vertical它们永远运行在event loop线程上,
2.worker verticles会运行在worker pool种的线程种
standard verticals被创建时,它会分派给一个event loop线程,并在这个event loop种执行它的start方法,当您在一个event loop上调用了core api中的方法并传入处理器时,vert.x将保证用于调用该方法时相同的event loop来执行这些处理器
Worker verticles和standard vertical很像,但它并不是由一个event loop来执行,而是由vert.x中的worker poo;中线程执行。
部署vertical
方式1
VerticlemyVerticle=new MyVerticle();
vertx.deployVerticle(myVerticle);
方式2
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle");
// 部署JavaScript的Verticlevertx.deployVerticle("verticles/myverticle.js");
// 部署Ruby的Verticlevertx.deployVerticle("verticles/my_verticle.rb");
等待部署完成
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", res -> {
if(res.succeeded()) { System.out.println("Deployment id is: "+ res.result()); }
else{ System.out.println("Deployment failed!"); }});
如果部署成功,这个完成处理器的结果中将包含部署ID的字符串。
撤销vertical
可以通过underlay方法来撤销部署好的vertical
撤销操作也是异步的,想要在撤销完成后收到通知,可以指定令一个完成处理器。
vertx.undeploy(deploymentID, res -> {if(res.succeeded()) { System.out.println("Undeployed ok"); }else{ System.out.println("Undeploy failed!"); }});
设置vertical实例数量:
使用名称部署vertical时,可以指定需要部署的vertical实例数量。
DeploymentOptionsoptions=newDeploymentOptions().setInstances(16);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
以上功能对于跨多核扩展时很有用,例如:有一个带web服务的vertical需要部署在多核的机器上,可以部署多个实例来利用所有的核。
向vertical传入配置
可在部署时传给vertical一个json格式的配置
JsonObjectconfig=newJsonObject().put("name","tim").put("directory","/blah");
DeploymentOptionsoptions=newDeploymentOptions().setConfig(config);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
传入之后,这个配置可以通过context对象或使用config方法访问,这个配置会以json对象的形式返回,可以通过一下代码读取数据
System.out.println("Configuration: "+ config().getString("name"));
在vertical中访问环境变量
环境变量和系统属性可以中介通过java api访问
System.getProperty("prop");
System.getenv("HOME");
高可用性
vertical可以启用高可以方式部署,在这种方式下,当其中一个部署在vert.x实例中的vertical突然挂掉,这个vertical可以在集群环境中的另一个vert.x实例中重新部署。
若要启用高可用方式运行一个vertical,仅需要追加-ha参数
当启用高可用方式时,不需要追加- cluster参数
从命令行运行vertical
可以从命令行直接运行 Vert.x 的 Verticle。
注意:在path设置jdk是为了支持java代码运行时编译 on the fly compilation
退出vert.x环境
您可以调用 close 方法关闭它。
这将关闭所有内部线程池并关闭其他资源,允许JVM退出。
Context 对象
Contextcontext=vertx.getOrCreateContext();
Verticle 中自动清除定时器
如果您在 Verticle 中创建了计时器, 当这个 Verticle 被撤销时这个计时器会被自动关闭。
Verticle worker pool
Verticle 使用 Vert.x 中的 Worker Pool 来执行阻塞式行为,例如 executeBlocking 或 Worker Verticle。
可以在部署配置项中指定不同的 Worker 线程池:
vertx.deployVerticle("the-verticle",newDeploymentOptions().setWorkerPoolName("the-specific-pool"));