vert.x是一个关于异步和反应式的java框架,异步(asynchronous)和反应式(reactive)是现代应用程序中的重要主题。
分布式和网络化是当前软件的常态:
20多年前,应用软件多数采用了一种ALL-IN-ONE的形态,这种软件具备用户界面、后端业务、数据存储等完整的业务链。但其实那个时候网络就已经在发展了,已经有软件开始利用网络进行数据存储、文件存储和远程代码执行等操作。
现在,最终用户通过web和移动端接口来访问应用的场景越来越多,网络发挥的作用越来越大,单体软件最终进化成了分布式系统。面向服务的体系结构(SOA)让企业可以把自身的业务通过网络服务的形式提供,用户也只需要使用这些服务就可以最大化发挥这些服务的价值,而且很多情况下,这些服务对于他的用户而言是不受控制的。例如你可以用QQ、微信、微博等进行第三方登录;也可以将支付处理交给微信或者支付宝。
我们现在并非生活在一个孤岛之上:
现在的网络服务遍及方方面面,这些服务可能包括在线数据库、在线搜索引擎、持久化文档存储、在线消息引擎、身份验证、监控等等。做一个现代的软件,都绕不开要跟这些网络服务打交道。
网络并非免费的午餐:
网络是业务处理过程中最容易出错的部分,主要表现在如下方面:
- 网络波动:不同服务之间可能无法享受相同的带宽,服务期间的网络通讯也远比进程之间的通讯要慢。
- 延迟波动:一项业务经过多个服务间的调用,最终执行耗时将是每个服务的耗时加上服务间调用延迟的总和。
- 可用性无法保证:网络本身、路由器、代理都可能出问题,有时候问题甚至是因为网线被不小心碰掉了。网络出问题的时候,你根本不知道是服务无法访问还是网络down掉了。
分布式系统依赖网络,而网络本身就是个不确定的因素。
简单粗暴的阻塞式API:
网络并发在很多应用中的模型都是每个连接分配一个线程,例如JavaEE 3.x版本之前的servlet、5.x版本之前的Spring框架、Ruby on Rails、Python的Flaskand等等。
这些模型非常简单,因为他们是同步的,同步就意味着阻塞。
阻塞式API的每个连接都是一个新的IO线程,当有IO操作时,这些线程就会阻塞在那里。
阻塞式API浪费资源、增加成本:
线程并不是一个廉价的资源,每个线程都需要内存,线程越多,操作系统内核调度器的压力就越大,它要为每个线程来分配CPU时间。
线程池一定程度上解决了线程创建和销毁的消耗,但是只要你采用的是这种模型,那么同一时间仍然需要n个线程来处理n个连接。
现代操作系统可以正确地处理数千个并发线程,虽然并不是每一个网络服务都会面对同样多的并发请求,但是有成千上万的并发连接时,这个模型就会显得非常局限。
而且现在的应用往往部署在一个容器化或者虚拟化的环境中,这就意味着应用无法看到有多少个CPU是可以使用的,同时也就意味着这个应用使用CPU的时间有一个上限。同样进程可用的内存也可能受限、线程太多就会消耗很多内存。如果所有的程序都使用阻塞式IO,当服务器压力增大时,很快就有会大量的线程需要管理和调度,随之就需要更多的服务器/容器实例资源,这些都会直接增加运营成本。
使用非阻塞式IO进行异步编程:
为了让线程不再阻塞等待IO操作完成,可以使用非阻塞式IO。
非阻塞式IO的核心思想就是在进行阻塞的操作时,不等待操作完成,继续执行下一项任务,直到阻塞的操作完成。例如,非阻塞请求读取可能会要求Socket一次最多传输256个字节过来,执行线程把数据放到缓冲区,然后就去干其他事了(比如处理另一个连接)。在这种模型下,并发连接可以在单个线程上实现多路复用,网络延迟时间往往超过读取传入字节所需要的CPU时间。
非阻塞式IO可以一定程度上解决线程问题,但是在编码的复杂度上,非阻塞IO要远高于阻塞IO。原本接收请求-等待执行的业务至少会拆分成两部分:请求和响应。这种运作模式可以基于状态机来实现更为复杂的TCP协议模型。
java提供的NIO的API和其他大部分的API一样,nio只关注它所做的事情,即IO的API,不会提供HTTP客户端和服务器那样的高层抽象。另外java的nio没有规定线程模型,线程模型对于正确利用CPU内核、处理异步IO事件和表达应用处理逻辑是非常重要的。
其实这也就是为什么实践开发中,开发人员很少直接使用java nio的原因。
像Netty和Mina这种框架,解决了java nio的一些缺点,许多工具包和框架都是基于它们来构建的,vert.x正是其中之一。
多路复用事件驱动处理机制:事件循环(Event Loop)
处理异步事件的一个流行的线程模型是事件循环模型,英文为Event Loop。
如上图,事件发生时,将会被压入一个队列,这些事件可以是I/O事件,例如已准备好消费数据,或者完成Socket缓冲区写入;也可以是任何其他事件,比如计时器的一次触发。一个线程被分配给一个事件循环,处理事件不应该执行任何阻塞或长时间运行的操作。否则,整个线程就会被阻塞,从而破坏了使用事件循环的目的。事件循环非常流行:浏览器中运行的JavaScript代码就建立在事件循环模型之上,许多图形界面工具包(如Java Swing)也有事件循环。
什么是反应式系统?
反应式,即Reactive,这个词的本意是对刺激作出响应。
到目前为止,我们一直在讨论如何:
- 利用异步编程和非阻塞I/O来处理更多的并发连接和使用更少的线程.
- 使用一个线程模型进行异步事件处理(事件循环)。
通过结合这两种技术,可以构建可伸缩且资源高效的应用。下面我们将探讨什么是反应系统,以及反应系统如何超越“基础版”异步编程。
反应系统有四大特性:
响应性、可恢复性、可伸缩性和消息驱动
- 可伸缩性 Elastic
可伸缩性是指应用在处理不同数量级的请求时,可以动态变化自身实例数量的能力。这是一种非常有用的能力,因为可伸缩性允许通过启动新实例来响应流量峰值,并在实例之间实现负载均衡。这个能力需要应用具有一些特殊的设计,首先应用需要限制实例之间的共享状态(例如,服务器端web session),另外我们还需要计算出实例的一些指标,有了这些指标,协调器才能根据网络流量和指标来决定何时启动或停止实例。
- 可恢复性 Resilient
可恢复性和可伸缩性就像一个硬币的正反面,当一组弹性实例中的一个实例崩溃时,可以自然地自动恢复,流量可以自动转到其他实例,必要时还可以启动一个新的实例。可恢复性还包含其他的一些内涵,例如当一个实例由于某些原因而不能执行某个请求时,它仍然可以尝试以降级模式进行响应。根据问题领域的不同,可以使用返回缓存中的数据,可以返回空值或者默认值,还可以把请求转发到其他一些未报错的实例上。即使最坏的情况下,实例也应该及时返回一个报错,而不是卡在那里。
- 响应性 Responsivity
响应性,又可以称之为有求必应。响应性是可伸缩性和可恢复性的结合,一个请求一定会在一定时间内得到响应,这使得一个服务非常的健壮。要达到这一点不仅要求服务可以在需要的时候启动更多的实例来保证响应时间在可接受的范围内,同样要求实例在出现错误时仍能快速响应。需要注意的是,如果一个组件依赖于一个不可扩展的资源,比如一个单一的中央数据库,那么响应性是不可能实现的。实际上,如果所有的实例都向同一个资源请求数据,那么启动更多的实例是不能解决问题的,这个资源将成为性能瓶颈,请求它的实例越多,这个资源超负载来的越快。
- 消息驱动 Message-driven
区别于远程过程调用这种的阻塞IO典型例子,消息驱动要求消息通过异步的方式进行传递。消息驱动是实现可伸缩性、可恢复性和响应性的关键推动者。消息驱动允许消息发送到不同的实例、控制消息生产者和消息消费者之间的消息流(即背压Backpressure)。背压是反应式编程里的重要概念之一,后面会讲。
一个反应式系统拥有上述四个特性。这些特性可以使得一个应用更加可靠和高效。
异步是否意味着反应式?
这是一个重要的问题,因为异步常常被认为是软件问题的灵丹妙药。
显然,反应式意味着异步,但反过来异步并不一定是反应式。例如,假设有一个购物web应用程序,用户可以将商品放入购物车。这通常是通过在服务器端web session中进行存储的。当session存储在内存或本地文件中时,即使系统内部使用非阻塞I/O和异步编程,系统也不具备响应性。这种模式下,应用程序的实例不能接管另一个实例,因为session是应用的状态,在这种情况下,该状态不会在节点之间复制和共享。要让这个例子实现响应性,需要使用一个内存网格服务(例如Hazelcast、Redis或Infinispan)来存储web会话,这样传入的请求可以被路由到任何实例进行处理。
反应式还意味着什么?
由于反应式是一个时髦的术语,这个术语在不同的地方的含义完全不同。上面我们刚刚讲了什么是反应式系统,下表包含了反应式另外两个流行定义:
反应式? | 含义 |
---|---|
反应式系统 | 可靠的、消息驱动的、可恢复的、可伸缩的、具备响应性的应用程序。 |
反应式编程 | 这个反应指对变化和事件作出反应。电子表格就是反应式编程的一个很好的例子:当单元格数据发生变化时,重新计算受影响的带计算公式的单元格。我们RxJava就是一种流行的针对Java的反应扩展API,它极大地帮助协调异步事件和数据处理。还有一种函数反应式编程,这种编程风格这里不会涉及,可以在网上搜索相关的资料。 |
反应流 | 当系统间传递连续的数据流时,经典的生产者/消费者问题就会出现。反应流的核心是要提供背压机制,以便消费者可以在数据传递过快时通知生产者。反应流的主要目标是在系统之间达到最佳吞吐量。 |
什么是Vert.X
根据Vert.X网站的说法,Vert.X是Eclipse旗下的一款用于在JVM上构建反应式系统的工具包。
Vert.X在2012年由Tim Fox发起,现在由Eclipse基金会开发的开源项目。最初Vert.X是在Node.js部分思想上开发的,后来的开发思路明显偏离了Node.js的路线,Vert.X针对JVM的细节进行了大量的定制。
Vert.X的本质就是本文所讲的异步事件处理,思想的核心就是非阻塞式IO以及事件循环的线程模型。
Vert.X是一个工具包,而不是框架:它没有提供任何预定义的脚手架,所以可以随意使用Vert.X,可以作为大型应用的一个基础库。Vert.X基本上与应用的构建、打包和部署环境无关。
Vert.x包含如下组件:
- 一个名为vertx-core的核心工程,为异步编程、非阻塞I/O、流式编程以及方便地访问网络协议(如TCP、UDP、DNS、HTTP或WebSockets)提供了api。
- 社区基于Vert.x写了一部分套件,如web API(vertx-web)和数据访问套件(vertx-redis, vertx-mongo等)
- 提供更多功能的更广泛的项目生态系统,例如连接Apache Cassandra、在系统进程之间进行通信的非阻塞I/O等等。
Vert.X是多语言的,它支持大多数流行可以运行在JVM上的语言,例如JavaScript、Ruby、Kotlin、Scala、Groovy。Vert.X对这些语言的支持不仅限于可以将它们与Java进行相互转换,实际上各种语言的特有写法也都是支持的,所以即使不是在用java写相关的应用,语法看起来仍然很自然。比如你可以使用Scala的future api,可以使用Kotlin定制的DSL和带有命名的参数函数来简化代码构造。而且你可以在同一个应用内混用多种语言。
Vert.x的事件循环模型
Vert.x的核心模型就是事件循环,如上文,事件循环的目的是使用极少的线程来处理海量的业务:
事件循环提出的线程模型有一个巨大的好处:它简化了并发编程。因为只有一个线程,所以总是由同一个线程调用,从不进行并发调用。然而,它也有一个非常重要的规则,你必须遵守:
不要阻塞事件循环 --Vert.x的黄金法则
想象一下,在一个事件循环中,有一天你打破了这个规则,因为请求的事件只会被这同一个线程来处理,所以如果请求被阻塞在那里了,那么剩下的请求就只能堆在队列里得等待处理。这样我们就失去了可伸缩性和高效率的优势。
哪些任务是阻塞式的任务呢?比如最常见的查询数据库就是阻塞式任务;还有各种CPU密集型的任务比如算出π,精确到第20万位。不过不用担心,Vert.x对于如何处理阻塞式任务有它自己的设计。
单线程事件循环带来的结果是只有一个CPU核心会进行工作,而多核心才是现在的标配,所以Vert.x对单线程事件循环进行了改良,每个Vert.x实例都可能包含多个事件循环线程,如下图:
事件由不同的事件循环来分派,但是,事件循环开始执行处理程序后,该事件循环将
始终调用该处理程序,从而发挥异步模型的并发优势。
第一个Vert.x程序
Maven中加入依赖:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.9.2</version>
</dependency>
代码:
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.createHttpServer()
.requestHandler(request -> request.response().end("hello Vert.x"))
.listen(8080);
}
启动后访问localhost:8080,将看到如下响应:
hello Vert.x
Verticle-Vert.x的代码块
Verticle是可以由Vert.x部署和运行的代码块。
Regular Verticle
运行在Vert.x的事件循环之上,这些Verticle绝对不能被阻塞,Vert.x确保这些代码永远单线程运行,不会有任何并发问题。在java里,这个Verticle是一个AbstractVerticle的实现类:
public class RegularVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
// verticle部署时,执行这段代码
}
@Override
public void stop() throws Exception {
// verticle卸载时,执行这段代码
}
}
Worker Verticle
与Regular Verticle
不同,Worker Verticle
不会运行在事件循环之上,可以执行任意阻塞代码,但是会限制应用的伸缩性。
Verticle可以访问vert.x的组件(由AbstractVerticle基类提供),这些组件可以创建服务器和客户端,可以与其他Verticle交互。Verticle还可以部署其他Verticle,配置它们,并设置要创建的实例数量。这些实例可以关联不同的事件循环(实现多反应式模式),Vert.x可以在这些实例之间实现平衡负载。
从回调到观察者
前面的例子可以看到,Vert.x的开发模型使用了回调。用回调来编排几个异步的操作时,这个开发模型将非常容易把简单问题复杂化。例如,我们从数据库中查询数据,首先我们要连接到数据库,然后我们查库,处理结果之后,释放数据库连接,这些操作都是异步的。使用回调,你可以用Vert.x编写下面的JDBC客户端代码:
client.getConnection(conn -> {
if (conn.failed()) {/* 异常处理 */}
else {
SQLConnection connection = conn.result();
connection.query("SELECT * from PRODUCTS", rs -> {
if (rs.failed()) {/* 异常处理*/}
else {
List<JsonArray> lines =
rs.result().getResults();
for (JsonArray l : lines) {
System.out.println(new Product(l));
}
connection.close(done -> {
if (done.failed()) {/* 异常处理 */}
});
}
});
}
});
这个示例尚在可控范围内,但是这种一层嵌一层的玩法在业务更加复杂时,将导致代码非常复杂且不可读。你可以使用Vert.x的Future
来处理异步操作,这个Future
非java的Future
,Vert.x的Future
完全没有阻塞。Future
提供了一个高级抽象,以便开发人员可以组合几个参数来指定操作执行顺序或并行执行几个操作。用Future
来写就是这样的:
Future<SQLConnection> future = getConnection();
future
.compose(conn -> {
connection.set(conn);
// 返回一个ResultSet的Future
return selectProduct(conn);
})
// 返回一个将每一行映射成一个Producet的集合
.map(result -> toProducts(result.getResults()))
.setHandler(ar -> {
if (ar.failed()) { /* 异常处理 */ }
else {
ar.result().forEach(System.out::println);
}
connection.get().close(done -> {
if (done.failed()) { /* 异常处理 */ }
});
});
不过,尽管Future
可以使代码更清晰,但是我们把所有的数据一次性检索出来然后进行处理。这个结果集可能会非常大,需要花费大量的时间,而且你并不需要一次性处理所有的数据,我们可以一行一行的来。Vert.x为这种场景提供了一个解决方案,提供了一种使用动态编程开发模型来实现反应式微服务的方法,Vert.x提供了RxJava的API来实现:
- 合并和协调异步任务
- 将传入的消息作为输入流进行响应
我们用RxJava的玩法重写上述代码就是:
// 获取一个连接,并且缓存下来,之后的操作可以直接使用这个连接
Single<SQLConnection> connection = client
.rxGetConnection();
connection
.flatMapObservable(conn ->
conn
// 执行查询SQL语句
.rxQueryStream("SELECT * from PRODUCTS")
// 把每一行发布为一个可观察对象(Observable)
.flatMapObservable(SQLRowStream::toObservable)
// 不要忘了关闭连接
.doAfterTerminate(conn::close)
)
// 每一行映射成一个Product对象
.map(Product::new)
// 打印每一行
.subscribe(System.out::println);
除了提高可读性之外,反应式编程还允许你在结果流和流程项出来的时候立即订阅消费。Vert.x允许你选择你喜欢的开发模型,在这一段中我们同时使用了回调和RxJava。