Kettle插件开发之Splunk

一、开发背景

工欲善其事,必先利其器。如果我们把Kettle离线或准实时ETL的工具链,那就绕不开Kettle定制化插件开发的环节。比如:我们需要对某个组件流出的数据进行特殊函数处理(如加解密);又或者我现有版本的组件不能满足我们对源端数据捕获的需求;再或者现有版本的组件缺失对重复消费的需求。

简而言之,就是业务流程的特殊性,kettle原有流程处理组件不能满足或者完全满足我们的数据处理需求,就需要我们定制开发流程处理组件,以满足数据的管理、数据的验证、数据的转换和某些特殊类型数据源的抽取。

二、基本框架

image.png

我们以上图splunk查询插件为例,来一步步阐述Kettle转换插件的工作原理,这四个类构成了基础的Kettle步骤/节点。当然,存在即合理,每一个类都扮演者不同的角色及其特定的作用。

SplunkInput:步骤类

继承了BaseStep父类,并实现了StepInterface接口,在转换运行时,他的实例即是数据实际处理的位置,每一个执行线程都表示一个此类的示例。

image.png

SplunkInputData:数据类

继承了BaseStepData父类,并实现了StepDataInterface接口,用来存储数据,当插件执行时,对于每一个执行线程都是唯一的。执行时里面存储的主要有自定义的元数据对象、数据库连接、缓存、文件句柄等其他对象信息。

image.png

SplunkInputDialog:对话框类

继承了BaseStepDialog父类,并实现了StepDialogInterface接口,该类主要实现组件步骤与ETL开发者交互配置的界面,ETL开发者按照设定好的输入和输出选项配置,来实现个性化ETL开发。

image.png

SplunkInputMeta:元数据类

继承了BaseStepMeta父类,并实现了StepMetaInterface接口。他的作用是保存和序列化特定步骤实例的配置,即跟踪SplunkInputDialog类的开发者配置。在本例子中,它负责保存开发者设置的步骤属性、splunk连接配置属性和输出字段的名称类型等属性信息。

插件展现配置

负责定义插件步骤在Kettle可视化UI工作台中的显示效果。设置插件的唯一ID、名称及描述,说明kettle插件要加载的元数据类,以及需要预先加载的依赖Jar包列表。当然它有以下两种实现方式,本例采用第一种。

一、通过@Step注解实现

@Step(id = "KettleSplunkInput", name = "Splunk Input", description = "Read data from Splunk", image = "splunk.svg", categoryDescription = "Input")

id:在Kettle插件中必须保证全局唯一

name:自定义插件Spoon UI中的显示名称,无强制唯一性要求

description:描述该插件的具体作用,或者简要使用说明

