如何用DataX自定义KuduWriter插件

如何用DataX自定义KuduWriter之前,先了解一下什么是DataX,它能做什么事

DataX

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

Features

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。


93b7fc1c-6927-11e6-8cda-7cf8420fc65f.png

DataX3.0框架设计

ec7e36f4-6927-11e6-8f5f-ffc43d6a468b.png

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX为什么要使用插件机制?
从设计之初,DataX就把异构数据源同步作为自身的使命,为了应对不同数据源的差异、同时提供一致的同步原语和扩展能力,DataX自然而然地采用了框架 + 插件 的模式:

  • 插件只需关心数据的读取或者写入本身。
  • 而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。

作为插件开发人员,则需要关注两个问题:

  • 数据源本身的读写数据正确性。
  • 如何与框架沟通、合理正确地使用框架。

接下来我们看一下DataX目前支持的读写


2019-08-18_212233.png

从官方提供的资料中显示,并没有支持往kudu中写数据,也就是说并没有这个writer插件,这也就是今天需要做的分享。

在接下来的实践之前,建议花费一点时间看一下官方提供的DataX插件开发宝典,这里就不浪费过多篇幅介绍底层原理实现。

快速启动

环境准备
JDK1.8以上(推荐1.8)
Python(推荐Python2.6.X)

快速下载DataX下载地址

下载后解压至本地某个目录,进入bin目录,即可运行同步作业:

$ cd  {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}

自检脚本: python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json

2019-08-18 21:38:38.640 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2019-08-18 21:38:38.651 [main] INFO  Engine - the machine info  => 

    osInfo: Oracle Corporation 1.8 25.212-b10
    jvmInfo:    Windows 10 amd64 10.0
    cpu num:    8

    totalPhysicalMemory:    -0.00G
    freePhysicalMemory: -0.00G
    maxFileDescriptorCount: -1
    currentOpenFileDescriptorCount: -1

    GC Names    [PS MarkSweep, PS Scavenge]

    MEMORY_NAME                    | allocation_size                | init_size                      
    PS Eden Space                  | 256.00MB                       | 256.00MB                       
    Code Cache                     | 240.00MB                       | 2.44MB                         
    Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
    PS Survivor Space              | 42.50MB                        | 42.50MB                        
    PS Old Gen                     | 683.00MB                       | 683.00MB                       
    Metaspace                      | -0.00MB                        | 0.00MB                         


2019-08-18 21:38:38.671 [main] INFO  Engine - 
{
    "content":[
        {
            "reader":{
                "name":"streamreader",
                "parameter":{
                    "column":[
                        {
                            "type":"string",
                            "value":"DataX"
                        },
                        {
                            "type":"long",
                            "value":19890604
                        },
                        {
                            "type":"date",
                            "value":"1989-06-04 00:00:00"
                        },
                        {
                            "type":"bool",
                            "value":true
                        },
                        {
                            "type":"bytes",
                            "value":"test"
                        }
                    ],
                    "sliceRecordCount":100000
                }
            },
            "writer":{
                "name":"streamwriter",
                "parameter":{
                    "encoding":"UTF-8",
                    "print":false
                }
            }
        }
    ],
    "setting":{
        "errorLimit":{
            "percentage":0.02,
            "record":0
        },
        "speed":{
            "byte":10485760
        }
    }
}

2019-08-18 21:38:38.694 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null
2019-08-18 21:38:38.697 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2019-08-18 21:38:38.697 [main] INFO  JobContainer - DataX jobContainer starts job.
2019-08-18 21:38:38.699 [main] INFO  JobContainer - Set jobId = 0
2019-08-18 21:38:38.722 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2019-08-18 21:38:38.723 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do prepare work .
2019-08-18 21:38:38.724 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do prepare work .
2019-08-18 21:38:38.724 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2019-08-18 21:38:38.726 [job-0] INFO  JobContainer - Job set Max-Byte-Speed to 10485760 bytes.
2019-08-18 21:38:38.727 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] splits to [1] tasks.
2019-08-18 21:38:38.727 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] splits to [1] tasks.
2019-08-18 21:38:38.751 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2019-08-18 21:38:38.756 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2019-08-18 21:38:38.759 [job-0] INFO  JobContainer - Running by standalone Mode.
2019-08-18 21:38:38.770 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2019-08-18 21:38:38.775 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2019-08-18 21:38:38.775 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2019-08-18 21:38:38.787 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2019-08-18 21:38:39.090 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[304]ms
2019-08-18 21:38:39.092 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2019-08-18 21:38:48.781 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.020s |  All Task WaitReaderTime 0.034s | Percentage 100.00%
2019-08-18 21:38:48.781 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2019-08-18 21:38:48.790 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do post work.
2019-08-18 21:38:48.793 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do post work.
2019-08-18 21:38:48.794 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2019-08-18 21:38:48.796 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: E:\envs\datax\hook
2019-08-18 21:38:48.797 [job-0] INFO  JobContainer - 
     [total cpu info] => 
        averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
        -1.00%                         | -1.00%                         | -1.00%
                        

     [total gc info] => 
         NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
         PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             
         PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             

