使用kafka自带脚本进行数据的导入导出

前言:

kafka使用的版本需要0.9.0.X以上;


kafka自带的shell脚本分别为connect-standalone.sh(单机)和connect-distributed.sh(分布式);这些脚本放在kafka安装包的bin目录下;


1.外部文件的数据导入到kafka中

目前只说明connect-standalone.sh的用法即可,用这个脚本导入数据,所有的kafka集群都已经能收到导入的数据;

比如:需要导入数据,文件的名称:testkafka.txt;文件的内容:

yesy

alipay

wenxin


导入数据的命令为:bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties


配置文件:connect-standalone.properties 配置信息如下:

# These are defaults. This file just demonstrates how to override some settings.

bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will

# need to configure these based on the format they want their data in when loaded from or stored into Kafka

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply

# it to

key.converter.schemas.enable=false

value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified, but most users will

# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

# Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000


connect-file-source.properties配置信息如下:file配置需要导入的文件名称,topic配置kafka的主题;

name=local-file-source-test

connector.class=FileStreamSource

tasks.max=1

file=testkafka.txt

topic=connect-test

transforms=MakeMap

transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value

transforms.MakeMap.field=data



数据导入到kafka格式展示,根据配置文件key.converter和value.converter配置信息可知,导入kafka的数据格式为json格式,具体示例如下:

{"data":"yesy"}

{"data":"alipay"}

{"data":"wenxin"}



2.把kafka的数据导出到文件

导出命令为:bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties

配置文件connect-standalone.properties的配置跟导入数据的配置一致

配置文件connect-file-sink.properties的配置信息如下:file配置存储导出数据的文件名称,topics配置导出数据的kafka主题;

name=local-file-sink

connector.class=FileStreamSink

tasks.max=1

file=test.sinkp.txt

topics=connect-test

导出的数据也是json数据

{data=alipay}

{data=wenxin}

{data=yesy}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容