如何用DataX自定义KuduWriter之前,先了解一下什么是DataX,它能做什么事
DataX
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
Features
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
DataX3.0框架设计
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
- Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
DataX为什么要使用插件机制?
从设计之初,DataX就把异构数据源同步作为自身的使命,为了应对不同数据源的差异、同时提供一致的同步原语和扩展能力,DataX自然而然地采用了框架 + 插件 的模式:
- 插件只需关心数据的读取或者写入本身。
- 而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。
作为插件开发人员,则需要关注两个问题:
- 数据源本身的读写数据正确性。
- 如何与框架沟通、合理正确地使用框架。
接下来我们看一下DataX目前支持的读写
从官方提供的资料中显示,并没有支持往kudu中写数据,也就是说并没有这个writer插件,这也就是今天需要做的分享。
在接下来的实践之前,建议花费一点时间看一下官方提供的DataX插件开发宝典,这里就不浪费过多篇幅介绍底层原理实现。
快速启动
环境准备
JDK1.8以上(推荐1.8)
Python(推荐Python2.6.X)
快速下载:DataX下载地址
下载后解压至本地某个目录,进入bin目录,即可运行同步作业:
$ cd {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}
自检脚本: python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json
2019-08-18 21:38:38.640 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2019-08-18 21:38:38.651 [main] INFO Engine - the machine info =>
osInfo: Oracle Corporation 1.8 25.212-b10
jvmInfo: Windows 10 amd64 10.0
cpu num: 8
totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1
GC Names [PS MarkSweep, PS Scavenge]
MEMORY_NAME | allocation_size | init_size
PS Eden Space | 256.00MB | 256.00MB
Code Cache | 240.00MB | 2.44MB
Compressed Class Space | 1,024.00MB | 0.00MB
PS Survivor Space | 42.50MB | 42.50MB
PS Old Gen | 683.00MB | 683.00MB
Metaspace | -0.00MB | 0.00MB
2019-08-18 21:38:38.671 [main] INFO Engine -
{
"content":[
{
"reader":{
"name":"streamreader",
"parameter":{
"column":[
{
"type":"string",
"value":"DataX"
},
{
"type":"long",
"value":19890604
},
{
"type":"date",
"value":"1989-06-04 00:00:00"
},
{
"type":"bool",
"value":true
},
{
"type":"bytes",
"value":"test"
}
],
"sliceRecordCount":100000
}
},
"writer":{
"name":"streamwriter",
"parameter":{
"encoding":"UTF-8",
"print":false
}
}
}
],
"setting":{
"errorLimit":{
"percentage":0.02,
"record":0
},
"speed":{
"byte":10485760
}
}
}
2019-08-18 21:38:38.694 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null
2019-08-18 21:38:38.697 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2019-08-18 21:38:38.697 [main] INFO JobContainer - DataX jobContainer starts job.
2019-08-18 21:38:38.699 [main] INFO JobContainer - Set jobId = 0
2019-08-18 21:38:38.722 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2019-08-18 21:38:38.723 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] do prepare work .
2019-08-18 21:38:38.724 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do prepare work .
2019-08-18 21:38:38.724 [job-0] INFO JobContainer - jobContainer starts to do split ...
2019-08-18 21:38:38.726 [job-0] INFO JobContainer - Job set Max-Byte-Speed to 10485760 bytes.
2019-08-18 21:38:38.727 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] splits to [1] tasks.
2019-08-18 21:38:38.727 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] splits to [1] tasks.
2019-08-18 21:38:38.751 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2019-08-18 21:38:38.756 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2019-08-18 21:38:38.759 [job-0] INFO JobContainer - Running by standalone Mode.
2019-08-18 21:38:38.770 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2019-08-18 21:38:38.775 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2019-08-18 21:38:38.775 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2019-08-18 21:38:38.787 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2019-08-18 21:38:39.090 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[304]ms
2019-08-18 21:38:39.092 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2019-08-18 21:38:48.781 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.020s | All Task WaitReaderTime 0.034s | Percentage 100.00%
2019-08-18 21:38:48.781 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2019-08-18 21:38:48.790 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do post work.
2019-08-18 21:38:48.793 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] do post work.
2019-08-18 21:38:48.794 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2019-08-18 21:38:48.796 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: E:\envs\datax\hook
2019-08-18 21:38:48.797 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
2019-08-18 21:38:48.800 [job-0] INFO JobContainer - PerfTrace not enable!
2019-08-18 21:38:48.801 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.020s | All Task WaitReaderTime 0.034s | Percentage 100.00%
2019-08-18 21:38:48.811 [job-0] INFO JobContainer -
任务启动时刻 : 2019-08-18 21:38:38
任务结束时刻 : 2019-08-18 21:38:48
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0
出现最后的任务结束时刻表明任务运行完毕。
实例
背景
从关系型数据库批量导数据到kudu数据库中,可覆盖更新或增量更新
项目结构
- package.xml:全局的package,添加插件的打包内容
- plugin.json:对插件本身的描述,重点是name和class,name表示插件名称,class表示插件的入口类,必须准确无误;
- plugin_job_template.json:插件的示例配置文件。
- KuduWriter:自定义流程
- ColumnType,Key,KuduWriterErrorCode皆为业务需要自定义的类,并不是开发插件必须
package.xml
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/kuduwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>kuduwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/kuduwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/kuduwriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
plugin.json
{
"name": "kuduwriter",
"class": "com.alibaba.datax.plugin.writer.kuduwriter.KuduWriter",
"description": "useScene: prod. mechanism: via FileSystem connect KUDU write data concurrent.",
"developer": ""
}
- name: 插件名称,大小写敏感。框架根据用户在配置文件中指定的名称来搜寻插件。 十分重要 。
- class: 入口类的全限定名称,框架通过反射创建入口类的实例。十分重要 。
- description: 描述信息。
- developer: 开发人员。
plugin_job_template.json
{
"name": "kuduwriter",
"parameter": {
"table": "",
"column": [
],
"master":"",
"primaryKey": [
],
"batch-size":"",
"mutationBufferSpace": "",
"isUpsert":""
}
}
自定义的参数格式,可以根据super.getPluginJobConf()来获取改JSON参数
KuduWriter
package com.alibaba.datax.plugin.writer.kuduwriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSONObject;
import org.apache.kudu.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import static com.alibaba.datax.plugin.writer.kuduwriter.KuduWriterErrorCode.*;
public class KuduWriter extends Writer {
public static final class Job extends Writer.Job {
private static final Logger log = LoggerFactory.getLogger(Task.class);
//提供多级JSON配置信息无损存储,采用json的形式
private Configuration originalConfig;
/**
* init:Job对象初始化工作,测试可以通过super.getPluginJobConf()获取与本插件相关的配置
*/
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
}
/**
* split:拆分Task,参数adviceNumber框架建议的拆分数,一般运行时候所配置的并发度,值返回的是task的配置列表;
* @param mandatoryNumber
* 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
*
* @return
*/
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> list = new ArrayList<Configuration>();
for (int i = 0; i < mandatoryNumber; i++) {
//拷贝当前Configuration,注意,这里使用了深拷贝,避免冲突
list.add(originalConfig.clone());
}
return list;
}
/**
* destroy:Job对象自身的销毁工作
*/
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private static final Logger log = LoggerFactory.getLogger(Task.class);
private String tableName;
private int batchSize;
private Configuration sliceConfig;
private KuduClient kuduClient;
private KuduSession session;
private KuduTable kuduTable;
private List<JSONObject> columnsList = new ArrayList<JSONObject>();
private List<JSONObject> primaryKeyList = new ArrayList<JSONObject>();
//判断是覆盖更新还是增量更新,默认是增量更新
private boolean isUpsert;
/**
* init:Task对象的初始化,此时可以通过super.getPluginJobConf()获取与本Task相关的配置
*/
@Override
public void init() {
//获取与本task相关的配置
this.sliceConfig = super.getPluginJobConf();
//判断表名是必须得参数
tableName = sliceConfig.getNecessaryValue(Key.KEY_KUDU_TABLE, KUDU_ERROR_TABLE);
//设置主节点
String masterAddresses = sliceConfig.getNecessaryValue(Key.KEY_KUDU_MASTER, KUDU_ERROR_MASTER);
//设置批量大小,官方推荐最多不超过1024
batchSize = sliceConfig.getInt(Key.KEY_KUDU_FLUSH_BATCH, 1000);
//设置缓冲大小,系统默认值是1000
int mutationBufferSpace = sliceConfig.getInt(Key.KEY_KUDU_MUTATION_BUFFER_SPACE, 3000);
//判断是覆盖更新还是增量更新,默认是增量更新
isUpsert = sliceConfig.getBool(Key.KEY_KUDU_ISUPSERT, false);
//获取指定的primary key,可以指定多个
primaryKeyList = (List<JSONObject>) sliceConfig.get(Key.KEY_KUDU_PRIMARY, List.class);
//获取列名
List<JSONObject> _columnList = (List<JSONObject>) sliceConfig.get(Key.KEY_KUDU_COLUMN, List.class);
if (primaryKeyList == null || primaryKeyList.isEmpty()) {
throw DataXException.asDataXException(KUDU_ERROR_PRIMARY_KEY, KUDU_ERROR_PRIMARY_KEY.getDescription());
}
for (JSONObject column : _columnList) {
if (!column.containsKey("index")) {
throw DataXException.asDataXException(KUDU_ERROR_CONF_COLUMNS, KUDU_ERROR_CONF_COLUMNS.getDescription());
} else if (!column.containsKey("name")) {
throw DataXException.asDataXException(KUDU_ERROR_CONF_COLUMNS, KUDU_ERROR_CONF_COLUMNS.getDescription());
} else if (!column.containsKey("type")) {
throw DataXException.asDataXException(KUDU_ERROR_CONF_COLUMNS, KUDU_ERROR_CONF_COLUMNS.getDescription());
} else {
columnsList.add(column.getInteger("index"), column);
}
}
log.info("初始化KuduClient");
kuduClient = new KuduClient.KuduClientBuilder(masterAddresses).build();
session = kuduClient.newSession();
/**
* 1. 尽量采用 MANUAL_FLUSH, 性能最好, 如果有写入kudu错误, flush()函数就会抛出异常, 逻辑非常清晰.
* 2. 在性能要求不高的情况下, AUTO_FLUSH_SYNC 也是一个好的选择.
* 3. 仅仅在demo场景下使用 AUTO_FLUSH_BACKGROUND, 不考虑异常处理时候代码可以很简单, 性能也很好. 在生产环境下,
* 不推荐的 原因是: 插入数据可能会是乱序, 一旦考虑捕获异常代码就很拖沓.
*/
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
session.setMutationBufferSpace(mutationBufferSpace);
try {
kuduTable = kuduClient.openTable(tableName);
} catch (KuduException e) {
throw DataXException.asDataXException(KUDU_ERROR_TABLE.ILLEGAL_VALUE, "打开Kudu表失败:" + tableName);
}
}
/**
* startWrite:从RecordReceiver中读取数据,写入目标数据源,RecordReceiver中的数据来自Reader和Writer之间的缓存队列。
* @param recordReceiver
*/
@Override
public void startWrite(RecordReceiver recordReceiver) {
Record record = null;
AtomicLong counter = new AtomicLong(0);
while ((record = recordReceiver.getFromReader()) != null) {
if (columnsList.size() != record.getColumnNumber()) {
throw DataXException.asDataXException(ILLEGAL_VALUES_ERROR, ILLEGAL_VALUES_ERROR.getDescription() + "读出字段个数:" + record.getColumnNumber() + " " + "配置字段个数:" + columnsList.size());
}
boolean isDirtyRecord = false;
for (int i = 0; i < primaryKeyList.size() && !isDirtyRecord; i++) {
JSONObject col = primaryKeyList.get(i);
Column column = record.getColumn(col.getInteger("index"));
isDirtyRecord = (column.getRawData() == null);
}
if (isDirtyRecord) {
super.getTaskPluginCollector().collectDirtyRecord(record, "primarykey字段为空");
continue;
}
Upsert upsert = kuduTable.newUpsert();
Insert insert = kuduTable.newInsert();
for (int i = 0; i < columnsList.size(); i++) {
PartialRow row;
if (isUpsert) {
//覆盖更新
row = upsert.getRow();
} else {
//增量更新
row = insert.getRow();
}
JSONObject col = columnsList.get(i);
String columnName = col.getString("name");
ColumnType columnType = ColumnType.getByTypeName(col.getString("type"));
Column column = record.getColumn(col.getInteger("index"));
Object data = column.getRawData();
if (data == null) {
row.setNull(columnName);
continue;
}
switch (columnType) {
case INT:
row.addInt(columnName, Integer.parseInt(data.toString()));
break;
case FLOAT:
row.addFloat(columnName, Float.parseFloat(data.toString()));
break;
case STRING:
row.addString(columnName, data.toString());
break;
case DOUBLE:
row.addDouble(columnName, Double.parseDouble(data.toString()));
break;
case LONG:
row.addLong(columnName, Long.parseLong(data.toString()));
break;
case BOOLEAN:
row.addBoolean(columnName, Boolean.getBoolean(data.toString()));
break;
case SHORT:
row.addShort(columnName, Short.parseShort(data.toString()));
break;
}
}
try {
if (isUpsert) {
//覆盖更新
session.apply(upsert);
} else {
//增量更新
session.apply(insert);
}
//提前写数据,阈值可自定义
if (counter.incrementAndGet() > batchSize * 0.75) {
session.flush();
}
} catch (KuduException e) {
e.printStackTrace();
}
}
try {
if (session.hasPendingOperations()) {
session.flush();
}
} catch (KuduException e) {
e.printStackTrace();
}
}
/**
* destroy:Task对象自身的销毁工作
*/
@Override
public void destroy() {
if (kuduClient != null) {
try {
session.close();
} catch (Exception e) {
// no-op
}
try {
kuduClient.shutdown();
} catch (Exception e) {
// no-op
}
try {
kuduClient.close();
} catch (Exception e) {
// no-op
}
}
}
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>kuduwriter</artifactId>
<name>kuduwriter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
打包部署
方式一:可以通过mvn clean package -DskipTests assembly:assembly来进行打包,默认会将所有插件进行编译,比较慢,可以在全局package中进行注释只保留需要编译的插件即可
方式二:IDEA插件
打包运行成功,进入目录
进入目录并拷贝文件夹
将拷贝的文件夹,粘贴进入DataX的plugin目录下,如果定义的reader就放在reader目录下,定义的writer就放在writer目录下。
kuduwriter目录下包含的文件
至此,所有的开发已经结束,接下来验证程序。
1.准备验证JSON
{
"job": {
"content": [{
"reader": {
"parameter": {
"password": "",
"connection": [{
"querySql": ["select id,name,age from person"],
"jdbcUrl": ["jdbc:mysql://:3306/datax?useUnicode=true&characterEncoding=utf8"]
}],
"username": ""
},
"name": "mysqlreader"
},
"writer": {
"parameter": {
"batch-size": 1024,
"column": [{
"index": 0,
"name": "id",
"type": "int"
}, {
"index": 1,
"name": "name",
"type": "string"
}, {
"index": 2,
"name": "age",
"type": "int"
}],
"mutationBufferSpace": 5000,
"isUpsert": true,
"table": "kudu_person_test",
"masterAddr": "",
"primaryKey": [{
"index": 0,
"name": "id",
"type": "int"
}]
},
"name": "kuduwriter"
}
}],
"setting": {
"errorLimit": {
"record": 0,
"percentage": 0.20
},
"speed": {
"channel": 4
}
}
}
}
准备的JSON文件放入DataX安装目录下的job文件夹下,JSON文件名字自定义即可
2.执行程序
python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/{YOUR TEST JSON}.json
以上若有不对,请多多指正!