Apache Pulsar 之 Java Function实践篇

Apache Pulsar 之 Java Function 实践篇

一篇文章了解 Pulsar Functions 中,我们介绍了什么是 Pulsar Function,它的运行机制以及原理。这篇文章具体介绍如何在 Java 中编写、部署并运行 Java Function。

部署 standalone Pulsar

前面我们说到 function 是 pulsar 的计算层,function 的运行依赖于 pulsar 的运行,为了方便演示,我们使用 docker 的方式运行一个 standalone 的 pulsar。

  1. 从docker hub 上获取 apachepulsar/pulsar:2.3.0 的镜像。
docker pull apachepulsar/pulsar:2.3.0
  1. start pulsar
docker run -it \
 -p 6650:6650 \
 -p 8080:8080 \
 -v $PWD/pulsardata:/pulsar/data \
 apachepulsar/pulsar:2.3.0 \
 bin/pulsar standalone

其中 $PWD/pulsardata 是本地目录,使用 -v 参数将 docker 镜像中的 /pulsar/data 目录映射到本地的 $PWD/pulsardata 目录。

  1. 进入pulsar镜像
docker ps

找到 apachepulsar/pulsar:2.3.0CONTAINER ID 并执行:

docker exec -it [CONTAINER ID] /bin/bash

到目前为止,你成功的以 standalone 形式启动了 Pulsar。

编写 Java Function

下面开始编写自己的第一个Java Function:

首先,新建一个 maven 工程,pom 文件具体如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>java-function</groupId>
    <artifactId>java-function</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-functions-api</artifactId>
            <version>2.3.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                    <manifest>
                        <mainClass>org.example.test.ExclamationFunction</mainClass>
                    </manifest>
                </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

注意:

<manifest>
        <mainClass>org.example.test.ExclamationFunction</mainClass>
 </manifest>

这里的 mainClass 需要修改为自己的路径。

  • Java Function 代码示例:
package org.example.test;

import java.util.function.Function;

public class ExclamationFunction implements Function<String, String> {
    @Override
    public String apply(String s) {
        return "This is my function!";
    }
}

在上述示例代码中,我们引用了 Java8 提供的 Function 接口,在 Pulsar Functions 中,同样提供了 Function 接口,二者之间的主要区别是:Pulsar Functions 提供了 Context 接口,当你在编写 function 时,如果需要与 Pulsar Functions 进行交互,可以使用 Context 来获取,示例如下:

package org.example.functions;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.util.Arrays;

public class WordCountFunction implements Function<String, Void> {
    // This function is invoked every time a message is published to the input topic
    @Override
    public Void process(String input, Context context) throws Exception {
        Arrays.asList(input.split(" ")).forEach(word -> {
            String counterKey = word.toLowerCase();
            context.incrCounter(counterKey, 1);
        });
        return null;
    }
}

部署 Java Function

如下图所示,Pulsar Functions 目前支持两种状态:start 和 stop。start 到处理 function 之间,是 Pulsar Functions 的初始化阶段,如:setupProducer、setupConsumer、setupLogTopic 等;当初始化工作完成之后,Pulsar Functions 会动态加载用户的 function 代码,作为 Pulsar Functions 处理逻辑的一部分,如果用户编写的 function 有输出,在上图所示的第二步 HandlerMessage 中,会有相应的 output 输出到下游来做相应的处理;处理 function 到 stop 之间是 Pulsar Functions 处理 output 的过程。如上图第三步所示,Pulsar Functions 会调用 processResult 函数,将相应的 output 处理之后,输出到 output topic 中,同时关闭用户在初始化工作时打开的系统资源。

image.png
  1. 打包 function
mvn package

这个时候打开 target 目录,查看是否有一个类似 java-function-1.0-SNAPSHOT.jar 的jar 包。

  1. 执行 function

上面你已经准备好了standalone pulsar, 但是 pulsar 环境中目前还没有你打包好的 jar 文件,所以首先需要 copy 打包好的 jar 文件到 pulsar 镜像中:

docker cp $PWD/javafunction/target/java-function-1.0-SNAPSHOT.jar  CONTAINER ID:/pulsar

执行如下命令,运行 function:

./bin/pulsar-admin functions localrun \

--classname org.example.test.ExclamationFunction \
--jar java-function-1.0-SNAPSHOT.jar \
--inputs persistent://public/default/my-topic-1 \
--output persistent://public/default/test-1 \
--tenant public \
--namespace default \
--name ExclamationFunctio6

