data-pipeline实现驱动类型扩展

data-pipeline目前已支持mysql、oracle驱动的数据传输,都是基于jdbc实现的数据库增删改查。但在实际开发中,我们可能需要支持其他的存储介质,这就需要我们根据实际情况进行驱动集成。下面已集成mysql进行举例说明

  • DbEnum添加驱动类型;
  • data-pipeline的驱动实现统一集成基类:AbstractDatasource.java;
  • 创建MysqlDatasource.java类:
package cn.juque.datapipeline.datasource;

import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.asymmetric.KeyType;
import cn.hutool.crypto.asymmetric.RSA;
import cn.juque.common.constants.MessageEnum;
import cn.juque.common.exception.AppException;
import cn.juque.datapipeline.api.enums.DbEnum;
import cn.juque.datapipeline.bo.WriteParamBO;
import cn.juque.datapipeline.config.PropertiesConfig;
import cn.juque.datapipeline.converter.ResultSetConverter;
import cn.juque.datapipeline.entity.DatabaseInfo;
import cn.juque.datapipeline.utils.PreparedStatementUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.sql.*;
import java.util.List;
import java.util.Map;

/**
 * @author juque
 * @version 1.0.0
 * <ul>
 *     <li>MysqlDatasource</li>
 * </ul>
 * @date 2023-04-04 22:22:04
 **/
@Slf4j
@Service("mysqlDatasource")
public class MysqlDatasource extends AbstractDatasource {

    @Resource
    private PropertiesConfig propertiesConfig;

    @Resource
    private ResultSetConverter resultSetConverter;

    /**
     * 标志当前驱动的db类型
     *
     * @param dbEnum DbEnum
     */
    @Override
    public void markDb(DbEnum dbEnum) {
        super.markDb(DbEnum.MYSQL);
    }

    /**
     * 创建链接
     *
     * @param databaseInfo 驱动信息
     * @return 读写驱动
     */
    @Override
    public Object createConnection(DatabaseInfo databaseInfo) {
        try {
            return this.createMysqlConnection(
                    databaseInfo.getDriverInfo(),
                    databaseInfo.getUserName(),
                    databaseInfo.getPassword(),
                    databaseInfo.getRemoteUrl());
        } catch (Exception e) {
            log.error("创建mysql驱动失败,驱动id:{},error:{}", databaseInfo.getId(), e.getMessage(), e);
            throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "创建mysql驱动异常");
        }
    }

    /**
     * 读取数据
     *
     * @param connection 连接
     * @param script     脚本
     * @return 结果集
     */
    @Override
    public List<Map<String, Object>> inData(Object connection, String script) {
        if (!(connection instanceof Connection)) {
            throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "连接方式检测非法");
        }
        Connection mysqlConnection = (Connection) connection;
        try (PreparedStatement statement = mysqlConnection.prepareStatement(script)) {
            ResultSet resultSet = statement.executeQuery();
            List<Map<String, Object>> list = this.resultSetConverter.resultSetToList(resultSet);
            resultSet.close();
            return list;
        } catch (Exception e) {
            log.error("读取数据出现异常, error:{}", e.getMessage(), e);
            throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "读取数据出现异常");
        }
    }

    /**
     * 插入
     *
     * @param writeParamBO 参数
     * @return 操作的数据量
     */
    @Override
    public Long insert(WriteParamBO writeParamBO) {
        try {
            return PreparedStatementUtil.batchInsert(writeParamBO);
        } catch (Exception e) {
            log.error("mysql insert error: {}", e.getMessage(), e);
            throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "mysql insert error");
        }
    }

    /**
     * 更新
     *
     * @param writeParamBO 参数
     * @return 操作的数据量
     */
    @Override
    public Long update(WriteParamBO writeParamBO) {
        try {
            return PreparedStatementUtil.batchUpdate(writeParamBO);
        } catch (Exception e) {
            log.error("mysql update error: {}", e.getMessage(), e);
            throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "mysql update error");
        }
    }

    /**
     * 更新或删除
     *
     * @param writeParamBO 参数
     * @return 操作的数据量
     */
    @Override
    public Long saveOrUpdate(WriteParamBO writeParamBO) {
        return null;
    }

    /**
     * 删除
     *
     * @param writeParamBO 参数
     * @return 操作的数据量
     */
    @Override
    public Long delete(WriteParamBO writeParamBO) {
        try {
            return PreparedStatementUtil.batchDelete(writeParamBO);
        } catch (Exception e) {
            log.error("mysql delete error: {}", e.getMessage(), e);
            throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "mysql delete error");
        }
    }

    /**
     * 清空目标表的数据
     *
     * @param databaseInfo 驱动信息
     * @param tableName    表名
     */
    @Override
    public void truncate(DatabaseInfo databaseInfo, String tableName) {
        String template = "TRUNCATE TABLE {}";
        String sql = CharSequenceUtil.format(template, tableName);
        Connection connection = (Connection) this.createConnection(databaseInfo);
        try (PreparedStatement statement = connection.prepareStatement(sql)) {
            statement.execute();
        } catch (Exception e) {
            log.error("databaseId:{},truncate mysql table:{} error", databaseInfo.getId(), tableName);
            throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), "执行清空表数据异常");
        }
    }

    /**
     * 创建连接方式
     *
     * @param driver      驱动
     * @param rawUsername 加密的用户名
     * @param rawPassword 加密的密码
     * @param url         链接
     * @return Connection
     * @throws ClassNotFoundException ClassNotFoundException
     * @throws SQLException           SQLException
     */
    private Connection createMysqlConnection(String driver, String rawUsername, String rawPassword, String url) throws ClassNotFoundException, SQLException {
        Class.forName(driver);
        String publicKey = this.propertiesConfig.getRsaPublicKey();
        String privateKey = this.propertiesConfig.getRsaPrivateKey();
        RSA rsa = SecureUtil.rsa(privateKey, publicKey);
        // 解密用户名、密码
        String password = rsa.decryptStr(rawPassword, KeyType.PrivateKey);
        return DriverManager.getConnection(url, rawUsername, password);
    }
}
  • 扩展DatasourceHelper#render方法:
/**
     * 创建驱动
     *
     * @param databaseInfo 驱动信息
     * @return 驱动
     */
    public AbstractDatasource render(DatabaseInfo databaseInfo) {
        DbEnum dbEnum = DbEnum.forEnum(databaseInfo.getDbType());
        switch (dbEnum) {
            case MYSQL:
                return SpringUtil.getBean("mysqlDatasource");
            case ORACLE:
                return SpringUtil.getBean("oracleDatasource");
            default:
                break;
        }
        throw new AppException(DataPipelineMsgEnum.DB_TYPE_MISSING);
    }
  • 执行采数逻辑的时候,程序会根据驱动信息返回驱动实现逻辑,实现逻辑参考:TaskInfoHelper#runTask
/**
     * 执行采数任务
     *
     * @param groupId  任务组ID
     * @param batchNo  批次号
     * @param taskInfo 任务信息
     * @return 写入的数量
     */
    public Long runTask(String groupId, String batchNo, TaskInfo taskInfo) {
        // 禁用状态不执行
        if (TaskStatusEnum.DISABLED.getCode().equals(taskInfo.getTaskStatus())) {
            return 0L;
        }
        TaskExecuteInfo taskExecuteInfo = this.saveExecute(groupId, taskInfo.getId(), batchNo);
        try {
            log.info("任务【{}】开始执行", taskInfo.getTaskName());
            // 获取读连接
            DatabaseInfo databaseInfo = this.databaseInfoMapper.selectById(taskInfo.getSourceDatabase());
            AbstractDatasource datasource = this.datasourceHelper.render(databaseInfo);
            Connection inConnection = (Connection) datasource.createConnection(databaseInfo);
          ......
            // 获取写的驱动
            DatabaseInfo targetDatabase = this.databaseInfoMapper.selectById(taskInfo.getTargetDatabase());
            AbstractDatasource targetDatasource = this.datasourceHelper.render(targetDatabase);
          .......
           
        } catch (Exception e) {
            this.updateExecuteStatus(taskExecuteInfo, TaskExecuteStatusEnum.FAIL.getCode());
            log.error("任务【{}】执行异常", taskInfo.getTaskName(), e);
            throw new AppException(MessageEnum.SYSTEM_ERROR.getCode(), taskInfo.getTaskName() + "执行失败");
        }
    }
  • 实现新驱动的分页,PageUtil#buildPageInfo。为什么一定要实现分页呢?因为我们无法准确预知每一次读数的数据量,我曾经就遇到过一个平时跑的好好的采数任务,某一天突然一只提示OOM异常,定位排查发现,源端数据采集量竟在几天内增长了好几倍,而一次全量读数直接把内存撑爆。
public static String buildPageInfo(String sql, DbEnum dbEnum, Integer pageNo, Integer pageSize) {
        int start = (pageNo - 1) * pageSize;
        String result = CharSequenceUtil.endWith(sql, StringConstants.SEMICOLON) ? CharSequenceUtil.subBefore(sql, StringConstants.SEMICOLON, true) : sql;
        switch (dbEnum) {
            case MYSQL:
                return result + " limit " + start + StringConstants.COMMA + pageSize;
            case ORACLE:
                start = start + 1;
                return "SELECT TMP_CACHE_2.* FROM (SELECT ROWNUM AS RN, TMP_CACHE_1.* FROM (" + result + ") TMP_CACHE_1 WHERE ROWNUM < " + (start + pageSize) + ") TMP_CACHE_2 WHERE TMP_CACHE_2.RN >=" + start;
            default:
                break;
        }
        return CharSequenceUtil.EMPTY;
    }
  • 在data-pipeline中,TaskInfoHelper#startTask一次只接收一个任务组,TaskInfoHelper#runTask则一次只执行一个任务,在执行startTask方法中,每次执行都会先检查任务组的状态,即只要任务组处于“执行中”,都会中断此次执行。主要是考虑到任务的执行极可能会耗时较长,如果任务组执行时间远远超过期望的时间,会导致任务组重复执行,导致采数重复。
  • 需要明确的是,quartz调度和任务调度是两个过程,quartz调度成功不代表任务组就会执行,quartz调度结束也不代表任务组执行结束。比如前面提到的例子,quartz调度成功,但任务组并不会执行。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355

推荐阅读更多精彩内容