一、开发背景
工欲善其事,必先利其器。如果我们把Kettle离线或准实时ETL的工具链,那就绕不开Kettle定制化插件开发的环节。比如:我们需要对某个组件流出的数据进行特殊函数处理(如加解密);又或者我现有版本的组件不能满足我们对源端数据捕获的需求;再或者现有版本的组件缺失对重复消费的需求。
简而言之,就是业务流程的特殊性,kettle原有流程处理组件不能满足或者完全满足我们的数据处理需求,就需要我们定制开发流程处理组件,以满足数据的管理、数据的验证、数据的转换和某些特殊类型数据源的抽取。
二、基本框架
我们以上图splunk查询插件为例,来一步步阐述Kettle转换插件的工作原理,这四个类构成了基础的Kettle步骤/节点。当然,存在即合理,每一个类都扮演者不同的角色及其特定的作用。
SplunkInput:步骤类
继承了BaseStep父类,并实现了StepInterface接口,在转换运行时,他的实例即是数据实际处理的位置,每一个执行线程都表示一个此类的示例。
SplunkInputData:数据类
继承了BaseStepData父类,并实现了StepDataInterface接口,用来存储数据,当插件执行时,对于每一个执行线程都是唯一的。执行时里面存储的主要有自定义的元数据对象、数据库连接、缓存、文件句柄等其他对象信息。
SplunkInputDialog:对话框类
继承了BaseStepDialog父类,并实现了StepDialogInterface接口,该类主要实现组件步骤与ETL开发者交互配置的界面,ETL开发者按照设定好的输入和输出选项配置,来实现个性化ETL开发。
SplunkInputMeta:元数据类
继承了BaseStepMeta父类,并实现了StepMetaInterface接口。他的作用是保存和序列化特定步骤实例的配置,即跟踪SplunkInputDialog类的开发者配置。在本例子中,它负责保存开发者设置的步骤属性、splunk连接配置属性和输出字段的名称类型等属性信息。
插件展现配置
负责定义插件步骤在Kettle可视化UI工作台中的显示效果。设置插件的唯一ID、名称及描述,说明kettle插件要加载的元数据类,以及需要预先加载的依赖Jar包列表。当然它有以下两种实现方式,本例采用第一种。
一、通过@Step注解实现
@Step(id = "KettleSplunkInput", name = "Splunk Input", description = "Read data from Splunk", image = "splunk.svg", categoryDescription = "Input")
id:在Kettle插件中必须保证全局唯一
name:自定义插件Spoon UI中的显示名称,无强制唯一性要求
description:描述该插件的具体作用,或者简要使用说明
image:Spoon UI中的显示图标,尽量使用svg格式图片,可到阿里Icon图库查找和下载所需的图标(https://www.iconfont.cn/)
categoryDescription:插件归属目录
……
二、通过plugin.xml实现
<?xml version="1.0" encoding="UTF-8"?>
<plugin
id="KafkaConsumer"
iconfile="logo.png"
description="Apache Kafka Consumer"
tooltip="This plug-in allows reading messages from a specific topic in a Kafka stream"
category="Input"
classname="com.ruckuswireless.pentaho.kafka.consumer.KafkaConsumerMeta">
<libraries>
<library name="pentaho-kafka-consumer.jar"/>
……
<library name="lib/zookeeper-3.4.6.jar"/>
</libraries>
<localized_category>
<category locale="en_US">Input</category>
</localized_category>
<localized_description>
<description locale="en_US">Apache Kafka Consumer</description>
</localized_description>
<localized_tooltip>
<tooltip locale="en_US">This plug-in allows reading messages from a specific topic in a Kafka stream</tooltip>
</localized_tooltip>
</plugin>
id:在Kettle插件中必须保证全局唯一
iconfile:Spoon UI中的显示图标,尽量使用png格式图片
description:描述该插件的具体作用
tooltip:树形菜单中,鼠标滑过显示的提示信息
category:插件归属目录
classname:元数据类
libraries:插件依赖Jar包列表
三、实现代码
一、步骤类
public class SplunkInput extends BaseStep implements StepInterface {
private SplunkInputMeta meta;
private SplunkInputData data;
public SplunkInput(StepMeta stepMeta, StepDataInterface stepDataInterface,
int copyNr, TransMeta transMeta, Trans trans) {
super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
}
@Override
public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
meta = (SplunkInputMeta) smi;
data = (SplunkInputData) sdi;
// Is the step getting input?
// List<StepMeta> steps = getTransMeta().findPreviousSteps(
// getStepMeta() );
// Connect to Neo4j
if (StringUtils.isEmpty(meta.getHost())) {
log.logError("You need to specify a Splunk connection Host to use in this step");
return false;
}
if (StringUtils.isEmpty(meta.getPort())) {
log.logError("You need to specify a Splunk connection Port to use in this step");
return false;
}
if (StringUtils.isEmpty(meta.getUsername())) {
log.logError("You need to specify a Splunk connection Username to use in this step");
return false;
}
if (StringUtils.isEmpty(meta.getPassword())) {
log.logError("You need to specify a Splunk connection Password to use in this step");
return false;
}
// To correct lazy programmers who built certain PDI steps...
//System.setProperty("https.protocols", "TLSv1.2,TLSv1.1,SSLv3");
//Security.setProperty("jdk.tls.disabledAlgorithms","SSLv3, RC4, MD5withRSA, DH keySize < 768");
/* Overriding the static method setSslSecurityProtocol to implement the security protocol of choice */
// HttpService.setSslSecurityProtocol(SSLSecurityProtocol.TLSv1_2);
/* end comment for overriding the method setSslSecurityProtocol */
data.splunkConnection = new SplunkConnection(meta.getName(),
meta.getHost(), meta.getPort(), meta.getUsername(),
meta.getPassword());
data.splunkConnection.initializeVariablesFrom(this);
try {
data.serviceArgs = data.splunkConnection.getServiceArgs();
data.service = Service.connect(data.serviceArgs);
} catch (Exception e) {
log.logError(
"Unable to get or create Neo4j database driver for database '"
+ data.splunkConnection.getName() + "'", e);
return false;
}
return super.init(smi, sdi);
}
@Override
public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
meta = (SplunkInputMeta) smi;
data = (SplunkInputData) sdi;
super.dispose(smi, sdi);
}
@Override
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
throws KettleException {
meta = (SplunkInputMeta) smi;
data = (SplunkInputData) sdi;
if (first) {
first = false;
// get the output fields...
data.outputRowMeta = new RowMeta();
meta.getFields(data.outputRowMeta, getStepname(), null,
getStepMeta(), this, repository, data.metaStore);
// Run a one shot search in blocking mode
JobResultsArgs args = new JobResultsArgs();
//不限制返回数据量,即返回所有数据
args.setCount(0);
args.setOutputMode(JobResultsArgs.OutputMode.XML);
data.eventsStream = data.service.oneshotSearch(getTransMeta()
.environmentSubstitute(meta.getQuery()), args);
}
try {
ResultsReaderXml resultsReader = new ResultsReaderXml(
data.eventsStream);
HashMap<String, String> event;
while ((event = resultsReader.getNextEvent()) != null) {
Object[] outputRow = RowDataUtil
.allocateRowData(data.outputRowMeta.size());
for (int i = 0; i < meta.getReturnValues().size(); i++) {
ReturnValue returnValue = meta.getReturnValues().get(i);
String value = event.get(returnValue.getSplunkName());
outputRow[i] = value;
}
incrementLinesInput();
putRow(data.outputRowMeta, outputRow);
}
} catch (Exception e) {
throw new KettleException(
"Error reading from Splunk events stream", e);
} finally {
try {
data.eventsStream.close();
} catch (IOException e) {
throw new KettleException("Unable to close events stream", e);
}
}
setOutputDone();
return false;
}
}
init(),该方法是在转换执行前被调用,只有所有步骤的初始化成功时,转换才会真正被执行。此例是检查splunk连接ip、port等属性是否配置,以及splunk连接初始化是否成功
dispose(),该方法作用是在转换步骤执行完后执行,完成如缓存、文件句柄等资源的关闭操作。
run(),该方法是在实际处理数据流记录集时调用。
processRow(),该方法作用是处理所有数据流。通常通过调用getRow()来获取需要处理的单条记录。 这个方法如果有需要将会被阻塞,例如当此步骤希望放慢脚步处理数据时。processRow()随后的流程将执行转换工作并调用putRow()方法将处理过的记录放到它的下游步骤。
二、数据类
开发中,大多数环节都需要临时的缓冲或者临时的数据。数据类就是这些数据合适的存放位置。每一个执行线程将得到其拥有的数据类实例,所以它能在独立的空间里面运行。
public class SplunkInputData extends BaseStepData implements StepDataInterface {
public RowMetaInterface outputRowMeta;
public SplunkConnection splunkConnection;//splunk连接
public int[] fieldIndexes;//字段索引数组
public String query;//splunk spl语句(必须以search开头)
public IMetaStore metaStore;//元数据仓库对象
public ServiceArgs serviceArgs;//splunk查询服务参数
public Service service;//splunk查询服务
public InputStream eventsStream;//输入事件流
}
三、对话框类
此处主要实现输入属性监听器配置,配置数据初始化约束检查,splunk连接校验以及返回结果集字段属性预览等功能模块
private static Class<?> PKG = SplunkInputMeta.class; // for i18n purposes,
// needed by
// Translator2!!
private Text wStepname;
private Text wHost;//splunk域名或IP
private Text wPort;//splunk端口
private Text wUsername;//splunk用户名
private Text wPassword;//splunk密码
private Text wQuery;//splunk spl语句
private TableView wReturns;//splunk组件输出字段及属性
private SplunkInputMeta input;
……
四、元数据类
下面针对splunk查询组件,列举了元数据类的几个关键的方法,其中私有成员变量outputField 存放了下一个步骤的输出流字段。
@Step(id = "KettleSplunkInput", name = "Splunk Input", description = "Read data from Splunk", image = "splunk.svg", categoryDescription = "Input")
@InjectionSupported(localizationPrefix = "Cypher.Injection.", groups = {
"PARAMETERS", "RETURNS" })
public class SplunkInputMeta extends BaseStepMeta implements StepMetaInterface {
public static final String HOST = "host";
public static final String PORT = "port";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String QUERY = "query";
public static final String RETURNS = "returns";
public static final String RETURN = "return";
public static final String RETURN_NAME = "return_name";
public static final String RETURN_SPLUNK_NAME = "return_splunk_name";
public static final String RETURN_TYPE = "return_type";
public static final String RETURN_LENGTH = "return_length";
public static final String RETURN_FORMAT = "return_format";
@Injection(name = HOST)
private String host;
@Injection(name = PORT)
private String port;
@Injection(name = USERNAME)
private String username;
@Injection(name = PASSWORD)
private String password;
@Injection(name = QUERY)
private String query;
@InjectionDeep
private List<ReturnValue> returnValues;
public SplunkInputMeta() {
super();
returnValues = new ArrayList<>();
}
……
}
// 跟踪组件步骤输入和输出设置
public String getOutputField()
public void setOutputField(…)
public void setDefault() //配置参数初始化
@Override
public void setDefault() {
host = "127.0.0.1";
port = "8089";
username = "query";
password = "query";
query = "search * | head 100";
}
// 依赖获取和载入xml,序列化步骤属性设置
public String getXML()
@Override
public String getXML() {
StringBuilder xml = new StringBuilder();
xml.append(XMLHandler.addTagValue(HOST, host));
xml.append(XMLHandler.addTagValue(PORT, port));
xml.append(XMLHandler.addTagValue(USERNAME, username));
xml.append(XMLHandler.addTagValue(PASSWORD, password));
xml.append(XMLHandler.addTagValue(QUERY, query));
xml.append(XMLHandler.openTag(RETURNS));
for (ReturnValue returnValue : returnValues) {
xml.append(XMLHandler.openTag(RETURN));
xml.append(XMLHandler.addTagValue(RETURN_NAME,
returnValue.getName()));
xml.append(XMLHandler.addTagValue(RETURN_SPLUNK_NAME,
returnValue.getSplunkName()));
xml.append(XMLHandler.addTagValue(RETURN_TYPE,
returnValue.getType()));
xml.append(XMLHandler.addTagValue(RETURN_LENGTH,
returnValue.getLength()));
xml.append(XMLHandler.addTagValue(RETURN_FORMAT,
returnValue.getFormat()));
xml.append(XMLHandler.closeTag(RETURN));
}
xml.append(XMLHandler.closeTag(RETURNS));
return xml.toString();
}
public void loadXML(…)
// 从资源库读取和保存步骤属性设置
public void readRep(…)
@Override
public void readRep(Repository rep, IMetaStore metaStore, ObjectId stepId,
List<DatabaseMeta> databases) throws KettleException {
host = rep.getStepAttributeString(stepId, HOST);
port = rep.getStepAttributeString(stepId, PORT);
username = rep.getStepAttributeString(stepId, USERNAME);
password = rep.getStepAttributeString(stepId, PASSWORD);
query = rep.getStepAttributeString(stepId, QUERY);
returnValues = new ArrayList<>();
int nrReturns = rep.countNrStepAttributes(stepId, RETURN_NAME);
for (int i = 0; i < nrReturns; i++) {
String name = rep.getStepAttributeString(stepId, i, RETURN_NAME);
String splunkName = rep.getStepAttributeString(stepId, i,
RETURN_SPLUNK_NAME);
String type = rep.getStepAttributeString(stepId, i, RETURN_TYPE);
int length = (int) rep.getStepAttributeInteger(stepId, i,
RETURN_LENGTH);
String format = rep
.getStepAttributeString(stepId, i, RETURN_FORMAT);
returnValues.add(new ReturnValue(name, splunkName, type, length,
format));
}
}
public void saveRep(…)
// 提供有关步骤如何处理数据流行的字段结构的信息
public void getFields(…)
@Override
public void getFields(RowMetaInterface rowMeta, String name,
RowMetaInterface[] info, StepMeta nextStep, VariableSpace space,
Repository repository, IMetaStore metaStore)
throws KettleStepException {
for (ReturnValue returnValue : returnValues) {
try {
int type = ValueMetaFactory.getIdForValueMeta(returnValue
.getType());
ValueMetaInterface valueMeta = ValueMetaFactory
.createValueMeta(returnValue.getName(), type);
valueMeta.setLength(returnValue.getLength());
valueMeta.setOrigin(name);
rowMeta.addValueMeta(valueMeta);
} catch (KettlePluginException e) {
throw new KettleStepException("Unknown data type '"
+ returnValue.getType() + "' for value named '"
+ returnValue.getName() + "'");
}
}
}
// 对步骤执行扩展验证检查
public void check(…)
// 向Kettle提供步骤、数据和对话框类的实例
public StepInterface getStep(…)
@Override
public StepInterface getStep(StepMeta stepMeta,
StepDataInterface stepDataInterface, int i, TransMeta transMeta,
Trans trans) {
return new SplunkInput(stepMeta, stepDataInterface, i, transMeta, trans);
}
public StepDataInterface getStepData()
@Override
public StepDataInterface getStepData() {
return new SplunkInputData();
}
五、附加类
该插件还需要SplunkConnection类来构建splunk连接,ReturnValue类来标准化splunk输出流字段信息。
public ServiceArgs getServiceArgs() {
ServiceArgs args = new ServiceArgs();
args.setUsername(getRealUsername());
args.setPassword(getRealPassword());
args.setHost(getRealHostname());
args.setPort(Const.toInt(getRealPort(), 8089));
args.setSSLSecurityProtocol(SSLSecurityProtocol.TLSv1_2);//具体支持协议依赖于服务端配置
return args;
}
public class ReturnValue {
@Injection(name = "RETURN_NAME", group = "RETURNS")
private String name;
@Injection(name = "RETURN_SPLUNK_NAME", group = "RETURNS")
private String splunkName;
@Injection(name = "RETURN_TYPE", group = "RETURNS")
private String type;
@Injection(name = "RETURN_LENGTH", group = "RETURNS")
private int length;
@Injection(name = "RETURN_FORMAT", group = "RETURNS")
private String format;
public ReturnValue(String name, String splunkName, String type, int length,
String format) {
this.name = name;
this.splunkName = splunkName;
this.type = type;
this.length = length;
this.format = format;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ReturnValue that = (ReturnValue) o;
return Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
@Override
public String toString() {
return "ReturnValue{" + "name='" + name + '\'' + '}';
}
/**
* Gets name
*
* @return value of name
*/
public String getName() {
return name;
}
/**
* @param name
* The name to set
*/
public void setName(String name) {
this.name = name;
}
/**
* Gets splunkName
*
* @return value of splunkName
*/
public String getSplunkName() {
return splunkName;
}
/**
* @param splunkName
* The splunkName to set
*/
public void setSplunkName(String splunkName) {
this.splunkName = splunkName;
}
/**
* Gets type
*
* @return value of type
*/
public String getType() {
return type;
}
/**
* @param type
* The type to set
*/
public void setType(String type) {
this.type = type;
}
/**
* Gets length
*
* @return value of length
*/
public int getLength() {
return length;
}
/**
* @param length
* The length to set
*/
public void setLength(int length) {
this.length = length;
}
/**
* Gets format
*
* @return value of format
*/
public String getFormat() {
return format;
}
/**
* @param format
* The format to set
*/
public void setFormat(String format) {
this.format = format;
}
}
四、总结
通过以上对于Kettle自定义插件开发的阐述,我们基本上可以掌握端到端的开发流程,如果你需要源码或者了解更多自定义插件及集成方式,抑或有开发过程或者使用过程中的任何疑问或建议,请关注小编"游走在数据之间"