Vert.x Java开发指南——第三章 重构为独立可重用的Verticle

感兴趣的朋友,可以关注微信服务号“猿学堂社区”,或加入“猿学堂社区”微信交流群

版权声明:本文由作者自行翻译,未经作者授权,不得随意转发。

通过第一次迭代,我们得到了一个可工作的Wiki应用。然而它的实现存在以下问题:

  1. HTTP请求处理和数据库访问代码交织在相同的方法中。
  2. 大量配置数据(如端口号、JDBC驱动等)是代码中的硬编码字符串。

3.1 架构和技术选择

第二次迭代是关于重构代码为独立可重用Verticle的:

image

我们将部署两个Verticle来处理HTTP请求,一个Verticle封装数据库持久化。由此产生的Verticle将没有相互的直接引用,它们将只商定事件总线中的目的地名称以及消息格式。这种方式提供了一个简单但有效的解耦。

发送到事件总线的消息将解码为JSON。虽然Vert.x的事件总线支持灵活的串行化方案用于高要求或者高度定制的上下文,但是使用JSON数据通常是明智的选择。使用JSON的另一个优势是它是一种语言无关的格式。由于Vert.x是支持多语言的,对于使用不同语言编写的Verticle之间的通讯,JSON是非常理想的。

3.2 HTTP Server Verticle

Verticle类开端及start方法看起来如下:

public class HttpServerVerticle extends AbstractVerticle {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpServerVerticle.class);
    public static final String CONFIG_HTTP_SERVER_PORT = "http.server.port"; ①
    public static final String CONFIG_WIKIDB_QUEUE = "wikidb.queue";
    private String wikiDbQueue = "wikidb.queue";
    @Override
    public void start(Future<Void> startFuture) throws Exception {
        wikiDbQueue = config().getString(CONFIG_WIKIDB_QUEUE, "wikidb.queue"); ②
        HttpServer server = vertx.createHttpServer();
        Router router = Router.router(vertx);
        router.get("/").handler(this::indexHandler);
        router.get("/wiki/:page").handler(this::pageRenderingHandler);
        router.post().handler(BodyHandler.create());
        router.post("/save").handler(this::pageUpdateHandler);
        router.post("/create").handler(this::pageCreateHandler);
        router.post("/delete").handler(this::pageDeletionHandler);
        int portNumber = config().getInteger(CONFIG_HTTP_SERVER_PORT, 8080); ③
        server
            .requestHandler(router::accept)
            .listen(portNumber, ar -> {
                if (ar.succeeded()) {
                    LOGGER.info("HTTP server running on port " + portNumber);
                    startFuture.complete();
                } else {
                    LOGGER.error("Could not start a HTTP server", ar.cause());
                    startFuture.fail(ar.cause());
                }
            });
    }
    // (...)

① 我们暴露了公开的常量用于Verticle配置参数:HTTP端口号以及发送消息到数据库Verticle的事件总线目的地名称。

② AbstractVerticle#config()方法允许访问已提供的Verticle配置。对于没有指定值的情况,第二个参数是默认值。

③ 配置值不只可以是字符串,也可以是整数、布尔值以及复杂的JSON数据等。

该类剩余部分主要是提取HTTP部分的代码,以前的数据库代码通过事件总线消息替换。这是indexHandler方法的代码:

private final FreeMarkerTemplateEngine templateEngine = FreeMarkerTemplateEngine.create();
private void indexHandler(RoutingContext context) {
    DeliveryOptions options = new DeliveryOptions().addHeader("action", "all-pages"); ②
    vertx.eventBus().send(wikiDbQueue, new JsonObject(), options, reply -> { ①
        if (reply.succeeded()) {
            JsonObject body = (JsonObject) reply.result().body(); ③
            context.put("title", "Wiki home");
            context.put("pages", body.getJsonArray("pages").getList());
            templateEngine.render(context, "templates", "/index.ftl", ar -> {
                if (ar.succeeded()) {
                    context.response().putHeader("Content-Type", "text/html");
                    context.response().end(ar.result());
                } else {
                    context.fail(ar.cause());
                }
            });
        } else {
            context.fail(reply.cause());
        }
    });
}

① vertx对象提供了对事件总线的访问,我们发送一个消息到数据库Verticle的队列。

