Druid 使用Tranquility从kafka实时导入数据
数据导入方式
通过前面的介绍我们知道在流式处理领域,有两种数据处理模式,一种为Stream Push,另一种为Stream Pull。
Stream Pull 如果Druid以Stream Pull方式自主地从外部数据源拉取数据从而生成Indexing Service Tasks,我们则需要建立Real-Time Node。Real-Time Node主要包含两大“工厂”:一个是连接流式数据源、负责数据接入的Firehose(中文翻译为水管,很形象地描述了该组件的职责);另一个是负责Segment发布与转移的Plumber(中文翻译为搬运工,同样也十分形象地描述了该组件的职责)。在Druid源代码中,这两个组件都是抽象工厂方法,使用者可以根据自己的需求创建不同类型的Firehose或者Plumber。Firehose和Plumber给我的感觉,更类似于Kafka_0.9.0版本后发布的Kafka Connect框架,Firehose类似于Kafka Connect Source,定义了数据的入口,但并不关心接入数据源的类型;而Plumber类似于Kafka Connect Sink,定义了数据的出口,也不关心最终输出到哪里。
Stream Push 如果采用Stream Push策略,我们需要建立一个“copy service”,负责从数据源中拉取数据并生成Indexing Service Tasks,从而将数据“推入”到Druid中,我们在druid_0.9.1版本之前一直使用的是这种模式,不过这种模式需要外部服务Tranquility,Tranquility是一个发送数据流到Druid的http客户端,Tranquility组件可以连接多种流式数据源,比如Spark-Streaming、Storm以及Kafka等,所以也产生了Tranquility-Storm、Tranquility-Kafka等外部组件。
实时数据流摄入方式
Standalone Realtime Node(Streaming pull)
Indexing-service + Tranquility(Streaming push)
KafkaIndex-indexing-service
Tranquility数据摄入特点
可以视为Druid的客户端
可以作为Jar包,依赖到其他程序中使用,典型的可以嵌入到其他流计算框架中使用,如Flink、Spark-streaming、Samza等
可以作为独立的Java应用部署
管理任务生命周期
实时任务定时提交
任务副本与任务数
实时节点服务发现
消费Kafka数据,通过HTTP服务推送到实时节点上
topicPatten:传topic的名字,可以是正则匹配的
https://github.com/druid-io/tranquility/blob/master/docs/configuration.md可以通过JS代码对数据进行处理
Tranquility使用例子
我们解析的数据格式如下:
[192.168.11.11] [11/Dec/2018:20:59:18 +0800] [GET /log/xxad?userAgent=Mozilla/5.0%20(%20CPU%20iPhone%20OS%2012_1%20like%20Mac%20OS%20X)%20AppleWebKit/605.1.15%20(KHTML,%20like%20Gecko)%20Version/12.0%20Mobile/15E148%20Safari/604.1&os=2&networkId=17&logType=1&jsCodeId=790431196 HTTP/1.1] [-] [Mozilla/5.0 (iPhone; CPU iPhone OS 12_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0 Mobile/15E148 Safari/604.1] [https://xx.com/question/72aef851c9b495fe218d579ce4db682b.html] 204
\t 分割的
这里给出一个json文件伪代码,仅供参考
{
"dataSources" : {
"xx_ad" : {
"spec" : {
"dataSchema" : {
"dataSource" : "xx_ad",
"parser" : {
"type" : "string",
"parseSpec" : {
"timestampSpec" : {
"column" : "req_time",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"dimensionsSpec" : {
"dimensions" : ["jsCodeId","userAgent"]
},
"function" : "function(str) {
省略部分js代码
var req=0,resp=0,show=0;if(logType==\"1\"){req=1;return {req_time:req_time,jsCodeId:jsCodeId,userAgent:userAgent,req:req,resp:resp,show:show}}else if(logType==\"2\"){resp=1;return {req_time:req_time,jsCodeId:jsCodeId,userAgent:userAgent,req:req,resp:resp,show:show}}else if(logType==\"3\"){show=1;return {req_time:req_time,jsCodeId:jsCodeId,userAgent:userAgent,req:req,resp:resp,show:show}}}",
"format" : "javascript"
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "hour",
"queryGranularity" : "hour"
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
},{
"name" : "req_sum",
"type" : "longSum",
"fieldName" : "req"
},{
"name" : "resp_sum",
"type" : "longSum",
"fieldName" : "resp"
},{
"name" : "show_sum",
"type" : "longSum",
"fieldName" : "show"
}
]
},
"ioConfig" : {
"type" : "realtime"
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT15M",
"windowPeriod" : "PT4H"
}
},
"properties" : {
"task.partitions" : "1",
"task.replicants" : "1",
"topicPattern" : "xxad"
}
}
},
"properties" : {
"zookeeper.connect" : "192.168.11.21:2181",
"druid.discovery.curator.path" : "/druid/discovery",
"druid.selectors.indexing.serviceName" : "druid/overlord",
"commit.periodMillis" : "15000",
"consumer.numThreads" : "2",
"kafka.zookeeper.connect" : "192.168.48.11:2181,192.168.48.12:2181,192.168.48.13:2181",
"kafka.group.id" : "tranquility-xx-ad"
}
}
js代码做的事情是根据logType类型来计算请求量、展现量和点击量,维度是jsCodeId和userAgent,
由于时间列式 [11/Dec/2018:20:59:18 +0800] 是这种格式,首先设置时间格式
"timestampSpec" : {
"column" : "req_time",
"format" : "dd/MMM/yyyyHH:mm:ss"
}
发现时间并不能解析,我们通过官网发现
http://druid.io/docs/latest/ingestion/ingestion-spec.html
Joda time是java里的,通过一段java
DateTime end_date = DateTime.parse("20-12-2018:20:20:20 +0800", DateTimeFormat.forPattern("dd-MM-yyyy:HH:mm:ss +0800"));
System.out.println("end_date:" + end_date);
打印
start_date:2018-12-11T20:59:18.000+08:00
end_date:2018-12-20T20:20:20.000+08:00
dt5:2012-05-20T13:14:00.000+08:00
如果是下面这样
DateTime end_date = DateTime.parse("20-Dec-2018:20:20:20 +0800", DateTimeFormat.forPattern("dd-MM-yyyy:HH:mm:ss +0800"));
System.out.println("end_date:" + end_date);
是解析不了的
Exception in thread "main" java.lang.IllegalArgumentException: Invalid format: "20-Dec-2018:20:20:20 +0800" is malformed at "Dec-2018:20:20:20 +0800"
时间解析时要注意时间格式符合joda-time要求的格式
要引入joda-time
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.7</version>
</dependency>
js代码解析并没有结束,我们还遇到了有一个问题,我们队[Get ....] 请求做切分才能拿到请求参数
[GET /log/xxad?userAgent=Mozilla/5.0%20(%20CPU%20iPhone%20OS%2012_1%20like%20Mac%20OS%20X)%20AppleWebKit/605.1.15%20(KHTML,%20like%20Gecko)%20Version/12.0%20Mobile/15E148%20Safari/604.1&os=2&networkId=17&logType=1&jsCodeId=790431196 HTTP/1.1] [-] [Mozilla/5.0 (iPhone; CPU iPhone OS 12_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0 Mobile/15E148 Safari/604.1]
开始时,我这里用的是Map,js伪代码如下:
var queryParam = param.split("&");
var queryMap = new Map();
for(var i=0;i< queryParam.length;i++){
var arr = queryParam[i].split("=");
queryMap.set(arr[0],arr[1]);
}
然后我们从中取出logType
var logType = queryMap.get("logType");
之后我们启动这个json文件,发现数据并未解析成功
2018-12-14 20:15:03,842 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {qbad={receivedCount=4438, sentCount=0, droppedCount=0, unparseableCount=4438}} pending messages in 0ms and committed offsets in 47ms.
2018-12-14 20:15:18,887 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {qbad={receivedCount=4544, sentCount=0, droppedCount=0, unparseableCount=4544}} pending messages in 0ms and committed offsets in 44ms.
unparseableCount 表示未解析的数据条数
通过排查我们发现是Map在druid的js代码块中识别不了,于是我们采用下面的写法
for(var i=0;i< queryParam.length;i++){var arr = queryParam[i].split(\"=\");
if(arr[0]==\"logType\"){
logType=arr[1];
}
if(arr[0]==\"jsCodeId\"){
jsCodeId=arr[1];
}
if(arr[0]==\"userAgent\"){
userAgent=arr[1];
}
}
在运行这个json文件
2018-12-14 21:23:23,136 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {qbad={receivedCount=4180, sentCount=4180, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 35ms.
2018-12-14 21:23:38,168 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {qbad={receivedCount=4261, sentCount=4261, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 31ms.
通过日志发现这次解析没有问题
总结:
Tranquility方式虽然能引入js做数据解析,但不是所有的js用法在druid中都能使用,这个坑要慢慢去试了。
附上json文件
xx_ad.json
{
"dataSources" : {
"xx_ad" : {
"spec" : {
"dataSchema" : {
"dataSource" : "xx_ad",
"parser" : {
"type" : "string",
"parseSpec" : {
"timestampSpec" : {
"column" : "req_time",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"dimensionsSpec" : {
"dimensions" : ["jsCodeId","userAgent"]
},
"function" : "function(str) {var infos = str.split(\"\t\");var time = infos[1].replace(\"[\",\"\").replace(\"]\",\"\");var tmp = time.split(\" \");var tmpTime = tmp[0].split(\":\");
var hhmmss= tmpTime[1] + \":\" + tmpTime[2] + \":\" +tmpTime[3];
var month = new Array();month[\"Jan\"] = 01;month[\"Feb\"] = 02;month[\"Mar\"] = 03;month[\"Apr\"] = 04;month[\"May\"] = 05;month[\"Jan\"] = 06;month[\"Jul\"] = 07;month[\"Aug\"] = 08;month[\"Sep\"] = 09;month[\"Oct\"] = 10;month[\"Nov\"] = 11;month[\"Dec\"] = 12;var yyyymmdd = tmpTime[0];
var yyyymmddStr = yyyymmdd.split(\"/\");var req_time = yyyymmddStr[2] + \"-\" + month[yyyymmddStr[1]] + \"-\" + yyyymmddStr[0] + \" \" + hhmmss;var firstIndex = infos[2].indexOf(\"[\");var firstInfo = infos[2].substring(firstIndex+1,infos[2].length);var lastIndex = firstInfo.indexOf(\"]\");var queryStr = firstInfo.substring(0,lastIndex);var queryPart = queryStr.split(\" \");var queryUrl = queryPart[1];var index = queryUrl.indexOf(\"?\");var param=queryUrl.substring(index + 1);var queryParam = param.split(\"&\");
var logType=0,jsCodeId=\"\",userAgent=\"\";
for(var i=0;i< queryParam.length;i++){var arr = queryParam[i].split(\"=\");
if(arr[0]==\"logType\"){
logType=arr[1];
}
if(arr[0]==\"jsCodeId\"){
jsCodeId=arr[1];
}
if(arr[0]==\"userAgent\"){
userAgent=arr[1];
}
}
var req=0,resp=0,show=0;if(logType==\"1\"){req=1;return {req_time:req_time,jsCodeId:jsCodeId,userAgent:userAgent,req:req,resp:resp,show:show}}else if(logType==\"2\"){resp=1;return {req_time:req_time,jsCodeId:jsCodeId,userAgent:userAgent,req:req,resp:resp,show:show}}else if(logType==\"3\"){show=1;return {req_time:req_time,jsCodeId:jsCodeId,userAgent:userAgent,req:req,resp:resp,show:show}}}",
"format" : "javascript"
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "hour",
"queryGranularity" : "hour"
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
},{
"name" : "req_sum",
"type" : "longSum",
"fieldName" : "req"
},{
"name" : "resp_sum",
"type" : "longSum",
"fieldName" : "resp"
},{
"name" : "show_sum",
"type" : "longSum",
"fieldName" : "show"
}
]
},
"ioConfig" : {
"type" : "realtime"
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT15M",
"windowPeriod" : "PT4H"
}
},
"properties" : {
"task.partitions" : "1",
"task.replicants" : "1",
"topicPattern" : "xxad"
}
}
},
"properties" : {
"zookeeper.connect" : "192.168.11.21:2181",
"druid.discovery.curator.path" : "/druid/discovery",
"druid.selectors.indexing.serviceName" : "druid/overlord",
"commit.periodMillis" : "15000",
"consumer.numThreads" : "2",
"kafka.zookeeper.connect" : "192.168.48.11:2181,192.168.48.12:2181,192.168.48.13:2181",
"kafka.group.id" : "tranquility-offline-qb-ad"
}
}
参考: