Vert.x Java开发指南——第四章 重构为Vert.x服务

感兴趣的朋友,可以关注微信服务号“猿学堂社区”,或加入“猿学堂社区”微信交流群

版权声明:本文由作者自行翻译,未经作者授权,不得随意转发。

与我们最早的实现相比,前面的重构已经是向前一大步,因为我们提取出了独立且可配置的Verticle,并且在事件总线之上使用异步消息进行链接。我们还看到,我们可以同时部署一个指定Verticle的几个实例,以便更好地处理负载以及更好地利用CPU内核。

在这一节,我们将看到如何设计和使用Vert.x服务。服务的主要优势是,它定义了一个接口用于执行Verticle公开的特定操作。对于所有事件总线消息工作,我们还可以利用代码生成,而不是像前一节那样自己创建它。

step-3/src/main/java/
└── io
    └── vertx
        └── guides
            └── wiki
                ├── MainVerticle.java
                ├── database
                │   ├── ErrorCodes.java
                │   ├── SqlQuery.java
                │   ├── WikiDatabaseService.java
                │   ├── WikiDatabaseServiceImpl.java
                │   ├── WikiDatabaseVerticle.java
                │   └── package-info.java
                └── http
                    └── HttpServerVerticle.java

io.vertx.guides.wiki现在包含主Verticle,io.vertx.guides.wiki.database包含数据库Verticle和服务,io.vertx.guides.wiki.http包含HTTP Server Verticle。

4.1 Maven配置变更

首先,我们需要添加下面两个依赖到我们的项目。很明显,我们需要vertx-service-proxy的API:

<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-service-proxy</artifactId>
</dependency>

我们需要Vert.x代码生成模块作为一个编译时依赖(所以是provided范围):

<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-codegen</artifactId>
    <scope>provided</scope>
</dependency>

接下来,我们必须稍微调整一下maven-compiler-plugin的配置来使用代码生成,它通过一个javac注解处理器完成:

<plugin>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.5.1</version>
    <configuration>
        <source>1.8</source>
        <target>1.8</target>
        <useIncrementalCompilation>false</useIncrementalCompilation>
        <annotationProcessors>
            <annotationProcessor>io.vertx.codegen.CodeGenProcessor</annotationProcessor>
        </annotationProcessors>
        <generatedSourcesDirectory>${project.basedir}/src/main/generated</generatedSourcesDirectory>
        <compilerArgs>
            <arg>-AoutputDirectory=${project.basedir}/src/main</arg>
        </compilerArgs>
    </configuration>
</plugin>

注意,生成代码放置在src/main/generated目录下,一些集成开发环境诸如IntelliJ IDEA将自动识别为类路径。

更新maven-clean-plugin插件移除这些生成文件也是一个好注意:

<plugin>
    <artifactId>maven-clean-plugin</artifactId>
    <version>3.0.0</version>
    <configuration>
        <filesets>
            <fileset>
                <directory>${project.basedir}/src/main/generated</directory>
            </fileset>
        </filesets>
    </configuration>
</plugin>

关于Vert.x Service的完整文档位于http://vertx.io/docs/vertxservice-
proxy/java/

4.2 数据库服务接口

定义一个服务接口与定义一个Java接口一样简单,除此之外,有一些规则需要遵守,以使代码生成可以工作,并且还要确保与Vert.x中的其它代码的互操作性。

接口的开始定义如下:

@ProxyGen
public interface WikiDatabaseService {
    @Fluent
    WikiDatabaseService fetchAllPages(Handler<AsyncResult<JsonArray>> resultHandler);
    @Fluent
    WikiDatabaseService fetchPage(String name, Handler<AsyncResult<JsonObject>> resultHandler);
    @Fluent
    WikiDatabaseService createPage(String title, String markdown, Handler<AsyncResult<Void>> resultHandler);
    @Fluent
    WikiDatabaseService savePage(int id, String markdown, Handler<AsyncResult<Void>> resultHandler);
    @Fluent
    WikiDatabaseService deletePage(int id, Handler<AsyncResult<Void>> resultHandler);
    // (...)

1、ProxyGen注解用于触发该服务的客户端代理代码生成。

2、Fluent注解是可选的,但是允许fluent接口,操作可以通过返回服务实例被链式调用(chained)。这对于代码生成器非常有用,当服务将被其它JVM语言消费时。

3、参数类型需要是字符串、Java原始数据类型、JSON对象或者数组、任何枚举类型或者前面类型的java.util集合(List/Set/Map)。支持任意Java类的唯一方法是使用@DataObject注解,使它们作为Vert.x数据对象。传递其它类型的最后机会是服务引用类型。

4、由于服务提供异步结果,一个服务的最后参数需要是Handler<AsyncResult<T>>,T是上面描述的适合于代码生成的任何类型。

服务接口提供一个静态方法用于创建实际服务实现以及事件总线上客户端代码使用的代理,这是一种好的习惯。

我们声明了create方法,简单的委托给实现类及其构造函数:

static WikiDatabaseService create(JDBCClient dbClient, HashMap<SqlQuery, String> sqlQueries, Handler<AsyncResult
<WikiDatabaseService>> readyHandler) {
    return new WikiDatabaseServiceImpl(dbClient, sqlQueries, readyHandler);
}

Vert.x代码生成器创建代理类,并且类名以VertxEBProxy作为后缀。这些代理类的构造方法需要一个Vert.x上下文的引用以及事件总线的目的地址作为参数:

static WikiDatabaseService createProxy(Vertx vertx, String address) {
    return new WikiDatabaseServiceVertxEBProxy(vertx, address);
}

在上次迭代中作为内部类的SqlQuery和ErrorCodes枚举类型,本次迭代已被提取为包保护(package-protected)类型,具体查看SqlQuery.java和ErrorCodes.java。

4.3 Database服务实现

服务实现是先前WikiDatabaseVerticle类代码的直截了当的移植。主要区别在于,服务实现在构造函数(报告初始化结果)和服务方法(报告操作成功)中支持异步处理结果处理(Handler)。

类代码如下:

class WikiDatabaseServiceImpl implements WikiDatabaseService {
    
    private static final Logger LOGGER = LoggerFactory
            .getLogger(WikiDatabaseServiceImpl.class);
    
    private final HashMap<SqlQuery, String> sqlQueries;
    
    private final JDBCClient dbClient;

    WikiDatabaseServiceImpl(JDBCClient dbClient,
            HashMap<SqlQuery, String> sqlQueries,
            Handler<AsyncResult<WikiDatabaseService>> readyHandler) {
        this.dbClient = dbClient;
        this.sqlQueries = sqlQueries;
        dbClient.getConnection(ar -> {
            if (ar.failed()) {
                LOGGER.error("Could not open a database connection", ar.cause());
                readyHandler.handle(Future.failedFuture(ar.cause()));
            } else {
                SQLConnection connection = ar.result();
                connection.execute(sqlQueries.get(SqlQuery.CREATE_PAGES_TABLE),
                        create -> {
                            connection.close();
                            if (create.failed()) {
                                LOGGER.error("Database preparation error",
                                        create.cause());
                                readyHandler.handle(Future.failedFuture(create
                                        .cause()));
                            } else {
                                readyHandler.handle(Future
                                        .succeededFuture(this));
                            }
                        });
            }
        });

    }

    @Override
    public WikiDatabaseService fetchAllPages(
            Handler<AsyncResult<JsonArray>> resultHandler) {
        dbClient.query(sqlQueries.get(SqlQuery.ALL_PAGES), res -> {
            if (res.succeeded()) {
                JsonArray pages = new JsonArray(res.result().getResults()
                        .stream().map(json -> json.getString(0)).sorted()
                        .collect(Collectors.toList()));
                resultHandler.handle(Future.succeededFuture(pages));
            } else {
                LOGGER.error("Database query error", res.cause());
                resultHandler.handle(Future.failedFuture(res.cause()));
            }
        });
        return this;
    }

    @Override
    public WikiDatabaseService fetchPage(String name,
            Handler<AsyncResult<JsonObject>> resultHandler) {
        dbClient.queryWithParams(
                sqlQueries.get(SqlQuery.GET_PAGE),
                new JsonArray().add(name),
                fetch -> {
                    if (fetch.succeeded()) {
                        JsonObject response = new JsonObject();
                        ResultSet resultSet = fetch.result();
                        if (resultSet.getNumRows() == 0) {
                            response.put("found", false);
                        } else {
                            response.put("found", true);
                            JsonArray row = resultSet.getResults().get(0);
                            response.put("id", row.getInteger(0));
                            response.put("rawContent", row.getString(1));
                        }
                        resultHandler.handle(Future.succeededFuture(response));
                    } else {
                        LOGGER.error("Database query error", fetch.cause());
                        resultHandler.handle(Future.failedFuture(fetch.cause()));
                    }
                });
        return this;
    }

    @Override
    public WikiDatabaseService createPage(String title, String markdown,
            Handler<AsyncResult<Void>> resultHandler) {
        JsonArray data = new JsonArray().add(title).add(markdown);
        dbClient.updateWithParams(sqlQueries.get(SqlQuery.CREATE_PAGE), data,
                res -> {
                    if (res.succeeded()) {
                        resultHandler.handle(Future.succeededFuture());
                    } else {
                        LOGGER.error("Database query error", res.cause());
                        resultHandler.handle(Future.failedFuture(res.cause()));
                    }
                });
        return this;
    }

