前言:
kafka使用的版本需要0.9.0.X以上;
kafka自带的shell脚本分别为connect-standalone.sh(单机)和connect-distributed.sh(分布式);这些脚本放在kafka安装包的bin目录下;
1.外部文件的数据导入到kafka中
目前只说明connect-standalone.sh的用法即可,用这个脚本导入数据,所有的kafka集群都已经能收到导入的数据;
比如:需要导入数据,文件的名称:testkafka.txt;文件的内容:
yesy
alipay
wenxin
导入数据的命令为:bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
配置文件:connect-standalone.properties 配置信息如下:
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
connect-file-source.properties配置信息如下:file配置需要导入的文件名称,topic配置kafka的主题;
name=local-file-source-test
connector.class=FileStreamSource
tasks.max=1
file=testkafka.txt
topic=connect-test
transforms=MakeMap
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=data
数据导入到kafka格式展示,根据配置文件key.converter和value.converter配置信息可知,导入kafka的数据格式为json格式,具体示例如下:
{"data":"yesy"}
{"data":"alipay"}
{"data":"wenxin"}
2.把kafka的数据导出到文件
导出命令为:bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
配置文件connect-standalone.properties的配置跟导入数据的配置一致
配置文件connect-file-sink.properties的配置信息如下:file配置存储导出数据的文件名称,topics配置导出数据的kafka主题;
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sinkp.txt
topics=connect-test
导出的数据也是json数据
{data=alipay}
{data=wenxin}
{data=yesy}