一. 快速介绍
实际生产中,鉴于数据安全、不同够公司数据交互等原因,很多时候会使用txt、csv等文件格式来交互数据。
TxtFileWriter提供了向本地文件写入类CSV格式的一个或者多个表文件。TxtFileWriter服务的用户主要在于DataX开发、测试同学。
写入本地文件内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。
二. 功能与限制
TxtFileWriter实现了从DataX协议转为本地TXT文件功能,本地文件本身是无结构化数据存储。
TxtFileWriter如下几个方面约定:
支持且仅支持写入 TXT的文件,且要求TXT中shema为一张二维表。
支持类CSV格式文件,自定义分隔符。
支持文本压缩,现有压缩格式为gzip、bzip2。
支持多线程写入,每个线程写入不同子文件。
文件支持滚动,当文件大于某个size值或者行数值,文件需要切换。 [暂不支持]
我们不能做到:
- 单个文件不能支持并发写入。
三. 功能说明
3.1 配置样例
{
"setting": {},
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/home/haiwei.luo/case00/data"],
"encoding": "UTF-8",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "boolean"
},
{
"index": 2,
"type": "double"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "date",
"format": "yyyy.MM.dd"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/home/haiwei.luo/case00/result",
"fileName": "luohw",
"writeMode": "truncate",
"dateFormat": "yyyy-MM-dd"
}
}
}
]
}
}
3.2 参数说明
3.2.1 path
描述:本地文件系统的路径信息,TxtFileWriter会写入Path目录下属多个文件。
必选:是
默认值:无
3.2.2 fileName
描述:TxtFileWriter写入的文件名,该文件名会添加随机的后缀作为每个线程写入实际文件名。
必选:是
默认值:无
3.2.3 writeMode
- 描述:TxtFileWriter写入前数据清理处理模式:
- truncate,写入前清理目录下一fileName前缀的所有文件。
- append,写入前不做任何处理,DataX TxtFileWriter直接使用filename写入,并保证文件名不冲突。
- nonConflict,如果目录下有fileName前缀的文件,直接报错。
必选:是
默认值:无
3.2.4 fieldDelimiter
描述:读取的字段分隔符
必选:否
默认值:,
3.2.5 compress
描述:文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为zip、lzo、lzop、tgz、bzip2。
必选:否
默认值:无压缩
3.2.6 encoding
描述:读取文件的编码配置。
必选:否
默认值:utf-8
3.2.7 nullFormat
- 描述:文本文件中无法使用标准字符串定义null(空指针),DataX提供nullFormat定义哪些字符串可以表示为null。
例如如果用户配置: nullFormat="\N",那么如果源头数据是"\N",DataX视作null字段。
必选:否
默认值:\N
3.2.8 dateFormat
描述:日期类型的数据序列化到文件中时的格式,例如 "dateFormat": "yyyy-MM-dd"。
必选:否
默认值:无
3.2.9 fileFormat
描述:文件写出的格式,包括csv (http://zh.wikipedia.org/wiki/%E9%80%97%E5%8F%B7%E5%88%86%E9%9A%94%E5%80%BC) 和text两种,csv是严格的csv格式,如果待写数据包括列分隔符,则会按照csv的转义语法转义,转义符号为双引号";text格式是用列分隔符简单分割待写数据,对于待写数据包括列分隔符情况下不做转义。
必选:否
默认值:text
3.2.10 header
描述:txt写出时的表头,示例['id', 'name', 'age']。
必选:否
默认值:无
3.3 类型转换
本地文件本身不提供数据类型,该类型是DataX TxtFileWriter定义:
其中:
- 本地文件 Long是指本地文件文本中使用整形的字符串表示形式,例如"19901219"。
- 本地文件 Double是指本地文件文本中使用Double的字符串表示形式,例如"3.1415"。
- 本地文件 Boolean是指本地文件文本中使用Boolean的字符串表示形式,例如"true"、"false"。不区分大小写。
- 本地文件 Date是指本地文件文本中使用Date的字符串表示形式,例如"2014-12-31",Date可以指定format格式。
四. 测试案例
现在我们来测试一个从csv读取,然后再写入csv的案例.
4.1 数据准备
参考之前Superset的测试数据:
测试数据
将mysql表数据写入csv文件:
mysql> use test;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql>
mysql> select * from fact_sale INTO OUTFILE '/home/backup/fact_sale.csv' FIELDS TERMINATED BY ',';
Query OK, 684371 rows affected (0.69 sec)
mysql>
查看csv文件格式:
将文件传到DataX所在的服务器:
4.2 json文件准备
txtfilereader:
{
"setting": {},
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/home/backup"],
"encoding": "UTF-8",
"column": [
{
"index": 0,
"type": "date",
"format": "yyyy-MM-dd"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "long"
},
{
"index": 4,
"type": "long"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/home/backup",
"fileName": "fact_sale_new",
"writeMode": "truncate",
"format": "yyyy-MM-dd"
}
}
}
]
}
}
4.3 运行脚本
cd $datax_home/bin
python datax.py ./txtfilereader.json
运行日志:
[root@10-31-1-119 bin]# python datax.py ./txtfilereader.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2021-11-23 11:16:33.256 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-11-23 11:16:33.263 [main] INFO Engine - the machine info =>
osInfo: Oracle Corporation 1.8 25.242-b08
jvmInfo: Linux amd64 3.10.0-1127.el7.x86_64
cpu num: 8
totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1
GC Names [PS MarkSweep, PS Scavenge]
MEMORY_NAME | allocation_size | init_size
PS Eden Space | 256.00MB | 256.00MB
Code Cache | 240.00MB | 2.44MB
Compressed Class Space | 1,024.00MB | 0.00MB
PS Survivor Space | 42.50MB | 42.50MB
PS Old Gen | 683.00MB | 683.00MB
Metaspace | -0.00MB | 0.00MB
2021-11-23 11:16:33.277 [main] INFO Engine -
{
"content":[
{
"reader":{
"name":"txtfilereader",
"parameter":{
"column":[
{
"format":"yyyy-MM-dd",
"index":0,
"type":"date"
},
{
"index":1,
"type":"string"
},
{
"index":2,
"type":"string"
},
{
"index":3,
"type":"long"
},
{
"index":4,
"type":"long"
}
],
"encoding":"UTF-8",
"fieldDelimiter":",",
"path":[
"/home/backup"
]
}
},
"writer":{
"name":"txtfilewriter",
"parameter":{
"fileName":"fact_sale_new",
"format":"yyyy-MM-dd",
"path":"/home/backup",
"writeMode":"truncate"
}
}
}
],
"setting":{
"speed":{
"channel":2
}
}
}
2021-11-23 11:16:33.291 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null
2021-11-23 11:16:33.295 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-11-23 11:16:33.295 [main] INFO JobContainer - DataX jobContainer starts job.
2021-11-23 11:16:33.299 [main] INFO JobContainer - Set jobId = 0
2021-11-23 11:16:33.319 [job-0] WARN TxtFileWriter$Job - 您使用format配置日期格式化, 这是不推荐的行为, 请优先使用dateFormat配置项, 两项同时存在则使用dateFormat.
2021-11-23 11:16:33.339 [job-0] WARN UnstructuredStorageWriterUtil - 您的encoding配置为空, 将使用默认值[UTF-8]
2021-11-23 11:16:33.340 [job-0] WARN UnstructuredStorageWriterUtil - 您没有配置列分隔符, 使用默认值[,]
2021-11-23 11:16:33.340 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2021-11-23 11:16:33.341 [job-0] INFO JobContainer - DataX Reader.Job [txtfilereader] do prepare work .
2021-11-23 11:16:33.343 [job-0] INFO TxtFileReader$Job - add file [/home/backup/fact_sale.csv] as a candidate to be read.
2021-11-23 11:16:33.344 [job-0] INFO TxtFileReader$Job - 您即将读取的文件数为: [1]
2021-11-23 11:16:33.345 [job-0] INFO JobContainer - DataX Writer.Job [txtfilewriter] do prepare work .
2021-11-23 11:16:33.345 [job-0] INFO TxtFileWriter$Job - 由于您配置了writeMode truncate, 开始清理 [/home/backup] 下面以 [fact_sale_new] 开头的内容
2021-11-23 11:16:33.348 [job-0] INFO JobContainer - jobContainer starts to do split ...
2021-11-23 11:16:33.349 [job-0] INFO JobContainer - Job set Channel-Number to 2 channels.
2021-11-23 11:16:33.351 [job-0] INFO JobContainer - DataX Reader.Job [txtfilereader] splits to [1] tasks.
2021-11-23 11:16:33.351 [job-0] INFO TxtFileWriter$Job - begin do split...
2021-11-23 11:16:33.365 [job-0] INFO TxtFileWriter$Job - splited write file name:[fact_sale_new__7b975784_087a_4270_94a4_11d55d290a68]
2021-11-23 11:16:33.365 [job-0] INFO TxtFileWriter$Job - end do split.
2021-11-23 11:16:33.365 [job-0] INFO JobContainer - DataX Writer.Job [txtfilewriter] splits to [1] tasks.
2021-11-23 11:16:33.387 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2021-11-23 11:16:33.390 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-11-23 11:16:33.391 [job-0] INFO JobContainer - Running by standalone Mode.
2021-11-23 11:16:33.399 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2021-11-23 11:16:33.407 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-11-23 11:16:33.407 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2021-11-23 11:16:33.416 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2021-11-23 11:16:33.418 [0-0-0-writer] INFO TxtFileWriter$Task - begin do write...
2021-11-23 11:16:33.418 [0-0-0-reader] INFO TxtFileReader$Task - reading file : [/home/backup/fact_sale.csv]
2021-11-23 11:16:33.418 [0-0-0-writer] INFO TxtFileWriter$Task - write to file : [/home/backup/fact_sale_new__7b975784_087a_4270_94a4_11d55d290a68]
2021-11-23 11:16:33.457 [0-0-0-reader] INFO UnstructuredStorageReaderUtil - CsvReader使用默认值[{"captureRawRecord":true,"columnCount":0,"comment":"#","currentRecord":-1,"delimiter":",","escapeMode":1,"headerCount":0,"rawRecord":"","recordDelimiter":"\u0000","safetySwitch":false,"skipEmptyRecords":true,"textQualifier":"\"","trimWhitespace":true,"useComments":false,"useTextQualifier":true,"values":[]}],csvReaderConfig值为[null]
2021-11-23 11:16:35.753 [0-0-0-writer] INFO TxtFileWriter$Task - end do write
2021-11-23 11:16:35.821 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[2406]ms
2021-11-23 11:16:35.822 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2021-11-23 11:16:43.415 [job-0] INFO StandAloneJobContainerCommunicator - Total 684371 records, 16363343 bytes | Speed 1.56MB/s, 68437 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.652s | All Task WaitReaderTime 0.076s | Percentage 100.00%
2021-11-23 11:16:43.415 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-11-23 11:16:43.416 [job-0] INFO JobContainer - DataX Writer.Job [txtfilewriter] do post work.
2021-11-23 11:16:43.416 [job-0] INFO JobContainer - DataX Reader.Job [txtfilereader] do post work.
2021-11-23 11:16:43.416 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2021-11-23 11:16:43.418 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /home/software/datax/hook
2021-11-23 11:16:43.420 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
PS Scavenge | 11 | 11 | 11 | 0.067s | 0.067s | 0.067s
2021-11-23 11:16:43.420 [job-0] INFO JobContainer - PerfTrace not enable!
2021-11-23 11:16:43.421 [job-0] INFO StandAloneJobContainerCommunicator - Total 684371 records, 16363343 bytes | Speed 1.56MB/s, 68437 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.652s | All Task WaitReaderTime 0.076s | Percentage 100.00%
2021-11-23 11:16:43.422 [job-0] INFO JobContainer -
任务启动时刻 : 2021-11-23 11:16:33
任务结束时刻 : 2021-11-23 11:16:43
任务总计耗时 : 10s
任务平均流量 : 1.56MB/s
记录写入速度 : 68437rec/s
读出记录总数 : 684371
读写失败总数 : 0
[root@10-31-1-119 bin]#
写入的文件加了线程名为实际的文件名: