data-pipeline目前已支持mysql、oracle驱动的数据传输,都是基于jdbc实现的数据库增删改查。但在实际开发中,我们可能需要支持其他的存储介质,这就需要我们根据实际情况进行驱动集成。下面已集成mysql进行举例说明
- DbEnum添加驱动类型;
- data-pipeline的驱动实现统一集成基类:AbstractDatasource.java;
- 创建MysqlDatasource.java类:
package cn.juque.datapipeline.datasource;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.asymmetric.KeyType;
import cn.hutool.crypto.asymmetric.RSA;
import cn.juque.common.constants.MessageEnum;
import cn.juque.common.exception.AppException;
import cn.juque.datapipeline.api.enums.DbEnum;
import cn.juque.datapipeline.bo.WriteParamBO;
import cn.juque.datapipeline.config.PropertiesConfig;
import cn.juque.datapipeline.converter.ResultSetConverter;
import cn.juque.datapipeline.entity.DatabaseInfo;
import cn.juque.datapipeline.utils.PreparedStatementUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.sql.*;
import java.util.List;
import java.util.Map;
/**
* @author juque
* @version 1.0.0
* <ul>
* <li>MysqlDatasource</li>
* </ul>
* @date 2023-04-04 22:22:04
**/
@Slf4j
@Service("mysqlDatasource")
public class MysqlDatasource extends AbstractDatasource {
@Resource
private PropertiesConfig propertiesConfig;
@Resource
private ResultSetConverter resultSetConverter;
/**
* 标志当前驱动的db类型
*
* @param dbEnum DbEnum
*/
@Override
public void markDb(DbEnum dbEnum) {
super.markDb(DbEnum.MYSQL);
}
/**
* 创建链接
*
* @param databaseInfo 驱动信息
* @return 读写驱动
*/
@Override
public Object createConnection(DatabaseInfo databaseInfo) {
try {
return this.createMysqlConnection(
databaseInfo.getDriverInfo(),
databaseInfo.getUserName(),
databaseInfo.getPassword(),
databaseInfo.getRemoteUrl());
} catch (Exception e) {
log.error("创建mysql驱动失败,驱动id:{},error:{}", databaseInfo.getId(), e.getMessage(), e);
throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "创建mysql驱动异常");
}
}
/**
* 读取数据
*
* @param connection 连接
* @param script 脚本
* @return 结果集
*/
@Override
public List<Map<String, Object>> inData(Object connection, String script) {
if (!(connection instanceof Connection)) {
throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "连接方式检测非法");
}
Connection mysqlConnection = (Connection) connection;
try (PreparedStatement statement = mysqlConnection.prepareStatement(script)) {
ResultSet resultSet = statement.executeQuery();
List<Map<String, Object>> list = this.resultSetConverter.resultSetToList(resultSet);
resultSet.close();
return list;
} catch (Exception e) {
log.error("读取数据出现异常, error:{}", e.getMessage(), e);
throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "读取数据出现异常");
}
}
/**
* 插入
*
* @param writeParamBO 参数
* @return 操作的数据量
*/
@Override
public Long insert(WriteParamBO writeParamBO) {
try {
return PreparedStatementUtil.batchInsert(writeParamBO);
} catch (Exception e) {
log.error("mysql insert error: {}", e.getMessage(), e);
throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "mysql insert error");
}
}
/**
* 更新
*
* @param writeParamBO 参数
* @return 操作的数据量
*/
@Override
public Long update(WriteParamBO writeParamBO) {
try {
return PreparedStatementUtil.batchUpdate(writeParamBO);
} catch (Exception e) {
log.error("mysql update error: {}", e.getMessage(), e);
throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "mysql update error");
}
}
/**
* 更新或删除
*
* @param writeParamBO 参数
* @return 操作的数据量
*/
@Override
public Long saveOrUpdate(WriteParamBO writeParamBO) {
return null;
}
/**
* 删除
*
* @param writeParamBO 参数
* @return 操作的数据量
*/
@Override
public Long delete(WriteParamBO writeParamBO) {
try {
return PreparedStatementUtil.batchDelete(writeParamBO);
} catch (Exception e) {
log.error("mysql delete error: {}", e.getMessage(), e);
throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "mysql delete error");
}
}
/**
* 清空目标表的数据
*
* @param databaseInfo 驱动信息
* @param tableName 表名
*/
@Override
public void truncate(DatabaseInfo databaseInfo, String tableName) {
String template = "TRUNCATE TABLE {}";
String sql = CharSequenceUtil.format(template, tableName);
Connection connection = (Connection) this.createConnection(databaseInfo);
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.execute();
} catch (Exception e) {
log.error("databaseId:{},truncate mysql table:{} error", databaseInfo.getId(), tableName);
throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "执行清空表数据异常");
}
}
/**
* 创建连接方式
*
* @param driver 驱动
* @param rawUsername 加密的用户名
* @param rawPassword 加密的密码
* @param url 链接
* @return Connection
* @throws ClassNotFoundException ClassNotFoundException
* @throws SQLException SQLException
*/
private Connection createMysqlConnection(String driver, String rawUsername, String rawPassword, String url) throws ClassNotFoundException, SQLException {
Class.forName(driver);
String publicKey = this.propertiesConfig.getRsaPublicKey();
String privateKey = this.propertiesConfig.getRsaPrivateKey();
RSA rsa = SecureUtil.rsa(privateKey, publicKey);
// 解密用户名、密码
String password = rsa.decryptStr(rawPassword, KeyType.PrivateKey);
return DriverManager.getConnection(url, rawUsername, password);
}
}
- 扩展DatasourceHelper#render方法:
/**
* 创建驱动
*
* @param databaseInfo 驱动信息
* @return 驱动
*/
public AbstractDatasource render(DatabaseInfo databaseInfo) {
DbEnum dbEnum = DbEnum.forEnum(databaseInfo.getDbType());
switch (dbEnum) {
case MYSQL:
return SpringUtil.getBean("mysqlDatasource");
case ORACLE:
return SpringUtil.getBean("oracleDatasource");
default:
break;
}
throw new AppException(DataPipelineMsgEnum.DB_TYPE_MISSING);
}
- 执行采数逻辑的时候,程序会根据驱动信息返回驱动实现逻辑,实现逻辑参考:TaskInfoHelper#runTask
/**
* 执行采数任务
*
* @param groupId 任务组ID
* @param batchNo 批次号
* @param taskInfo 任务信息
* @return 写入的数量
*/
public Long runTask(String groupId, String batchNo, TaskInfo taskInfo) {
// 禁用状态不执行
if (TaskStatusEnum.DISABLED.getCode().equals(taskInfo.getTaskStatus())) {
return 0L;
}
TaskExecuteInfo taskExecuteInfo = this.saveExecute(groupId, taskInfo.getId(), batchNo);
try {
log.info("任务【{}】开始执行", taskInfo.getTaskName());
// 获取读连接
DatabaseInfo databaseInfo = this.databaseInfoMapper.selectById(taskInfo.getSourceDatabase());
AbstractDatasource datasource = this.datasourceHelper.render(databaseInfo);
Connection inConnection = (Connection) datasource.createConnection(databaseInfo);
......
// 获取写的驱动
DatabaseInfo targetDatabase = this.databaseInfoMapper.selectById(taskInfo.getTargetDatabase());
AbstractDatasource targetDatasource = this.datasourceHelper.render(targetDatabase);
.......
} catch (Exception e) {
this.updateExecuteStatus(taskExecuteInfo, TaskExecuteStatusEnum.FAIL.getCode());
log.error("任务【{}】执行异常", taskInfo.getTaskName(), e);
throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), taskInfo.getTaskName() + "执行失败");
}
}
- 实现新驱动的分页,PageUtil#buildPageInfo。为什么一定要实现分页呢?因为我们无法准确预知每一次读数的数据量,我曾经就遇到过一个平时跑的好好的采数任务,某一天突然一只提示OOM异常,定位排查发现,源端数据采集量竟在几天内增长了好几倍,而一次全量读数直接把内存撑爆。
public static String buildPageInfo(String sql, DbEnum dbEnum, Integer pageNo, Integer pageSize) {
int start = (pageNo - 1) * pageSize;
String result = CharSequenceUtil.endWith(sql, StringConstants.SEMICOLON) ? CharSequenceUtil.subBefore(sql, StringConstants.SEMICOLON, true) : sql;
switch (dbEnum) {
case MYSQL:
return result + " limit " + start + StringConstants.COMMA + pageSize;
case ORACLE:
start = start + 1;
return "SELECT TMP_CACHE_2.* FROM (SELECT ROWNUM AS RN, TMP_CACHE_1.* FROM (" + result + ") TMP_CACHE_1 WHERE ROWNUM < " + (start + pageSize) + ") TMP_CACHE_2 WHERE TMP_CACHE_2.RN >=" + start;
default:
break;
}
return CharSequenceUtil.EMPTY;
}
- 在data-pipeline中,TaskInfoHelper#startTask一次只接收一个任务组,TaskInfoHelper#runTask则一次只执行一个任务,在执行startTask方法中,每次执行都会先检查任务组的状态,即只要任务组处于“执行中”,都会中断此次执行。主要是考虑到任务的执行极可能会耗时较长,如果任务组执行时间远远超过期望的时间,会导致任务组重复执行,导致采数重复。
- 需要明确的是,quartz调度和任务调度是两个过程,quartz调度成功不代表任务组就会执行,quartz调度结束也不代表任务组执行结束。比如前面提到的例子,quartz调度成功,但任务组并不会执行。