java整合kettle文件资源库工具方法

1.整合需要的架包
kettle支持的架包

2.基于文件目录的api

@Slf4j
@Component
public class KettleManager {
    private KettleFileRepository kettleFileRepository = null;
    @Resource
    private KettleProperties kettleProperties;

    @PostConstruct
    public void init() {
        try {
            // 环境
            KettleEnvironment.init();
            EnvUtil.environmentInit();
            log.info("Kettle环境初始化成功");
        } catch (Exception e) {
            log.error("Kettle环境初始化失败", e);
        }
    }

    public KettleFileRepository getKettleFileRepository() throws KettleException {
        if (null == kettleFileRepository) {
            // 初始化
            kettleFileRepository = this.fileRepositoryCon();
        }
        return kettleFileRepository;
    }

    /**
     * 执行文件资源库转换 -- 转换
     *
     * @param dataTypeCode  数据类型
     * @param xlsFile   文件
     * @param namedParams 命名参数 
     */
    public String callTrans(String dataTypeCode, String tempFileName, File xlsFile, Map<String, String> namedParams) throws KettleException {
        // 请求
        Map<String, String> variables = Maps.newConcurrentMap();
        variables.put("file", xlsFile.getAbsolutePath()); // 直接文件全路径
        variables.put("path", xlsFile.getParent()); // 跟通配符协同使用,通配符的格式: .*.xls
        return this.callTrans(dataTypeCode, tempFileName, namedParams, variables);
    }

    /**
     * 执行文件资源库转换 -- 转换
     *
     * @param transPath   转换路径(相对于资源库)
     * @param transName   转换名称(不需要后缀)
     * @param namedParams 命名参数
     * @param variables   全局参数
     */
    private String callTrans(String transPath, String transName, Map<String, String> namedParams, Map<String, String> variables) throws KettleException {
        log.info("transPath = {}, transName = {}, namedParams = {}, variables={}", transPath, transName, JSON.toJSONString(namedParams), JSON.toJSONString(variables));
        TransMeta transMeta = this.loadTrans(getKettleFileRepository(), transPath, transName);
        // 转换
        Trans trans = new Trans(transMeta);
        // 数据库信息
        initDbInfo(trans);
        // 设置命名参数
        if (null != namedParams && namedParams.size() > 0) {
            for (Map.Entry<String, String> entry : namedParams.entrySet()) {
                trans.setParameterValue(entry.getKey(), entry.getValue());
            }
        }
        // 设置全局参数
        if (null != variables && variables.size() > 0) {
            for (Map.Entry<String, String> entry : variables.entrySet()) {
                trans.setVariable(entry.getKey(), entry.getValue());
            }
        }
        trans.setLogLevel(this.getLoggerLevel());
        //执行
        String[] cmdParams = new String[]{};
        trans.execute(cmdParams);
        //记录日志
        checkErrors(trans.getLogChannelId(), trans.getErrors());
        // 同步,等待执行完成
        trans.waitUntilFinished();
        return trans.getLogChannelId();
    }

    /**
     * 数据库信息
     */
    private void initDbInfo(Trans trans) {
        trans.setVariable("driverClassName", kettleProperties.getDriverClassName());
        trans.setVariable("username", kettleProperties.getUsername());
        trans.setVariable("password", kettleProperties.getPassword());

        trans.setVariable("sjzl-url", kettleProperties.getSjzlUrl());
        trans.setVariable("ztk-url", kettleProperties.getZtkUrl());
        trans.setVariable("xxcx-url", kettleProperties.getXxcxUrl());
    }

    /**
     * 加载转换
     *
     * @param repo      kettle文件资源库
     * @param transPath 相对路径 - 目录
     * @param transName 转换名称
     */
    private TransMeta loadTrans(KettleFileRepository repo, String transPath, String transName) throws KettleException {
        String msg;
        RepositoryDirectoryInterface dir = repo.findDirectory(transPath);//根据指定的字符串路径找到目录
        if (null == dir) {
            msg = "kettle资源库转换路径不存在【" + repo.getRepositoryMeta().getBaseDirectory() + transPath + "】!";
            throw new RuntimeException(msg);
        }
        TransMeta transMeta = repo.loadTransformation(repo.getTransformationID(transName, dir), null);
        if (null == transMeta) {
            msg = "kettle资源库【" + dir.getPath() + "】不存在该转换【" + transName + "】!";
            throw new RuntimeException(msg);
        }
        return transMeta;
    }

    private void checkErrors(String logChannelId, int errors) {
        LoggingBuffer appender = KettleLogStore.getAppender();
        String logText = appender.getBuffer(logChannelId, true).toString();
        log.info(logText);
        log.info("------------{},{}----------------", logChannelId, errors);
        //抛出异常
        if (errors <= 0) {
            return;
        }
        String msg = "There are errors during transformation exception!(转换过程中发生异常)";
        log.error(msg);
        throw new RuntimeException(msg);
    }

    /**
     * 取得kettle的日志级别
     */
    private LogLevel getLoggerLevel() {
        String level = kettleProperties.getKettleLogLevel();
        switch (level.toLowerCase()) {
            case "basic": return LogLevel.BASIC;
            case "detailed": return LogLevel.DETAILED;
            case "error": return LogLevel.ERROR;
            case "debug": return LogLevel.DEBUG;
            case "minimal": return LogLevel.MINIMAL;
            case "rowlevel": return LogLevel.ROWLEVEL;
            case "nothing": return LogLevel.NOTHING;
            default: return null;
        }
    }

    /**
     * 配置kettle文件库资源库环境
     **/
    public KettleFileRepository fileRepositoryCon() throws KettleException {

        String kettleFileId = kettleProperties.getKettleFileId();
        String kettleFileName = kettleProperties.getKettleFileName();
        String kettleFilePath = kettleProperties.getKettleFilePath();
        String kettleFileDesc = kettleProperties.getKettleFileDesc();

        //资源库元对象
        KettleFileRepositoryMeta fileRepositoryMeta = new KettleFileRepositoryMeta(kettleFileId, kettleFileName, kettleFileDesc, kettleFilePath);
        // 文件形式的资源库
        KettleFileRepository repo = new KettleFileRepository();
        repo.init(fileRepositoryMeta);
        //连接到资源库
        repo.connect("", "");//默认的连接资源库的用户名和密码

        String msg;
        if (repo.isConnected()) {
            msg = "kettle文件库资源库【" + kettleFilePath + "】连接成功";
            log.info(msg);
            return repo;
        } else {
            msg = "kettle文件库资源库【" + kettleFilePath + "】连接失败";
            log.error(msg);
            throw new RuntimeException(msg);
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容