Druid 使用Tranquility从kafka实时导入数据

Druid 使用Tranquility从kafka实时导入数据


数据导入方式

通过前面的介绍我们知道在流式处理领域,有两种数据处理模式,一种为Stream Push,另一种为Stream Pull。

  • Stream Pull 如果Druid以Stream Pull方式自主地从外部数据源拉取数据从而生成Indexing Service Tasks,我们则需要建立Real-Time Node。Real-Time Node主要包含两大“工厂”:一个是连接流式数据源、负责数据接入的Firehose(中文翻译为水管,很形象地描述了该组件的职责);另一个是负责Segment发布与转移的Plumber(中文翻译为搬运工,同样也十分形象地描述了该组件的职责)。在Druid源代码中,这两个组件都是抽象工厂方法,使用者可以根据自己的需求创建不同类型的Firehose或者Plumber。Firehose和Plumber给我的感觉,更类似于Kafka_0.9.0版本后发布的Kafka Connect框架,Firehose类似于Kafka Connect Source,定义了数据的入口,但并不关心接入数据源的类型;而Plumber类似于Kafka Connect Sink,定义了数据的出口,也不关心最终输出到哪里。

  • Stream Push 如果采用Stream Push策略,我们需要建立一个“copy service”,负责从数据源中拉取数据并生成Indexing Service Tasks,从而将数据“推入”到Druid中,我们在druid_0.9.1版本之前一直使用的是这种模式,不过这种模式需要外部服务Tranquility,Tranquility是一个发送数据流到Druid的http客户端,Tranquility组件可以连接多种流式数据源,比如Spark-Streaming、Storm以及Kafka等,所以也产生了Tranquility-Storm、Tranquility-Kafka等外部组件。

实时数据流摄入方式

  • Standalone Realtime Node(Streaming pull)

  • Indexing-service + Tranquility(Streaming push)

  • KafkaIndex-indexing-service

Tranquility数据摄入特点

  • 可以视为Druid的客户端

  • 可以作为Jar包,依赖到其他程序中使用,典型的可以嵌入到其他流计算框架中使用,如Flink、Spark-streaming、Samza等

  • 可以作为独立的Java应用部署

  • 管理任务生命周期

  • 实时任务定时提交

  • 任务副本与任务数

  • 实时节点服务发现

  • 消费Kafka数据,通过HTTP服务推送到实时节点上

  • topicPatten:传topic的名字,可以是正则匹配的
    https://github.com/druid-io/tranquility/blob/master/docs/configuration.md

  • 可以通过JS代码对数据进行处理

Tranquility使用例子

我们解析的数据格式如下:

