datax:https://github.com/alibaba/DataX
测试:https://github.com/linkingli/dataxmysql
由于服务器最近被自己玩iptables搞炸了。所以这里是个localhost的测试
如果想进行远程测试,去mysql授权就可以,
mongodbreader插件源码
自定义异常类
public enum MongoDBReaderErrorCode implements ErrorCode {
ILLEGAL_VALUE("ILLEGAL_PARAMETER_VALUE","参数不合法"),
ILLEGAL_ADDRESS("ILLEGAL_ADDRESS","不合法的Mongo地址"),
UNEXCEPT_EXCEPTION("UNEXCEPT_EXCEPTION","未知异常");
private final String code;
private final String description;
private MongoDBReaderErrorCode(String code,String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return code;
}
@Override
public String getDescription() {
return description;
}
}
mg的初始化属性封装
package com.alibaba.datax.plugin.reader.mongodbreader;
//mg的链接属性,封装为一个class
public class KeyConstant {
/**
* 数组类型
*/
public static final String ARRAY_TYPE = "array";
/**
* mongodb 的 host 地址
*/
public static final String MONGO_ADDRESS = "address";
/**
* mongodb 的用户名
*/
public static final String MONGO_USER_NAME = "userName";
/**
* mongodb 密码
*/
public static final String MONGO_USER_PASSWORD = "userPassword";
/**
* mongodb 数据库名
*/
public static final String MONGO_DB_NAME = "dbName";
/**
* mongodb 集合名
*/
public static final String MONGO_COLLECTION_NAME = "collectionName";
/**
* mongodb 查询条件
*/
public static final String MONGO_QUERY = "query";
/**
* mongodb 的列
*/
public static final String MONGO_COLUMN = "column";
/**
* 每个列的名字
*/
public static final String COLUMN_NAME = "name";
/**
* 每个列的类型
*/
public static final String COLUMN_TYPE = "type";
/**
* 列分隔符
*/
public static final String COLUMN_SPLITTER = "splitter";
/**
* 跳过的列数
*/
public static final String SKIP_COUNT = "skipCount";
/**
* 批量获取的记录数
*/
public static final String BATCH_SIZE = "batchSize";
/**
* MongoDB的idmeta
*/
public static final String MONGO_PRIMIARY_ID_META = "_id";
/**
* 判断是否为数组类型
* @param type 数据类型
* @return
*/
public static boolean isArrayType(String type) {
return ARRAY_TYPE.equals(type);
}
}
mg的reader类
package com.alibaba.datax.plugin.reader.mongodbreader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.reader.mongodbreader.util.CollectionSplitUtil;
import com.alibaba.datax.plugin.reader.mongodbreader.util.MongoUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.mongodb.*;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.Document;
import java.util.*;
/*
*
* public abstract class Reader extends BaseObject
* {
* public static abstract class Job extends AbstractJobPlugin
* {
* public abstract List<Configuration> split(int adviceNumber);
* }
*
* public static abstract class Task extends AbstractTaskPlugin {
public abstract void startRead(RecordSender recordSender);
}
* }
*
*public abstract class AbstractTaskPlugin extends AbstractPlugin
*
*public abstract class AbstractPlugin extends BaseObject implements Pluginable
*
*public interface Pluginable{
* Configuration getPluginJobConf();
* }
*
* public class BaseObject
* {
* 重写了hashcode(),equals(),tostring();
* }
* */
public class MongoDBReader extends Reader {
public static class Job extends Reader.Job {
//Configuration 提供多级JSON配置信息无损存储
private Configuration originalConfig = null;
//mongodb_java_driver
private MongoClient mongoClient;
//username,passwd
private String userName = null;
private String password = null;
//extends com.alibaba.datax.common.spi;
//adviceNumber是框架建议插件切分的任务数,调用自定义utils,CollectionSplitUtil
@Override
public List<Configuration> split(int adviceNumber) {
return CollectionSplitUtil.doSplit(originalConfig,adviceNumber,mongoClient);
}
//KeyConstant获取user.passwd,db,config,并且建立client
@Override
public void init() {
/*
super.getPluginJobConf
* public interface Pluginable{
Configuration getPluginJobConf();
}
* */
this.originalConfig = super.getPluginJobConf();
this.userName = originalConfig.getString(KeyConstant.MONGO_USER_NAME);
this.password = originalConfig.getString(KeyConstant.MONGO_USER_PASSWORD);
String database = originalConfig.getString(KeyConstant.MONGO_DB_NAME);
if(!Strings.isNullOrEmpty(this.userName) && !Strings.isNullOrEmpty(this.password)) {
this.mongoClient = MongoUtil.initCredentialMongoClient(originalConfig,userName,password,database);
} else {
this.mongoClient = MongoUtil.initMongoClient(originalConfig);
}
}
/ //distory
@Override
public void destroy() {
}
}
/*
*在Task中重新获取链接属性
* Job和Task之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。
* 两者之间只能通过配置文件进行依赖。*/
public static class Task extends Reader.Task {
//Configuration 提供多级JSON配置信息无损存
private Configuration readerSliceConfig;
private MongoClient mongoClient;
private String userName = null;
private String password = null;
private String database = null;
private String collection = null;
private String query = null;
private JSONArray mongodbColumnMeta = null;
/**
* 批量获取的记录数
*/
private Long batchSize = null;
/**
* 用来控制每个task取值的offset
*/
private Long skipCount = null;
/**
* 每页数据的大小
*/
private int pageSize = 1000;
/*
* 读取数据,进行分页处理
*
* */
@Override
public void startRead(RecordSender recordSender) {
/*
* 校验,调用自定义异常MongoDBReaderErrorCode
* */
if(batchSize == null ||
mongoClient == null || database == null ||
collection == null || mongodbColumnMeta == null) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
}
MongoDatabase db = mongoClient.getDatabase(database);
MongoCollection col = db.getCollection(this.collection);
BsonDocument sort = new BsonDocument();
sort.append(KeyConstant.MONGO_PRIMIARY_ID_META, new BsonInt32(1));
long pageCount = batchSize / pageSize;
int modCount = (int)(batchSize % pageSize);
for(int i = 0; i <= pageCount; i++) {
if(modCount == 0 && i == pageCount) {
break;
}
if (i == pageCount) {
pageSize = modCount;
}
MongoCursor<Document> dbCursor = null;
if(!Strings.isNullOrEmpty(query)) {
dbCursor = col.find(BsonDocument.parse(query)).sort(sort)
.skip(skipCount.intValue()).limit(pageSize).iterator();
} else {
dbCursor = col.find().sort(sort)
.skip(skipCount.intValue()).limit(pageSize).iterator();
}
while (dbCursor.hasNext()) {
Document item = dbCursor.next();
Record record = recordSender.createRecord();
Iterator columnItera = mongodbColumnMeta.iterator();
while (columnItera.hasNext()) {
JSONObject column = (JSONObject)columnItera.next();
Object tempCol = item.get(column.getString(KeyConstant.COLUMN_NAME));
if (tempCol == null) {
continue;
}
if (tempCol instanceof Double) {
record.addColumn(new DoubleColumn((Double) tempCol));
} else if (tempCol instanceof Boolean) {
record.addColumn(new BoolColumn((Boolean) tempCol));
} else if (tempCol instanceof Date) {
record.addColumn(new DateColumn((Date) tempCol));
} else if (tempCol instanceof Integer) {
record.addColumn(new LongColumn((Integer) tempCol));
}else if (tempCol instanceof Long) {
record.addColumn(new LongColumn((Long) tempCol));
} else {
if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {
String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);
if(Strings.isNullOrEmpty(splitter)) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
} else {
ArrayList array = (ArrayList)tempCol;
String tempArrayStr = Joiner.on(splitter).join(array);
record.addColumn(new StringColumn(tempArrayStr));
}
} else {
record.addColumn(new StringColumn(tempCol.toString()));
}
}
}
recordSender.sendToWriter(record);
}
skipCount += pageSize;
}
}
/*
* 初始化
* */
@Override
public void init() {
/*
super.getPluginJobConf
* public interface Pluginable{
Configuration getPluginJobConf();
}
* */
this.readerSliceConfig = super.getPluginJobConf();
this.userName = readerSliceConfig.getString(KeyConstant.MONGO_USER_NAME);
this.password = readerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD);
this.database = readerSliceConfig.getString(KeyConstant.MONGO_DB_NAME);
if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
/*
* 调用自定义utils,
* MongoUtil.initCredentialMongoClient
*
* MongoUtil.initMongoClient
*
* */
mongoClient = MongoUtil.initCredentialMongoClient(readerSliceConfig,userName,password,database);
} else {
mongoClient = MongoUtil.initMongoClient(readerSliceConfig);
}
this.collection = readerSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);
this.query = readerSliceConfig.getString(KeyConstant.MONGO_QUERY);
this.mongodbColumnMeta = JSON.parseArray(readerSliceConfig.getString(KeyConstant.MONGO_COLUMN));
this.batchSize = readerSliceConfig.getLong(KeyConstant.BATCH_SIZE);
this.skipCount = readerSliceConfig.getLong(KeyConstant.SKIP_COUNT);
}
/*
* destroy
* */
@Override
public void destroy() {
}
}
}
2个工具类,主要是mg的链接校验,mg传输切片处理
package com.alibaba.datax.plugin.reader.mongodbreader.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.reader.mongodbreader.KeyConstant;
import com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReaderErrorCode;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class MongoUtil {
//mg初始化检测
public static MongoClient initMongoClient(Configuration conf) {
List<Object> addressList = conf.getList(KeyConstant.MONGO_ADDRESS);
if(addressList == null || addressList.size() <= 0) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
}
try {
return new MongoClient(parseServerAddress(addressList));
} catch (UnknownHostException e) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
} catch (NumberFormatException e) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
} catch (Exception e) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
}
}
//mg初始化客户端链接检验
public static MongoClient initCredentialMongoClient(Configuration conf,String userName,String password,String database) {
List<Object> addressList = conf.getList(KeyConstant.MONGO_ADDRESS);
if(!isHostPortPattern(addressList)) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
}
try {
MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray());
return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential));
} catch (UnknownHostException e) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
} catch (NumberFormatException e) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
} catch (Exception e) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
}
}
/**
* 判断地址类型是否符合要求
* @param addressList
* @return
*/
private static boolean isHostPortPattern(List<Object> addressList) {
for(Object address : addressList) {
String regex = "(\\S+):([0-9]+)";
if(!((String)address).matches(regex)) {
return false;
}
}
return true;
}
/**
* 转换为mongo地址协议
* @param rawAddressList
* @return
*/
private static List<ServerAddress> parseServerAddress(List<Object> rawAddressList) throws UnknownHostException{
List<ServerAddress> addressList = new ArrayList<ServerAddress>();
for(Object address : rawAddressList) {
String[] tempAddress = ((String)address).split(":");
try {
ServerAddress sa = new ServerAddress(tempAddress[0],Integer.valueOf(tempAddress[1]));
addressList.add(sa);
} catch (Exception e) {
throw new UnknownHostException();
}
}
return addressList;
}
public static void main(String[] args) {
try {
ArrayList hostAddress = new ArrayList();
hostAddress.add("127.0.0.1:27017");
System.out.println(MongoUtil.isHostPortPattern(hostAddress));
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.alibaba.datax.plugin.reader.mongodbreader.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.reader.mongodbreader.KeyConstant;
import com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReaderErrorCode;
import com.google.common.base.Strings;
import com.mongodb.*;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.BsonDocument;
import java.util.ArrayList;
import java.util.List;
public class CollectionSplitUtil {
//切分任务
public static List<Configuration> doSplit(
Configuration originalSliceConfig,int adviceNumber,MongoClient mongoClient) {
List<Configuration> confList = new ArrayList<Configuration>();
String dbName = originalSliceConfig.getString(KeyConstant.MONGO_DB_NAME);
String collectionName = originalSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);
if(Strings.isNullOrEmpty(dbName) || Strings.isNullOrEmpty(collectionName) || mongoClient == null) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
}
String query = originalSliceConfig.getString(KeyConstant.MONGO_QUERY);
MongoDatabase db = mongoClient.getDatabase(dbName);
MongoCollection collection = db.getCollection(collectionName);
List<Entry> countInterval = doSplitInterval(adviceNumber, collection, query);
for(Entry interval : countInterval) {
Configuration conf = originalSliceConfig.clone();
conf.set(KeyConstant.SKIP_COUNT,interval.interval);
conf.set(KeyConstant.BATCH_SIZE,interval.batchSize);
confList.add(conf);
}
return confList;
}
//间隔切分
private static List<Entry> doSplitInterval(int adviceNumber, MongoCollection collection, String query) {
List<Entry> intervalCountList = new ArrayList<Entry>();
long totalCount = 0;
if (!Strings.isNullOrEmpty(query)) {
totalCount = collection.count(BsonDocument.parse(query));
} else {
totalCount = collection.count();
}
if(totalCount < 0) {
return intervalCountList;
}
// 100 6 => 16 mod 4
long batchSize = totalCount/adviceNumber;
for(int i = 0; i < adviceNumber; i++) {
Entry entry = new Entry();
/**
* 这个判断确认不会丢失最后一页数据,
* 因为 totalCount/adviceNumber 不整除时,如果不做判断会丢失最后一页
*/
if(i == (adviceNumber - 1)) {
entry.batchSize = batchSize + adviceNumber;
} else {
entry.batchSize = batchSize;
}
entry.interval = batchSize * i;
intervalCountList.add(entry);
}
return intervalCountList;
}
}
class Entry {
Long interval;
Long batchSize;
}