② 传递选项(DeliveryOptions)允许我们指定头、有效载荷(payload)编解码器和超时时间。

③ 一旦成功,回复包含有效载荷。

正如我们所看到的,事件总线消息由一个消息体和选项组成,它可以选择性地期待一个答复。对于没有预期答复的情况,有一个send方法的变体,它没有Handler参数。

我们将有效载荷编码为JSON对象,并通过一个称为action的消息头指定数据库Verticle应该执行哪个操作。

Verticle的剩余代码就是路由器处理器,同样使用事件总线获取和存储数据:

private static final String EMPTY_PAGE_MARKDOWN = "# A new page\n" + "\n"
        + "Feel-free to write in Markdown!\n";

private void pageRenderingHandler(RoutingContext context) {
    String requestedPage = context.request().getParam("page");
    JsonObject request = new JsonObject().put("page", requestedPage);
    DeliveryOptions options = new DeliveryOptions().addHeader("action",
            "get-page");
    vertx.eventBus().send(
            wikiDbQueue,
            request,
            options,
            reply -> {
                if (reply.succeeded()) {
                    JsonObject body = (JsonObject) reply.result().body();
                    boolean found = body.getBoolean("found");
                    String rawContent = body.getString("rawContent",
                            EMPTY_PAGE_MARKDOWN);
                    context.put("title", requestedPage);
                    context.put("id", body.getInteger("id", -1));
                    context.put("newPage", found ? "no" : "yes");
                    context.put("rawContent", rawContent);
                    context.put("content", Processor.process(rawContent));
                    context.put("timestamp", new Date().toString());
                    templateEngine.render(
                            context,
                            "templates",
                            "/page.ftl",
                            ar -> {
                                if (ar.succeeded()) {
                                    context.response().putHeader(
                                            "Content-Type", "text/html");
                                    context.response().end(ar.result());
                                } else {
                                    context.fail(ar.cause());
                                }
                            });
                } else {
                    context.fail(reply.cause());
                }
            });
}

private void pageUpdateHandler(RoutingContext context) {
    String title = context.request().getParam("title");
    JsonObject request = new JsonObject()
            .put("id", context.request().getParam("id"))
            .put("title", title)
            .put("markdown", context.request().getParam("markdown"));
    DeliveryOptions options = new DeliveryOptions();
    if ("yes".equals(context.request().getParam("newPage"))) {
        options.addHeader("action", "create-page");
    } else {
        options.addHeader("action", "save-page");
    }

    vertx.eventBus().send(wikiDbQueue, request, options, reply -> {
        if (reply.succeeded()) {
            context.response().setStatusCode(303);
            context.response().putHeader("Location", "/wiki/" + title);
            context.response().end();
        } else {
            context.fail(reply.cause());
        }
    });
}

private void pageCreateHandler(RoutingContext context) {
    String pageName = context.request().getParam("name");
    String location = "/wiki/" + pageName;
    if (pageName == null || pageName.isEmpty()) {
        location = "/";
    }
    context.response().setStatusCode(303);
    context.response().putHeader("Location", location);
    context.response().end();
}

private void pageDeletionHandler(RoutingContext context) {
    String id = context.request().getParam("id");
    JsonObject request = new JsonObject().put("id", id);
    DeliveryOptions options = new DeliveryOptions().addHeader("action",
            "delete-page");
    vertx.eventBus().send(wikiDbQueue, request, options, reply -> {
        if (reply.succeeded()) {
            context.response().setStatusCode(303);
            context.response().putHeader("Location", "/");
            context.response().end();
        } else {
            context.fail(reply.cause());
        }
    });
}

3.3 数据库Verticle

使用JDBC链接到一个数据库当然需要数据库驱动以及配置,这些我们在第一次迭代中采用硬编码的方式实现。

3.3.1 配置SQL查询

在将前面Verticle的硬编码值转换为配置参数的同时,我们还可以进一步从一个配置文件中加载SQL查询。

查询语句将从一个文件中加载,这个文件名作为一个配置参数传递,如果没有提供则从一个默认资源加载。这个方法的优点是Verticle可以适配不同的JDBC驱动和SQL方言。

Verticle类的开端包含了主要的配置键的定义:

public class WikiDatabaseVerticle extends AbstractVerticle {
    public static final String CONFIG_WIKIDB_JDBC_URL = "wikidb.jdbc.url";
    public static final String CONFIG_WIKIDB_JDBC_DRIVER_CLASS = "wikidb.jdbc.driver_class";
    public static final String CONFIG_WIKIDB_JDBC_MAX_POOL_SIZE = "wikidb.jdbc.max_pool_size";
    public static final String CONFIG_WIKIDB_SQL_QUERIES_RESOURCE_FILE = "wikidb.sqlqueries.resource.file";
    public static final String CONFIG_WIKIDB_QUEUE = "wikidb.queue";
    private static final Logger LOGGER = LoggerFactory.getLogger(WikiDatabaseVerticle.class);
    // (...)

SQL查询存储在一个Properties文件中,使用HSQLDB的默认文件位于src/main/resources/db-queries.properties:

create-pages-table=create table if not exists Pages (Id integer identity primary key, Name varchar(255) unique, Content
clob)
get-page=select Id, Content from Pages where Name = ?
create-page=insert into Pages values (NULL, ?, ?)
save-page=update Pages set Content = ? where Id = ?
all-pages=select Name from Pages
delete-page=delete from Pages where Id = ?

WikiDdatabaseVerticle中的以下代码用于从文件中加载SQL查询,并将它们放到一个map中:

private enum SqlQuery {
    CREATE_PAGES_TABLE,
    ALL_PAGES,
    GET_PAGE,
    CREATE_PAGE,
    SAVE_PAGE,
    DELETE_PAGE
}

private final HashMap<SqlQuery, String> sqlQueries = new HashMap<>();

private void loadSqlQueries() throws IOException {
    String queriesFile = config().getString(CONFIG_WIKIDB_SQL_QUERIES_RESOURCE_FILE);
    InputStream queriesInputStream;
    if (queriesFile != null) {
        queriesInputStream = new FileInputStream(queriesFile);
    } else {
        queriesInputStream = getClass().getResourceAsStream("/db-queries.properties");
    }
    Properties queriesProps = new Properties();
    queriesProps.load(queriesInputStream);
    queriesInputStream.close();
    sqlQueries.put(SqlQuery.CREATE_PAGES_TABLE, queriesProps.getProperty("create-pages-table"));
    sqlQueries.put(SqlQuery.ALL_PAGES, queriesProps.getProperty("all-pages"));
    sqlQueries.put(SqlQuery.GET_PAGE, queriesProps.getProperty("get-page"));
    sqlQueries.put(SqlQuery.CREATE_PAGE, queriesProps.getProperty("create-page"));
    sqlQueries.put(SqlQuery.SAVE_PAGE, queriesProps.getProperty("save-page"));
    sqlQueries.put(SqlQuery.DELETE_PAGE, queriesProps.getProperty("delete-page"));
}

在接下来的代码中,我们使用SqlQuery枚举类型以避免字符串常量。Verticle的start方法代码如下:

private JDBCClient dbClient;

@Override
public void start(Future<Void> startFuture) throws Exception {
    /*
    * Note: this uses blocking APIs, but data is small...
    */
    loadSqlQueries(); ①
    dbClient = JDBCClient.createShared(vertx, new JsonObject()
        .put("url", config().getString(CONFIG_WIKIDB_JDBC_URL, "jdbc:hsqldb:file:db/wiki"))
        .put("driver_class", config().getString(CONFIG_WIKIDB_JDBC_DRIVER_CLASS, "org.hsqldb.jdbcDriver"))
        .put("max_pool_size", config().getInteger(CONFIG_WIKIDB_JDBC_MAX_POOL_SIZE, 30)));
    dbClient.getConnection(ar -> {
        if (ar.failed()) {
            LOGGER.error("Could not open a database connection", ar.cause());
            startFuture.fail(ar.cause());
        } else {
            SQLConnection connection = ar.result();
            connection.execute(sqlQueries.get(SqlQuery.CREATE_PAGES_TABLE), create -> { ②
                connection.close();
                if (create.failed()) {
                    LOGGER.error("Database preparation error", create.cause());
                    startFuture.fail(create.cause());
                } else {
                    vertx.eventBus().consumer(config().getString(CONFIG_WIKIDB_QUEUE,                       "wikidb.queue"), this::onMessage); ③
                    startFuture.complete();
                }
            });
        }
    });
}

① 有趣的是,我们打破了Vert.x中的一个重要的原则——就是避免阻塞API,但是因为没有异步API来访问类路径上的资源,所以我们的选择是受限的。我们可以使用Vert.x的executeBlocking方法将阻塞的I/O操作从事件循环转移到工作者线程,但是由于数据非常小,这么做没有明显的效益。

② 这儿是使用SQL查询的一个示例。

③ consumer方法注册了一个事件总线目的地Handler。

3.3.2 分发请求

事件总线消息的Handler是onMessage方法:

public enum ErrorCodes {
    NO_ACTION_SPECIFIED,
    BAD_ACTION,
    DB_ERROR
}
public void onMessage(Message<JsonObject> message) {
    if (!message.headers().contains("action")) {
        LOGGER.error("No action header specified for message with headers {} and body {}",
        message.headers(), message.body().encodePrettily());
        message.fail(ErrorCodes.NO_ACTION_SPECIFIED.ordinal(), "No action header specified");
        return;
    }
    String action = message.headers().get("action");
    switch (action) {
        case "all-pages":
            fetchAllPages(message);
            break;
        case "get-page":
            fetchPage(message);
            break;
        case "create-page":
            createPage(message);
            break;
        case "save-page":
            savePage(message);
            break;
        case "delete-page":
            deletePage(message);
            break;
        default:
            message.fail(ErrorCodes.BAD_ACTION.ordinal(), "Bad action: " + action);
    }
}

我们为错误定义了一个ErrorCodes枚举,用来报回消息发送者。为此,Message类的fail方法提供了一个便捷方式答复错误,原消息发送者得到一个失败的AsyncResult.

3.3.3 减少JDBC客户端样板文件(译者注:原文使用的是boilerplate,应该指的是那些重复的代码)

截止目前,我们可以看到执行一个SQL查询的完整交互:

  1. 获取一个链接
  2. 执行请求
  3. 释放链接

这导致对每个异步操作需要异常处理的地方都需要编码,如下:

dbClient.getConnection(car -> {
    if (car.succeeded()) {
        SQLConnection connection = car.result();
        connection.query(sqlQueries.get(SqlQuery.ALL_PAGES), res -> {
            connection.close();
            if (res.succeeded()) {
                List<String> pages = res.result()
                    .getResults()
                    .stream()
                    .map(json -> json.getString(0))
                    .sorted()
                    .collect(Collectors.toList());
                message.reply(new JsonObject().put("pages", new JsonArray(pages)));
            } else {
                reportQueryError(message, res.cause());
            }
        });
    } else {
        reportQueryError(message, car.cause());
    }
});

自从Vert.x 3.5.0开始,JDBC客户端现在支持一次性(one-shot)操作,获取一个链接执行一个SQL操作,并且在内部释放。与前面相同的代码现在简化如下:

dbClient.query(sqlQueries.get(SqlQuery.ALL_PAGES), res -> {
if (res.succeeded()) {
List<String> pages = res.result()
.getResults()
.stream()
.map(json -> json.getString(0))
.sorted()
.collect(Collectors.toList());
message.reply(new JsonObject().put("pages", new JsonArray(pages)));
} else {
reportQueryError(message, res.cause());
}
});

这对于获取数据库链接执行一个单独操作的情况非常有用。但就性能而言,需要注意的是,对于链式的SQL操作,重用数据库链接会更好。

该类的剩余部分包含了onMessage分发接收消息时的私有方法调用:

private void fetchAllPages(Message<JsonObject> message) {
    dbClient.query(sqlQueries.get(SqlQuery.ALL_PAGES), res -> {
        if (res.succeeded()) {
            List<String> pages = res.result()
                .getResults()
                .stream()
                .map(json -> json.getString(0))
                .sorted()
                .collect(Collectors.toList());
            message.reply(new JsonObject().put("pages", new JsonArray(pages)));
        } else {
            reportQueryError(message, res.cause());
        }
    });
}

private void fetchPage(Message<JsonObject> message) {
    String requestedPage = message.body().getString("page");
    JsonArray params = new JsonArray().add(requestedPage);
    dbClient.queryWithParams(sqlQueries.get(SqlQuery.GET_PAGE), params, fetch -> {
        if (fetch.succeeded()) {
            JsonObject response = new JsonObject();
            ResultSet resultSet = fetch.result();
            if (resultSet.getNumRows() == 0) {
                response.put("found", false);
            } else {
                response.put("found", true);
                JsonArray row = resultSet.getResults().get(0);
                response.put("id", row.getInteger(0));
                response.put("rawContent", row.getString(1));
            }
            message.reply(response);
        } else {
            reportQueryError(message, fetch.cause());
        }
    });
}

private void createPage(Message<JsonObject> message) {
    JsonObject request = message.body();
    JsonArray data = new JsonArray()
        .add(request.getString("title"))
        .add(request.getString("markdown"));
    dbClient.updateWithParams(sqlQueries.get(SqlQuery.CREATE_PAGE), data, res -> {
        if (res.succeeded()) {
            message.reply("ok");
        } else {
            reportQueryError(message, res.cause());
        }
    });
}

private void savePage(Message<JsonObject> message) {
    JsonObject request = message.body();
    JsonArray data = new JsonArray()
        .add(request.getString("markdown"))
        .add(request.getString("id"));
    dbClient.updateWithParams(sqlQueries.get(SqlQuery.SAVE_PAGE), data, res -> {
        if (res.succeeded()) {
            message.reply("ok");
        } else {
            reportQueryError(message, res.cause());
        }
    });
}

private void deletePage(Message<JsonObject> message) {
    JsonArray data = new JsonArray().add(message.body().getString("id"));
    dbClient.updateWithParams(sqlQueries.get(SqlQuery.DELETE_PAGE), data, res -> {
        if (res.succeeded()) {
            message.reply("ok");
        } else {
            reportQueryError(message, res.cause());
        }
    });
}

private void reportQueryError(Message<JsonObject> message, Throwable cause) {
    LOGGER.error("Database query error", cause);
    message.fail(ErrorCodes.DB_ERROR.ordinal(), cause.getMessage());
}

3.4 从主Verticle部署Verticle

我们依旧有一个MainVerticle类,但它不是像首次迭代一样包含所有逻辑,它的唯一目的是启动应用并且部署其它Verticle。

这些代码包括部署一个WikiDatabaseVerticle实例和两个HttpServerVerticle实例:

public class MainVerticle extends AbstractVerticle {

@Override
public void start(Future<Void> startFuture) throws Exception {
    Future<String> dbVerticleDeployment = Future.future(); ①
    vertx.deployVerticle(new WikiDatabaseVerticle(), dbVerticleDeployment.completer()); ②
    dbVerticleDeployment.compose(id -> { ③
        Future<String> httpVerticleDeployment = Future.future();
        vertx.deployVerticle("io.vertx.guides.wiki.HttpServerVerticle", ④
                new DeploymentOptions().setInstances(2), ⑤
                httpVerticleDeployment.completer());
        return httpVerticleDeployment; ⑥
        }).setHandler(ar -> { ⑦
            if (ar.succeeded()) {
                startFuture.complete();
            } else {
                startFuture.fail(ar.cause());
            }
        });
    }
}

① 部署Verticle是一个异步操作,因此我们需要一个Future。String参数类型是因为一个Verticle部署成功时会返回一个标识。

② 一种选择是使用new创建一个Verticle实例,传递对象引用给deploy方法。completer返回值是一个处理器,简单的完成future。

③ 使用compose的顺序组合允许在一个操作之后运行另一个异步操作。当初始化future成功完成之后,调用组合方法。

④ 指定一个类名字符串也是部署Verticle的一种选项。对于其它JVM语言来说,基于字符串的惯例(conventions)允许指定一个模块/脚本。

⑤ DeploymentOption类允许指定一些参数,尤其部署的实例个数。

⑥ 组合方法返回下一个future。它的完成将会触发组合操作的完成。

⑦ 我们定义了一个Handler,以便完成MainVerticle的启动future。

精明的读者可能会感到惊奇,我们怎么可以在同一个端口部署HTTP Server代码两次,并且对于每个实例都不期望出现由于TCP端口已经被占用而导致的任何错误。对于许多Web框架,我们需要选择不同的TCP端口,并且有一个前端HTTP代理来执行端口之间的负载平衡。

对于Vert.x则不需要这么做,多个Verticle可以共享相同的TCP端口号。传入的连接只是简单的通过接收线程以轮转的方式分发。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容