下面我们详细解释一下,每一个选项具体的作用:

Options Describe
functions 通知 pulsar broker,你即将运行 function 实例
classname 指定运行 function 的 ClassName, 需要将包名加上,形如:org.example.test.ExclamationFunction
localrun 当你指定localrun时,function 将以 localrun 的形式运行
create 当你指定create的时,function 将以 cluster 的形式运行
jar 指定 jar 包的运行路径。
inputs 指定 function 数据的来源在哪里,支持多个 topics 作为输入
output 如果该 function 有输出(有些情况下,function 没有输出),指定 function 输出的 topic,只能有一个输出
tenant 指定该 function 运行的租户名
namespace 指定该 function 运行的命名空间
name 指定该 function 运行的名称

上述启动选项仅是作为 Demo 演示,更多详细选项文档可以通过./bin/pulsar-admin functions 直接查看。

启动之后,如果看到下述日志,证明启动成功:

07:55:03.724 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully

07:55:34.948 [Timer-0] INFO  org.apache.pulsar.functions.runtime.LocalRunner - {                        │
  "running": true,                                                                                     
  "instanceId": "0"                                                                                    
}                                                                                                      

当你将上述启动命令的 localrun 替换为 create,就能以集群的模式启动,启动后会输出一行日志,具体如下:

"Created successfully"

除了启动 Pulsar Functions 之外,Pulsar 提供了如下命令,用于控制 function 的状态。

停止 Java Function

停止 Pulsar Function 的命令为 stop,它提供了如下参数列表:

Options Describe
tenant The Function's tenant
namespace The Function's namespace
name The Function's name
fqfn fqfn 是 tenant、namespace、name 的组合表示,用于唯一标识一个 Function,形如:tenant/namespace/name
instance-id The Function's instanceID(如果 instanceID 没有被提供,则默认关闭所有的 instances)

下面以 tenantnamespacename 为例,停止 ExclamationFunctio123 Function,具体如下:

root@856932ba3474:/pulsar# ./bin/pulsar-admin functions stop \
--tenant public \
--namespace default \
--name ExclamationFunctio6

Stopped successfully

启动 Java Function

启动 Pulsar Function 的命令为 start,它提供了如下参数列表:

Options Describe
tenant The Function's tenant
namespace The Function's namespace
name The Function's name
fqfn fqfn 是 tenant、namespace、name 的组合表示,用于唯一标识一个 Function,形如:tenant/namespace/name
instance-id The Function's instanceID(如果 instanceID 没有被提供,则默认启动所有的 instances)

下面以 tenantnamespacename 为例,启动 ExclamationFunctio123 Function,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions start \
--tenant public \
--namespace default \
--name ExclamationFunctio123

Started successfully

重启 Java Function

重启 Pulsar Function 的命令为 restart,它提供了如下参数列表:

Options Describe
tenant The Function's tenant
namespace The Function's namespace
name The Function's name
fqfn fqfn 是 tenant、namespace、name 的组合表示,用于唯一标识一个 Function,形如:tenant/namespace/name
instance-id The Function's instanceID(如果 instanceID 没有被提供,则默认重启所有的 instances)

下面以 tenantnamespacename 为例,重启 ExclamationFunctio123 Function,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions restart \
--tenant public \
--namespace default \
--name ExclamationFunctio123

Restarted successfully

更新 Java Function

当启动或者运行一段时间 Pulsar Functions 之后,如果想要更改 Function 的内部运行参数,可以使用 update 命令,update 提供的参数列表与创建一个 Function 基本一致,可以使用 ./bin/pulsar-admin functions 查看帮助文档。

下面示例将 ExclamationFunctio123cpu 核数更新为 10,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions update --tenant public --namespace default --name ExclamationFunctio123 --cpu 10

"Updated successfully"

删除 Java Function

删除 Pulsar Function 的命令为 restart,它提供了如下参数列表:

Options Describe
tenant The Function's tenant
namespace The Function's namespace
name The Function's name
fqfn fqfn 是 tenant、namespace、name 的组合表示,用于唯一标识一个 Function,形如:tenant/namespace/name

下面以 tenantnamespacename 为例,删除 ExclamationFunctio123 Function,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name ExclamationFunctio123

"Deleted successfully"
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容