建表语句插入的位置
自动建立Writer端的表,下面是一个简单的实现没有考虑可扩展性。具体代码加入到mysqlwriter这个模块即可。自动建表应该在哪调用呢,我选择了MysqlWriter#Job#init这个方法。
// MysqlWriter#Job#init
public void init() {
this.originalConfig = super.getPluginJobConf();
this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
// 建表
MysqlAutoCreateTable.createTable(getPeerPluginJobConf(), originalConfig);
// init中会用到Writer端的表,所以在这之前必须先建好表
this.commonRdbmsWriterJob.init(this.originalConfig);
}
建表的功能实现
- 一对一的导入;
- 多对一的导入,如果Reader端是多个table或者多个querySql,只会取第一个table或者querySql来进行建表所以如果是多对一需要保证Reader端多个表导入到Writer端的字段相同,或者将最多的字段放在第一个table或者querySql中;
- 需要配置:
writer.name.parameter.autoCreateTable = true,默认为false。
代码:com.alibaba.datax.plugin.writer.mysqlwriter.MysqlAutoCreateTable.java
class MysqlAutoCreateTable {
private static final Logger LOG = LoggerFactory
.getLogger(MysqlAutoCreateTable.class);
private static String AUTO_CREATE_TABLE = "autoCreateTable";
private static final Pattern CREATE_TABLE_PATTERN = Pattern
.compile("\\s*[c,C][r,R][e,E][a,A][t,T][e,E]\\s+[t,T][a,A][b,B][l,L][e,E]\\s+(`?\\w+`?)\\s+");
private static final Pattern SELECT_PATTERN = Pattern
.compile("[f,F][r,R][o,O][m,M]\\s+(`?\\w+`?)");
static void createTable(Configuration readerConfiguration, Configuration writerConfiguration) {
// 首先判断标志位,如果为false直接返回了
if (!writerConfiguration.getBool(AUTO_CREATE_TABLE, false))
return;
List<Object> connections = writerConfiguration.getList(Constant.CONN_MARK,
Object.class);
// 确定目的端表的数目
int tableNum = 0;
for (int i = 0, len = connections.size(); i < len; i++) {
Configuration connConf = Configuration.from(connections.get(i).toString());
String jdbcUrl = connConf.getString(Key.JDBC_URL);
if (StringUtils.isBlank(jdbcUrl)) {
throw DataXException.asDataXException(DBUtilErrorCode.REQUIRED_VALUE, "您未配置的写入数据库表的 jdbcUrl.");
}
List<String> tables = connConf.getList(Key.TABLE, String.class);
if (null == tables || tables.isEmpty()) {
throw DataXException.asDataXException(DBUtilErrorCode.REQUIRED_VALUE,
"您未配置写入数据库表的表名称. 根据配置DataX找不到您配置的表. 请检查您的配置并作出修改.");
}
// 对每一个connection 上配置的table 项进行解析
List<String> expandedTables = TableExpandUtil
.expandTableConf(DataBaseType.MySql, tables);
if (null == expandedTables || expandedTables.isEmpty()) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
"您配置的写入数据库表名称错误. DataX找不到您配置的表,请检查您的配置并作出修改.");
}
tableNum += expandedTables.size();
}
if (tableNum > 1) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
"您配置了自动建表,但是Writer端表的数量大于1,目前只支持Writer端表的数量为1");
}
// 确定Reader端的表名,区分 采用tableMode 还是SqlModel
ConnectionInfo readerConnectionInfo = getReaderConnectionInfo(readerConfiguration);
// 目前Reader端只支持mysql
if (!readerConnectionInfo.getJdbcUrl().toLowerCase().contains("mysql")) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
"您配置了自动建表,目前自动建表只支持mysql到mysql...");
}
Connection readerConnection = DBUtil.getConnection(DataBaseType.MySql, readerConnectionInfo.getJdbcUrl(),
readerConnectionInfo.getUsername(), readerConnectionInfo.getPassword());
String readerCreateSql = getReaderCreateTableSql(readerConnectionInfo, readerConnection);
ConnectionInfo writerConnectionInfo = getWriterConnectionInfo(writerConfiguration);
Connection writerConnection = DBUtil.getConnection(DataBaseType.MySql, writerConnectionInfo.getJdbcUrl(),
writerConnectionInfo.getUsername(), writerConnectionInfo.getPassword());
createWriterTable(getWriterConnectionInfo(writerConfiguration),
writerConnection, readerCreateSql);
}
private static ConnectionInfo getReaderConnectionInfo(Configuration readerConfiguration) {
String firstTableName = readerConfiguration.getString(String.format(
"%s[0].%s[0]", Constant.CONN_MARK, Key.TABLE), null);
String firstQuerySql = readerConfiguration.getString(String.format(
"%s[0].%s[0]", Constant.CONN_MARK, "querySql"), null);
String tableName = null;
if (firstTableName != null) {
tableName = firstTableName;
} else {
Matcher matcher1 = SELECT_PATTERN.matcher(firstQuerySql);
if (matcher1.find()) {
tableName = matcher1.group(1);
}
}
ConnectionInfo result = new ConnectionInfo();
result.setJdbcUrl(readerConfiguration.getString(String.format("%s[0].%s[0]",
Constant.CONN_MARK, Key.JDBC_URL)));
result.setUsername(readerConfiguration.getString(Key.USERNAME));
result.setPassword(readerConfiguration.getString(Key.PASSWORD));
result.setTablename(tableName);
return result;
}
private static ConnectionInfo getWriterConnectionInfo(Configuration writerConfiguration) {
ConnectionInfo result = new ConnectionInfo();
result.setJdbcUrl(writerConfiguration.getString(String.format("%s[0].%s",
Constant.CONN_MARK, Key.JDBC_URL)));
result.setUsername(writerConfiguration.getString(Key.USERNAME));
result.setPassword(writerConfiguration.getString(Key.PASSWORD));
result.setTablename(writerConfiguration.getString(String.format(
"%s[0].%s[0]", Constant.CONN_MARK, Key.TABLE)));
return result;
}
private static String getReaderCreateTableSql(ConnectionInfo connectionInfo, Connection connection) {
String showCreateTableSql = "SHOW CREATE TABLE " + connectionInfo.getTablename();
ResultSet rs = null;
String readerCreateSql = null;
try {
rs = DBUtil.query(connection, showCreateTableSql);
if(rs.next()) {
readerCreateSql = rs.getString(2);
}
} catch (SQLException e) {
throw RdbmsException.asQueryException(DataBaseType.MySql, e, showCreateTableSql, connectionInfo.getTablename(), null);
} finally {
DBUtil.closeDBResources(rs, null, connection);
}
if (readerCreateSql == null) {
throw DataXException.asDataXException(DBUtilErrorCode.MYSQL_QUERY_SQL_ERROR,
"查询不到Reader端建表语句.");
}
LOG.info("Reader 端建表语句为:" + readerCreateSql);
return readerCreateSql;
}
private static void createWriterTable(ConnectionInfo connectionInfo, Connection connection, String readerCreateSql) {
Matcher matcher = CREATE_TABLE_PATTERN.matcher(readerCreateSql);
String tableName = null;
if (matcher.find()) {
tableName = matcher.group(1);
}
if (tableName == null) {
throw DataXException.asDataXException(DBUtilErrorCode.MYSQL_QUERY_SQL_ERROR,
"找不到Reader端的表名.");
}
String replace = " IF NOT EXISTS " + connectionInfo.getTablename();
String writerCreateSql = readerCreateSql.replaceFirst(tableName, replace);
LOG.info("Writer 端建表语句为:" + writerCreateSql);
Statement statement = null;
try {
statement = connection.createStatement();
DBUtil.executeSqlWithoutResultSet(statement, writerCreateSql);
} catch (SQLException e) {
throw RdbmsException.asQueryException(DataBaseType.MySql, e, writerCreateSql, connectionInfo.getTablename(), null);
} finally {
DBUtil.closeDBResources(null, statement, connection);
}
}
static class ConnectionInfo {
private String jdbcUrl;
private String username;
private String password;
private String tablename;
public String getJdbcUrl() {
return jdbcUrl;
}
public void setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getTablename() {
return tablename;
}
public void setTablename(String tablename) {
this.tablename = tablename;
}
}
}
一个配置文件示例:
{
"core": {
"container": {
"taskGroup": {
"channel": 2
}
}
},
"job": {
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": ["*"],
"connection": [
{
"querySql": ["select * from user where id = 1",
"select * from user where id = 2",
"select * from user where id = 3",
"select * from user where id = 4",
"select * from user where id = 5"
],
"jdbcUrl": ["jdbc:mysql://172.10.10.231:3306/test"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": ["*"],
"autoCreateTable": "true",
"connection": [
{
"jdbcUrl": "jdbc:mysql://172.10.10.231:3306/test1",
"table": [
"userAuto",
]
}
]
}
}
}
]
}
}