    @Override
    public WikiDatabaseService savePage(int id, String markdown,
            Handler<AsyncResult<Void>> resultHandler) {
        JsonArray data = new JsonArray().add(markdown).add(id);
        dbClient.updateWithParams(sqlQueries.get(SqlQuery.SAVE_PAGE), data,
                res -> {
                    if (res.succeeded()) {
                        resultHandler.handle(Future.succeededFuture());
                    } else {
                        LOGGER.error("Database query error", res.cause());
                        resultHandler.handle(Future.failedFuture(res.cause()));

                    }
                });
        return this;
    }

    @Override
    public WikiDatabaseService deletePage(int id,
            Handler<AsyncResult<Void>> resultHandler) {
        JsonArray data = new JsonArray().add(id);
        dbClient.updateWithParams(sqlQueries.get(SqlQuery.DELETE_PAGE), data,
                res -> {
                    if (res.succeeded()) {
                        resultHandler.handle(Future.succeededFuture());
                    } else {
                        LOGGER.error("Database query error", res.cause());
                        resultHandler.handle(Future.failedFuture(res.cause()));
                    }
                });
        return this;
    }
}

在代理代码生成可以工作之前,还需要最后一步:服务包需要有一个package-info.java注解来声明一个Vert.x模块:

@ModuleGen(groupPackage = "io.vertx.guides.wiki.database", name = "wiki-database")
package io.vertx.guides.wiki.database;
import io.vertx.codegen.annotations.ModuleGen;

4.4 从数据库Verticle公开数据库服务

由于大多数数据库处理代码已经被移动到WikiDatabaseServiceImpl类,WikiDatabaseVerticle类现在包含两个方法:start方法用来注册服务,以及一个工具方法加载SQL查询:

public class WikiDatabaseVerticle extends AbstractVerticle {

    public static final String CONFIG_WIKIDB_JDBC_URL = "wikidb.jdbc.url";
    public static final String CONFIG_WIKIDB_JDBC_DRIVER_CLASS = "wikidb.jdbc.driver_class";
    public static final String CONFIG_WIKIDB_JDBC_MAX_POOL_SIZE = "wikidb.jdbc.max_pool_size";
    public static final String CONFIG_WIKIDB_SQL_QUERIES_RESOURCE_FILE = "wikidb.sqlqueries.resource.file";
    public static final String CONFIG_WIKIDB_QUEUE = "wikidb.queue";

    @Override
    public void start(Future<Void> startFuture) throws Exception {
        HashMap<SqlQuery, String> sqlQueries = loadSqlQueries();
        JDBCClient dbClient = JDBCClient.createShared(
                vertx,
                new JsonObject()
                        .put("url",config().getString(CONFIG_WIKIDB_JDBC_URL,"jdbc:hsqldb:file:db/wiki"))
                        .put("driver_class",config().getString(CONFIG_WIKIDB_JDBC_DRIVER_CLASS,"org.hsqldb.jdbcDriver"))
                        .put("max_pool_size",config().getInteger(CONFIG_WIKIDB_JDBC_MAX_POOL_SIZE, 30)));
        WikiDatabaseService.create(dbClient, sqlQueries, ready -> {
            if (ready.succeeded()) {
                ProxyHelper.registerService(WikiDatabaseService.class, vertx,
                        ready.result(), CONFIG_WIKIDB_QUEUE); ①
                startFuture.complete();
            } else {
                startFuture.fail(ready.cause());
            }
        });
    }

