一、背景介绍
1.1、Redis介绍
天下武功,唯快不破。Redis作为NoSQL数据库的性能王者,天生拥有高可用、高性能、高扩展等基因。那么进一步来看看Redis具备的特性:
- 使用ANSI C语言编写,开源并遵守BSD协议。支持网络、可基于内存亦可持久化的日志型、Key-Value键值对存储数据库,并支持多种语言的API接口
- 支持单点模式、主从模式、sentinels哨兵模式,cluster集群模式部署,可保证redis高可用
- 基于内存运行,天生具备高性能
- 支持分布式,理论上可以无限扩展
那么Redis对比其他类型的数据库,Redis又具备以下特点:
- C/S通讯模型
- 单进程单线程模型
- 丰富的数据类型
- 操作原子性
- 持久化
- 高并发读写
- 支持lua脚本
1.2、Redis数据类型
最新版本的Redis提供的数据类型主要分为8种自有类型,包括:String类型、列表类型、集合类型、顺序集合类型、哈希类型、位数组、概率数据结构和流。
1.3、Redis的数据结构
主要介绍五种常用的数据类型的数据结构,具体如下图:
关于上表中的部分释义:
- 双端链表与单链表十分相似,不同的是第一个链接点与最后一个链接点直接相连。双端链表不是双向链表
- 压缩列表是列表键和哈希键的底层实现之一。当一个列表键只包含少量列表项,并且每个列表项要么就是小整数,要么就是长度比较短的字符串,Redis会使用压缩列表来做列表键的底层实现;
- 整数集合是集合键的底层实现之。当一个集合只包含整数值元素,并且这个集合的元素数量不多时,Redis会使用整数集合作为集合键的底层实现。
- 跳跃表是一种链表+多级索引的有序数据结构,它通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的。
1.4、应用场景
- 分布式锁(共享Session,唯一ID生成器,秒杀系统)
- 缓存(“热点”数据:高频读、低频写)
- 限流器、计数器
- 消息队列
- 排行榜、社交网络和实时系统
二、代码分析
初步介绍了Redis的基本特征、数据类型和数据结构,下面我们切入正题:如何基于Kettle平台构建Redis读写操作插件。
2.1、开发环境
- jdk1.7/1.8
- kettle 6.1.0.1/7.0.0.0
- redis5.x
- jedis2.9.0
2.2、代码架构
2.3、实现功能
通过本组合插件的构建,主要实现了Redis数据以下三个操作插件:
- 数据的写入(RedisOutput)
- 数据的读取(RedisInput)
- 数据的删除(RedisDelete)
2.4、插件代码
本插件所有操作的服务端Redis集群,是以sentinel哨兵模式搭建的高可用集群。
2.4.1、写入插件
步骤类核心代码1:初始化Redis连接配置
//pool设置为static变量,多组件共享,会存在某一个组件使用完连接池,导致其他组件pool不可用
private JedisSentinelPool pool = null;
ExecutorService service = Executors.newFixedThreadPool(12);
@Override
public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
long start = System.currentTimeMillis();
if (super.init(smi, sdi)) {
try {
// Create client and connect to redis server(s)
Set<Map<String, String>> jedisClusterNodes = ((RedisOutputMeta) smi).getServers();
// 建立连接池配置参数
JedisPoolConfig config = new JedisPoolConfig();
// 设置最大连接数
config.setMaxTotal(10000);
// 设置最大阻塞时间,记住是毫秒数milliseconds
config.setMaxWaitMillis(10000);
// 设置最大空闲连接数
config.setMaxIdle(300);
// jedis实例是否可用
config.setTestOnBorrow(true);
// return 一个jedis实例给pool时,是否检查连接可用性(ping())
// config.setTestOnReturn(true);
// 设置jedis实例空闲检查连接可用性
config.setTestWhileIdle(true);
// 创建连接池
// 获取redis密码
String password = null;
int timeout = 3000;
String masterName = ((RedisOutputMeta) smi).getMasterName();
Set<String> sentinels = new HashSet<String>();
Iterator<Map<String, String>> it = jedisClusterNodes.iterator();
while (it.hasNext()) {
Map<String, String> hostAndPort = it.next();
password = hostAndPort.get("auth");
sentinels.add(hostAndPort.get("hostname") + ":" + hostAndPort.get("port"));
}
pool = new JedisSentinelPool(masterName, sentinels, config, timeout, password);
long end = System.currentTimeMillis();
logBasic("建立连接池 毫秒:" + (end - start));
return true;
} catch (Exception e) {
logError(BaseMessages.getString(PKG, "RedisInput.Error.ConnectError"), e);
return false;
}
} else {
return false;
}
}
步骤类核心代码2:redis数据执行写入操作
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
meta = (RedisOutputMeta) smi;
data = (RedisOutputData) sdi;
// TODO Auto-generated method stub
Jedis jedis = pool.getResource();
Object[] r = getRow(); // get row, set busy!
// If no more input to be expected, stop
if (r == null) {
setOutputDone();
return false;
}
if (first) {
first = false;
// clone input row meta for now, we will change it (add or set inline) later
data.outputRowMeta = getInputRowMeta().clone();
// Get output field types
meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
keyfieldname = meta.getKeyFieldName();
valuefieldname = meta.getValueFieldName();
ttlfieldname = meta.getTtlFieldName();
logBasic("keyfieldname:" + keyfieldname);
logBasic("valuefieldname:" + valuefieldname);
logBasic("ttlfieldname:" + ttlfieldname);
}
RedisOutputThread thread = new RedisOutputThread(this, jedis, r);
service.submit(thread);
if (checkFeedback(getLinesRead())) {
if (log.isBasic()) {
logBasic(BaseMessages.getString(PKG, "RedisOutput.Log.LineNumber") + getLinesRead());
}
}
return true;
}
元数据类核心代码1:用户关键配置信息检查
@Override
public void check(List<CheckResultInterface> remarks, TransMeta transMeta,
StepMeta stepMeta, RowMetaInterface prev, String[] input,
String[] output, RowMetaInterface info) {
CheckResult cr;
if (prev == null || prev.size() == 0) {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING,
BaseMessages.getString(PKG,
"RedisOutputMeta.CheckResult.NotReceivingFields"),
stepMeta);
remarks.add(cr);
} else {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
BaseMessages.getString(PKG,
"RedisOutputMeta.CheckResult.StepRecevingData",
prev.size() + ""), stepMeta);
remarks.add(cr);
}
// See if we have input streams leading to this step!
if (input.length > 0) {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
BaseMessages.getString(PKG,
"RedisOutputMeta.CheckResult.StepRecevingData2"),
stepMeta);
remarks.add(cr);
} else {
cr = new CheckResult(
CheckResultInterface.TYPE_RESULT_ERROR,
BaseMessages
.getString(PKG,
"RedisOutputMeta.CheckResult.NoInputReceivedFromOtherSteps"),
stepMeta);
remarks.add(cr);
}
}
元数据类核心代码2:对应redis数据写入操作,stepnode关键配置信息读取
private void readData(Node stepnode) throws KettleXMLException {
try {
this.keyFieldName = XMLHandler.getTagValue(stepnode, "keyfield");
this.valueFieldName = XMLHandler
.getTagValue(stepnode, "valuefield");
this.ttlFieldName = XMLHandler.getTagValue(stepnode, "ttlfield");
this.masterName = XMLHandler.getTagValue(stepnode, "mastername");
Node serverNodes = XMLHandler.getSubNode(stepnode, "servers");
int nrservers = XMLHandler.countNodes(serverNodes, "server");
allocate(nrservers);
for (int i = 0; i < nrservers; i++) {
Node fnode = XMLHandler
.getSubNodeByNr(serverNodes, "server", i);
Map<String, String> hostAndPort = new HashMap<String, String>();
hostAndPort.put("hostname",
XMLHandler.getTagValue(fnode, "hostname"));
hostAndPort.put("port", XMLHandler.getTagValue(fnode, "port"));
hostAndPort.put("auth", XMLHandler.getTagValue(fnode, "auth"));
if (i == 0) {
setJedisServer(hostAndPort);
}
servers.add(hostAndPort);
}
} catch (Exception e) {
throw new KettleXMLException(BaseMessages.getString(PKG,
"RedisOutputMeta.Exception.UnableToReadStepInfo"), e);
}
}
多线程处理类RedisOutputThread
@Override
public void run() {
long start = System.currentTimeMillis();
// Get value from redis, don't cast now, be lazy. TODO change this?
int keyFieldIndex = redisOutput.getInputRowMeta().indexOfValue(
redisOutput.meta.getKeyFieldName());
Object key = r[keyFieldIndex];
int valueFieldIndex = redisOutput.getInputRowMeta().indexOfValue(
redisOutput.meta.getValueFieldName());
Object value = r[valueFieldIndex];
int ttlFieldIndex = redisOutput.getInputRowMeta().indexOfValue(
redisOutput.meta.getTtlFieldName());
Object ttl = r[ttlFieldIndex];
String keyString = key.toString();
String valueString = value.toString();
String ttlString = ttl.toString();
if (keyString != null && !StringUtil.isEmpty(keyString)) {
String getKeyValue = jedis.get(keyString);
boolean existsKey = jedis.exists(keyString);
// 如果key已存在,就只更新value数据;如果key不存在,更新数据同时设置key过期时间(单位:秒)
if (existsKey) {
jedis.set(keyString, valueString);
redisOutput
.logBasic(" This key already exists, so only the corresponding value= "
+ getKeyValue + " is updated.");
} else {
if (ttlString != null && !StringUtil.isEmpty(ttlString)) {
jedis.set(keyString, valueString, "NX", "EX",
Integer.parseInt(ttlString));
} else {
jedis.set(keyString, valueString, "NX", "EX",
2 * 24 * 60 * 60);
}
}
}
try {
redisOutput.putRow(redisOutput.data.outputRowMeta, r);
// redisOutput.rowkey.add(r[idFieldIndex]);
} catch (KettleStepException e) {
e.printStackTrace();
}
jedis.close();
long end = System.currentTimeMillis();
redisOutput.logBasic("Redis_Key:" + keyString + " ,Redis_Value:"
+ valueString + " ,Redis_TTL:" + ttlString + " ,processRow "
+ (end - start) + " milliseconds .");
}
2.4.2、读取插件
步骤类核心代码1:初始化Redis连接配置(同上)
步骤类核心代码2:根据用户配置数据类型,动态选择对应Redis的API读取Redis数据库对应key的数据
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
meta = (RedisInputMeta) smi;
data = (RedisInputData) sdi;
Object[] r = getRow(); // get row, set busy!
// If no more input to be expected, stop
if (r == null) {
setOutputDone();
return false;
}
if (first) {
first = false;
// clone input row meta for now, we will change it (add or set inline) later
data.outputRowMeta = getInputRowMeta().clone();
// Get output field types
meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
keytype = meta.getKeyTypeFieldName();
logBasic("keytype:" + keytype);
valuetype = meta.getValueTypeName();
logBasic("valuetype:" + valuetype);
mastername = meta.getMasterName();
logBasic("mastername:" + mastername);
}
// Get value from redis, don't cast now, be lazy. TODO change this?
int keyFieldIndex = getInputRowMeta().indexOfValue(meta.getKeyFieldName());
if (keyFieldIndex < 0) {
throw new KettleException(BaseMessages.getString(PKG, "RedisInputMeta.Exception.KeyFieldNameNotFound"));
}
int key2Index = -1;
if (keytype.equals("hash")) {
key2Index = getInputRowMeta().indexOfValue(meta.getKey2FieldName());
if (key2Index < 0) {
throw new KettleException(
BaseMessages.getString(PKG, "RedisOutputMeta.Exception.Key2FieldNameNotFound"));
}
}
StringBuffer fetchedValue = new StringBuffer("");
Jedis jedis = pool.getResource();
try {
if (keytype.equals("string")) {
String value =jedis.get((String) (r[keyFieldIndex]));
fetchedValue.append(value).append("|");
} else if (keytype.equals("hash")) {
String res = jedis.hget((String) r[keyFieldIndex], (String) (r[key2Index]));
fetchedValue.append(res + "|");
} else if (keytype.equals("hashall")) {
Map<String, String> map = jedis.hgetAll((String) r[keyFieldIndex]);
for (Map.Entry<String, String> entry : map.entrySet()) {
fetchedValue.append(entry.getKey() + ":" + entry.getValue() + "|");
}
} else if (keytype.equals("list")) {
List<String> list = jedis.lrange((String) r[keyFieldIndex], 0, -1);
for (String s : list) {
fetchedValue.append(s).append("|");
}
} else if (keytype.equals("set")) {
Set<String> set = jedis.smembers((String) r[keyFieldIndex]);
for (String s : set) {
fetchedValue.append(s).append("|");
}
} else if (keytype.equals("zset")) {
Set<String> set = jedis.zrangeByScore((String) r[keyFieldIndex], 0, -1);
for (String s : set) {
fetchedValue.append(s).append("|");
}
} else if (keytype.equals("keys")) {
Set<String> set = jedis.keys((String) r[keyFieldIndex]);
for (String s : set) {
fetchedValue.append(s).append("|");
}
}
} finally {
jedis.close();
}
String output;
if (fetchedValue.length() > 1)
output = fetchedValue.substring(0, fetchedValue.length() - 1);
else
output = fetchedValue.toString();
// Add Value data name to output, or set value data if already exists
//logBasic("output:" + output);
Object[] outputRowData = r;
int valueFieldIndex = getInputRowMeta().indexOfValue(meta.getValueFieldName());
if (valueFieldIndex < 0 || valueFieldIndex > outputRowData.length) {
// Not found so add it
outputRowData = RowDataUtil.addValueData(r, getInputRowMeta().size(), output);
} else {
// Update value in place
outputRowData[valueFieldIndex] = output;
}
putRow(data.outputRowMeta, outputRowData); // copy row to possible alternate rowset(s).
if (checkFeedback(getLinesRead())) {
if (log.isBasic()) {
logBasic(BaseMessages.getString(PKG, "RedisInput.Log.LineNumber") + getLinesRead());
}
}
return true;
}
元数据类核心代码1:用户关键配置信息检查
@Override
public void check(List<CheckResultInterface> remarks,
TransMeta transMeta, StepMeta stepMeta,
RowMetaInterface prev,
String[] input, String[] output,
RowMetaInterface info) {
CheckResult cr;
if (prev == null || prev.size() == 0) {
cr =
new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING, BaseMessages.getString(PKG,
"RedisInputMeta.CheckResult.NotReceivingFields"), stepMeta);
remarks.add(cr);
} else {
cr =
new CheckResult(CheckResultInterface.TYPE_RESULT_OK, BaseMessages.getString(PKG,
"RedisInputMeta.CheckResult.StepRecevingData", prev.size() + ""), stepMeta);
remarks.add(cr);
}
// See if we have input streams leading to this step!
if (input.length > 0) {
cr =
new CheckResult(CheckResultInterface.TYPE_RESULT_OK, BaseMessages.getString(PKG,
"RedisInputMeta.CheckResult.StepRecevingData2"), stepMeta);
remarks.add(cr);
} else {
cr =
new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, BaseMessages.getString(PKG,
"RedisInputMeta.CheckResult.NoInputReceivedFromOtherSteps"), stepMeta);
remarks.add(cr);
}
}
元数据类核心代码2:对应redis数据读取操作,stepnode关键配置信息读取
private void readData(Node stepnode) throws KettleXMLException {
try {
this.keyFieldName = XMLHandler.getTagValue(stepnode, "keyfield");
this.keyTypeFieldName = XMLHandler.getTagValue(stepnode, "keytypefield");
this.key2FieldName = XMLHandler.getTagValue(stepnode, "key2field");
this.valueFieldName = XMLHandler.getTagValue(stepnode, "valuefield");
this.valueTypeName = XMLHandler.getTagValue(stepnode, "valuetype");
this.masterName = XMLHandler.getTagValue(stepnode, "mastername");
Node serverNodes = XMLHandler.getSubNode(stepnode, "servers");
int nrservers = XMLHandler.countNodes(serverNodes, "server");
allocate(nrservers);
for (int i = 0; i < nrservers; i++) {
Node fnode = XMLHandler.getSubNodeByNr(serverNodes, "server", i);
Map<String,String> hostAndPort = new HashMap<String,String>();
hostAndPort.put("hostname", XMLHandler.getTagValue(fnode, "hostname"));
hostAndPort.put("port", XMLHandler.getTagValue(fnode, "port"));
hostAndPort.put("auth", XMLHandler.getTagValue(fnode, "auth"));
if (i == 0) {
setJedisServer(hostAndPort);
}
servers.add(hostAndPort);
}
} catch (Exception e) {
throw new KettleXMLException(BaseMessages.getString(PKG, "RedisInputMeta.Exception.UnableToReadStepInfo"),
e);
}
}
2.4.3、删除插件
image.png
- RedisDelete:步骤类,初始化Redis连接配置,处理数据流数据,执行删除操作
- RedisDeleteData:数据类,定义kettle删除插件的RowMetaInterface
- RedisDeleteDialog:对话框类,定义kettle删除插件redis find del的UI Swing的对话框
- RedisDeleteMeta:元数据类,根据配置执行redis操作,并定义kettle删除插件显示类
步骤类核心代码1:初始化Redis连接配置(同上)
步骤类核心代码2:redis数据执行删除操作
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
meta = (RedisDeleteMeta) smi;
data = (RedisDeleteData) sdi;
Object[] r = getRow(); // get row, set busy!
// If no more input to be expected, stop
if (r == null) {
setOutputDone();
return false;
}
if (first) {
first = false;
// clone input row meta for now, we will change it (add or set inline) later
data.outputRowMeta = getInputRowMeta().clone();
// Get output field types
meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
keyfieldname = meta.getKeyFieldName();
logBasic("keyfieldname:" + keyfieldname);
valuefieldname = meta.getValueFieldName();
logBasic("valueFieldName:" + valuefieldname);
valuetype = meta.getValueTypeName();
logBasic("valuetype:" + valuetype);
mastername = meta.getMasterName();
logBasic("mastername:" + mastername);
}
// Get value from redis, don't cast now, be lazy. TODO change this?
int keyFieldIndex = getInputRowMeta().indexOfValue(meta.getKeyFieldName());
if (keyFieldIndex < 0) {
throw new KettleException(BaseMessages.getString(PKG, "RedisDeletetMeta.Exception.KeyFieldNameNotFound"));
}
Jedis jedis = pool.getResource();
String outPutValue = "";
String[] delKeys=r[keyFieldIndex].toString().split(",");
try {
Long delete_flag = jedis.del(delKeys);
outPutValue = String.valueOf(delete_flag);
} finally {
jedis.close();
}
// logBasic("output:" + output);
Object[] outputRowData = r;
int valueFieldIndex = getInputRowMeta().indexOfValue(meta.getValueFieldName());
if (valueFieldIndex < 0 || valueFieldIndex > outputRowData.length) {
// Not found so add it
outputRowData = RowDataUtil.addValueData(r, getInputRowMeta().size(), outPutValue);
} else {
// Update value in place
outputRowData[valueFieldIndex] = outPutValue;
}
putRow(data.outputRowMeta, outputRowData); // copy row to possible alternate rowset(s).
if (checkFeedback(getLinesRead())) {
if (log.isBasic()) {
logBasic(BaseMessages.getString(PKG, "RedisDelete.Log.LineNumber") + getLinesRead());
}
}
return true;
}
元数据类核心代码1:用户关键配置信息检查
@Override
public void check(List<CheckResultInterface> remarks, TransMeta transMeta, StepMeta stepMeta, RowMetaInterface prev,
String[] input, String[] output, RowMetaInterface info) {
CheckResult cr;
if (prev == null || prev.size() == 0) {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING,
BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.NotReceivingFields"), stepMeta);
remarks.add(cr);
} else {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.StepRecevingData", prev.size() + ""),
stepMeta);
remarks.add(cr);
}
// See if we have input streams leading to this step!
if (input.length > 0) {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.StepRecevingData2"), stepMeta);
remarks.add(cr);
} else {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR,
BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.NoInputReceivedFromOtherSteps"), stepMeta);
remarks.add(cr);
}
}
元数据类核心代码2:对应redis数据删除操作,stepnode关键配置信息读取
private void readData(Node stepnode) throws KettleXMLException {
try {
this.keyFieldName = XMLHandler.getTagValue(stepnode, "keyfieldname");
this.valueFieldName = XMLHandler.getTagValue(stepnode, "valuefieldname");
this.valueTypeName = XMLHandler.getTagValue(stepnode, "valuetype");
this.masterName = XMLHandler.getTagValue(stepnode, "mastername");
Node serverNodes = XMLHandler.getSubNode(stepnode, "servers");
int nrservers = XMLHandler.countNodes(serverNodes, "server");
allocate(nrservers);
for (int i = 0; i < nrservers; i++) {
Node fnode = XMLHandler.getSubNodeByNr(serverNodes, "server", i);
Map<String, String> hostAndPort = new HashMap<String, String>();
hostAndPort.put("hostname", XMLHandler.getTagValue(fnode, "hostname"));
hostAndPort.put("port", XMLHandler.getTagValue(fnode, "port"));
hostAndPort.put("auth", XMLHandler.getTagValue(fnode, "auth"));
if (i == 0) {
setJedisServer(hostAndPort);
}
servers.add(hostAndPort);
}
} catch (Exception e) {
throw new KettleXMLException(BaseMessages.getString(PKG, "RedisDeleteMeta.Exception.UnableToReadStepInfo"),
e);
}
}
三、使用说明
3.1、Redis Output插件
①Key:动态key字段,从上一步骤数据流动态获取(必选)
②Value:动态value字段,从上一步骤数据流动态获取(必选)
③TTL:key超时时间,默认值172800秒,从上一步骤数据流动态获取,可对单行数据做控制(必选)
④mastername:Redis sentinels哨兵模式集群master名称(必选)
⑤server ip:服务端ip列表(必选)
⑥server port:哨兵端口列表,和服务端IP一一对应(必选)
⑦认证密钥:服务端IP对应鉴权密码(可选)
3.2、Redis Input插件
①Key Field:动态key字段,从上一步骤数据流动态获取(必选)
②Key type:key数据类型,从集合列表选择(必选)
③Hash值:动态hash值字段,从上一步骤数据流动态获取(可选)
④Value Field:动态value字段,从上一步骤数据流动态获取(必选)
⑤Value type:value数据类型,从集合列表选择(必选)
⑥mastername:Redis sentinels哨兵模式集群master名称(必选)
⑦hostname:服务端ip列表(必选)
⑧host port:哨兵端口列表,和服务端IP一一对应(必选)
⑨auth:服务端IP对应鉴权密码(可选)
3.3、Redis Delete插件
①Key:动态key字段,从上一步骤数据流动态获取(必选)
②输出字段名:delete操作,返回自定义输出字段名称(必选)
③Value type:value数据类型,从集合列表选择(必选)
④mastername:Redis sentinels哨兵模式集群master名称(必选)
⑤server ip:服务端ip列表(必选)
⑥server port:哨兵端口列表,和服务端IP一一对应(必选)
⑦认证密钥:服务端IP对应鉴权密码(可选)
四、总结
如果你需要源码或者了解更多自定义插件及集成方式,抑或有开发过程或者使用过程中的任何疑问或建议,请关注小编"游走在数据之间",回复2查看源代码,回复3获取入门视频。