本篇文章描述了开发人员如何为Kafka Connect编写新的connector。
核心概念与API
Connectors和Tasks
Connector的实现类本身不执行数据复制:它们的配置描述了要复制的数据集,connector负责将该作业分解成一组task,这些task可以分发给Kafka Connect worker。Connector的实现类可以监视外部系统的数据更改和请求task的重新配置。
有了要复制的数据的分配信息,每个task需要将其数据子集复制到Kafka或从Kafka复制到别处。connector复制的数据要表示为分区的流,类似于Kafka topic,其中每个分区是带有偏移量的有序记录序列。每个task处理其中的一些partition。有时这种划分是很清晰的:一组log文件中的每个文件都被视为一个分区,每一行被视为一条记录,偏移量就是这条记录在文件中的位置。其他情况下,映射到这个模型可能需要更多工作:JDBC connector可以将每个表映射到一个分区,但是偏移量不是那么清晰。一种可能的映射是使用时间戳列生成查询,以增量地返回新数据,最后查询的时间戳用作偏移量。
Partitions和Recoeds
每个分区都是key-value记录的有序序列。key和value都可以具有复杂结构,由org.apache.kafka.connect中的数据结构表示。许多基本类型以及arrays、structs和嵌套数据结构都是支持的。对于结构化数据,应该使用Struct类。
为了跟踪分区中记录的结构和兼容性,可以在每个记录中包含schema。因为schema通常是基于数据源动态生成的,包含一个SchemaBuilder类会使得构建schema非常容易。
这种运行时数据格式不采用任何特定的序列化格式,其转换由Converter的实现指定,它将org.apache.kafka.connect.data运行时格式和序列化数据表示为byte[]。Connector开发人员不需要关心转换细节。
除了key和value之外,record还有partition ID和offset。框架使用它们定期提交已处理数据的偏移量。在失败的情况下,可以从最后提交的偏移量恢复处理,避免不必要的重复事件。
动态connectors
并不是所有连接器都有静态的partition,因此connector还负责监视外部系统中的任何更改。例如,在JDBCSourceConnector中,Connector将一组table中的每个table都分配一个task。当一个新表创建时,Connector可以发现它并通过更新配置为其分配一个新的task。
开发一个简单的Connector
开发一个connector只需要实现两个接口,Connector和Task。Kafka Connect源代码中包含了一个简单的connector示例,可以从文件中读写行。SourceConnector/SourceTask类实现从文件中读取行的source connector,SinkConnector/SinkTask实现将每个记录写入文件的sink connector。
Connector Example
首先创建从SourceConnector继承的类,并添加两个字段来存储已解析的配置信息(要从中读取的文件名和要向其发送数据的主题):
public class FileStreamSourceConnector extends SourceConnector {
private String filename;
private String topic;
最简单的方法是getTaskClass(),它定义了应该在worker进程中实例化的类,以实际读取数据:
@Override
public Class<? extends Task> getTaskClass() {
return FileStreamSourceTask.class;
}
我们将在下面定义FileStreamSourceTask类。接下来,添加一些标准的生命周期方法start()和stop():
@Override
public void start(Map<String, String> props) {
// The complete version includes error handling as well.
filename = props.get(FILE_CONFIG);
topic = props.get(TOPIC_CONFIG);
}
@Override
public void stop() {
// Nothing to do since no background monitoring is required
}
最后,实现的真正核心是在taskConfigs()中。在这种情况下,我们只处理一个文件,所以即使我们可能被允许按照maxTasks参数生成更多的任务,我们返回一个只有一个条目的列表:
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
// Only one input partition makes sense.
Map<String, String> config = new HashMap<>();
if (filename != null)
config.put(FILE_CONFIG, filename);
config.put(TOPIC_CONFIG, topic);
configs.add(config);
return configs;
}
即使有多个task,这个方法的实现通常也非常简单。它只需要确定输入task的数量,这需要与抽取数据的远程服务进行通信,将它们进行分配。由于在task间分配工作的一些模式非常常见,ConnectorUtils中提供了一些使用的工具来简化这些情况。
注意,这个简单示例不包含动态输入。有关如何触发对task配置的更新,请参见下一节讨论。
Task示例-Source Task
接下来,我们来描述SourceTask的实现。这个类很小,但是对于这篇指导来说太长了。我们只提供部分的实现代码。
与connector一样,我们需要创建从适当的Task类继承的类。它也有一些标准的生命周期方法:
public class FileStreamSourceTask extends SourceTask {
private String filename;
private InputStream stream;
private String topic;
public void start(Map<String, String> props) {
filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
stream = openOrThrowError(filename);
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
}
@Override
public synchronized void stop() {
stream.close()
}
这些是稍微简化的版本,但是显示这些方法应该相对简单,它们执行的唯一工作是分配或释放资源。关于此实现,有两点需要注意。首先,start()方法还没有处理从上一个偏移量恢复,这将在后面的部分中讨论。其次,stop()方法是同步的。这是必要的,因为SourceTasks被赋予了一个专用线程,它们可以无限期地阻塞该线程,因此需要使用来自工作程序中另一个线程的调用来停止它们。
接下来,我们实现任务的主要功能:poll()方法,它从输入系统获取记录并返回一个List<SourceRecord>:
@Override
public List<SourceRecord> poll() throws InterruptedException {
try {
ArrayList<SourceRecord> records = new ArrayList<>();
while (streamValid(stream) && records.isEmpty()) {
LineAndOffset line = readToNextLine(stream);
if (line != null) {
Map sourcePartition = Collections.singletonMap("filename", filename);
Map sourceOffset = Collections.singletonMap("position", streamOffset);
records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
} else {
Thread.sleep(1);
}
}
return records;
} catch (IOException e) {
// Underlying stream was killed, probably as a result of calling stop. Allow to return
// null, and driving thread will handle any shutdown if necessary.
}
return null;
}
同样,我们忽略了一些细节,但是我们可以看到一些重要的步骤:poll()方法将被重复调用,对于每个调用,它将循环尝试从文件中读取记录。对于读取的每一行,它还跟踪文件偏移量。它使用这些信息来创建一个输出SourceRecord,其包含四条信息:源partition(只有一个,单文件读取),源offset(在文件中的位置),输出topic名称和输出值(包含schema,指示其为一个String)。SourceRecord构造函数的其他变体还可以包括特定的输出分区和键。
注意,此实现使用普通的Java InputStream接口,如果数据不可用,则可能休眠。这是可以接受的,因为Kafka Connect为每个任务提供了一个专用线程。虽然任务实现必须符合基本的poll()接口,但它们在如何实现方面有很大的灵活性。在这种情况下,基于nio的实现会更有效,但是这种简单的方法可以工作,实现起来很快,并且与旧版本的Java兼容。
虽然在示例中没有使用,但SourceTask还提供了两个api来在源系统中提交偏移量:commit()和commitRecord()。这些api是为具有消息确认机制的源系统提供的。覆盖这些方法允许源连接器在将消息写入Kafka之后,在源系统中确认消息,无论是批量的还是单独的。
commit() API将偏移量存储在源系统中,直到poll()返回偏移量为止。此API的实现应该阻塞,直到提交完成。commitRecord() API在将每个源记录写入Kafka之后,在源系统中为每个源记录保存偏移量。由于Kafka Connect将自动记录偏移量,因此不需要SourceTask来实现它们。在连接器确实需要确认源系统中的消息的情况下,通常只需要其中一个api。
Sink Tasks
SourceTask和SinkTask有非常不同的接口,因为SourceTask使用pull接口,而SinkTask使用push接口。两者都有相同的生命周期方法,但是SinkTask接口是完全不同的:
public abstract class SinkTask implements Task {
... [ lifecycle methods omitted ] ...
public void initialize(SinkTaskContext context) {
this.context = context;
}
public abstract void put(Collection<SinkRecord> records);
public abstract void flush(Map<TopicPartition, Long> offsets);
public void open(Collection<TopicPartition> partitions) {}
public void close(Collection<TopicPartition> partitions) {}
}
put()方法应该包含大部分实现、接受一组SinkRecords、执行任何所需的转换并将它们存储在目标系统中.此流程不需要在返回之前确保数据已完全写入目标系统。事实上,在许多情况下,一些内部缓冲是有用的,这样可以一次发送整批记录(很像Kafka的生产者),从而减少向下游数据存储插入事件的开销。SinkRecords基本上包含与源记录相同的信息:Kafka主题、分区和偏移量以及事件键和值。
flush()方法在偏移量提交过程中使用,它允许任务从失败中恢复,并从安全点恢复,这样就不会错过任何事件。该方法应该将任何未完成的数据推送到目标系统,然后阻塞,直到确认写入。偏移量参数通常可以忽略,但在实现希望在目标存储中存储偏移量信息以提供精确的一次交付的某些情况下,该参数非常有用。例如,HDFS连接器可以这样做,并使用原子移动操作来确保flush()操作以原子方式将数据和偏移量提交到HDFS中的最终位置。
在内部,SinkTask使用Kafka消费者来轮询数据。connector的task中使用的consumer实例属于同一consumer组。任务重新配置或失败将触发使用者组的重新平衡。在重新平衡期间,主题分区将被重新分配到新的任务集。有关kafka消费者再平衡的更多解释,请参见消费者部分。
请注意,由于consumer是单线程的,应该确保put()或flush()花费的时间不会超过使用者会话超时时间。否则,consumer会被踢出组,从而触发分区的rebalance,组织所有其他task在rebalance完成之前取得进展。
为了确保在重新平衡期间正确地释放和分配资源,SinkTask提供了两个额外的方法:close()和open(),它们与驱动SinkTask的KafkaConsumer的底层再平衡回调相关联。
close()方法用于关闭分配给SinkTask的分区的writers。此方法将在使用者重新平衡操作启动之前和SinkTask停止获取数据之后调用。在关闭之后,Connect将不会向任务写入任何记录,直到打开了一组新的分区。close()方法可以在重新平衡开始之前访问分配给SinkTask的所有主题分区。通常,我们建议关闭所有主题分区的writers,并确保所有主题分区的状态都得到了正确维护。然而,你可以在实现中关闭topic partition的子集的writer。
open()方法用于在使用者重新平衡时为新分配的分区创建writers。此方法将在分区重新分配完成后以及SinkTask开始获取数据之前调用。
注意,从close()或open()引发的任何错误都将导致任务停止,报告失败状态,并关闭相应的consumer实例。此consumer关闭触发重新平衡,此任务的主题分区将被重新分配到此connector的其他任务。
从之前的偏移量恢复
SourceTask实现包含每条record的分区ID(输入文件名)和偏移量(文件中的位置)。框架会定期的提交偏移量,这样,在失败的情况下,任务可以恢复和减少事件的数量再加工。这个提交过程由框架完全自动化,但是只有connector知道如何在输入中查找要从该位置恢复的正确位置。
要在启动时正确恢复,任务可以使用传递到其initialize()方法中的SourceContext来访问偏移量数据。在initialize()中,我们将添加更多的代码来读取偏移量(如果它存在),并寻找到那个位置:
stream = new FileInputStream(filename);
Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
if (offset != null) {
Long lastRecordedOffset = (Long) offset.get("position");
if (lastRecordedOffset != null)
seekToOffset(stream, lastRecordedOffset);
}
关于此实现,有两点需要注意。首先,这个连接器的偏移量是long,这是一种基本类型。然而,包括映射和列表在内的更复杂的结构也可以用作偏移量。其次,返回的数据是“无模式的”。这是必要的,因为不能保证序列化偏移量的底层转换器能够跟踪模式。这使得可靠的偏移量解析对于连接器开发人员来说更加具有挑战性,但是使得序列化格式的选择更加灵活。
动态输入/输出分区
Kafka Connect旨在定义批量数据复制作业,例如复制整个数据库,而不是创建多个作业来单独复制每个表。这种设计的一个结果是连接器的输入或输出分区集可能随时间而变化。
源连接器需要监视源系统中的更改,例如数据库中的表添加/删除。当它们接收到更改时,它们应该通过ConnectorContext对象通知框架需要重新配置。例如,在SourceConnector中:
if (inputsChanged()) {
this.context.requestTaskReconfiguration();
}
框架将迅速请求新的配置信息并更新任务,允许它们在重新配置之前优雅地提交进度。注意,在SourceConnector中,这种监视目前由connector实现来完成。如果执行此监视需要一个额外的线程,connector必须自己分配它。
理想情况下,用于监视变更的代码应该被connector隔离,task不需要担心它们。然而,变更也会影响到task,最常见的情况是当其中一个输入partition在输入系统中被销毁时,例如,一个表从数据库中删除。如果task在Connector之前遇到了问题,则task需要处理随后的错误。幸运的是,这通常可以通过捕获和处理适当的异常来处理。
SinkConnector通常只需要处理添加的partition,这些partition可能会转换为其输出中的新条目。Kafka Connect框架管理Kafka输入的任何更改。SinkTask管理新的输入partition,这可能需要再下游系统中创建新的资源,例如数据库中的新表。这种情况下,最棘手的情况是多个SinkTask在看到新的输入partition时,同时尝试创建新资源,并发生冲突。
处理Schema
FileStream连接器是很好的例子,因为它们很简单,但是它们也有一些结构简单的数据——每行只是一个字符串。几乎所有连接器都需要具有更复杂数据格式的模式。
要创建更复杂的数据,您需要使用org.apache.kafka.connect API。除了基本类型之外,大多数结构化记录还需要与两个类交互:Schema和Struct。
API文档提供了完整的参考,下面是一个创建Schema和Struct的简单示例:
Schema schema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT_SCHEMA)
.field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
.build();
Struct struct = new Struct(schema)
.put("name", "Barbara Liskov")
.put("age", 75)
.build();
如果您正在实现source connector,则需要决定何时以及如何创建schema。如果可能的话,应该避免重新计算它们。例如,如果connector保证有固定的schema,那么静态地创建它并重用单个实例。
然而,许多connector将具有动态模式。其中一个例子是数据库连接器。即使只考虑单个表,在连接器的生命周期中也不会为单个表固定模式,因为用户可能会执行ALTER table命令。连接器必须能够检测这些更改,并通过创建更新的模式作出适当的响应。
接收器连接器通常更简单,因为它们正在使用数据,因此不需要创建模式。但是,他们应该同样小心地验证接收到的模式是否具有预期的格式。当模式不匹配时——通常表示上游生产者正在生成无法正确转换到目标系统的无效数据——接收器连接器应该抛出异常,将此错误指示给Kafka Connect框架。
在使用Confluent平台中包含的AvroConverter时,模式在Confluent的schema registry下注册,因此任何新模式都必须满足目标主题的兼容性要求。
Schema进化
Kafka Connect在SchemaProjector中提供了schema的兼容模式以及不兼容时如何抛出异常。SchemaProjector的使用很简单。下面的示例展示了如何将sourceStruct从版本2的源模式投射到版本3的目标模式,这将添加一个具有默认值的字段。由于这两个模式是兼容的,我们看到targetStruct有两个字段,field2填充123,这是该字段的默认值。
Schema source = SchemaBuilder.struct()
.version(2)
.field("field", Schema.INT32_SCHEMA)
.build();
Struct sourceStruct = new Struct(source);
sourceStruct.put("field", 1);
Schema target = SchemaBuilder.struct()
.version(3)
.field("field", Schema.INT32_SCHEMA)
.field("field2", SchemaBuilder.int32().defaultValue(123).build())
.build();
Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
该实用程序对于需要处理模式演化和维护模式兼容性的连接器非常有用。例如,如果我们希望HDFS连接器保持向后兼容性,因为每个文件只能有一个模式,那么我们需要在将消息写入HDFS之前,将具有旧模式的消息投影到连接器看到的最新模式。这确保写入HDFS的最新文件将具有可用于查询整个数据的最新模式,从而保持向后兼容性。