2019-08-18 21:38:48.800 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-08-18 21:38:48.801 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.020s |  All Task WaitReaderTime 0.034s | Percentage 100.00%
2019-08-18 21:38:48.811 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2019-08-18 21:38:38
任务结束时刻                    : 2019-08-18 21:38:48
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

出现最后的任务结束时刻表明任务运行完毕。

实例

背景

从关系型数据库批量导数据到kudu数据库中,可覆盖更新或增量更新

项目结构

项目结构.png
  • package.xml:全局的package,添加插件的打包内容
  • plugin.json:对插件本身的描述,重点是name和class,name表示插件名称,class表示插件的入口类,必须准确无误;
  • plugin_job_template.json:插件的示例配置文件。
  • KuduWriter:自定义流程
  • ColumnType,Key,KuduWriterErrorCode皆为业务需要自定义的类,并不是开发插件必须
package.xml
<assembly
        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id></id>
    <formats>
        <format>dir</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <fileSets>
        <fileSet>
            <directory>src/main/resources</directory>
            <includes>
                <include>plugin.json</include>
                <include>plugin_job_template.json</include>
            </includes>
            <outputDirectory>plugin/writer/kuduwriter</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>target/</directory>
            <includes>
                <include>kuduwriter-0.0.1-SNAPSHOT.jar</include>
            </includes>
            <outputDirectory>plugin/writer/kuduwriter</outputDirectory>
        </fileSet>
    </fileSets>

    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <outputDirectory>plugin/writer/kuduwriter/libs</outputDirectory>
            <scope>runtime</scope>
        </dependencySet>
    </dependencySets>
</assembly>
plugin.json
{
    "name": "kuduwriter",
    "class": "com.alibaba.datax.plugin.writer.kuduwriter.KuduWriter",
    "description": "useScene: prod. mechanism: via FileSystem connect KUDU write data concurrent.",
    "developer": ""
}
  • name: 插件名称,大小写敏感。框架根据用户在配置文件中指定的名称来搜寻插件。 十分重要 。
  • class: 入口类的全限定名称,框架通过反射创建入口类的实例。十分重要 。
  • description: 描述信息。
  • developer: 开发人员。
plugin_job_template.json
{
    "name": "kuduwriter",
    "parameter": {
        "table": "",
        "column": [
        ],
        "master":"",
        "primaryKey": [
        ],
        "batch-size":"",
        "mutationBufferSpace": "",
        "isUpsert":""
    }
}

自定义的参数格式,可以根据super.getPluginJobConf()来获取改JSON参数

KuduWriter
package com.alibaba.datax.plugin.writer.kuduwriter;

import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSONObject;
import org.apache.kudu.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import static com.alibaba.datax.plugin.writer.kuduwriter.KuduWriterErrorCode.*;

public class KuduWriter extends Writer {

    public static final class Job extends Writer.Job {

        private static final Logger log = LoggerFactory.getLogger(Task.class);
        //提供多级JSON配置信息无损存储,采用json的形式
        private Configuration originalConfig;

        /**
         * init:Job对象初始化工作,测试可以通过super.getPluginJobConf()获取与本插件相关的配置
         */
        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();
        }

        /**
         * split:拆分Task,参数adviceNumber框架建议的拆分数,一般运行时候所配置的并发度,值返回的是task的配置列表;
         * @param mandatoryNumber
         *            为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
         *
         * @return
         */
        @Override
        public List<Configuration> split(int mandatoryNumber) {
            List<Configuration> list = new ArrayList<Configuration>();
            for (int i = 0; i < mandatoryNumber; i++) {
                //拷贝当前Configuration,注意,这里使用了深拷贝,避免冲突
                list.add(originalConfig.clone());
            }
            return list;
        }

