Kettle插件开发之Redis

一、背景介绍

1.1、Redis介绍

天下武功,唯快不破。Redis作为NoSQL数据库的性能王者,天生拥有高可用、高性能、高扩展等基因。那么进一步来看看Redis具备的特性:

  • 使用ANSI C语言编写,开源并遵守BSD协议。支持网络、可基于内存亦可持久化的日志型、Key-Value键值对存储数据库,并支持多种语言的API接口
  • 支持单点模式、主从模式、sentinels哨兵模式,cluster集群模式部署,可保证redis高可用
  • 基于内存运行,天生具备高性能
  • 支持分布式,理论上可以无限扩展

那么Redis对比其他类型的数据库,Redis又具备以下特点:

  • C/S通讯模型
  • 单进程单线程模型
  • 丰富的数据类型
  • 操作原子性
  • 持久化
  • 高并发读写
  • 支持lua脚本

1.2、Redis数据类型

最新版本的Redis提供的数据类型主要分为8种自有类型,包括:String类型、列表类型、集合类型、顺序集合类型、哈希类型、位数组、概率数据结构和流。

image

1.3、Redis的数据结构

主要介绍五种常用的数据类型的数据结构,具体如下图:

image

关于上表中的部分释义:

  • 双端链表与单链表十分相似,不同的是第一个链接点与最后一个链接点直接相连。双端链表不是双向链表
image.png
  • 压缩列表是列表键和哈希键的底层实现之一。当一个列表键只包含少量列表项,并且每个列表项要么就是小整数,要么就是长度比较短的字符串,Redis会使用压缩列表来做列表键的底层实现;
  • 整数集合是集合键的底层实现之。当一个集合只包含整数值元素,并且这个集合的元素数量不多时,Redis会使用整数集合作为集合键的底层实现。
  • 跳跃表是一种链表+多级索引的有序数据结构,它通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的。
image

1.4、应用场景

  • 分布式锁(共享Session,唯一ID生成器,秒杀系统)
  • 缓存(“热点”数据:高频读、低频写)
  • 限流器、计数器
  • 消息队列
  • 排行榜、社交网络和实时系统

二、代码分析

初步介绍了Redis的基本特征、数据类型和数据结构,下面我们切入正题:如何基于Kettle平台构建Redis读写操作插件。

2.1、开发环境

  • jdk1.7/1.8
  • kettle 6.1.0.1/7.0.0.0
  • redis5.x
  • jedis2.9.0

2.2、代码架构

image.png

2.3、实现功能

通过本组合插件的构建,主要实现了Redis数据以下三个操作插件:

  • 数据的写入(RedisOutput)
  • 数据的读取(RedisInput)
  • 数据的删除(RedisDelete)

2.4、插件代码

本插件所有操作的服务端Redis集群,是以sentinel哨兵模式搭建的高可用集群。

2.4.1、写入插件

image.png

步骤类核心代码1:初始化Redis连接配置

//pool设置为static变量,多组件共享,会存在某一个组件使用完连接池,导致其他组件pool不可用
    private  JedisSentinelPool pool = null;
    ExecutorService service = Executors.newFixedThreadPool(12);
    @Override
    public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
        long start = System.currentTimeMillis();
        if (super.init(smi, sdi)) {
            try {
                // Create client and connect to redis server(s)
                Set<Map<String, String>> jedisClusterNodes = ((RedisOutputMeta) smi).getServers();
                // 建立连接池配置参数
                JedisPoolConfig config = new JedisPoolConfig();
                // 设置最大连接数
                config.setMaxTotal(10000);
                // 设置最大阻塞时间,记住是毫秒数milliseconds
                config.setMaxWaitMillis(10000);
                // 设置最大空闲连接数
                config.setMaxIdle(300);
                // jedis实例是否可用
                config.setTestOnBorrow(true);
                // return 一个jedis实例给pool时,是否检查连接可用性(ping())
                // config.setTestOnReturn(true);
                // 设置jedis实例空闲检查连接可用性
                config.setTestWhileIdle(true);
                // 创建连接池
                // 获取redis密码
                String password = null;
                int timeout = 3000;
                String masterName = ((RedisOutputMeta) smi).getMasterName();
                Set<String> sentinels = new HashSet<String>();
                Iterator<Map<String, String>> it = jedisClusterNodes.iterator();
                while (it.hasNext()) {
                    Map<String, String> hostAndPort = it.next();
                    password = hostAndPort.get("auth");
                    sentinels.add(hostAndPort.get("hostname") + ":" + hostAndPort.get("port"));
                }
                pool = new JedisSentinelPool(masterName, sentinels, config, timeout, password);
                long end = System.currentTimeMillis();
                logBasic("建立连接池 毫秒:" + (end - start));
                return true;
            } catch (Exception e) {
                logError(BaseMessages.getString(PKG, "RedisInput.Error.ConnectError"), e);
                return false;
            }
        } else {
            return false;
        }
    }