image:Spoon UI中的显示图标,尽量使用svg格式图片,可到阿里Icon图库查找和下载所需的图标(https://www.iconfont.cn/

categoryDescription:插件归属目录

……

二、通过plugin.xml实现

<?xml version="1.0" encoding="UTF-8"?>
<plugin
    id="KafkaConsumer"
    iconfile="logo.png"
    description="Apache Kafka Consumer"
    tooltip="This plug-in allows reading messages from a specific topic in a Kafka stream"
    category="Input"
    classname="com.ruckuswireless.pentaho.kafka.consumer.KafkaConsumerMeta">
    
    <libraries>
        <library name="pentaho-kafka-consumer.jar"/>
        ……
        <library name="lib/zookeeper-3.4.6.jar"/>
    </libraries>
    
    <localized_category>
        <category locale="en_US">Input</category>
    </localized_category>
    <localized_description>
        <description locale="en_US">Apache Kafka Consumer</description>
    </localized_description>
    <localized_tooltip>
        <tooltip locale="en_US">This plug-in allows reading messages from a specific topic in a Kafka stream</tooltip>
    </localized_tooltip>
</plugin>

id:在Kettle插件中必须保证全局唯一
iconfile:Spoon UI中的显示图标,尽量使用png格式图片
description:描述该插件的具体作用
tooltip:树形菜单中,鼠标滑过显示的提示信息
category:插件归属目录
classname:元数据类
libraries:插件依赖Jar包列表

三、实现代码

一、步骤类

public class SplunkInput extends BaseStep implements StepInterface {
    private SplunkInputMeta meta;
    private SplunkInputData data;
    public SplunkInput(StepMeta stepMeta, StepDataInterface stepDataInterface,
            int copyNr, TransMeta transMeta, Trans trans) {
        super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
    }
    @Override
    public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
        meta = (SplunkInputMeta) smi;
        data = (SplunkInputData) sdi;
        // Is the step getting input?
        // List<StepMeta> steps = getTransMeta().findPreviousSteps(
        // getStepMeta() );
        // Connect to Neo4j
        if (StringUtils.isEmpty(meta.getHost())) {
            log.logError("You need to specify a Splunk connection Host to use in this step");
            return false;
        }
        if (StringUtils.isEmpty(meta.getPort())) {
            log.logError("You need to specify a Splunk connection Port to use in this step");
            return false;
        }
        if (StringUtils.isEmpty(meta.getUsername())) {
            log.logError("You need to specify a Splunk connection Username to use in this step");
            return false;
        }
        if (StringUtils.isEmpty(meta.getPassword())) {
            log.logError("You need to specify a Splunk connection Password to use in this step");
            return false;
        }
        // To correct lazy programmers who built certain PDI steps...
        //System.setProperty("https.protocols", "TLSv1.2,TLSv1.1,SSLv3");
        
        //Security.setProperty("jdk.tls.disabledAlgorithms","SSLv3, RC4, MD5withRSA, DH keySize < 768");
         /* Overriding the static method setSslSecurityProtocol to implement the security protocol of choice */
        // HttpService.setSslSecurityProtocol(SSLSecurityProtocol.TLSv1_2);
        /* end comment for overriding the method setSslSecurityProtocol */
        
        data.splunkConnection = new SplunkConnection(meta.getName(),
                meta.getHost(), meta.getPort(), meta.getUsername(),
                meta.getPassword());
        data.splunkConnection.initializeVariablesFrom(this);
        try {
            data.serviceArgs = data.splunkConnection.getServiceArgs();
            
            data.service = Service.connect(data.serviceArgs);
        } catch (Exception e) {
            log.logError(
                    "Unable to get or create Neo4j database driver for database '"
                            + data.splunkConnection.getName() + "'", e);
            return false;
        }
        return super.init(smi, sdi);
    }
    @Override
    public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
        meta = (SplunkInputMeta) smi;
        data = (SplunkInputData) sdi;
        super.dispose(smi, sdi);
    }
    @Override
    public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
            throws KettleException {
        meta = (SplunkInputMeta) smi;
        data = (SplunkInputData) sdi;
        if (first) {
            first = false;
            // get the output fields...
            data.outputRowMeta = new RowMeta();
            meta.getFields(data.outputRowMeta, getStepname(), null,
                    getStepMeta(), this, repository, data.metaStore);
            // Run a one shot search in blocking mode
            JobResultsArgs args = new JobResultsArgs();
            //不限制返回数据量,即返回所有数据
            args.setCount(0);
            args.setOutputMode(JobResultsArgs.OutputMode.XML);
            data.eventsStream = data.service.oneshotSearch(getTransMeta()
                    .environmentSubstitute(meta.getQuery()), args);
        }
        try {
            ResultsReaderXml resultsReader = new ResultsReaderXml(
                    data.eventsStream);
            HashMap<String, String> event;
            while ((event = resultsReader.getNextEvent()) != null) {
                Object[] outputRow = RowDataUtil
                        .allocateRowData(data.outputRowMeta.size());
                for (int i = 0; i < meta.getReturnValues().size(); i++) {
                    ReturnValue returnValue = meta.getReturnValues().get(i);
                    String value = event.get(returnValue.getSplunkName());
                    outputRow[i] = value;
                }
                incrementLinesInput();
                putRow(data.outputRowMeta, outputRow);
            }
        } catch (Exception e) {
            throw new KettleException(
                    "Error reading from Splunk events stream", e);
        } finally {
            try {
                data.eventsStream.close();
            } catch (IOException e) {
                throw new KettleException("Unable to close events stream", e);
            }
        }
        setOutputDone();
        return false;
    }
}

