Kafka Connect transform初体验

可以配置Kafka Connector transforms以进行一个轻量级的消息修改。transforms可以方面的修改数据以及事件路由。

接下来我们将介绍如何配置一个transform。

首先,我们要在任务中配置transform,无论source 还是sink都可以配置transform

{
"name": "test-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"tombstones.on.delete": "false",
"database.server.id": "1",
"database.server.name": "test-server",
"database.history.kafka.topic": "test-server-history",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"include.schema.changes": "true",
"database.serverTimezone": "Asia/Shanghai",
"database.driver": "com.mysql.jdbc.Driver",
"database.history.kafka.recovery.poll.interval.ms": "3000",
"defaultFetchSize": "1000",
"database.tinyInt1isBit": "false",
"snapshot.locking.mode": "none",
"decimal.handling.mode": "string",
"transforms": "MakeMap,InsertSource",
"transforms.MakeMap.type":"org.apache.kafka.connect.transforms.HoistFieldValue", "transforms.MakeMap.field":"line", "transforms.InsertSource.type":"org.apache.kafka.connect.transforms.InsertFieldValue",
"transforms.InsertSource.static.field":"data_source",
"transforms.InsertSource.static.value":"test-file-source"
}
}

这是官方配置一个transforms的例子。主要配置有transforms,这个key下面主要配置有哪些transforms,在有多个transforms的情况下,transform执行的顺序是按照配置的顺序,在本文后面我们会贴出源码来向大家说明。

官方以及第三方提供了很多transforms,在满足我们需求的情况下可以直接使用,如果不能满足我们的需求我们将需要自己开发或者是二次开发transform。

我们来介绍下官方目前有哪些transforms:

  1. Cast

  2. ExtractField

  3. Flatten

  4. HoistField

  5. InsertField

  6. MaskField

  7. RegexRouter

  8. ReplaceField

  9. TimestampConverter

  10. SetSchemaMetadata

  11. TimestampRouter

  12. ValueToKey

confluent支持的transform有:

  1. Cast 使用的是Kafka的包(org.apache.kafka.connect.transforms.CastKey 或者是 org.apache.kafka.connect.transforms.CastValue)
  2. Drop(io.confluent.connect.transforms.DropKey 或者是 io.confluent.connect.transforms.DropValue)
  3. ExtractField 使用的是Kafka的包 org.apache.kafka.connect.transforms.ExtractFieldValue or org.apache.kafka.connect.transforms.ExtractFieldKey
  4. ExtractTopic (io.confluent.connect.transforms.ExtractTopicKey 或者是 io.confluent.connect.transforms.ExtractTopicValue)
  5. Filter (io.confluent.connect.transforms.FilterKey 或者是 io.confluent.connect.transforms.FilterValue)
  6. Flatten (org.apache.kafka.connect.transforms.FlattenKey 或者是 org.apache.kafka.connect.transforms.FlattenValue)
  7. HoistField (org.apache.kafka.connect.transforms.HoistFieldKey 或者是 org.apache.kafka.connect.transforms.HoistFieldValue)
  8. InsertField (org.apache.kafka.connect.transforms.InsertFieldKey 或者是 org.apache.kafka.connect.transforms.InsertFieldValue)
  9. MaskField (org.apache.kafka.connect.transforms.MaskFieldKey 或者是 org.apache.kafka.connect.transforms.MaskFieldValue)
  10. MessageTimeStampRouter (io.confluent.connect.transforms.MessageTimestampRouter)
  11. RegexRouter 使用Kafka的包 org.apache.kafka.connect.transforms.RegexRouter
  12. ReplaceField (org.apache.kafka.connect.transforms.ReplaceFieldKey 或者是 org.apache.kafka.connect.transforms.ReplaceFieldValue)
  13. SetSchemaMetadata (org.apache.kafka.connect.transforms.SetSchemaMetadataKey 或者是 org.apache.kafka.connect.transforms.SetSchemaMetadataValue)
  14. TimestampConverter (org.apache.kafka.connect.transforms.TimestampConverterKey 或者是org.apache.kafka.connect.transforms.TimestampConverterValue)
  15. TimestampRouter (org.apache.kafka.connect.transforms.TimestampRouter)
  16. TombstoneHandler (io.confluent.connect.transforms.TombstoneHandler)
  17. ValueToKey (org.apache.kafka.connect.transforms.ValueToKey)

具体使用将在后期的文档中详细说明。

在这些transform不能满足我们需求的情况下,我们只能开发自己的transform了,

开发transform主要是实现这个接口:Transformation