步骤类核心代码2:redis数据执行写入操作

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
        meta = (RedisOutputMeta) smi;
        data = (RedisOutputData) sdi;
        // TODO Auto-generated method stub
        Jedis jedis = pool.getResource();
        Object[] r = getRow(); // get row, set busy!
        // If no more input to be expected, stop
        if (r == null) {
            setOutputDone();
            return false;
        }
        if (first) {
            first = false;
            // clone input row meta for now, we will change it (add or set inline) later
            data.outputRowMeta = getInputRowMeta().clone();
            // Get output field types
            meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
            keyfieldname = meta.getKeyFieldName();
            valuefieldname = meta.getValueFieldName();
            ttlfieldname = meta.getTtlFieldName();
            logBasic("keyfieldname:" + keyfieldname);
            logBasic("valuefieldname:" + valuefieldname);
            logBasic("ttlfieldname:" + ttlfieldname);
        }
        RedisOutputThread thread = new RedisOutputThread(this, jedis, r);
        service.submit(thread);
        if (checkFeedback(getLinesRead())) {
            if (log.isBasic()) {
                logBasic(BaseMessages.getString(PKG, "RedisOutput.Log.LineNumber") + getLinesRead());
            }
        }
        return true;
    }

元数据类核心代码1:用户关键配置信息检查

@Override
    public void check(List<CheckResultInterface> remarks, TransMeta transMeta,
            StepMeta stepMeta, RowMetaInterface prev, String[] input,
            String[] output, RowMetaInterface info) {
        CheckResult cr;
        if (prev == null || prev.size() == 0) {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING,
                    BaseMessages.getString(PKG,
                            "RedisOutputMeta.CheckResult.NotReceivingFields"),
                    stepMeta);
            remarks.add(cr);
        } else {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
                    BaseMessages.getString(PKG,
                            "RedisOutputMeta.CheckResult.StepRecevingData",
                            prev.size() + ""), stepMeta);
            remarks.add(cr);
        }
        // See if we have input streams leading to this step!
        if (input.length > 0) {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
                    BaseMessages.getString(PKG,
                            "RedisOutputMeta.CheckResult.StepRecevingData2"),
                    stepMeta);
            remarks.add(cr);
        } else {
            cr = new CheckResult(
                    CheckResultInterface.TYPE_RESULT_ERROR,
                    BaseMessages
                            .getString(PKG,
                                    "RedisOutputMeta.CheckResult.NoInputReceivedFromOtherSteps"),
                    stepMeta);
            remarks.add(cr);
        }
    }

元数据类核心代码2:对应redis数据写入操作,stepnode关键配置信息读取

private void readData(Node stepnode) throws KettleXMLException {
        try {
            this.keyFieldName = XMLHandler.getTagValue(stepnode, "keyfield");
            this.valueFieldName = XMLHandler
                    .getTagValue(stepnode, "valuefield");
            this.ttlFieldName = XMLHandler.getTagValue(stepnode, "ttlfield");
            this.masterName = XMLHandler.getTagValue(stepnode, "mastername");
            Node serverNodes = XMLHandler.getSubNode(stepnode, "servers");
            int nrservers = XMLHandler.countNodes(serverNodes, "server");
            allocate(nrservers);
            for (int i = 0; i < nrservers; i++) {
                Node fnode = XMLHandler
                        .getSubNodeByNr(serverNodes, "server", i);
                Map<String, String> hostAndPort = new HashMap<String, String>();
                hostAndPort.put("hostname",
                        XMLHandler.getTagValue(fnode, "hostname"));
                hostAndPort.put("port", XMLHandler.getTagValue(fnode, "port"));
                hostAndPort.put("auth", XMLHandler.getTagValue(fnode, "auth"));
                if (i == 0) {
                    setJedisServer(hostAndPort);
                }
                servers.add(hostAndPort);
            }
        } catch (Exception e) {
            throw new KettleXMLException(BaseMessages.getString(PKG,
                    "RedisOutputMeta.Exception.UnableToReadStepInfo"), e);
        }
    }