init(),该方法是在转换执行前被调用,只有所有步骤的初始化成功时,转换才会真正被执行。此例是检查splunk连接ip、port等属性是否配置,以及splunk连接初始化是否成功
dispose(),该方法作用是在转换步骤执行完后执行,完成如缓存、文件句柄等资源的关闭操作。
run(),该方法是在实际处理数据流记录集时调用。
processRow(),该方法作用是处理所有数据流。通常通过调用getRow()来获取需要处理的单条记录。 这个方法如果有需要将会被阻塞,例如当此步骤希望放慢脚步处理数据时。processRow()随后的流程将执行转换工作并调用putRow()方法将处理过的记录放到它的下游步骤。

二、数据类

开发中,大多数环节都需要临时的缓冲或者临时的数据。数据类就是这些数据合适的存放位置。每一个执行线程将得到其拥有的数据类实例,所以它能在独立的空间里面运行。

public class SplunkInputData extends BaseStepData implements StepDataInterface {
    public RowMetaInterface outputRowMeta;
    public SplunkConnection splunkConnection;//splunk连接
    public int[] fieldIndexes;//字段索引数组
    public String query;//splunk spl语句(必须以search开头)
    public IMetaStore metaStore;//元数据仓库对象
    public ServiceArgs serviceArgs;//splunk查询服务参数
    public Service service;//splunk查询服务
    public InputStream eventsStream;//输入事件流
}

三、对话框类

此处主要实现输入属性监听器配置,配置数据初始化约束检查,splunk连接校验以及返回结果集字段属性预览等功能模块

private static Class<?> PKG = SplunkInputMeta.class; // for i18n purposes,
                                                            // needed by
                                                            // Translator2!!
    private Text wStepname;
    private Text wHost;//splunk域名或IP
    private Text wPort;//splunk端口
    private Text wUsername;//splunk用户名
    private Text wPassword;//splunk密码
    private Text wQuery;//splunk spl语句
    private TableView wReturns;//splunk组件输出字段及属性
    private SplunkInputMeta input;
    
    ……

四、元数据类

下面针对splunk查询组件,列举了元数据类的几个关键的方法,其中私有成员变量outputField 存放了下一个步骤的输出流字段。

@Step(id = "KettleSplunkInput", name = "Splunk Input", description = "Read data from Splunk", image = "splunk.svg", categoryDescription = "Input")
@InjectionSupported(localizationPrefix = "Cypher.Injection.", groups = {
        "PARAMETERS", "RETURNS" })
public class SplunkInputMeta extends BaseStepMeta implements StepMetaInterface {
    public static final String HOST = "host";
    public static final String PORT = "port";
    public static final String USERNAME = "username";
    public static final String PASSWORD = "password";
    public static final String QUERY = "query";
    public static final String RETURNS = "returns";
    public static final String RETURN = "return";
    public static final String RETURN_NAME = "return_name";
    public static final String RETURN_SPLUNK_NAME = "return_splunk_name";
    public static final String RETURN_TYPE = "return_type";
    public static final String RETURN_LENGTH = "return_length";
    public static final String RETURN_FORMAT = "return_format";
    @Injection(name = HOST)
    private String host;
    @Injection(name = PORT)
    private String port;
    @Injection(name = USERNAME)
    private String username;
    @Injection(name = PASSWORD)
    private String password;
    @Injection(name = QUERY)
    private String query;
    @InjectionDeep
    private List<ReturnValue> returnValues;
    public SplunkInputMeta() {
        super();
        returnValues = new ArrayList<>();
    }
    ……
}