实现的方法有:

  1. configure 因为Transformation继承了Configurable类,这里从source、sink 配置的json中获取transform的配置信息

  2. apply 主要处理数据的地方,例如类型转换、数据结果转换、数据值的转换都在这里处理

  3. close 因为 Transformation 继承了Closeable类

  4. config 这里主要配置transform相关的配置,例如我们上面提到的:

"transforms.MakeMap.type":"org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field":"line",

这里配置的值在config值能否在transform中能获取到可以在这个方法里做处理。

接下来我们将介绍一个transform的源码:

public class RegexRouter<R extends ConnectRecord<R>> implements Transformation<R> {

public static final String OVERVIEW_DOC = "Update the record topic using the configured regular expression and replacement string."
        + "<p/>Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. "
        + "If the pattern matches the input topic, <code>java.util.regex.Matcher#replaceFirst()</code> is used with the replacement string to obtain the new topic.";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
        .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(), ConfigDef.Importance.HIGH,
                "Regular expression to use for matching.")
        .define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH,
                "Replacement string.");

private interface ConfigName {
    String REGEX = "regex";
    String REPLACEMENT = "replacement";
}

private Pattern regex;
private String replacement;

@Override
public void configure(Map<String, ?> props) {
    final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    regex = Pattern.compile(config.getString(ConfigName.REGEX));
    replacement = config.getString(ConfigName.REPLACEMENT);
}

@Override
public R apply(R record) {
    final Matcher matcher = regex.matcher(record.topic());
    if (matcher.matches()) {
        final String topic = matcher.replaceFirst(replacement);
        return record.newRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
    }
    return record;
}

@Override
public void close() {
}

@Override
public ConfigDef config() {
    return CONFIG_DEF;
}

}
从这个代码片段可以看出,这个transform只接受两个配置项的值:

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(), ConfigDef.Importance.HIGH,
"Regular expression to use for matching.")
.define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH,
"Replacement string.");
一个是ConfigName.REGEX(replacement), 另一个是ConfigName.REPLACEMENT(replacement)

从配置configure方法中可以看出根据我们配置的CONFIG_DEF获取其对于配置项的值

public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
regex = Pattern.compile(config.getString(ConfigName.REGEX));
replacement = config.getString(ConfigName.REPLACEMENT);
}
而在这个例子中close方法是空的,说明这边在调用close方法的时候没有需要执行的,例如不需要释放资源、 不需要关闭连接等。

接下来我们将说这个transform中的重点了:apply方法,其主要思路是获取这条数据(record)的topic,然后根据我们配置的regex去匹配,如果能匹配上,将对topic的值替换成我们配置的值,并重新生成新的数据(record);如果没有匹配上则返回其原始值。

@Override
public R apply(R record) {
final Matcher matcher = regex.matcher(record.topic());
if (matcher.matches()) {
final String topic = matcher.replaceFirst(replacement);
return record.newRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
}
return record;
}
自此一个tranfrom的处理就完成了,会继续往下传,如果有多个transfrom将会一一执行。我们来看下transform在Kafka Connet运行的时候是如何执行的。

源码在 org.apache.kafka.connect.runtime.WorkerSourceTask#sendRecords 发送记录给下游的时候调用

在发送每条数据(record)的时候会调用:final SourceRecord record = transformationChain.apply(preTransformRecord);

而在apply方法的实现是:

public R apply(R record) {
if (transformations.isEmpty()) return record;

for (Transformation<R> transformation : transformations) {
    record = transformation.apply(record);
    if (record == null) break;
}

return record;

}
代码简洁,如果我们没有配置transform直接原记录返回,如果有配置transform,会按照顺序,对数据(record)应用一次transform,然后返回结果。

transform配置项的获取源码:

public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
final List<String> transformAliases = getList(TRANSFORMS_CONFIG);

final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
for (String alias : transformAliases) {
    final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
    final Transformation<R> transformation;
    try {
        transformation = getClass(prefix + "type").asSubclass(Transformation.class).newInstance();
    } catch (Exception e) {
        throw new ConnectException(e);
    }
    transformation.configure(originalsWithPrefix(prefix));
    transformations.add(transformation);
}

return transformations;

}
...

public static final String TRANSFORMS_CONFIG = "transforms";
Okay, 到这里我们我们介绍完了 transform的定义、使用、如何开发一个transform以及transform在Kafka Connect中是如何执行的。

后面我们将介绍Kafka Connect source connector 以及如何开发一个source connector。敬请期待,如果有问题可以和我留言哦~

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容