使用Vert.x
上一个示例使用Quarkus提供的服务。这里使用Vert.x。
激活扩展:
mvn io.quarkus:quarkus-maven-plugin:1.13.2.Final:add-extensions \
-Dextensions=vertx
或quarkus-vertx手动添加到您的依赖项中。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
有Vert.x API的Mutiny版本。此API分为几个工件,可以独立导入它们:
groupId:artifactId |
描述 |
io.smallrye.reactive:smallrye-mutiny-vertx-core |
Vert.x Core的Mutiny API |
io.smallrye.reactive:smallrye-mutiny-vertx-mail-client |
Vert.x邮件客户端的Mutiny API |
io.smallrye.reactive:smallrye-mutiny-vertx-web-client |
Vert.x Web客户端的Mutiny API |
io.smallrye.reactive:smallrye-mutiny-vertx-mongo-client |
Vert.x Mongo客户端的Mutiny API |
io.smallrye.reactive:smallrye-mutiny-vertx-redis-client |
Vert.x Redis客户端的Mutiny API |
io.smallrye.reactive:smallrye-mutiny-vertx-cassandra-client |
Vert.x Cassandra客户端的Mutiny API |
io.smallrye.reactive:smallrye-mutiny-vertx-consul-client |
Vert.x Consul客户端的Mutiny API |
io.smallrye.reactive:smallrye-mutiny-vertx-kafka-client |
Vert.x Kafka客户端的Mutiny API |
io.smallrye.reactive:smallrye-mutiny-vertx-amqp-client |
Vert.x AMQP客户端的Mutiny API |
io.smallrye.reactive:smallrye-mutiny-vertx-rabbitmq-client |
Vert.x RabbitMQ客户端的Mutiny API |
还可以在http://smallrye.io/smallrye-reactive-utils/apidocs/上查看可用的API 。
例子:将以下依赖项添加到应用程序:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
它提供了Vert.x Web客户端的Mutiny API。然后,可以按如下方式使用Web客户端:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/fruit-data")
public class ResourceUsingWebClient {
private final WebClient client;
public ResourceUsingWebClient(Vertx vertx) {
this.client = WebClient.create(vertx,
new WebClientOptions().setDefaultHost("fruityvice.com").setDefaultPort(443).setSsl(true)
.setTrustAll(true));
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{name}")
public Uni<JsonObject> getFruitData(@PathParam("name") String name) {
return client.get("/api/fruit/" + name)
.send()
.map(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
return new JsonObject()
.put("code", resp.statusCode())
.put("message", resp.bodyAsString());
}
});
}
}
有两个要点:
- 注入的Vert.x实例的io.vertx.mutiny.core.Vertx类型是Vert.x的Mutiny变体。
- Web客户端是从创建的io.vertx.mutiny.ext.web.client.WebClient。
Vert.x API的Mutiny版本还提供:
- andAwait诸如的方法sendAndAwait。andAwait指示在结果可用之前,调用者线程被阻止。注意不要那样阻塞事件循环/ IO线程。
- andForget诸如的方法writeAndForget。andForget可用于返回的方法Uni。 andForget指示您不需要Uni指示操作成功或失败的结果。但是,请记住,如果您不订阅,则不会触发该操作。 andForget为您管理并管理订阅。
- toMulti允许将Vert.xReadStream转换为Multi
- toBlockingIterable/toBlockingStream允许将Vert.xReadStream转换为可迭代或阻塞的阻止的方法java.util.Stream
使用RxJava或Reactor API
Mutiny提供实用程序将RxJava 2和Project Reactor类型转换为Uni和Multi。
RxJava 2转换器具有以下依赖性:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-rxjava</artifactId>
</dependency>
所以,如果你有一个API返回RxJava 2种类型(Completable,Single,Maybe,Observable,Flowable),您可以创建Unis和Multis如下:
import io.smallrye.mutiny.converters.multi.MultiRxConverters;
import io.smallrye.mutiny.converters.uni.UniRxConverters;
// ...
Uni<Void> uniFromCompletable = Uni.createFrom().converter(UniRxConverters.fromCompletable(), completable);
Uni<String> uniFromSingle = Uni.createFrom().converter(UniRxConverters.fromSingle(), single);
Uni<String> uniFromMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), maybe);
Uni<String> uniFromEmptyMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), emptyMaybe);
Uni<String> uniFromObservable = Uni.createFrom().converter(UniRxConverters.fromObservable(), observable);
Uni<String> uniFromFlowable = Uni.createFrom().converter(UniRxConverters.fromFlowable(), flowable);
Multi<Void> multiFromCompletable = Multi.createFrom().converter(MultiRxConverters.fromCompletable(), completable);
Multi<String> multiFromSingle = Multi.createFrom().converter(MultiRxConverters.fromSingle(), single);
Multi<String> multiFromMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), maybe);
Multi<String> multiFromEmptyMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), emptyMaybe);
Multi<String> multiFromObservable = Multi.createFrom().converter(MultiRxConverters.fromObservable(), observable);
Multi<String> multiFromFlowable = Multi.createFrom().converter(MultiRxConverters.fromFlowable(), flowable);
还可以转换Unis并Multis转换为RxJava类型:
Completable completable = uni.convert().with(UniRxConverters.toCompletable());
Single<Optional<String>> single = uni.convert().with(UniRxConverters.toSingle());
Single<String> single2 = uni.convert().with(UniRxConverters.toSingle().failOnNull());
Maybe<String> maybe = uni.convert().with(UniRxConverters.toMaybe());
Observable<String> observable = uni.convert().with(UniRxConverters.toObservable());
Flowable<String> flowable = uni.convert().with(UniRxConverters.toFlowable());
// ...
Completable completable = multi.convert().with(MultiRxConverters.toCompletable());
Single<Optional<String>> single = multi.convert().with(MultiRxConverters.toSingle());
Single<String> single2 = multi.convert().with(MultiRxConverters
.toSingle().onEmptyThrow(() -> new Exception("D'oh!")));
Maybe<String> maybe = multi.convert().with(MultiRxConverters.toMaybe());
Observable<String> observable = multi.convert().with(MultiRxConverters.toObservable());
Flowable<String> flowable = multi.convert().with(MultiRxConverters.toFlowable());
Project Reactor转换器具有以下依赖关系:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-reactor</artifactId>
</dependency>
所以,如果你有一个API返回反应器类型(Mono,Flux),可以创建Unis和Multis如下:
import io.smallrye.mutiny.converters.multi.MultiReactorConverters;
import io.smallrye.mutiny.converters.uni.UniReactorConverters;
// ...
Uni<String> uniFromMono = Uni.createFrom().converter(UniReactorConverters.fromMono(), mono);
Uni<String> uniFromFlux = Uni.createFrom().converter(UniReactorConverters.fromFlux(), flux);
Multi<String> multiFromMono = Multi.createFrom().converter(MultiReactorConverters.fromMono(), mono);
Multi<String> multiFromFlux = Multi.createFrom().converter(MultiReactorConverters.fromFlux(), flux);
还可以转换Unis并Multis转换为Reactor类型:
Mono<String> mono = uni.convert().with(UniReactorConverters.toMono());
Flux<String> flux = uni.convert().with(UniReactorConverters.toFlux());
Mono<String> mono2 = multi.convert().with(MultiReactorConverters.toMono());
Flux<String> flux2 = multi.convert().with(MultiReactorConverters.toFlux());
使用CompletionStages或Publisher API
如果您使用CompletionStage,CompletableFuture或来面对API Publisher,则可以来回转换。首先,两者Uni和Multi都可以从CompletionStage或从中创建Supplier<CompletionStage>。例如:
CompletableFuture<String> future = Uni
// Create from a Completion Stage
.createFrom().completionStage(CompletableFuture.supplyAsync(() -> "hello"));
在上Uni,还可以产生一个CompletionStageuse subscribeAsCompletionStage(),该use可以产生一个CompletionStage,该物品将获得或发出的故障Uni。