多线程处理类RedisOutputThread

@Override
    public void run() {
        long start = System.currentTimeMillis();
        // Get value from redis, don't cast now, be lazy. TODO change this?
        int keyFieldIndex = redisOutput.getInputRowMeta().indexOfValue(
                redisOutput.meta.getKeyFieldName());
        Object key = r[keyFieldIndex];
        int valueFieldIndex = redisOutput.getInputRowMeta().indexOfValue(
                redisOutput.meta.getValueFieldName());
        Object value = r[valueFieldIndex];
        int ttlFieldIndex = redisOutput.getInputRowMeta().indexOfValue(
                redisOutput.meta.getTtlFieldName());
        Object ttl = r[ttlFieldIndex];
        String keyString = key.toString();
        String valueString = value.toString();
        String ttlString = ttl.toString();
        if (keyString != null && !StringUtil.isEmpty(keyString)) {
            String getKeyValue = jedis.get(keyString);
            boolean existsKey = jedis.exists(keyString);
            // 如果key已存在,就只更新value数据;如果key不存在,更新数据同时设置key过期时间(单位:秒)
            if (existsKey) {
                jedis.set(keyString, valueString);
                redisOutput
                        .logBasic(" This key already exists, so only the corresponding value= "
                                + getKeyValue + " is updated.");
            } else {
                if (ttlString != null && !StringUtil.isEmpty(ttlString)) {
                    jedis.set(keyString, valueString, "NX", "EX",
                            Integer.parseInt(ttlString));
                } else {
                    jedis.set(keyString, valueString, "NX", "EX",
                            2 * 24 * 60 * 60);
                }
            }
        }
        try {
            redisOutput.putRow(redisOutput.data.outputRowMeta, r);
            // redisOutput.rowkey.add(r[idFieldIndex]);
        } catch (KettleStepException e) {
            e.printStackTrace();
        }
        jedis.close();
        long end = System.currentTimeMillis();
        redisOutput.logBasic("Redis_Key:" + keyString + " ,Redis_Value:"
                + valueString + " ,Redis_TTL:" + ttlString + " ,processRow "
                + (end - start) + "  milliseconds .");
    }

2.4.2、读取插件

image.png

步骤类核心代码1:初始化Redis连接配置(同上)

步骤类核心代码2:根据用户配置数据类型,动态选择对应Redis的API读取Redis数据库对应key的数据

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
        meta = (RedisInputMeta) smi;
        data = (RedisInputData) sdi;
        Object[] r = getRow(); // get row, set busy!
        // If no more input to be expected, stop
        if (r == null) {
            setOutputDone();
            return false;
        }
        if (first) {
            first = false;
            // clone input row meta for now, we will change it (add or set inline) later
            data.outputRowMeta = getInputRowMeta().clone();
            // Get output field types
            meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
            keytype = meta.getKeyTypeFieldName();
            logBasic("keytype:" + keytype);
            valuetype = meta.getValueTypeName();
            logBasic("valuetype:" + valuetype);
            mastername = meta.getMasterName();
            logBasic("mastername:" + mastername);
        }
        // Get value from redis, don't cast now, be lazy. TODO change this?
        int keyFieldIndex = getInputRowMeta().indexOfValue(meta.getKeyFieldName());
        if (keyFieldIndex < 0) {
            throw new KettleException(BaseMessages.getString(PKG, "RedisInputMeta.Exception.KeyFieldNameNotFound"));
        }
        int key2Index = -1;
        if (keytype.equals("hash")) {
            key2Index = getInputRowMeta().indexOfValue(meta.getKey2FieldName());
            if (key2Index < 0) {
                throw new KettleException(
                        BaseMessages.getString(PKG, "RedisOutputMeta.Exception.Key2FieldNameNotFound"));
            }
        }
        StringBuffer fetchedValue = new StringBuffer("");
        
        Jedis jedis = pool.getResource();
        try {
        if (keytype.equals("string")) {
            String value =jedis.get((String) (r[keyFieldIndex]));
            fetchedValue.append(value).append("|");
        } else if (keytype.equals("hash")) {
            String res = jedis.hget((String) r[keyFieldIndex], (String) (r[key2Index]));
            fetchedValue.append(res + "|");
        } else if (keytype.equals("hashall")) {
            Map<String, String> map = jedis.hgetAll((String) r[keyFieldIndex]);
            for (Map.Entry<String, String> entry : map.entrySet()) {
                fetchedValue.append(entry.getKey() + ":" + entry.getValue() + "|");
            }
        } else if (keytype.equals("list")) {
            List<String> list = jedis.lrange((String) r[keyFieldIndex], 0, -1);
            for (String s : list) {
                fetchedValue.append(s).append("|");
            }
        } else if (keytype.equals("set")) {
            Set<String> set = jedis.smembers((String) r[keyFieldIndex]);
            for (String s : set) {
                fetchedValue.append(s).append("|");
            }
        } else if (keytype.equals("zset")) {
            Set<String> set = jedis.zrangeByScore((String) r[keyFieldIndex], 0, -1);
            for (String s : set) {
                fetchedValue.append(s).append("|");
            }
        } else if (keytype.equals("keys")) {
            Set<String> set = jedis.keys((String) r[keyFieldIndex]);
            for (String s : set) {
                fetchedValue.append(s).append("|");
            }
        }
        } finally {
            jedis.close();
        }
        
        String output;
        if (fetchedValue.length() > 1)
            output = fetchedValue.substring(0, fetchedValue.length() - 1);
        else
            output = fetchedValue.toString();
        // Add Value data name to output, or set value data if already exists
        //logBasic("output:" + output);
        Object[] outputRowData = r;
        int valueFieldIndex = getInputRowMeta().indexOfValue(meta.getValueFieldName());
        if (valueFieldIndex < 0 || valueFieldIndex > outputRowData.length) {
            // Not found so add it
            outputRowData = RowDataUtil.addValueData(r, getInputRowMeta().size(), output);
        } else {
            // Update value in place
            outputRowData[valueFieldIndex] = output;
        }
        putRow(data.outputRowMeta, outputRowData); // copy row to possible alternate rowset(s).
        if (checkFeedback(getLinesRead())) {
            if (log.isBasic()) {
                logBasic(BaseMessages.getString(PKG, "RedisInput.Log.LineNumber") + getLinesRead());
            }
        }
        return true;
    }

元数据类核心代码1:用户关键配置信息检查

@Override
    public void check(List<CheckResultInterface> remarks,
                      TransMeta transMeta, StepMeta stepMeta,
                      RowMetaInterface prev,
                      String[] input, String[] output,
                      RowMetaInterface info) {
        CheckResult cr;
        if (prev == null || prev.size() == 0) {
            cr =
                    new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING, BaseMessages.getString(PKG,
                            "RedisInputMeta.CheckResult.NotReceivingFields"), stepMeta);
            remarks.add(cr);
        } else {
            cr =
                    new CheckResult(CheckResultInterface.TYPE_RESULT_OK, BaseMessages.getString(PKG,
                            "RedisInputMeta.CheckResult.StepRecevingData", prev.size() + ""), stepMeta);
            remarks.add(cr);
        }
        // See if we have input streams leading to this step!
        if (input.length > 0) {
            cr =
                    new CheckResult(CheckResultInterface.TYPE_RESULT_OK, BaseMessages.getString(PKG,
                            "RedisInputMeta.CheckResult.StepRecevingData2"), stepMeta);
            remarks.add(cr);
        } else {
            cr =
                    new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, BaseMessages.getString(PKG,
                            "RedisInputMeta.CheckResult.NoInputReceivedFromOtherSteps"), stepMeta);
            remarks.add(cr);
        }
    }

元数据类核心代码2:对应redis数据读取操作,stepnode关键配置信息读取

