kettle支持的架包
2.基于文件目录的api
@Slf4j
@Component
public class KettleManager {
private KettleFileRepository kettleFileRepository = null;
@Resource
private KettleProperties kettleProperties;
@PostConstruct
public void init() {
try {
// 环境
KettleEnvironment.init();
EnvUtil.environmentInit();
log.info("Kettle环境初始化成功");
} catch (Exception e) {
log.error("Kettle环境初始化失败", e);
}
}
public KettleFileRepository getKettleFileRepository() throws KettleException {
if (null == kettleFileRepository) {
// 初始化
kettleFileRepository = this.fileRepositoryCon();
}
return kettleFileRepository;
}
/**
* 执行文件资源库转换 -- 转换
*
* @param dataTypeCode 数据类型
* @param xlsFile 文件
* @param namedParams 命名参数
*/
public String callTrans(String dataTypeCode, String tempFileName, File xlsFile, Map<String, String> namedParams) throws KettleException {
// 请求
Map<String, String> variables = Maps.newConcurrentMap();
variables.put("file", xlsFile.getAbsolutePath()); // 直接文件全路径
variables.put("path", xlsFile.getParent()); // 跟通配符协同使用,通配符的格式: .*.xls
return this.callTrans(dataTypeCode, tempFileName, namedParams, variables);
}
/**
* 执行文件资源库转换 -- 转换
*
* @param transPath 转换路径(相对于资源库)
* @param transName 转换名称(不需要后缀)
* @param namedParams 命名参数
* @param variables 全局参数
*/
private String callTrans(String transPath, String transName, Map<String, String> namedParams, Map<String, String> variables) throws KettleException {
log.info("transPath = {}, transName = {}, namedParams = {}, variables={}", transPath, transName, JSON.toJSONString(namedParams), JSON.toJSONString(variables));
TransMeta transMeta = this.loadTrans(getKettleFileRepository(), transPath, transName);
// 转换
Trans trans = new Trans(transMeta);
// 数据库信息
initDbInfo(trans);
// 设置命名参数
if (null != namedParams && namedParams.size() > 0) {
for (Map.Entry<String, String> entry : namedParams.entrySet()) {
trans.setParameterValue(entry.getKey(), entry.getValue());
}
}
// 设置全局参数
if (null != variables && variables.size() > 0) {
for (Map.Entry<String, String> entry : variables.entrySet()) {
trans.setVariable(entry.getKey(), entry.getValue());
}
}
trans.setLogLevel(this.getLoggerLevel());
//执行
String[] cmdParams = new String[]{};
trans.execute(cmdParams);
//记录日志
checkErrors(trans.getLogChannelId(), trans.getErrors());
// 同步,等待执行完成
trans.waitUntilFinished();
return trans.getLogChannelId();
}
/**
* 数据库信息
*/
private void initDbInfo(Trans trans) {
trans.setVariable("driverClassName", kettleProperties.getDriverClassName());
trans.setVariable("username", kettleProperties.getUsername());
trans.setVariable("password", kettleProperties.getPassword());
trans.setVariable("sjzl-url", kettleProperties.getSjzlUrl());
trans.setVariable("ztk-url", kettleProperties.getZtkUrl());
trans.setVariable("xxcx-url", kettleProperties.getXxcxUrl());
}
/**
* 加载转换
*
* @param repo kettle文件资源库
* @param transPath 相对路径 - 目录
* @param transName 转换名称
*/
private TransMeta loadTrans(KettleFileRepository repo, String transPath, String transName) throws KettleException {
String msg;
RepositoryDirectoryInterface dir = repo.findDirectory(transPath);//根据指定的字符串路径找到目录
if (null == dir) {
msg = "kettle资源库转换路径不存在【" + repo.getRepositoryMeta().getBaseDirectory() + transPath + "】!";
throw new RuntimeException(msg);
}
TransMeta transMeta = repo.loadTransformation(repo.getTransformationID(transName, dir), null);
if (null == transMeta) {
msg = "kettle资源库【" + dir.getPath() + "】不存在该转换【" + transName + "】!";
throw new RuntimeException(msg);
}
return transMeta;
}
private void checkErrors(String logChannelId, int errors) {
LoggingBuffer appender = KettleLogStore.getAppender();
String logText = appender.getBuffer(logChannelId, true).toString();
log.info(logText);
log.info("------------{},{}----------------", logChannelId, errors);
//抛出异常
if (errors <= 0) {
return;
}
String msg = "There are errors during transformation exception!(转换过程中发生异常)";
log.error(msg);
throw new RuntimeException(msg);
}
/**
* 取得kettle的日志级别
*/
private LogLevel getLoggerLevel() {
String level = kettleProperties.getKettleLogLevel();
switch (level.toLowerCase()) {
case "basic": return LogLevel.BASIC;
case "detailed": return LogLevel.DETAILED;
case "error": return LogLevel.ERROR;
case "debug": return LogLevel.DEBUG;
case "minimal": return LogLevel.MINIMAL;
case "rowlevel": return LogLevel.ROWLEVEL;
case "nothing": return LogLevel.NOTHING;
default: return null;
}
}
/**
* 配置kettle文件库资源库环境
**/
public KettleFileRepository fileRepositoryCon() throws KettleException {
String kettleFileId = kettleProperties.getKettleFileId();
String kettleFileName = kettleProperties.getKettleFileName();
String kettleFilePath = kettleProperties.getKettleFilePath();
String kettleFileDesc = kettleProperties.getKettleFileDesc();
//资源库元对象
KettleFileRepositoryMeta fileRepositoryMeta = new KettleFileRepositoryMeta(kettleFileId, kettleFileName, kettleFileDesc, kettleFilePath);
// 文件形式的资源库
KettleFileRepository repo = new KettleFileRepository();
repo.init(fileRepositoryMeta);
//连接到资源库
repo.connect("", "");//默认的连接资源库的用户名和密码
String msg;
if (repo.isConnected()) {
msg = "kettle文件库资源库【" + kettleFilePath + "】连接成功";
log.info(msg);
return repo;
} else {
msg = "kettle文件库资源库【" + kettleFilePath + "】连接失败";
log.error(msg);
throw new RuntimeException(msg);
}
}
}