Apache Pulsar 之 Java Function 实践篇

作者 | 冉小龙

审校 | Anonymitaet

编辑 | Susan + Anonymitaet

阅读本文需要约 15 分钟。

导读:在 一篇文章了解 Pulsar Functions 中,我们介绍了什么是 Pulsar Functions,它的运行机制以及原理。这篇文章具体介绍如何在 Java 中编写、部署并运行 Java Function。

部署 standalone Pulsar

前面我们说到 Functions 是 Pulsar 的计算层,Functions 的运行依赖于 Pulsar 的运行,为了方便演示,我们使用 Docker 运行一个 standalone Pulsar。

1. 从 Docker Hub 上获取 apachepulsar/pulsar:2.3.0 的镜像。

docker pull apachepulsar/pulsar:2.3.0

2. 启动 Pulsar。

docker run -it \

-p 6650:6650 \

-p 8080:8080 \

-v $PWD/pulsardata:/pulsar/data \

apachepulsar/pulsar:2.3.0 \

bin/pulsar standalone

其中 PWD/pulsardata 是本地目录,使用 -v 参数将 docker 镜像中的 /pulsar/data 目录映射到本地的PWD/pulsardata 目录。

3. 进入 Pulsar 镜像。

docker ps

找到 apachepulsar/pulsar:2.3.0 的 CONTAINER ID 并执行:

docker exec -it [CONTAINER ID] /bin/bash

到目前为止,你成功启动了 standalone Pulsar。

编写 Java Function

下面开始编写自己的第一个 Java Function:

首先,新建一个 maven 工程,pom 文件具体如下:

image.gif

注意:

<manifest>

<mainClass>org.example.test.ExclamationFunction</mainClass>

</manifest>

这里的 mainClass 需要修改为自己的路径。

  • Java Function 代码示例:

package org.example.test;

import java.util.function.Function;

public class ExclamationFunction implements Function<String, String> {
@Override
public String apply(String s) {
return "This is my function!";
}
}

在上述示例代码中,我们引用了 Java 8 提供的 Functions 接口,在 Pulsar Functions 中,同样提供了 Functions 接口,二者之间的主要区别是:Pulsar Functions 提供了 Context 接口,当你在编写 functions 时,如果需要与 Pulsar Functions 进行交互,可以使用 Context 来获取,示例如下:

image

部署 Java Function

如下图所示,Pulsar Functions 目前支持两种状态:start 和 stop。start 到处理 function 之间,是 Pulsar Functions 的初始化阶段,例如,setupProducer、setupConsumer、setupLogTopic 等。

当初始化工作完成之后,Pulsar Functions 会动态加载用户的 function 代码,作为 Pulsar Functions 处理逻辑的一部分,如果用户编写的 function 有输出,在图中第二步 HandlerMessage 中,会有相应的 output 输出到下游来做相应的处理。

处理 function 到 stop 之间是 Pulsar Functions 处理 output 的过程。如图第三步所示,Pulsar Functions 会调用 processResult 函数,将相应的 output 处理之后,输出到 output topic 中,同时关闭用户在初始化工作时打开的系统资源。

image.gif

1. 打包 Function。

mvn package

这个时候打开 target 目录,查看是否有一个类似 java-function-1.0-SNAPSHOT.jar 的 jar 包。

2. 执行 Function。

上面你已经准备好了 standalone Pulsar, 但是 Pulsar 环境中目前还没有你打包好的 jar 文件,所以首先需要 copy 打包好的 jar 文件到 Pulsar 镜像中:

docker cp

$PWD/javafunction/target/java-function-1.0-SNAPSHOT.jar CONTAINER ID:/pulsar

执行如下命令,运行 Function:

./bin/pulsar-admin functions localrun --classname org.example.test.ExclamationFunction --jar java-function-1.0-SNAPSHOT.jar --inputs persistent://public/default/my-topic-1 --output persistent://public/default/test-1 --tenant public --namespace default --name ExclamationFunctio6

以下是每个选项的解释:

image.gif

上述启动选项仅是作为 Demo 演示,更多详细选项可以通过 ./bin/pulsar-admin functions 直接查看。

启动之后,如果看到下述日志,证明启动成功:

image.gif

当你将上述启动命令的 localrun 替换为 create,就能以集群的模式启动,启动后会输出一行日志,具体如下:

"Created successfully"

除了启动 Pulsar Functions 之外,Pulsar 提供了如下命令,用于控制 function 的状态。

停止 Java Function

停止 Pulsar Function 的命令为 stop,它提供了如下参数列表:

image

下面以 tenant、namespace、name 为例,停止 ExclamationFunctio123 Function,具体如下:

root@856932ba3474:/pulsar# ./bin/pulsar-admin functions stop
--tenant public
--namespace default
--name ExclamationFunctio6

Stopped successfully

启动 Java Function

启动 Pulsar Functions 的命令为 start,它提供了如下参数列表:

image

下面以 tenant、namespace、name 为例,启动 ExclamationFunctio123 Function,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions start
--tenant public
--namespace default
--name ExclamationFunctio123

Started successfully

重启 Java Function

重启 Pulsar Functions 的命令为 restart,它提供了如下参数列表:

image.gif

下面以 tenant、namespace、name 为例,重启 ExclamationFunctio123 Function,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions restart
--tenant public
--namespace default
--name ExclamationFunctio123

Restarted successfully

更新 Java Function

当启动或者运行一段时间 Pulsar Functions 之后,如果想要更改 Function 的内部运行参数,可以使用 update 命令,update 提供的参数列表与创建一个 Function 基本一致,可以使用 ./bin/pulsar-admin functions 查看参数细节。

下面示例将 ExclamationFunctio123 的 cpu 核数更新为 10,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions update --tenant public --namespace default --name ExclamationFunctio123 --cpu 10

"Updated successfully"

删除 Java Function

删除 Pulsar Functions 的命令为 delete,它提供了如下参数列表:

image.gif

下面以 tenant、namespace、name 为例,删除 ExclamationFunctio123 Function,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions delete
--tenant public
--namespace default
--name ExclamationFunctio123

"Deleted successfully"

更多关于 Pulsar 的干货和动态分享,请关注 StreamNative 微信公众号。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容