private void readData(Node stepnode) throws KettleXMLException {
        try {
            this.keyFieldName = XMLHandler.getTagValue(stepnode, "keyfield");
            this.keyTypeFieldName = XMLHandler.getTagValue(stepnode, "keytypefield");
            this.key2FieldName = XMLHandler.getTagValue(stepnode, "key2field");
            this.valueFieldName = XMLHandler.getTagValue(stepnode, "valuefield");
            this.valueTypeName = XMLHandler.getTagValue(stepnode, "valuetype");
            this.masterName = XMLHandler.getTagValue(stepnode, "mastername");
            Node serverNodes = XMLHandler.getSubNode(stepnode, "servers");
            int nrservers = XMLHandler.countNodes(serverNodes, "server");
            allocate(nrservers);
            for (int i = 0; i < nrservers; i++) {
                Node fnode = XMLHandler.getSubNodeByNr(serverNodes, "server", i);
                Map<String,String> hostAndPort = new HashMap<String,String>();
                hostAndPort.put("hostname", XMLHandler.getTagValue(fnode, "hostname"));
                hostAndPort.put("port", XMLHandler.getTagValue(fnode, "port"));
                hostAndPort.put("auth", XMLHandler.getTagValue(fnode, "auth"));
                if (i == 0) {
                    setJedisServer(hostAndPort);
                }
                servers.add(hostAndPort);
            }
        } catch (Exception e) {
            throw new KettleXMLException(BaseMessages.getString(PKG, "RedisInputMeta.Exception.UnableToReadStepInfo"),
                    e);
        }
    }

2.4.3、删除插件

image.png

  • RedisDelete:步骤类,初始化Redis连接配置,处理数据流数据,执行删除操作
  • RedisDeleteData:数据类,定义kettle删除插件的RowMetaInterface
  • RedisDeleteDialog:对话框类,定义kettle删除插件redis find del的UI Swing的对话框
  • RedisDeleteMeta:元数据类,根据配置执行redis操作,并定义kettle删除插件显示类

步骤类核心代码1:初始化Redis连接配置(同上)

步骤类核心代码2:redis数据执行删除操作

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
        meta = (RedisDeleteMeta) smi;
        data = (RedisDeleteData) sdi;
        Object[] r = getRow(); // get row, set busy!
        // If no more input to be expected, stop
        if (r == null) {
            setOutputDone();
            return false;
        }
        if (first) {
            first = false;
            // clone input row meta for now, we will change it (add or set inline) later
            data.outputRowMeta = getInputRowMeta().clone();
            // Get output field types
            meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
            keyfieldname = meta.getKeyFieldName();
            logBasic("keyfieldname:" + keyfieldname);
            valuefieldname = meta.getValueFieldName();
            logBasic("valueFieldName:" + valuefieldname);
            valuetype = meta.getValueTypeName();
            logBasic("valuetype:" + valuetype);
            mastername = meta.getMasterName();
            logBasic("mastername:" + mastername);
        }
        // Get value from redis, don't cast now, be lazy. TODO change this?
        int keyFieldIndex = getInputRowMeta().indexOfValue(meta.getKeyFieldName());
        if (keyFieldIndex < 0) {
            throw new KettleException(BaseMessages.getString(PKG, "RedisDeletetMeta.Exception.KeyFieldNameNotFound"));
        }
        Jedis jedis = pool.getResource();
        String outPutValue = "";
        String[] delKeys=r[keyFieldIndex].toString().split(",");
        try {
            Long delete_flag = jedis.del(delKeys);
            outPutValue = String.valueOf(delete_flag);
        } finally {
            jedis.close();
        }
        // logBasic("output:" + output);
        Object[] outputRowData = r;
        int valueFieldIndex = getInputRowMeta().indexOfValue(meta.getValueFieldName());
        if (valueFieldIndex < 0 || valueFieldIndex > outputRowData.length) {
            // Not found so add it
            outputRowData = RowDataUtil.addValueData(r, getInputRowMeta().size(), outPutValue);
        } else {
            // Update value in place
            outputRowData[valueFieldIndex] = outPutValue;
        }
        putRow(data.outputRowMeta, outputRowData); // copy row to possible alternate rowset(s).
        if (checkFeedback(getLinesRead())) {
            if (log.isBasic()) {
                logBasic(BaseMessages.getString(PKG, "RedisDelete.Log.LineNumber") + getLinesRead());
            }
        }
        return true;
    }

元数据类核心代码1:用户关键配置信息检查

