Druid数据摄入规范

本篇文章主要是向读者介绍如何制定Druid摄入数据的规范,指出开发过程中需要注意的关键事项和规则,方便刚接触Druid的同学快速入门。同时方便自己后期翻阅。

  • 环境准备

    默认同学们本地已经有Druid的环境,以下操作都是基于0.12.3版本的Druid操作的。

  • 数据准备

    ①使用kafka产生模拟数据
    ts:时间
    startIP: 发送发IP
    startPort: 发送方IP端口
    endIP: 接收方IP
    endPort: 接收方IP端口
    protocol:IP协议
    packets: packets
    bytes: 传输多少bytes
    costTime: 耗时
    
    ②样例数据
    {"ts":"2019-01-18T01:01:35Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":2000, "endPort":3000, "protocol": 6, "packets":10, "bytes":1000, "costTime": 1.4}
    {"ts":"2019-01-18T01:01:51Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":2000, "endPort":3000, "protocol": 6, "packets":20, "bytes":2000, "costTime": 3.1}
    {"ts":"2019-01-18T01:01:59Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":2000, "endPort":3000, "protocol": 6, "packets":30, "bytes":3000, "costTime": 0.4}
    {"ts":"2019-01-18T01:02:14Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":5000, "endPort":7000, "protocol": 6, "packets":40, "bytes":4000, "costTime": 7.9}
    {"ts":"2019-01-18T01:02:29Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":5000, "endPort":7000, "protocol": 6, "packets":50, "bytes":5000, "costTime": 10.2}
    {"ts":"2019-01-18T01:03:29Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":5000, "endPort":7000, "protocol": 6, "packets":60, "bytes":6000, "costTime": 4.3}
    {"ts":"2019-01-18T02:33:14Z","startIP":"7.7.7.7", "endIP":"8.8.8.8", "startPort":4000, "endPort":5000, "protocol": 17, "packets":100, "bytes":10000, "costTime": 22.4}
    {"ts":"2019-01-18T02:33:45Z","startIP":"7.7.7.7", "endIP":"8.8.8.8", "startPort":4000, "endPort":5000, "protocol": 17, "packets":200, "bytes":20000, "costTime": 34.5}
    {"ts":"2019-01-18T02:35:45Z","startIP":"7.7.7.7", "endIP":"8.8.8.8", "startPort":4000, "endPort":5000, "protocol": 17, "packets":300, "bytes":30000, "costTime": 46.3}
    
  • Druid摄入数据规范

    Schema的定义,Druid摄入数据规范的核心是dataSchema,dataSchema定义了如何解析输入的数据,并将数据存储到Druid中。

    1.dataSchema

    首先我们创建一个json的文件:kafka-index-day-roll-up.json,在该文件中添加空dataSchema;
    "dataSchema" : {}
    

    2.DataSource name

    DataSource name指定,数据源名称由dataSchema中的datasource参数指定,在这里我们叫做realtime_kafka_to_druid,可以看作是数据库的表名;
    "dataSchema" : {
      "dataSource" : "realtime_kafka_to_druid",
    }
    

    3.parser-解释器

    dataSchema中有一个parser这个字段,它是解释输入数据的解析器,上面的案例中我们使用的是JSON格式的字符串,因此我们使用JSON格式的字符串解释器解析数据。
    "dataSchema" : {
        "dataSource" : "realtime_kafka_to_druid",
        "parser" : {
          "type" : "string",
          "parseSpec" : {
             "format" : "json"
          }
        }
      }
    

    4.Time column - 时间列

    解释器parser需要知道数据中每条数据的产生时间(main timestamp),这个时间戳需要定义在 timestampSpec中。数据中有一列ts就是我们所需要的timestamp,因此我们将带有该信息的timestampSpec 添加到parseSpec中。
    "dataSchema" : {
        "dataSource" : "realtime_kafka_to_druid",
        "parser" : {
          "type" : "string",
          "parseSpec" : {
            "format" : "json",
            "timestampSpec" : {
              "format" : "auto",
              "column" : "ts"
            }
          }
        }
      }
    

    5.Column types

    上面我们已经定义了time的列,接下来我们定义其它列的类型。Druid支持的column types: String, Long, Float, Double.我们将在接下来的小节中讨论以及如何使用它们。在我们去定义非时间序列之前,我们首先来讨论一下rollup。

    6.Rollup

    druid在通过roll-up处理后,会将原始数据在注入的时候就开始进行汇总处理。roll-up是在数据存储到segment之前进行的第一层聚合操作。

    ①如果rollup设置成true,这个时候就需要我们把输入的columns进行分为两类,维度(dimensions)和度量(metrics).dimensions是我们进行group的时候需要的列,metrics是我们进行聚合时需要的列。
    ②如果rollup设置成false,这个时候我们会将输入的所有columns当做dimensions处理,并且没有预聚合的发生。
    "dataSchema" : {
        "dataSource" : "realtime_kafka_to_druid",
        "parser" : {
          "type" : "string",
          "parseSpec" : {
            "format" : "json",
            "timestampSpec" : {
              "format" : "auto",
              "column" : "ts"
            }
          }
        },
        "granularitySpec" : {
          "rollup" : true
        }
      }
    

    7.选择dimension和metrics

    ①在上面给到的数据集中,很明显的就可以区分开 dimensions 和 metrics。
    Dimensions: startIP |  startPort | endIP  | endPort | protocol
    Metrics: packets | bytes | costTime
    
    ②接下来我们如何在摄入数据规范中定义这些 dimensions列 和 metrics列呢?Dimensions:使用dimensionsSpec在parseSpec中指定。
    "dataSchema" : {
          "dataSource" : "realtime_kafka_to_druid",
          "parser" : {
          "type" : "string",
          "parseSpec" : {
              "format" : "json",
              "timestampSpec" : {
                   "format" : "auto",
                   "column" : "ts"
              },
           "dimensionsSpec" : {
                  "dimensions": [
                       "startIP",
                       { "name" : "startPort", "type" : "long" },
                       { "name" : "endIP", "type" : "string" },
                       { "name" : "endPort", "type" : "long" },
                       { "name" : "protocol", "type" : "string" }
                  ]
                }   
               }
              },
           "metricsSpec" : [
                  { "type" : "count", "name" : "count" },
                  { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
                  { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
                  { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" }
          ],
           "granularitySpec" : {
                  "rollup" : true
              }
          }
    
    注:每个维度都有一个name 和 type,type的类型可能是:"long", "float", "double", "string"。我们注意到startIP这个"string"类型的维度,它仅仅只需要指定名字就可以了。
    ③.在druid中,string 类型是默认的。除此之外,我们注意一下protocol是一个数值型的。但是我们定义的时候将其定义为 string。Druid会强制将该类型进行转换。Metrics:使用metricsSpec 在dataSchema中指定。
    "dataSchema" : {
                "dataSource" : "realtime_kafka_to_druid",
                "parser" : {
                  "type" : "string",
                  "parseSpec" : {
                    "format" : "json",
                    "timestampSpec" : {
                      "format" : "auto",
                      "column" : "ts"
                    },
                    "dimensionsSpec" : {
                      "dimensions": [
                        "startIP",
                        { "name" : "startPort", "type" : "long" },
                        { "name" : "endIP", "type" : "string" },
                        { "name" : "endPort", "type" : "long" },
                        { "name" : "protocol", "type" : "string" }
                      ]
                    }   
                  }
                },
                "metricsSpec" : [
                  { "type" : "count", "name" : "count" },
                  { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
                  { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
                  { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" }
                ],
                "granularitySpec" : {
                  "rollup" : true
                }
          }
    
    注:当我们定义metric时,有必要指定在rollup期间对该列执行的聚合类型。我们将packets和bytes定义成long sum聚合操作,costTime定义成double sum聚合操作。 metricsSpec的嵌套级别与dimensionSpec或parseSpec不同,它和dataSchema属于同一嵌套级别。除此,我们还定义了一个count聚合操作器,它会在rollup过程中,记录输入的数据量总共有多少。支持的聚合器类型详情点击link

    8.不使用rollup

    如果不适用roolup所有输入的colums都被当做"dimensions",不再区分"dimensions" 和"metrics"。
    "dimensionsSpec" : {
      "dimensions": [
          "startIP",
          { "name" : "startPort", "type" : "long" },
          { "name" : "endIP", "type" : "string" },
          { "name" : "endPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" },
          { "name" : "packets", "type" : "long" },
          { "name" : "bytes", "type" : "long" },
          { "name" : "startPort", "type" : "double" }
      ]
    }
    

    9.Define Granularities-粒度的定义。

    接下来还有一些其他的属性需要在granularitySpec中设置,granularitySpec支持2中类型(type):uniform和arbitrary。在这里,我们使用uniform这种类型,这会使所有的segment都有统一的间隔大小(比如:每个segment都保存一个小时内的值)。

    ①segment granularity这个属性是指一个segment应该包含多大时间间隔的数据,可以是: DAY, WEEK,HOUR , MINUTE...... 在这里,我们制定segment的粒度是HOUR。
    "dataSchema" : {
            "dataSource" : "realtime_kafka_to_druid",
            "parser" : {
              "type" : "string",
              "parseSpec" : {
                "format" : "json",
                "timestampSpec" : {
                  "format" : "auto",
                  "column" : "ts"
                },
                "dimensionsSpec" : {
                  "dimensions": [
                    "startIP",
                    { "name" : "startPort", "type" : "long" },
                    { "name" : "endIP", "type" : "string" },
                    { "name" : "endPort", "type" : "long" },
                    { "name" : "protocol", "type" : "string" }
                  ]
                }      
              }
            },
            "metricsSpec" : [
              { "type" : "count", "name" : "count" },
              { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
              { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
              { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" }
            ],
            "granularitySpec" : {
              "type" : "uniform",
              "segmentGranularity" : "HOUR",
              "rollup" : true
            }
          }
    
    ②.query granularity:查询的粒度通过queryGranularity配置在granularitySpec中,在这里我们使用minute粒度。
    "dataSchema" : {
            "dataSource" : "realtime_kafka_to_druid",
            "parser" : {
               "type" : "string",
               "parseSpec" : {
                   "format" : "json",
                   "timestampSpec" : {
                    "format" : "auto",
                    "column" : "ts"
                  },
                "dimensionsSpec" : {
                  "dimensions": [
                    "startIP",
                    { "name" : "startPort", "type" : "long" },
                    { "name" : "endIP", "type" : "string" },
                    { "name" : "endPort", "type" : "long" },
                    { "name" : "protocol", "type" : "string" }
                  ]
                }      
              }
            },
            "metricsSpec" : [
              { "type" : "count", "name" : "count" },
              { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
              { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
              { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" }
            ],
            "granularitySpec" : {
              "type" : "uniform",
              "segmentGranularity" : "HOUR",
              "queryGranularity" : "MINUTE"
              "rollup" : true
            }
          }
    
    ③.Define an interval:定义时间间隔,在这个时间间隔之外的数据将不会被处理。注意,这个参数设置只在批处理中(batch)。interval需要在 granularitySpec中指定。
    "granularitySpec" : {
       "intervals" : ["2019-01-17/2019-01-18"]
    }
    

    10.定义输入数据的数据源

    输入数据的数据源在ioConfig中指定,每个任务类型都有它自己的ioConfig。本文采用从kafka中获取数据,ioConfig配置如下:
    {
        "type" : "index",
        "spec" : {
          "dataSchema" : {
            "dataSource" : "realtime_kafka_to_druid",
            "parser" : {
              "type" : "string",
              "parseSpec" : {
                "format" : "json",
                "timestampSpec" : {
                  "format" : "auto",
                  "column" : "ts"
                },
                "dimensionsSpec" : {
                  "dimensions": [
                    "startIP",
                    { "name" : "startPort", "type" : "long" },
                    { "name" : "endIP", "type" : "string" },
                    { "name" : "endPort", "type" : "long" },
                    { "name" : "protocol", "type" : "string" }
                  ]
                }      
              }
            },
            "metricsSpec" : [
              { "type" : "count", "name" : "count" },
              { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
              { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
              { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" }
            ],
            "granularitySpec" : {
              "type" : "uniform",
              "segmentGranularity" : "HOUR",
              "queryGranularity" : "MINUTE",
              "rollup" : true
            }
          },
          "ioConfig": {
              "topic": "druid-topic-book",
              "replicas": 1,
              "taskDuration": "PT5M",
              "completionTimeout": "PT20M",
              "consumerProperties": {
              "bootstrap.servers": "host1:9092,host2:9092,host3:9092"
              }
          }
        }
    }
    

    11.tuningConfig-额外的配置

    每个摄入任务都有一个tuningConfig部分,让开发人员自行配置。在这里根据输入的数据源kafka来进行配置tuningConfig。type索引任务类型,此处是kafka 。reportParseExceptions默认是false,如果开启这个功能,当摄入数据过程中出现数据异常将会导致摄入数据停止。
    "tuningConfig": {
      "type": "kafka",
      "reportParseExceptions": false
    }
    

    12.下面是我们设置的摄入数据的规范。

    {
        "type" : "index",
        "spec" : {
          "dataSchema" : {
            "dataSource" : "realtime_kafka_to_druid",
            "parser" : {
              "type" : "string",
              "parseSpec" : {
                "format" : "json",
                "timestampSpec" : {
                  "format" : "auto",
                  "column" : "ts"
                },
                "dimensionsSpec" : {
                  "dimensions": [
                    "startIP",
                    { "name" : "startPort", "type" : "long" },
                    { "name" : "endIP", "type" : "string" },
                    { "name" : "endPort", "type" : "long" },
                    { "name" : "protocol", "type" : "string" }
                  ]
                }      
              }
            },
            "metricsSpec" : [
              { "type" : "count", "name" : "count" },
              { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
              { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
              { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" }
            ],
            "granularitySpec" : {
              "type" : "uniform",
              "segmentGranularity" : "HOUR",
              "queryGranularity" : "MINUTE",
              "rollup" : true
            }
          },
          "tuningConfig": {
              "type": "kafka",
              "reportParseExceptions": false
          },
          "ioConfig": {
              "topic": "druid-topic-book",
              "replicas": 1,
              "taskDuration": "PT5M",
              "completionTimeout": "PT20M",
              "consumerProperties": {
              "bootstrap.servers": "host1:9092,host2:9092,host3:9092"
              }
          }
        }
    }
    

    13.kafka的TuningConfig和IOConfig配置详情可以参考:

    http://druid.io/docs/0.12.3/development/extensions-core/kafka-ingestion.html
    
  • 提交我们的task,然后查询数据。

    1.需要在Overlord节点执行:
    curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/kafka-druid/kafka-index-day-roll-up.json http://host1:8090/druid/indexer/v1/supervisor
    
    2.此刻开启程序,往kafka的topic=druid-topic-book中发送数据,此代码不做重点。
    3.上面的步骤执行完之后,我们可以查看druid最终存入的数据。需要在broker节点执行。
    ①.rollup-select-sql.json内容,注意查询的DataSource名称
    {
       "query":"select * from \"realtime_kafka_to_druid\""
    }
    
    ② 执行
     curl -X 'POST' -H 'Content-Type:application/json' -d @rollup-select-sql.json http://host2:8082/druid/v2/sql
    
    ③最终存入druid中的数据:
     [
        {
              "__time": "2019-01-18T01:01:00.000Z",
              "bytes": 6000,
              "costTime": 4.9,
              "count": 3,
              "endIP": "2.2.2.2",
              "endPort": 3000,
              "packets": 60,
              "protocol": "6",
              "startIP": "1.1.1.1",
              "startPort": 2000
            },
            {
              "__time": "2019-01-18T01:02:00.000Z",
              "bytes": 9000,
              "costTime": 18.1,
              "count": 2,
              "endIP": "2.2.2.2",
              "endPort": 7000,
              "packets": 90,
              "protocol": "6",
              "startIP": "1.1.1.1",
              "startPort": 5000
            },
            {
              "__time": "2019-01-18T01:03:00.000Z",
              "bytes": 6000,
              "costTime": 4.3,
              "count": 1,
              "endIP": "2.2.2.2",
              "endPort": 7000,
              "packets": 60,
              "protocol": "6",
              "startIP": "1.1.1.1",
              "startPort": 5000
            },
            {
              "__time": "2019-01-18T02:33:00.000Z",
              "bytes": 30000,
              "costTime": 56.9,
              "count": 2,
              "endIP": "8.8.8.8",
              "endPort": 5000,
              "packets": 300,
              "protocol": "17",
              "startIP": "7.7.7.7",
              "startPort": 4000
            },
            {
              "__time": "2019-01-18T02:35:00.000Z",
              "bytes": 30000,
              "costTime": 46.3,
              "count": 1,
              "endIP": "8.8.8.8",
              "endPort": 5000,
              "packets": 300,
              "protocol": "17",
              "startIP": "7.7.7.7",
              "startPort": 4000
            }
          ]
    

>>初次记录学习过程,文章中如有错误或不妥之处,请留言!<<
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容