[192.168.11.11]    [11/Dec/2018:20:59:18 +0800]    [GET /log/xxad?userAgent=Mozilla/5.0%20(%20CPU%20iPhone%20OS%2012_1%20like%20Mac%20OS%20X)%20AppleWebKit/605.1.15%20(KHTML,%20like%20Gecko)%20Version/12.0%20Mobile/15E148%20Safari/604.1&os=2&networkId=17&logType=1&jsCodeId=790431196 HTTP/1.1] [-] [Mozilla/5.0 (iPhone; CPU iPhone OS 12_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0 Mobile/15E148 Safari/604.1]   [https://xx.com/question/72aef851c9b495fe218d579ce4db682b.html] 204

\t 分割的

这里给出一个json文件伪代码,仅供参考

{
    "dataSources" : {
        "xx_ad" : {
            "spec" : {
                "dataSchema" : {
                    "dataSource" : "xx_ad",
                    "parser" : {
                        "type" : "string",
                        "parseSpec" : {
                            "timestampSpec" : {
                                "column" : "req_time",
                                "format" : "yyyy-MM-dd HH:mm:ss"
                            },
                            "dimensionsSpec" : {
                                "dimensions" : ["jsCodeId","userAgent"]
                            },
                         "function" : "function(str) {
                            省略部分js代码
                        var req=0,resp=0,show=0;if(logType==\"1\"){req=1;return {req_time:req_time,jsCodeId:jsCodeId,userAgent:userAgent,req:req,resp:resp,show:show}}else if(logType==\"2\"){resp=1;return {req_time:req_time,jsCodeId:jsCodeId,userAgent:userAgent,req:req,resp:resp,show:show}}else if(logType==\"3\"){show=1;return {req_time:req_time,jsCodeId:jsCodeId,userAgent:userAgent,req:req,resp:resp,show:show}}}",
"format" : "javascript"
                        }
                    },
                    "granularitySpec" : {
                        "type" : "uniform",
                        "segmentGranularity" : "hour",
                        "queryGranularity" : "hour"
                    },
                    "metricsSpec" : [{
                            "type" : "count",
                            "name" : "count"
                        },{
                            "name" : "req_sum",
                            "type" : "longSum",
                            "fieldName" : "req"
                        },{
                            "name" : "resp_sum",
                            "type" : "longSum",
                            "fieldName" : "resp"
                        },{
                            "name" : "show_sum",
                            "type" : "longSum",
                            "fieldName" : "show"
                        }
                    ]
                },
                "ioConfig" : {
                    "type" : "realtime"
                },
                "tuningConfig" : {
                    "type" : "realtime",
                    "maxRowsInMemory" : "100000",
                    "intermediatePersistPeriod" : "PT15M",
                    "windowPeriod" : "PT4H"
                }
            },
            "properties" : {
                "task.partitions" : "1",
                "task.replicants" : "1",
                "topicPattern" : "xxad"
            }
        }
    },
    "properties" : {
        "zookeeper.connect" : "192.168.11.21:2181",
        "druid.discovery.curator.path" : "/druid/discovery",
        "druid.selectors.indexing.serviceName" : "druid/overlord",
        "commit.periodMillis" : "15000",
        "consumer.numThreads" : "2",
        "kafka.zookeeper.connect" : "192.168.48.11:2181,192.168.48.12:2181,192.168.48.13:2181",
        "kafka.group.id" : "tranquility-xx-ad"
    }
}

js代码做的事情是根据logType类型来计算请求量、展现量和点击量,维度是jsCodeId和userAgent,
由于时间列式 [11/Dec/2018:20:59:18 +0800] 是这种格式,首先设置时间格式

"timestampSpec" : {
    "column" : "req_time",
    "format" : "dd/MMM/yyyyHH:mm:ss"
    }

发现时间并不能解析,我们通过官网发现
http://druid.io/docs/latest/ingestion/ingestion-spec.html

timestampSpec

Joda time

Joda time是java里的,通过一段java

        DateTime end_date = DateTime.parse("20-12-2018:20:20:20 +0800", DateTimeFormat.forPattern("dd-MM-yyyy:HH:mm:ss +0800"));
        System.out.println("end_date:" + end_date);

打印

start_date:2018-12-11T20:59:18.000+08:00
end_date:2018-12-20T20:20:20.000+08:00
dt5:2012-05-20T13:14:00.000+08:00

如果是下面这样

 DateTime end_date = DateTime.parse("20-Dec-2018:20:20:20 +0800", DateTimeFormat.forPattern("dd-MM-yyyy:HH:mm:ss +0800"));
        System.out.println("end_date:" + end_date);

是解析不了的

Exception in thread "main" java.lang.IllegalArgumentException: Invalid format: "20-Dec-2018:20:20:20 +0800" is malformed at "Dec-2018:20:20:20 +0800"

时间解析时要注意时间格式符合joda-time要求的格式

要引入joda-time

 <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>2.9.7</version>
    </dependency>

js代码解析并没有结束,我们还遇到了有一个问题,我们队[Get ....] 请求做切分才能拿到请求参数

[GET /log/xxad?userAgent=Mozilla/5.0%20(%20CPU%20iPhone%20OS%2012_1%20like%20Mac%20OS%20X)%20AppleWebKit/605.1.15%20(KHTML,%20like%20Gecko)%20Version/12.0%20Mobile/15E148%20Safari/604.1&os=2&networkId=17&logType=1&jsCodeId=790431196 HTTP/1.1] [-] [Mozilla/5.0 (iPhone; CPU iPhone OS 12_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0 Mobile/15E148 Safari/604.1]

开始时,我这里用的是Map,js伪代码如下:

var queryParam = param.split("&");
    var queryMap = new Map();
for(var i=0;i< queryParam.length;i++){
    var arr = queryParam[i].split("=");
    queryMap.set(arr[0],arr[1]);
}

然后我们从中取出logType

var logType = queryMap.get("logType");

之后我们启动这个json文件,发现数据并未解析成功

2018-12-14 20:15:03,842 [KafkaConsumer-CommitThread] INFO  c.m.tranquility.kafka.KafkaConsumer - Flushed {qbad={receivedCount=4438, sentCount=0, droppedCount=0, unparseableCount=4438}} pending messages in 0ms and committed offsets in 47ms.
2018-12-14 20:15:18,887 [KafkaConsumer-CommitThread] INFO  c.m.tranquility.kafka.KafkaConsumer - Flushed {qbad={receivedCount=4544, sentCount=0, droppedCount=0, unparseableCount=4544}} pending messages in 0ms and committed offsets in 44ms.

unparseableCount 表示未解析的数据条数

通过排查我们发现是Map在druid的js代码块中识别不了,于是我们采用下面的写法

 for(var i=0;i< queryParam.length;i++){var arr = queryParam[i].split(\"=\");
                                if(arr[0]==\"logType\"){
                                    logType=arr[1];
                                }
                                if(arr[0]==\"jsCodeId\"){
                                    jsCodeId=arr[1];
                                }
                                if(arr[0]==\"userAgent\"){
                                    userAgent=arr[1];
                                }

                        }

在运行这个json文件

2018-12-14 21:23:23,136 [KafkaConsumer-CommitThread] INFO  c.m.tranquility.kafka.KafkaConsumer - Flushed {qbad={receivedCount=4180, sentCount=4180, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 35ms.
2018-12-14 21:23:38,168 [KafkaConsumer-CommitThread] INFO  c.m.tranquility.kafka.KafkaConsumer - Flushed {qbad={receivedCount=4261, sentCount=4261, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 31ms.

通过日志发现这次解析没有问题

总结:
Tranquility方式虽然能引入js做数据解析,但不是所有的js用法在druid中都能使用,这个坑要慢慢去试了。

附上json文件

xx_ad.json
{
    "dataSources" : {
        "xx_ad" : {
            "spec" : {
                "dataSchema" : {
                    "dataSource" : "xx_ad",
                    "parser" : {
                        "type" : "string",
                        "parseSpec" : {
                            "timestampSpec" : {
                                "column" : "req_time",
                                "format" : "yyyy-MM-dd HH:mm:ss"
                            },
                            "dimensionsSpec" : {
                                "dimensions" : ["jsCodeId","userAgent"]
                            },
                         "function" : "function(str) {var infos = str.split(\"\t\");var time = infos[1].replace(\"[\",\"\").replace(\"]\",\"\");var tmp = time.split(\" \");var tmpTime = tmp[0].split(\":\");
                         var hhmmss= tmpTime[1] + \":\" + tmpTime[2] + \":\" +tmpTime[3];
                         var month = new Array();month[\"Jan\"] = 01;month[\"Feb\"] = 02;month[\"Mar\"] = 03;month[\"Apr\"] = 04;month[\"May\"] = 05;month[\"Jan\"] = 06;month[\"Jul\"] = 07;month[\"Aug\"] = 08;month[\"Sep\"] = 09;month[\"Oct\"] = 10;month[\"Nov\"] = 11;month[\"Dec\"] = 12;var yyyymmdd = tmpTime[0];
                         var yyyymmddStr = yyyymmdd.split(\"/\");var req_time = yyyymmddStr[2] + \"-\" + month[yyyymmddStr[1]] + \"-\" + yyyymmddStr[0] + \" \" + hhmmss;var firstIndex = infos[2].indexOf(\"[\");var firstInfo = infos[2].substring(firstIndex+1,infos[2].length);var lastIndex = firstInfo.indexOf(\"]\");var queryStr = firstInfo.substring(0,lastIndex);var queryPart = queryStr.split(\" \");var queryUrl = queryPart[1];var index = queryUrl.indexOf(\"?\");var param=queryUrl.substring(index + 1);var queryParam = param.split(\"&\");
                            var logType=0,jsCodeId=\"\",userAgent=\"\";
                            for(var i=0;i< queryParam.length;i++){var arr = queryParam[i].split(\"=\");
                                if(arr[0]==\"logType\"){
                                    logType=arr[1];
                                }
                                if(arr[0]==\"jsCodeId\"){
                                    jsCodeId=arr[1];
                                }
                                if(arr[0]==\"userAgent\"){
                                    userAgent=arr[1];
                                }

                        }
                        var req=0,resp=0,show=0;if(logType==\"1\"){req=1;return {req_time:req_time,jsCodeId:jsCodeId,userAgent:userAgent,req:req,resp:resp,show:show}}else if(logType==\"2\"){resp=1;return {req_time:req_time,jsCodeId:jsCodeId,userAgent:userAgent,req:req,resp:resp,show:show}}else if(logType==\"3\"){show=1;return {req_time:req_time,jsCodeId:jsCodeId,userAgent:userAgent,req:req,resp:resp,show:show}}}",
"format" : "javascript"
                        }
                    },
                    "granularitySpec" : {
                        "type" : "uniform",
                        "segmentGranularity" : "hour",
                        "queryGranularity" : "hour"
                    },
                    "metricsSpec" : [{
                            "type" : "count",
                            "name" : "count"
                        },{
                            "name" : "req_sum",
                            "type" : "longSum",
                            "fieldName" : "req"
                        },{
                            "name" : "resp_sum",
                            "type" : "longSum",
                            "fieldName" : "resp"
                        },{
                            "name" : "show_sum",
                            "type" : "longSum",
                            "fieldName" : "show"
                        }
                    ]
                },
                "ioConfig" : {
                    "type" : "realtime"
                },
                "tuningConfig" : {
                    "type" : "realtime",
                    "maxRowsInMemory" : "100000",
                    "intermediatePersistPeriod" : "PT15M",
                    "windowPeriod" : "PT4H"
                }
            },
            "properties" : {
                "task.partitions" : "1",
                "task.replicants" : "1",
                "topicPattern" : "xxad"
            }
        }
    },
    "properties" : {
        "zookeeper.connect" : "192.168.11.21:2181",
        "druid.discovery.curator.path" : "/druid/discovery",
        "druid.selectors.indexing.serviceName" : "druid/overlord",
        "commit.periodMillis" : "15000",
        "consumer.numThreads" : "2",
        "kafka.zookeeper.connect" : "192.168.48.11:2181,192.168.48.12:2181,192.168.48.13:2181",
        "kafka.group.id" : "tranquility-offline-qb-ad"
    }
}

参考:

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容

  • Druid基本概念及架构介绍 1.什么是Druid Druid是一个专为大型数据集上的高性能切片和OLAP分析而设...
    it_zzy阅读 53,118评论 0 32
  • Druid.io(以下简称Druid)是面向海量数据的、用于实时查询与分析的OLAP存储系统。Druid的四大关键...
    大诗兄_zl阅读 6,459评论 0 9
  • Kafka设计解析(七)- Kafka Stream 原创文章,转载请务必将下面这段话置于文章开头处。本文转发自技...
    小小少年Boy阅读 5,248评论 0 32
  • 本文介绍在Kafka和Druid整合使用中遇到的问题和解决方法。 1. 基本配置 Druid使用Kafka作为数据...
    MeazZa阅读 1,832评论 1 0
  • Quickstart单机测试 http://druid.io/docs/0.10.1/tutorials/quic...
    大诗兄_zl阅读 1,253评论 0 0