@Override
    public void check(List<CheckResultInterface> remarks, TransMeta transMeta, StepMeta stepMeta, RowMetaInterface prev,
            String[] input, String[] output, RowMetaInterface info) {
        CheckResult cr;
        if (prev == null || prev.size() == 0) {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING,
                    BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.NotReceivingFields"), stepMeta);
            remarks.add(cr);
        } else {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
                    BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.StepRecevingData", prev.size() + ""),
                    stepMeta);
            remarks.add(cr);
        }
        // See if we have input streams leading to this step!
        if (input.length > 0) {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
                    BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.StepRecevingData2"), stepMeta);
            remarks.add(cr);
        } else {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR,
                    BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.NoInputReceivedFromOtherSteps"), stepMeta);
            remarks.add(cr);
        }
    }

元数据类核心代码2:对应redis数据删除操作,stepnode关键配置信息读取

private void readData(Node stepnode) throws KettleXMLException {
        try {
            this.keyFieldName = XMLHandler.getTagValue(stepnode, "keyfieldname");
            this.valueFieldName = XMLHandler.getTagValue(stepnode, "valuefieldname");
            this.valueTypeName = XMLHandler.getTagValue(stepnode, "valuetype");
            this.masterName = XMLHandler.getTagValue(stepnode, "mastername");
            Node serverNodes = XMLHandler.getSubNode(stepnode, "servers");
            int nrservers = XMLHandler.countNodes(serverNodes, "server");
            allocate(nrservers);
            for (int i = 0; i < nrservers; i++) {
                Node fnode = XMLHandler.getSubNodeByNr(serverNodes, "server", i);
                Map<String, String> hostAndPort = new HashMap<String, String>();
                hostAndPort.put("hostname", XMLHandler.getTagValue(fnode, "hostname"));
                hostAndPort.put("port", XMLHandler.getTagValue(fnode, "port"));
                hostAndPort.put("auth", XMLHandler.getTagValue(fnode, "auth"));
                if (i == 0) {
                    setJedisServer(hostAndPort);
                }
                servers.add(hostAndPort);
            }
        } catch (Exception e) {
            throw new KettleXMLException(BaseMessages.getString(PKG, "RedisDeleteMeta.Exception.UnableToReadStepInfo"),
                    e);
        }
    }

三、使用说明

3.1、Redis Output插件

①Key:动态key字段,从上一步骤数据流动态获取(必选) 
②Value:动态value字段,从上一步骤数据流动态获取(必选) 
③TTL:key超时时间,默认值172800秒,从上一步骤数据流动态获取,可对单行数据做控制(必选) 
④mastername:Redis sentinels哨兵模式集群master名称(必选) 
⑤server ip:服务端ip列表(必选) 
⑥server port:哨兵端口列表,和服务端IP一一对应(必选) 
⑦认证密钥:服务端IP对应鉴权密码(可选)
image.png

3.2、Redis Input插件

①Key Field:动态key字段,从上一步骤数据流动态获取(必选) 
②Key type:key数据类型,从集合列表选择(必选) 
③Hash值:动态hash值字段,从上一步骤数据流动态获取(可选)
④Value Field:动态value字段,从上一步骤数据流动态获取(必选) 
⑤Value type:value数据类型,从集合列表选择(必选) 
⑥mastername:Redis sentinels哨兵模式集群master名称(必选) 
⑦hostname:服务端ip列表(必选) 
⑧host port:哨兵端口列表,和服务端IP一一对应(必选) 
⑨auth:服务端IP对应鉴权密码(可选)
image.png

3.3、Redis Delete插件

①Key:动态key字段,从上一步骤数据流动态获取(必选) 
②输出字段名:delete操作,返回自定义输出字段名称(必选) 
③Value type:value数据类型,从集合列表选择(必选) 
④mastername:Redis sentinels哨兵模式集群master名称(必选) 
⑤server ip:服务端ip列表(必选) 
⑥server port:哨兵端口列表,和服务端IP一一对应(必选) 
⑦认证密钥:服务端IP对应鉴权密码(可选)
image.png

四、总结

如果你需要源码或者了解更多自定义插件及集成方式,抑或有开发过程或者使用过程中的任何疑问或建议,请关注小编"游走在数据之间",回复2查看源代码,回复3获取入门视频。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,548评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,497评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,990评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,618评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,618评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,246评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,819评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,725评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,268评论 1 320
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,356评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,488评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,181评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,862评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,331评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,445评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,897评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,500评论 2 359

推荐阅读更多精彩内容