// 跟踪组件步骤输入和输出设置
public String getOutputField()
public void setOutputField(…)
public void setDefault() //配置参数初始化

@Override
    public void setDefault() {
        host = "127.0.0.1";
        port = "8089";
        username = "query";
        password = "query";
        query = "search * | head 100";
    }

// 依赖获取和载入xml,序列化步骤属性设置
public String getXML()

@Override
    public String getXML() {
        StringBuilder xml = new StringBuilder();
        xml.append(XMLHandler.addTagValue(HOST, host));
        xml.append(XMLHandler.addTagValue(PORT, port));
        xml.append(XMLHandler.addTagValue(USERNAME, username));
        xml.append(XMLHandler.addTagValue(PASSWORD, password));
        xml.append(XMLHandler.addTagValue(QUERY, query));
        xml.append(XMLHandler.openTag(RETURNS));
        for (ReturnValue returnValue : returnValues) {
            xml.append(XMLHandler.openTag(RETURN));
            xml.append(XMLHandler.addTagValue(RETURN_NAME,
                    returnValue.getName()));
            xml.append(XMLHandler.addTagValue(RETURN_SPLUNK_NAME,
                    returnValue.getSplunkName()));
            xml.append(XMLHandler.addTagValue(RETURN_TYPE,
                    returnValue.getType()));
            xml.append(XMLHandler.addTagValue(RETURN_LENGTH,
                    returnValue.getLength()));
            xml.append(XMLHandler.addTagValue(RETURN_FORMAT,
                    returnValue.getFormat()));
            xml.append(XMLHandler.closeTag(RETURN));
        }
        xml.append(XMLHandler.closeTag(RETURNS));
        return xml.toString();
    }

public void loadXML(…)
// 从资源库读取和保存步骤属性设置
public void readRep(…)

@Override
    public void readRep(Repository rep, IMetaStore metaStore, ObjectId stepId,
            List<DatabaseMeta> databases) throws KettleException {
        host = rep.getStepAttributeString(stepId, HOST);
        port = rep.getStepAttributeString(stepId, PORT);
        username = rep.getStepAttributeString(stepId, USERNAME);
        password = rep.getStepAttributeString(stepId, PASSWORD);
        query = rep.getStepAttributeString(stepId, QUERY);
        returnValues = new ArrayList<>();
        int nrReturns = rep.countNrStepAttributes(stepId, RETURN_NAME);
        for (int i = 0; i < nrReturns; i++) {
            String name = rep.getStepAttributeString(stepId, i, RETURN_NAME);
            String splunkName = rep.getStepAttributeString(stepId, i,
                    RETURN_SPLUNK_NAME);
            String type = rep.getStepAttributeString(stepId, i, RETURN_TYPE);
            int length = (int) rep.getStepAttributeInteger(stepId, i,
                    RETURN_LENGTH);
            String format = rep
                    .getStepAttributeString(stepId, i, RETURN_FORMAT);
            returnValues.add(new ReturnValue(name, splunkName, type, length,
                    format));
        }
    }

public void saveRep(…)
// 提供有关步骤如何处理数据流行的字段结构的信息
public void getFields(…)

@Override
    public void getFields(RowMetaInterface rowMeta, String name,
            RowMetaInterface[] info, StepMeta nextStep, VariableSpace space,
            Repository repository, IMetaStore metaStore)
            throws KettleStepException {
        for (ReturnValue returnValue : returnValues) {
            try {
                int type = ValueMetaFactory.getIdForValueMeta(returnValue
                        .getType());
                ValueMetaInterface valueMeta = ValueMetaFactory
                        .createValueMeta(returnValue.getName(), type);
                valueMeta.setLength(returnValue.getLength());
                valueMeta.setOrigin(name);
                rowMeta.addValueMeta(valueMeta);
            } catch (KettlePluginException e) {
                throw new KettleStepException("Unknown data type '"
                        + returnValue.getType() + "' for value named '"
                        + returnValue.getName() + "'");
            }
        }
    }

// 对步骤执行扩展验证检查
public void check(…)
// 向Kettle提供步骤、数据和对话框类的实例
public StepInterface getStep(…)

@Override
    public StepInterface getStep(StepMeta stepMeta,
            StepDataInterface stepDataInterface, int i, TransMeta transMeta,
            Trans trans) {
        return new SplunkInput(stepMeta, stepDataInterface, i, transMeta, trans);
    }

public StepDataInterface getStepData()

@Override
    public StepDataInterface getStepData() {
        return new SplunkInputData();
    }

五、附加类

该插件还需要SplunkConnection类来构建splunk连接,ReturnValue类来标准化splunk输出流字段信息。

public ServiceArgs getServiceArgs() {
        ServiceArgs args = new ServiceArgs();
        args.setUsername(getRealUsername());
        args.setPassword(getRealPassword());
        args.setHost(getRealHostname());
        args.setPort(Const.toInt(getRealPort(), 8089));
        args.setSSLSecurityProtocol(SSLSecurityProtocol.TLSv1_2);//具体支持协议依赖于服务端配置
        return args;
    }
public class ReturnValue {
    @Injection(name = "RETURN_NAME", group = "RETURNS")
    private String name;
    @Injection(name = "RETURN_SPLUNK_NAME", group = "RETURNS")
    private String splunkName;
    @Injection(name = "RETURN_TYPE", group = "RETURNS")
    private String type;
    @Injection(name = "RETURN_LENGTH", group = "RETURNS")
    private int length;
    @Injection(name = "RETURN_FORMAT", group = "RETURNS")
    private String format;
    public ReturnValue(String name, String splunkName, String type, int length,
            String format) {
        this.name = name;
        this.splunkName = splunkName;
        this.type = type;
        this.length = length;
        this.format = format;
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        ReturnValue that = (ReturnValue) o;
        return Objects.equals(name, that.name);
    }
    @Override
    public int hashCode() {
        return Objects.hash(name);
    }
    @Override
    public String toString() {
        return "ReturnValue{" + "name='" + name + '\'' + '}';
    }
    /**
     * Gets name
     *
     * @return value of name
     */
    public String getName() {
        return name;
    }
    /**
     * @param name
     *            The name to set
     */
    public void setName(String name) {
        this.name = name;
    }
    /**
     * Gets splunkName
     *
     * @return value of splunkName
     */
    public String getSplunkName() {
        return splunkName;
    }
    /**
     * @param splunkName
     *            The splunkName to set
     */
    public void setSplunkName(String splunkName) {
        this.splunkName = splunkName;
    }
    /**
     * Gets type
     *
     * @return value of type
     */
    public String getType() {
        return type;
    }
    /**
     * @param type
     *            The type to set
     */
    public void setType(String type) {
        this.type = type;
    }
    /**
     * Gets length
     *
     * @return value of length
     */
    public int getLength() {
        return length;
    }
    /**
     * @param length
     *            The length to set
     */
    public void setLength(int length) {
        this.length = length;
    }
    /**
     * Gets format
     *
     * @return value of format
     */
    public String getFormat() {
        return format;
    }
    /**
     * @param format
     *            The format to set
     */
    public void setFormat(String format) {
        this.format = format;
    }
}

四、总结

通过以上对于Kettle自定义插件开发的阐述,我们基本上可以掌握端到端的开发流程,如果你需要源码或者了解更多自定义插件及集成方式,抑或有开发过程或者使用过程中的任何疑问或建议,请关注小编"游走在数据之间"

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,548评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,497评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,990评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,618评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,618评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,246评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,819评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,725评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,268评论 1 320
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,356评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,488评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,181评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,862评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,331评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,445评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,897评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,500评论 2 359

推荐阅读更多精彩内容