    /*
     * Note: this uses blocking APIs, but data is small...
     */
    private HashMap<SqlQuery, String> loadSqlQueries() throws IOException {
        String queriesFile = config().getString(
                CONFIG_WIKIDB_SQL_QUERIES_RESOURCE_FILE);
        InputStream queriesInputStream;
        if (queriesFile != null) {
            queriesInputStream = new FileInputStream(queriesFile);
        } else {
            queriesInputStream = getClass().getResourceAsStream(
                    "/db-queries.properties");
        }
        Properties queriesProps = new Properties();
        queriesProps.load(queriesInputStream);
        queriesInputStream.close();
        HashMap<SqlQuery, String> sqlQueries = new HashMap<>();
        sqlQueries.put(SqlQuery.CREATE_PAGES_TABLE,queriesProps.getProperty("create-pages-table"));
        sqlQueries.put(SqlQuery.ALL_PAGES,queriesProps.getProperty("all-pages"));
        sqlQueries.put(SqlQuery.GET_PAGE, queriesProps.getProperty("get-page"));
        sqlQueries.put(SqlQuery.CREATE_PAGE,queriesProps.getProperty("create-page"));
        sqlQueries.put(SqlQuery.SAVE_PAGE,queriesProps.getProperty("save-page"));
        sqlQueries.put(SqlQuery.DELETE_PAGE,queriesProps.getProperty("delete-page"));
        return sqlQueries;
    }
}

① 我们在此处注册服务。

注册服务需要一个接口类,一个Vert.x上下文,一个实现类以及一个事件总线目的地址。

WikiDatabaseServiceVertxEBProxy生成类处理事件总线上接收到的消息并分发它们到WikiDatabaseServiceImpl。它做的事情实际上非常接近于我们在前面章节所做的:发送消息使用一个action头指定哪个方法被调用,并且参数被编码为JSON。

4.5 获得一个数据库服务代理

重构为Vert.x服务的最后步骤是改写HTTP Server Verticle来获得数据库服务的代理,并且在处理器(handler)中使用它代替事件总线:

首先,我们需要在启动Verticle时,创建代理:

private WikiDatabaseService dbService;
@Override
public void start(Future<Void> startFuture) throws Exception {
    String wikiDbQueue = config().getString(CONFIG_WIKIDB_QUEUE, "wikidb.queue"); ①
    dbService = WikiDatabaseService.createProxy(vertx, wikiDbQueue);
    HttpServer server = vertx.createHttpServer();
// (...)

① 我们仅仅需要确保使用的事件总线目的地址与WikiDatabaseVerticle发布的服务相同。

然后,我们需要用数据库服务的调用替换事件总线的调用:

private void indexHandler(RoutingContext context) {
    dbService.fetchAllPages(reply -> {
        if (reply.succeeded()) {
            context.put("title", "Wiki home");
            context.put("pages", reply.result().getList());
            templateEngine.render(
                    context,
                    "templates",
                    "/index.ftl",
                    ar -> {
                        if (ar.succeeded()) {
                            context.response().putHeader("Content-Type",
                                    "text/html");
                            context.response().end(ar.result());
                        } else {
                            context.fail(ar.cause());
                        }
                    });
        } else {
            context.fail(reply.cause());
        }
    });
}

private void pageRenderingHandler(RoutingContext context) {
    String requestedPage = context.request().getParam("page");
    dbService.fetchPage(
            requestedPage,
            reply -> {
                if (reply.succeeded()) {
                    JsonObject payLoad = reply.result();
                    boolean found = payLoad.getBoolean("found");
                    String rawContent = payLoad.getString("rawContent",
                            EMPTY_PAGE_MARKDOWN);
                    context.put("title", requestedPage);
                    context.put("id", payLoad.getInteger("id", -1));
                    context.put("newPage", found ? "no" : "yes");
                    context.put("rawContent", rawContent);
                    context.put("content", Processor.process(rawContent));
                    context.put("timestamp", new Date().toString());
                    templateEngine.render(
                            context,
                            "templates",
                            "/page.ftl",
                            ar -> {
                                if (ar.succeeded()) {
                                    context.response().putHeader(
                                            "Content-Type", "text/html");
                                    context.response().end(ar.result());
                                } else {
                                    context.fail(ar.cause());
                                }
                            });

                } else {
                    context.fail(reply.cause());
                }
            });
}

private void pageUpdateHandler(RoutingContext context) {
    String title = context.request().getParam("title");
    Handler<AsyncResult<Void>> handler = reply -> {
        if (reply.succeeded()) {
            context.response().setStatusCode(303);
            context.response().putHeader("Location", "/wiki/" + title);
            context.response().end();
        } else {
            context.fail(reply.cause());
        }
    };

    String markdown = context.request().getParam("markdown");
    if ("yes".equals(context.request().getParam("newPage"))) {
        dbService.createPage(title, markdown, handler);
    } else {
        dbService.savePage(
                Integer.valueOf(context.request().getParam("id")),
                markdown, handler);
    }
}

private void pageCreateHandler(RoutingContext context) {
    String pageName = context.request().getParam("name");
    String location = "/wiki/" + pageName;
    if (pageName == null || pageName.isEmpty()) {
        location = "/";
    }
    context.response().setStatusCode(303);
    context.response().putHeader("Location", location);
    context.response().end();
}

private void pageDeletionHandler(RoutingContext context) {
    dbService.deletePage(Integer.valueOf(context.request().getParam("id")),
            reply -> {
                if (reply.succeeded()) {
                    context.response().setStatusCode(303);
                    context.response().putHeader("Location", "/");
                    context.response().end();
                } else {
                    context.fail(reply.cause());
                }
            });
}

生成的WikiDatabaseServiceVertxProxyHandler类处理转发调用为事件总线消息。

仍然完全可以直接通过事件总线消息使用Vert.x服务,因为这正是生成的代理做的事情。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容