本篇文章主要是向读者介绍如何制定Druid摄入数据的规范,指出开发过程中需要注意的关键事项和规则,方便刚接触Druid的同学快速入门。同时方便自己后期翻阅。
-
环境准备
默认同学们本地已经有Druid的环境,以下操作都是基于0.12.3版本的Druid操作的。
-
数据准备
①使用kafka产生模拟数据
ts:时间 startIP: 发送发IP startPort: 发送方IP端口 endIP: 接收方IP endPort: 接收方IP端口 protocol:IP协议 packets: packets bytes: 传输多少bytes costTime: 耗时
②样例数据
{"ts":"2019-01-18T01:01:35Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":2000, "endPort":3000, "protocol": 6, "packets":10, "bytes":1000, "costTime": 1.4} {"ts":"2019-01-18T01:01:51Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":2000, "endPort":3000, "protocol": 6, "packets":20, "bytes":2000, "costTime": 3.1} {"ts":"2019-01-18T01:01:59Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":2000, "endPort":3000, "protocol": 6, "packets":30, "bytes":3000, "costTime": 0.4} {"ts":"2019-01-18T01:02:14Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":5000, "endPort":7000, "protocol": 6, "packets":40, "bytes":4000, "costTime": 7.9} {"ts":"2019-01-18T01:02:29Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":5000, "endPort":7000, "protocol": 6, "packets":50, "bytes":5000, "costTime": 10.2} {"ts":"2019-01-18T01:03:29Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":5000, "endPort":7000, "protocol": 6, "packets":60, "bytes":6000, "costTime": 4.3} {"ts":"2019-01-18T02:33:14Z","startIP":"7.7.7.7", "endIP":"8.8.8.8", "startPort":4000, "endPort":5000, "protocol": 17, "packets":100, "bytes":10000, "costTime": 22.4} {"ts":"2019-01-18T02:33:45Z","startIP":"7.7.7.7", "endIP":"8.8.8.8", "startPort":4000, "endPort":5000, "protocol": 17, "packets":200, "bytes":20000, "costTime": 34.5} {"ts":"2019-01-18T02:35:45Z","startIP":"7.7.7.7", "endIP":"8.8.8.8", "startPort":4000, "endPort":5000, "protocol": 17, "packets":300, "bytes":30000, "costTime": 46.3}
-
Druid摄入数据规范
Schema的定义,Druid摄入数据规范的核心是dataSchema,dataSchema定义了如何解析输入的数据,并将数据存储到Druid中。
1.dataSchema
首先我们创建一个json的文件:kafka-index-day-roll-up.json,在该文件中添加空dataSchema;
"dataSchema" : {}
2.DataSource name
DataSource name指定,数据源名称由dataSchema中的datasource参数指定,在这里我们叫做realtime_kafka_to_druid,可以看作是数据库的表名;
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", }
3.parser-解释器
dataSchema中有一个parser这个字段,它是解释输入数据的解析器,上面的案例中我们使用的是JSON格式的字符串,因此我们使用JSON格式的字符串解释器解析数据。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json" } } }
4.Time column - 时间列
解释器parser需要知道数据中每条数据的产生时间(main timestamp),这个时间戳需要定义在 timestampSpec中。数据中有一列ts就是我们所需要的timestamp,因此我们将带有该信息的timestampSpec 添加到parseSpec中。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" } } } }
5.Column types
上面我们已经定义了time的列,接下来我们定义其它列的类型。Druid支持的column types: String, Long, Float, Double.我们将在接下来的小节中讨论以及如何使用它们。在我们去定义非时间序列之前,我们首先来讨论一下rollup。
6.Rollup
druid在通过roll-up处理后,会将原始数据在注入的时候就开始进行汇总处理。roll-up是在数据存储到segment之前进行的第一层聚合操作。
①如果rollup设置成true,这个时候就需要我们把输入的columns进行分为两类,维度(dimensions)和度量(metrics).dimensions是我们进行group的时候需要的列,metrics是我们进行聚合时需要的列。
②如果rollup设置成false,这个时候我们会将输入的所有columns当做dimensions处理,并且没有预聚合的发生。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" } } }, "granularitySpec" : { "rollup" : true } }
7.选择dimension和metrics
①在上面给到的数据集中,很明显的就可以区分开 dimensions 和 metrics。
Dimensions: startIP | startPort | endIP | endPort | protocol Metrics: packets | bytes | costTime
②接下来我们如何在摄入数据规范中定义这些 dimensions列 和 metrics列呢?Dimensions:使用dimensionsSpec在parseSpec中指定。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" } ], "granularitySpec" : { "rollup" : true } }
注:每个维度都有一个name 和 type,type的类型可能是:"long", "float", "double", "string"。我们注意到startIP这个"string"类型的维度,它仅仅只需要指定名字就可以了。
③.在druid中,string 类型是默认的。除此之外,我们注意一下protocol是一个数值型的。但是我们定义的时候将其定义为 string。Druid会强制将该类型进行转换。Metrics:使用metricsSpec 在dataSchema中指定。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" } ], "granularitySpec" : { "rollup" : true } }
注:当我们定义metric时,有必要指定在rollup期间对该列执行的聚合类型。我们将packets和bytes定义成long sum聚合操作,costTime定义成double sum聚合操作。 metricsSpec的嵌套级别与dimensionSpec或parseSpec不同,它和dataSchema属于同一嵌套级别。除此,我们还定义了一个count聚合操作器,它会在rollup过程中,记录输入的数据量总共有多少。支持的聚合器类型详情点击link
8.不使用rollup
如果不适用roolup所有输入的colums都被当做"dimensions",不再区分"dimensions" 和"metrics"。
"dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" }, { "name" : "packets", "type" : "long" }, { "name" : "bytes", "type" : "long" }, { "name" : "startPort", "type" : "double" } ] }
9.Define Granularities-粒度的定义。
接下来还有一些其他的属性需要在granularitySpec中设置,granularitySpec支持2中类型(type):uniform和arbitrary。在这里,我们使用uniform这种类型,这会使所有的segment都有统一的间隔大小(比如:每个segment都保存一个小时内的值)。
①segment granularity这个属性是指一个segment应该包含多大时间间隔的数据,可以是: DAY, WEEK,HOUR , MINUTE...... 在这里,我们制定segment的粒度是HOUR。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "rollup" : true } }
②.query granularity:查询的粒度通过queryGranularity配置在granularitySpec中,在这里我们使用minute粒度。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE" "rollup" : true } }
③.Define an interval:定义时间间隔,在这个时间间隔之外的数据将不会被处理。注意,这个参数设置只在批处理中(batch)。interval需要在 granularitySpec中指定。
"granularitySpec" : { "intervals" : ["2019-01-17/2019-01-18"] }
10.定义输入数据的数据源
输入数据的数据源在ioConfig中指定,每个任务类型都有它自己的ioConfig。本文采用从kafka中获取数据,ioConfig配置如下:
{ "type" : "index", "spec" : { "dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "rollup" : true } }, "ioConfig": { "topic": "druid-topic-book", "replicas": 1, "taskDuration": "PT5M", "completionTimeout": "PT20M", "consumerProperties": { "bootstrap.servers": "host1:9092,host2:9092,host3:9092" } } } }
11.tuningConfig-额外的配置
每个摄入任务都有一个tuningConfig部分,让开发人员自行配置。在这里根据输入的数据源kafka来进行配置tuningConfig。type索引任务类型,此处是kafka 。reportParseExceptions默认是false,如果开启这个功能,当摄入数据过程中出现数据异常将会导致摄入数据停止。
"tuningConfig": { "type": "kafka", "reportParseExceptions": false }
12.下面是我们设置的摄入数据的规范。
{ "type" : "index", "spec" : { "dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "rollup" : true } }, "tuningConfig": { "type": "kafka", "reportParseExceptions": false }, "ioConfig": { "topic": "druid-topic-book", "replicas": 1, "taskDuration": "PT5M", "completionTimeout": "PT20M", "consumerProperties": { "bootstrap.servers": "host1:9092,host2:9092,host3:9092" } } } }
13.kafka的TuningConfig和IOConfig配置详情可以参考:
http://druid.io/docs/0.12.3/development/extensions-core/kafka-ingestion.html
-
提交我们的task,然后查询数据。
1.需要在Overlord节点执行:
curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/kafka-druid/kafka-index-day-roll-up.json http://host1:8090/druid/indexer/v1/supervisor
2.此刻开启程序,往kafka的topic=druid-topic-book中发送数据,此代码不做重点。
3.上面的步骤执行完之后,我们可以查看druid最终存入的数据。需要在broker节点执行。
①.rollup-select-sql.json内容,注意查询的DataSource名称
{ "query":"select * from \"realtime_kafka_to_druid\"" }
② 执行
curl -X 'POST' -H 'Content-Type:application/json' -d @rollup-select-sql.json http://host2:8082/druid/v2/sql
③最终存入druid中的数据:
[ { "__time": "2019-01-18T01:01:00.000Z", "bytes": 6000, "costTime": 4.9, "count": 3, "endIP": "2.2.2.2", "endPort": 3000, "packets": 60, "protocol": "6", "startIP": "1.1.1.1", "startPort": 2000 }, { "__time": "2019-01-18T01:02:00.000Z", "bytes": 9000, "costTime": 18.1, "count": 2, "endIP": "2.2.2.2", "endPort": 7000, "packets": 90, "protocol": "6", "startIP": "1.1.1.1", "startPort": 5000 }, { "__time": "2019-01-18T01:03:00.000Z", "bytes": 6000, "costTime": 4.3, "count": 1, "endIP": "2.2.2.2", "endPort": 7000, "packets": 60, "protocol": "6", "startIP": "1.1.1.1", "startPort": 5000 }, { "__time": "2019-01-18T02:33:00.000Z", "bytes": 30000, "costTime": 56.9, "count": 2, "endIP": "8.8.8.8", "endPort": 5000, "packets": 300, "protocol": "17", "startIP": "7.7.7.7", "startPort": 4000 }, { "__time": "2019-01-18T02:35:00.000Z", "bytes": 30000, "costTime": 46.3, "count": 1, "endIP": "8.8.8.8", "endPort": 5000, "packets": 300, "protocol": "17", "startIP": "7.7.7.7", "startPort": 4000 } ]