        /**
         * destroy:Job对象自身的销毁工作
         */
        @Override
        public void destroy() {
        }
    }

    public static class Task extends Writer.Task {
        private static final Logger log = LoggerFactory.getLogger(Task.class);

        private String tableName;
        private int batchSize;
        private Configuration sliceConfig;
        private KuduClient kuduClient;
        private KuduSession session;
        private KuduTable kuduTable;
        private List<JSONObject> columnsList = new ArrayList<JSONObject>();
        private List<JSONObject> primaryKeyList = new ArrayList<JSONObject>();
        //判断是覆盖更新还是增量更新,默认是增量更新
        private boolean isUpsert;

        /**
         * init:Task对象的初始化,此时可以通过super.getPluginJobConf()获取与本Task相关的配置
         */
        @Override
        public void init() {
            //获取与本task相关的配置
            this.sliceConfig = super.getPluginJobConf();
            //判断表名是必须得参数
            tableName = sliceConfig.getNecessaryValue(Key.KEY_KUDU_TABLE, KUDU_ERROR_TABLE);
            //设置主节点
            String masterAddresses = sliceConfig.getNecessaryValue(Key.KEY_KUDU_MASTER, KUDU_ERROR_MASTER);
            //设置批量大小,官方推荐最多不超过1024
            batchSize = sliceConfig.getInt(Key.KEY_KUDU_FLUSH_BATCH, 1000);
            //设置缓冲大小,系统默认值是1000
            int mutationBufferSpace = sliceConfig.getInt(Key.KEY_KUDU_MUTATION_BUFFER_SPACE, 3000);
            //判断是覆盖更新还是增量更新,默认是增量更新
            isUpsert = sliceConfig.getBool(Key.KEY_KUDU_ISUPSERT, false);
            //获取指定的primary key,可以指定多个
            primaryKeyList = (List<JSONObject>) sliceConfig.get(Key.KEY_KUDU_PRIMARY, List.class);
            //获取列名
            List<JSONObject> _columnList = (List<JSONObject>) sliceConfig.get(Key.KEY_KUDU_COLUMN, List.class);
            if (primaryKeyList == null || primaryKeyList.isEmpty()) {
                throw DataXException.asDataXException(KUDU_ERROR_PRIMARY_KEY, KUDU_ERROR_PRIMARY_KEY.getDescription());
            }

            for (JSONObject column : _columnList) {
                if (!column.containsKey("index")) {
                    throw DataXException.asDataXException(KUDU_ERROR_CONF_COLUMNS, KUDU_ERROR_CONF_COLUMNS.getDescription());
                } else if (!column.containsKey("name")) {
                    throw DataXException.asDataXException(KUDU_ERROR_CONF_COLUMNS, KUDU_ERROR_CONF_COLUMNS.getDescription());
                } else if (!column.containsKey("type")) {
                    throw DataXException.asDataXException(KUDU_ERROR_CONF_COLUMNS, KUDU_ERROR_CONF_COLUMNS.getDescription());
                } else {
                    columnsList.add(column.getInteger("index"), column);
                }
            }

            log.info("初始化KuduClient");
            kuduClient = new KuduClient.KuduClientBuilder(masterAddresses).build();
            session = kuduClient.newSession();
            /**
             * 1. 尽量采用 MANUAL_FLUSH, 性能最好, 如果有写入kudu错误, flush()函数就会抛出异常, 逻辑非常清晰.
             * 2. 在性能要求不高的情况下, AUTO_FLUSH_SYNC 也是一个好的选择.
             * 3. 仅仅在demo场景下使用 AUTO_FLUSH_BACKGROUND, 不考虑异常处理时候代码可以很简单, 性能也很好. 在生产环境下,
             *    不推荐的 原因是: 插入数据可能会是乱序, 一旦考虑捕获异常代码就很拖沓.
             */
            session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
            session.setMutationBufferSpace(mutationBufferSpace);
            try {
                kuduTable = kuduClient.openTable(tableName);
            } catch (KuduException e) {
                throw DataXException.asDataXException(KUDU_ERROR_TABLE.ILLEGAL_VALUE, "打开Kudu表失败:" + tableName);
            }
        }

        /**
         * startWrite:从RecordReceiver中读取数据,写入目标数据源,RecordReceiver中的数据来自Reader和Writer之间的缓存队列。
         * @param recordReceiver
         */
        @Override
        public void startWrite(RecordReceiver recordReceiver) {
            Record record = null;
            AtomicLong counter = new AtomicLong(0);
            while ((record = recordReceiver.getFromReader()) != null) {
                if (columnsList.size() != record.getColumnNumber()) {
                    throw DataXException.asDataXException(ILLEGAL_VALUES_ERROR, ILLEGAL_VALUES_ERROR.getDescription() + "读出字段个数:" + record.getColumnNumber() + " " + "配置字段个数:" + columnsList.size());
                }
                boolean isDirtyRecord = false;
                for (int i = 0; i < primaryKeyList.size() && !isDirtyRecord; i++) {
                    JSONObject col = primaryKeyList.get(i);
                    Column column = record.getColumn(col.getInteger("index"));
                    isDirtyRecord = (column.getRawData() == null);
                }

                if (isDirtyRecord) {
                    super.getTaskPluginCollector().collectDirtyRecord(record, "primarykey字段为空");
                    continue;
                }

                Upsert upsert = kuduTable.newUpsert();
                Insert insert = kuduTable.newInsert();

                for (int i = 0; i < columnsList.size(); i++) {
                    PartialRow row;
                    if (isUpsert) {
                        //覆盖更新
                        row = upsert.getRow();
                    } else {
                        //增量更新
                        row = insert.getRow();
                    }

                    JSONObject col = columnsList.get(i);
                    String columnName = col.getString("name");
                    ColumnType columnType = ColumnType.getByTypeName(col.getString("type"));
                    Column column = record.getColumn(col.getInteger("index"));
                    Object data = column.getRawData();
                    if (data == null) {
                        row.setNull(columnName);
                        continue;
                    }
                    switch (columnType) {
                        case INT:
                            row.addInt(columnName, Integer.parseInt(data.toString()));
                            break;
                        case FLOAT:
                            row.addFloat(columnName, Float.parseFloat(data.toString()));
                            break;
                        case STRING:
                            row.addString(columnName, data.toString());
                            break;
                        case DOUBLE:
                            row.addDouble(columnName, Double.parseDouble(data.toString()));
                            break;
                        case LONG:
                            row.addLong(columnName, Long.parseLong(data.toString()));
                            break;
                        case BOOLEAN:
                            row.addBoolean(columnName, Boolean.getBoolean(data.toString()));
                            break;
                        case SHORT:
                            row.addShort(columnName, Short.parseShort(data.toString()));
                            break;
                    }
                }
                try {
                    if (isUpsert) {
                        //覆盖更新
                        session.apply(upsert);
                    } else {
                        //增量更新
                        session.apply(insert);
                    }
                    //提前写数据,阈值可自定义
                    if (counter.incrementAndGet() > batchSize * 0.75) {
                        session.flush();
                    }
                } catch (KuduException e) {
                    e.printStackTrace();
                }
            }
            try {
                if (session.hasPendingOperations()) {
                    session.flush();
                }
            } catch (KuduException e) {
                e.printStackTrace();
            }
        }

        /**
         * destroy:Task对象自身的销毁工作
         */
        @Override
        public void destroy() {
            if (kuduClient != null) {
                try {
                    session.close();
                } catch (Exception e) {
                    // no-op
                }
                try {
                    kuduClient.shutdown();
                } catch (Exception e) {
                    // no-op
                }
                try {
                    kuduClient.close();
                } catch (Exception e) {
                    // no-op
                }
            }
        }
    }
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>datax-all</artifactId>
        <groupId>com.alibaba.datax</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <artifactId>kuduwriter</artifactId>
    <name>kuduwriter</name>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-common</artifactId>
            <version>${datax-project-version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- compiler plugin -->
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <encoding>${project-sourceEncoding}</encoding>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptors>
                        <descriptor>src/main/assembly/package.xml</descriptor>
                    </descriptors>
                    <finalName>datax</finalName>
                </configuration>
                <executions>
                    <execution>
                        <id>dwzip</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

打包部署

方式一:可以通过mvn clean package -DskipTests assembly:assembly来进行打包,默认会将所有插件进行编译,比较慢,可以在全局package中进行注释只保留需要编译的插件即可
方式二:IDEA插件

打包

打包运行成功,进入目录


2019-08-18_221920.png

进入目录并拷贝文件夹


2019-08-18_222119.png

将拷贝的文件夹,粘贴进入DataX的plugin目录下,如果定义的reader就放在reader目录下,定义的writer就放在writer目录下。
2019-08-18_222334.png

kuduwriter目录下包含的文件


2019-08-18_222558.png

至此,所有的开发已经结束,接下来验证程序。
1.准备验证JSON

{
    "job": {
        "content": [{
            "reader": {
                "parameter": {
                    "password": "",
                    "connection": [{
                        "querySql": ["select id,name,age from person"],
                        "jdbcUrl": ["jdbc:mysql://:3306/datax?useUnicode=true&characterEncoding=utf8"]
                    }],
                    "username": ""
                },
                "name": "mysqlreader"
            },
            "writer": {
                "parameter": {
                    "batch-size": 1024,
                    "column": [{
                        "index": 0,
                        "name": "id",
                        "type": "int"
                    }, {
                        "index": 1,
                        "name": "name",
                        "type": "string"
                    }, {
                        "index": 2,
                        "name": "age",
                        "type": "int"
                    }],
                    "mutationBufferSpace": 5000,
                    "isUpsert": true,
                    "table": "kudu_person_test",
                    "masterAddr": "",
                    "primaryKey": [{
                        "index": 0,
                        "name": "id",
                        "type": "int"
                    }]
                },
                "name": "kuduwriter"
            }
        }],
        "setting": {
            "errorLimit": {
                "record": 0,
                "percentage": 0.20
            },
            "speed": {
                "channel": 4
            }
        }
    }
}

准备的JSON文件放入DataX安装目录下的job文件夹下,JSON文件名字自定义即可
2.执行程序

python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/{YOUR TEST JSON}.json

以上若有不对,请多多指正!

参考文档:https://github.com/alibaba/DataX

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352