vert.x两大核心类

故事从 Vert.x 开始

它是 Vert.x 的控制中心,也是您做几乎一切事情的基础,包括创建客户端和服务器、 获取事件总线的引用、设置定时器等等。

创建实例的两种方式

1.Vertxvertx=Vertx.vertx(); 大部分应用将只会需要一个vert.x实例,

2.Vertxvertx=Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));如果默认缺省的配置不适合您,可以创建vertx对象的同时指定配置项。VertxOptions

对象有很多配置,包括集群,高可用,池大小等,在javadoc中描述了所有配置的细节。

创建集群模式的 Vert.x 对象

// 注意要添加对应的集群管理器依赖,详情见集群管理器章节VertxOptionsoptions=newVertxOptions();Vertx.clusteredVertx(options, res -> {if(res.succeeded()) {Vertxvertx=res.result();// 获取到了集群模式下的 Vertx 对象// 做一些其他的事情}else{// 获取失败,可能是集群管理器出现了问题}});

Vert.x API调用您提供的 处理器 来处理事件。 例如每隔一秒发送一个事件的计时器:

vertx.setPeriodic(1000, id -> {// 这个处理器将会每隔一秒被调用一次System.out.println("timer fired!");});

又比如收到一个 HTTP 请求:

server.requestHandler(request -> {// 服务器每次收到一个HTTP请求时这个处理器将被调用request.response().end("hello world!");});

对象有很多配置,包括集群,高可用,池大小等,在javadoc中描述了所有配置的细节。


Future的异步结果

vert.x 4使用future承载异步结果

异步的方法会返回一个future对象,包含成功或者失败的异步结果,我们不能直接操作future的异步结果,而应该设置future的handler,党future执行完毕,结果可用时,会调用handler进行处理,

FileSystemfs=vertx.fileSystem();Future future = fs.props("/my_file.txt");future.onComplete((AsyncResult ar) -> {if(ar.succeeded()) {FilePropsprops=ar.result(); System.out.println("File size = "+ props.size()); }else{ System.out.println("Failure: "+ ar.cause().getMessage()); }});

futures代表的是异步结果的读取端,

promise代表的是异步结果的写入端。在大多数情况vert.x程序不需要自信创建promise对象,

future组合和future协作 提供了转换和合并异步结果的工具。

Promise<String> promise = Promise.promise();

legacyGreetAsync(promise);

Future<String> greeting = promise.future();

小心:onSuccess,onFailure和onComplete之类的终端操作并不能保证回调的调用顺序。

future.onComplete(ar -> {// 做些什么});

future.onComplete(ar -> {// 可能先被调用});

第二个回调完全有可能在第一个回调之前被调用。

如果需要保证顺序调用,可以将future组合与andThen一起使用。


Future组合

compose方法作用顺序组合future:

若当前future成功,执行compose方法指定的方法,该方法返回新的future,当返回的新future完成时,future组合成功。若future失败则组合失败。

FileSystemfs=vertx.fileSystem();

Future future = fs .createFile("/foo") .compose(v -> {// createFile文件创建完成后执行returnfs.writeFile("/foo", Buffer.buffer()); }) .compose(v -> {// writeFile文件写入完成后执行returnfs.move("/foo","/bar"); });

这里例子中,有三个操作被串起来了,

1.一个文件被创建 createFile

2.一些东西被写入到文件 writefile

3文件被移走move

如果这三个步骤全部成功,则最终的future会是成功的,其中任何一步失败,则最终future就是失败的。

除了上诉方法,future还提供了更多方法,map,recover,otherwise,andThen及flatMap(等同compose方法)


future协作

vert.x中的futures支持协调多个future。支持并发组合(并执行多个异步调用)和顺序组合(依次执行异步调用)

CompositeFuture.all 方法接受多个future对象作为参数(最多6个,可以传入一个list),当所有的future都成功完成,该方法将返回一个成功的future,当人一个future执行失败,则返回一个失败的future。

Future httpServerFuture = httpServer.listen();

Future netServerFuture = netServer.listen();

CompositeFuture.all(httpServerFuture, netServerFuture).onComplete(ar -> {if(ar.succeeded()) {

// 所有服务器启动完成}else{// 有一个服务器启动失败}});

所有被合并的future中的操作同时运行,当组合的处理操作完成时,该方法返回的future上绑定的处理器handler会被调用,只要有一个操作失败(其中的某一个future的状态被标记成失败),则返回的future会被标记为失败,如果所有的操作都成功,则返回的future将会成功完成。

当操作成功时,需要按照 CompositeFuture.all 的参数顺序来调用resultAt方法以获取操作的执行结果,以上面的代码为力,无论是那个操作先完成,通过resultAt(0)获取到的都是httpServer 的结果,而通过resultAt(1)获取到netService的结果 

CompositeFuture.all(Arrays.asList(future1, future2, future3));传入list也是如此

all 方法的合并会 等待 所有的 Future 成功执行(或任一失败),而 any 方法的合并会 等待 第一个成功执行的 Future。CompositeFuture.any 方法接受多个future作为参数最多6个可以传入list,当任意一个future成功得到结果,则future成功,党所有future都执行失败,则future失败。

CompositeFuture.any(future1, future2).onComplete(ar -> {if(ar.succeeded()) {// 至少一个成功}else{// 所有的都失败}});

CompositeFuture.any(Arrays.asList(f1, f2, f3));

join方法的合并会 等待所有future完成,无论成败,

composeiteFuture.join方法接受多个future作为参数最多6个,并将结果归并成一个future,当全部future成功执行完成,得到的future是成功状态的,党至少一个future执行失败,得到的future是失败状态。

CompositeFuture.join(future1, future2, future3).onComplete(ar -> {if(ar.succeeded()) {// 所有都成功}else{// 全部完成(无论成功还是失败),且至少一个失败}});

CompositeFuture.join(Arrays.asList(future1, future2, future3));

兼容completionStage

jdk的completionStage接口用于组合异步操作,vert.x的future Api可兼容completionState,

我们可以用toCOmpletionStage方法将vert.x的future对象转成completionStage对象,如:

Future future = vertx.createDnsClient().lookup("vertx.io");future.toCompletionStage().whenComplete((ip, err) -> {if(err !=null) { System.err.println("Could not resolve vertx.io"); err.printStackTrace(); }else{ System.out.println("vertx.io => "+ ip); }});

相应地,可使用 Future.fromCompletionStage方法将CompletionStage对象转唯vert.x的future对象,

Future.fromCompletionStage 有两个重载方法

第一个重载方法只接收一个 CompletionStage 参数,会在执行 CompletionStage 实例的线程中调用 Future 的方法;

第二个重载方法额外多接收一个 Context 参数,会在Vert.x的Context中调用 Future 的方法。

重要:由于vert.x的future通常会与vert.x的代码,库以及客户端等一起使用,为了与vert.x的线程模型更好地配合,大部分场景下使用:

Future.fromCompletionStage(CompletionStage, Context) 方法。

下面的例子展示了如何将 CompletionStage 对象转为Vert.x的 Future 对象,这里选择使用Vert.x的Context执行:

Future.fromCompletionStage(completionStage, vertx.getOrCreateContext()) .

flatMap(str -> {Stringkey=UUID.randomUUID().toString();returnstoreInDb(key, str); }) .

onSuccess(str -> { System.out.println("We have a result: "+ str); }) .

onFailure(err -> { System.err.println("We have a problem"); err.printStackTrace(); });

verticles

vert.x通过开箱即用的方式提供了一个简单便捷的,可扩展的部署和并发模型机制,可以用次模型机制来保管代码组件。

vertical的实现必须实现verticle接口或者实现抽象类abstractVerticle继承更简单

public class MyVerticle extends AbstractVerticle {// Verticle部署时调用public void start() {}

// 可选 - Verticle撤销时调用public void stop({}}

vertical类分2种

1.standard verticals 这是最常用的一类vertical它们永远运行在event loop线程上,

2.worker verticles会运行在worker pool种的线程种

standard verticals被创建时,它会分派给一个event loop线程,并在这个event loop种执行它的start方法,当您在一个event loop上调用了core api中的方法并传入处理器时,vert.x将保证用于调用该方法时相同的event loop来执行这些处理器

Worker verticles和standard vertical很像,但它并不是由一个event loop来执行,而是由vert.x中的worker poo;中线程执行。

部署vertical

方式1

VerticlemyVerticle=new MyVerticle();

vertx.deployVerticle(myVerticle);

方式2

vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle");

// 部署JavaScript的Verticlevertx.deployVerticle("verticles/myverticle.js");

// 部署Ruby的Verticlevertx.deployVerticle("verticles/my_verticle.rb");

等待部署完成

vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", res -> {

if(res.succeeded()) { System.out.println("Deployment id is: "+ res.result()); }

else{ System.out.println("Deployment failed!"); }});

如果部署成功,这个完成处理器的结果中将包含部署ID的字符串。

撤销vertical

可以通过underlay方法来撤销部署好的vertical

撤销操作也是异步的,想要在撤销完成后收到通知,可以指定令一个完成处理器。

vertx.undeploy(deploymentID, res -> {if(res.succeeded()) { System.out.println("Undeployed ok"); }else{ System.out.println("Undeploy failed!"); }});


设置vertical实例数量:

使用名称部署vertical时,可以指定需要部署的vertical实例数量。

DeploymentOptionsoptions=newDeploymentOptions().setInstances(16);

vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);

以上功能对于跨多核扩展时很有用,例如:有一个带web服务的vertical需要部署在多核的机器上,可以部署多个实例来利用所有的核。

向vertical传入配置

可在部署时传给vertical一个json格式的配置

JsonObjectconfig=newJsonObject().put("name","tim").put("directory","/blah");

DeploymentOptionsoptions=newDeploymentOptions().setConfig(config);

vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);

传入之后,这个配置可以通过context对象或使用config方法访问,这个配置会以json对象的形式返回,可以通过一下代码读取数据

System.out.println("Configuration: "+ config().getString("name"));

在vertical中访问环境变量

环境变量和系统属性可以中介通过java api访问

System.getProperty("prop");

System.getenv("HOME");

高可用性

vertical可以启用高可以方式部署,在这种方式下,当其中一个部署在vert.x实例中的vertical突然挂掉,这个vertical可以在集群环境中的另一个vert.x实例中重新部署。

若要启用高可用方式运行一个vertical,仅需要追加-ha参数

当启用高可用方式时,不需要追加- cluster参数

从命令行运行vertical

可以从命令行直接运行 Vert.x 的 Verticle。

注意:在path设置jdk是为了支持java代码运行时编译 on the fly compilation

退出vert.x环境 

您可以调用 close 方法关闭它。

这将关闭所有内部线程池并关闭其他资源,允许JVM退出。

Context 对象

Contextcontext=vertx.getOrCreateContext();

Verticle 中自动清除定时器

如果您在 Verticle 中创建了计时器, 当这个 Verticle 被撤销时这个计时器会被自动关闭。

Verticle worker pool

Verticle 使用 Vert.x 中的 Worker Pool 来执行阻塞式行为,例如 executeBlocking 或 Worker Verticle。

可以在部署配置项中指定不同的 Worker 线程池:

vertx.deployVerticle("the-verticle",newDeploymentOptions().setWorkerPoolName("the-specific-pool"));

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容