参考链接
- https://pulsar.apache.org/docs/en/functions-overview/
- https://streaml.io/blog/exactly-once
- https://streaml.io/blog/pulsar-effectively-once
笔记
- 概述
轻量级的计算方法,依赖于Pulsar消息系统,处理逻辑如下:
1. 消费一个或多个Topic
2. 每个消息都执行用户提供的处理逻辑
3. 发布计算后的结果到另一个Topic
目标:
1. 开发人员开发效率
2. 排查故障方便
3. 操作简单(无需外部系统)
灵感:
1. 流式处理引擎(Apache Storm,Apache Heron,Apache Flink)
2. "Serverless", "Function as a Service"(Amazon Web Services Lambda,Google Cloud Functions,Azure Cloud Functions)
Function可如下描述:
a. Lambda样式的函数
b. 使用Pulsar为消息总线而设计
程序设计模型:
提供了广泛的功能,核心编程模型简单.
Functions可以从一个或多个输入Topic中获取数据,并且对于每个收到的消息可以执行如下任务:
1. 对输入消息做逻辑处理,并输出数据至指定的Topic或Bookkeeper
2. 往日志Topic写入处理日志(用于调试)
3. 递增计数器
可以将多个Function组合起来,实现一个消息处理链,如:
输入Message -> 关键字过滤白名单Function->白名单结果Topic->消息统计Function->统计结果Topic->结果入库Function
Functions处理消息类型:
Pulsar Functions默认是以字节数组的形式来接收消息及发送消息;也可以绑定消息的类型(有两种方式)
1. Schema Registry
pulsar内置的schema注册中心
2. SerDe
自定义Serialization,Deserialization函数(必须跟function打在同一个包里),并在发布Function时指定处理类,使用参数--output-serde-classname,如: --output-serde-classname com.example.serde.TweetSerde
FQFN(完全限定函数名)
包含三元素:function tenant, namespace, function name
故相同的函数名可以部署在不同的命名空间下。
支持的开发语言
Java
Python
Go
消息处理三种模式保证
At-most-once delivery: 最多一次投递,发送到函数的每个消息都可能被处理,或者不被处理;处理过程失败后将没有重试机制;
At-least-once delivery:最少一次投递,发送到函数的每个消息都可以被多次处理,Function默认的模式;处理过程失败后消息将会被重新投递处理,故存在消息被多次投递、处理的情况;
Effectively-once delivery:只有一次有效投递,发送到函数的每条消息都将有一个与之关联的输出;
Exactly-once delivery:只投递一次
基本实现的方式有两种:
分布式快照\状态检查点 (Chandy-Lamport分布式快照算法)
定期检查,出现失败情况,回滚至最近的一次全局一致性快照/检查点,重新消费处理对应offset的消息
优点: 较少的性能和资源开销
缺点:故障中恢复时对性能的影响更大;计算网络拓扑越大,对性能的影响越大基于“最少投递一次”增加消息去重插件
消息处理前,先进行去重处理;对于失败的消息直接重新投递,若有计算节点已经处理过,则不处理该消息;需要更多的资源,尤其是存储。
优点:故障对性能的影响是局部的;计算网络拓扑增大,并不会提高故障对性能的影响;
缺点:需要大量存储和基础架构来支持;每个事件/消息都需要额外的性能开销;
Pulsar Broker消息去重:
Broker的重复数据消除逻辑基于 record-keeping系统。每个Broker都会跟踪每个消息生产者最后一个发布“successfully”的消息ID。这些信息存储于内存(以producer -> last sequence ID形式),Pulsar会定期存储快照,并持久化同时复制多份。快照与快照消息日志相结合,可确保Broker崩溃后正确状态的重建。
同时要实现Exactly-once delivery,应用也需要做相应支持,简单样例:
Producer producer = client.createProducer(TOPIC_NAME, conf);
此方式创建的producer,系统将为其分配一个唯一的名称,并且发布消息的序列ID是从0开始;;若应用重启则名称将会变化,故需要做下改进:
设置producer名称,并从最后的序列ID开始发送消息;
并且应用系统需要将发送的消息和序列ID关联起来;
消费端需要将处理结果同序列ID关联起来(如写入DB),故障恢复时可以通过Reader API读取指定ID的消息;
- Functions工作原理
1.) 运行模式
【functions】
线程:function以线程方式运行于Functions worker,使用同一个JVM;只支持java function。
进程:由Functions worker fork出新进程运行function,在相同的宿主机上;支持Java, Python, and Go functions。
Kubernetes:以docker容器化方式运行functions。
【Functions worker】
模式一: 运行于Broker中,可以理解为Broker的一部分;简单方便;
模式二:独立的Functions worker集群模式运行;更好的资源隔离;
2.) functions开发
提供接口类型:
Language-native interface:无需pulsar指定的库,故也没有对应的context;
import java.util.function.Function;
public class JavaNativeExclamationFunction implements Function<String, String> {
@Override
public String apply(String input) {
return String.format("%s!", input);
}
}
Pulsar Function SDK:引入pulsar指定的库,实现的函数带有对应的context;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public class ExclamationFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
return String.format("%s!", input);
}
}
消息序列化:
a.) Pulsar内置的schema注册中心
b.) 自定义的序列化、反序列化函数(SerDe)
注意:需要将SerDe打包到function的包中
3.) 调试
a.) unit Test
@Test
public void testJavaNativeExclamationFunction() {
JavaNativeExclamationFunction exclamation = new JavaNativeExclamationFunction();
String output = exclamation.apply("foo");
Assert.assertEquals(output, "foo!");
}
@Test
public void testExclamationFunction() {
ExclamationFunction exclamation = new ExclamationFunction();
String output = exclamation.process("foo", mock(Context.class));
Assert.assertEquals(output, "foo!");
}
b.) localrun mode
public class ExclamationFunction implements Function<String, String> {
@Override
public String process(String s, Context context) throws Exception {
return s + "!";
}
public static void main(String[] args) throws Exception {
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName("exclamation");
functionConfig.setInputs(Collections.singleton("input")); functionConfig.setClassName(ExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput("output");
LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).build();
localRunner.start(false);
}
4.) 部署
使用pulsar-admin functions命令,主要参数如下:
Parameter | Default | 备注 |
---|---|---|
classname | The function's class name | |
jar | Path to the jar file for the function | |
name | The function's name | |
tenant | The function's tenant | |
namespace | The function's namespace | |
output | The function's output topic | |
inputs | The function's input topic or topics | |
subs-name | Pulsar source subscription name | |
processing-guarantees | The processing guarantees (aka delivery semantics) applied to the function | [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE |
parallelism | The function's parallelism factor | |
cpu | The cpu in cores that need to be allocated per function instance | |
log-topic | The topic to which the function's logs are produced | |
schema-type | The builtin schema type or custom schema class name to be used for messages output by the function |
特别说明
【Subscription type】
For at-least-once
and at-most-once
[processing guarantees], the [SHARED
]mode is applied by default; for effectively-once
guarantees, the [FAILOVER
] mode is applied.
支持本地运行(localrun,运行于连接的broker)和集群运行(create,伴随broker集群运行)
并行度设置(Parallelism):
localrun,多次调用命令,命令亦提供参数项parallelism
create,设置参数项parallelism