IndexR初步体验

IndexR根据网上手册,已经基本完成安装,做了基本的验证,对其有了初步的认识,从官网摘抄了些,加上点自己的东西, 就有了这篇文章。

1、IndexR如何工作

IndexR 严格来说不算是完备的系统,只是Drill一个插件,严重依赖于zookeeper、Drill和hive,对其版本也有严格要求,在安装过程中发现如果hive版本或drill版本有问题,会遇到各种奇怪问题,比如空指针(HIVE版本不对),比如通过drill查询卡死的问题(drill版本问题)。

IndexR和Drill关系
想了解IndexR,需要首先对Drill有所了解,Apache Drill是仿照Google一个一个软件开发的,实时查询工具,主要实现对各种数据源的快速查询,将普通的SQL进行解析优化,转换成具体的查询,可以查询各种数据源数据,比如HBASE、kudu、HIVE、JSON,paquent文件格式。
Drill 是使用内存方式,尽量不对磁盘缓存数据,所以速度比impla等速度要快一些。
本文介绍的IndexR即是Drill的一种数据源,在Drill叫Storage,也可以抽象看就是一种数据格式。

IndexR和HIVE的关系:

HIVE将HQL(类SQL)的查询语句转成MR任务或Spark任务去执行查询。HIVE不是一个查询引擎。
HIVE由于需要将查询转成MapReduce任务去执行,所以一般来说速度是比较慢。
IndexR主要利用HIVE的表分区功能,以插件形式在HIVE中存在,在建表的时候需要在HIVE中建立相应的外表,
存储格式是IndexR格式。

HIVE与DRILL

HIVE的容错性更强,但是也速度更慢更保守。DRILL更激进、更激进地获取资源,更专门地对SQL做优化,而且不需要那么多容错性保证(因为系统出错了大不了重新启动任务,如果整个处理时间更短的话,比如几分钟之内)。
这些系统让用户更快速地处理SQL任务,牺牲了通用性稳定性等特性。如果说MapReduce是大砍刀,砍啥都不怕,那上面Impla、DRILL就是剔骨刀,灵巧锋利,但是不能搞太大太硬的东西。

2、IndexR建表和入数据

1、根据json创建index表:(表里面设置从哪个kafka消费数据、topic、消费标示)

tools.sh -cmd settb -t tt -c example_schema.json
example_schema.json格式如下:

{
    "schema":{
        "columns":
        [
            {"name": "date", "dataType": "int"},
            {"name": "d1", "dataType": "string"},
            {"name": "m1", "dataType": "int"},
            {"name": "m2", "dataType": "bigint"},
            {"name": "m3", "dataType": "float", "default": "-0.1"},
            {"name": "m4", "dataType": "double"}
        ]
    },
    "location": "/indexr/segment/test",
    "mode": "vlt",
    "agg":{
        "grouping": true,
        "dims": [
            "date",
            "d1"
        ],
        "metrics": [
            {"name": "m1", "agg": "sum"},
            {"name": "m2", "agg": "min"},
            {"name": "m3", "agg": "max"},
            {"name": "m4", "agg": "first"}
        ]
    }
}

解释如下:

schema - [required] the schema of table.

  • columns - defines all columns of table.

  • location - (v0.6.0+) the data path of table. It will be set to ${indexr.fs.data.root}/segment/<tableName> if omitted. Node that this option cannot be changed after table creation.

  • mode - [default vlt] the segment mode. Supports basic and vlt.

  • sort.columns - [deprecated, use agg instead]

  • agg - the pre-aggregation setting, default null.

    • grouping - [optional, default false] rollup the events or not. If true, dims and metrics are required.

    Rollup will merge those events with the same dimensions together, works like SQL

     select `date`, d1, sum(m1), min(m2), max(m3), first(m4) from table group by `date`, d1
    
    

    It can greatly reduce data size in OLAP scenario, since we don't care about indivatual event, but the whole pictures on dimension groups.

    • dims - [required if grouping is true] the rollup dimensions.
    • metrics - [required if grouping is true] the rollup metrics. Currently supports aggregation functions:
      • sum
      • min
      • max
      • first
      • last

2、获取创建hive表的sql 去hive创建表

tools.sh -cmd hivesql -t tt -col dt

得到Hive创建表脚本:
CREATE EXTERNAL TABLE IF NOT EXISTS tabtest (
  `date` int,
  `d1` string,
  `m1` int,
  `m2` bigint,
  `m3` float,
  `m4` double
) 
PARTITIONED BY (`dt` string)
ROW FORMAT SERDE 'io.indexr.hive.IndexRSerde' 
STORED AS INPUTFORMAT 'io.indexr.hive.IndexRInputFormat' 
OUTPUTFORMAT 'io.indexr.hive.IndexROutputFormat' 
LOCATION '/indexr/segment/tabtest' 
TBLPROPERTIES (
'indexr.segment.mode'='vlt',
'indexr.agg.grouping'='true',
'indexr.agg.dims'='date,d1',
'indexr.agg.metrics'='m1:sum,m2:min,m3:max,m4:first',
'indexr.index.columns'='d1'
) 
;

3、开启kafka的消费点 ,设置几台就设置几个
IndexR的实时数据可以通过sparksql和kafka、HIVE方式进行入数据,kafa入数据的时候需要定义消费的节点,如下操作即是利用hostA和hostC两个IndexR节点来消费数据。

tools.sh -cmd listnode
$bin/tools.sh -cmd addrtt -t tt-host hostA,hostC
$ bin/tools.sh -cmd rttnode -t tt
hostA
hostC

4、实时获取kafka数据的表配置

{
    "schema":{
        "columns":
        [
            {"name": "date", "dataType": "int"},
            {"name": "d1", "dataType": "string", "index": true},
            {"name": "m1", "dataType": "int"},
            {"name": "m2", "dataType": "bigint"},
            {"name": "m3", "dataType": "float", "default": "-0.1"},
            {"name": "m4", "dataType": "double"}
        ]
    },
    "mode": "vlt",
    "agg":{
        "grouping": true,
        "dims": [
            "date",
            "d1"
        ],
        "metrics": [
            {"name": "m1", "agg": "sum"},
            {"name": "m2", "agg": "min"},
            {"name": "m3", "agg": "max"},
            {"name": "m4", "agg": "first"}
        ]
    },
    "realtime":{
        "name.alias": {
            "date": "dt",
            "m1": "m1_alias"
        },
        "tag.setting": {
            "tag.field": "_tag_",
            "accept.tags": ["a"],
            "accept.none": false
        },
        "save.period.minutes": 20,
        "upload.period.minutes": 60,
        "max.row.memory": 500000,
        "max.row.realtime": 10000000,
        "fetcher": {
            "type": "kafka-0.8",
            "topic": "test_topic",
            "number.empty.as.zero": false,
            "properties": {
                "zookeeper.connect": "localhost:2181",
                "zookeeper.connection.timeout.ms": "15000",
                "zookeeper.session.timeout.ms": "40000",
                "zookeeper.sync.time.ms": "5000",
                "fetch.message.max.bytes": "1048586",
                "auto.offset.reset": "largest",
                "auto.commit.enable": "true",
                "auto.commit.interval.ms": "5000",
                "group.id": "test_group"
            }
        }
    }
} 

5、kafka的数据格式

{"_tag_": "a,b", "date": 20160702, "d1": "mac", "m1": 100, "m2": 48224, "m3": 0.76}

或多条上面格式的数据用逗号分隔。

3、IndexR表管理

表更新

$> cd .../indexr-tool
$> bin/tools.sh -cmd settb -t test -c test_schema.json

描述表

$> bin/tools.sh -cmd dctb -t test

列出可用的IndexR的node

$> bin/tools.sh -cmd listnode
hostA
hostB
hostC
hostD

让特定节点上表开始提取数据

$> bin/tools.sh -cmd addrtt -t test -host hostA,hostC
OK
$> bin/tools.sh -cmd rttnode -t test
hostA
hostC</pre>

更新表模式

Use upcol.sh in indexr-tool.

  • Stop any tasks like rt2his which could manipulate the data in hive table.

  • Update the table schema in IndexR by editing test_schema.json and running script bin/tools.sh -cmd settb -t tes -c test_schema.json. This only effects those updated columns, queries on untouched columned should work normally.

  • Update the hive table schema by ALTER TABLE <tb> ADD COLUMNS(...) or simply dropping the old and creating a new one if your table is in EXTERNAL mode.

  • [optional] If your table have setup realtime ingestion, wait for those old segments to upload into storage system, i.e. rt folder in hive table path. The duration depends on how many realtime segments data on local IndexR nodes. Normally 20 minutes is safe enough.

  • Update historical segments.

    • Add columns
     $> bin/upcol.sh -add -t test -col '[{"name": "m5", "dataType": "long", "value": "100"}]'
    

    Here value could be a literal value, or a SQL, e.g. if((a > b), a + b, a / c). You can specify many columns. You can put the json param in a file and reference it by '@', e.g. -col @add_cols.json.

    • Remove columns
     $> bin/upcol.sh -del -t test -col m1,m2
    
    • Alter columns
     $> bin/upcol.sh -alt -t test -col '[{"name": "m1", "dataType": "string", "value": "cast(m1, string)"}]'
    

    Those scripts run mapreduce jobs, and could take a while to complete, depends on the size of your data.

删除表

  • Remove all realtime ingestion nodes of the table.
$> bin/tools.sh -cmd rttnode -t test
test:
-----------
hostA
hostB
$> bin/tools.sh -cmd rmrtt -t test -host hostA,hostB
OK

  • Wait for the realtime segments uploaded to HDFS. The duration depends on how many realtime segments data on local IndexR nodes. Normally 20 minutes is safe enough.

  • Remove segment files on HDFS and notify segment update.

$> hdfs dfs -rm -r ${indexr.fs.data.root}/segment/test
$> .../indexr-tool/bin/tools.sh -cmd notifysu -t test

  • Check if any data left behinded: select count(*) from test

  • Remove IndexR table.

$> bin/tools.sh -cmd rmtb -t test
  • Remove Hive table.
$> hive (default)> drop table test;

4、IndexR的实时数据搬迁

IndexR的实时数据在HIVE中是无法查询到的,IndexR在HDFS的格式如下:

$> bin/hdfs dfs -ls -R /indexr/segment/test
/indexr/segment/test/__UPDATE__
/indexr/segment/test/dt=20160701/
/indexr/segment/test/dt=20160701/000000_0
/indexr/segment/test/rt/
/indexr/segment/test/rt/rtsg.201607021725.e06ca84e-1d65-4f37-a799-b406d01dd10e.seg
/indexr/segment/test/rt/rtsg.201607021825.fop91s73-822s-nxk1-0091-pa1dcw9s1slk.seg</pre>

这可能会产生问题。 rt不是合法的表分区路径,更糟糕的是,它包含可能属于不同分区的行。 除非将这些数据转换为正确的分区,否则无法管理这些数据。

幸运的是,您可以使用Hive的动态分区插入。

将rt文件夹中的所有片段移至新文件夹,如rt2his。 并通知段更新。
hdfs dfs -mkdir / indexr / segment / test / rt2his
hdfs dfs -mv“/ indexr / segment / test / rt / *”“/ indexr / segment / test / rt2his /”
tools.sh -cmd notifysu -t test

创建另一个Hive表,例如 test_rt2his,位置指向table_path / rt2his路径。

CREATE EXTERNAL TABLE IF NOT EXISTS test_rt2his (
  `date` int,
  `d1` string,
  `m1` int,
  `m2` bigint,
  `m3` float,
  `m4` double
)
ROW FORMAT SERDE 'io.indexr.hive.IndexRSerde' 
STORED AS INPUTFORMAT 'io.indexr.hive.IndexRInputFormat' 
OUTPUTFORMAT 'io.indexr.hive.IndexROutputFormat' 
LOCATION '/indexr/segment/test/rt2his'
;
  • Insert rows from test_rt2his table into the real test table. e.g.
insert into table test partition (dt) select *, `date` from test_rt2his;

  • Remove segments in rt2his folder, and notify updates.
hdfs dfs -rm -r "/indexr/segment/test/rt2his/*"
tools.sh -cmd notifysu -t test

5、 遇到的坑

坑一:数据丢失

测试实时导入数据的时候发现,写入kafka数据量为1千条,结果在drill中只可以查到50条,每增加1千条,drill中只会增加50条,网上问一位大佬,原来是默认的例子里面做了预聚合,就是下面的表结构决定的:

 "agg":{
        "grouping": true,
        "dims": [
            "date",
            "d1"
        ],
        "metrics": [
            {"name": "m1", "agg": "sum"},
            {"name": "m2", "agg": "min"},
            {"name": "m3", "agg": "max"},
            {"name": "m4", "agg": "first"}
        ]
    },

预聚合会直接导致原始数据没有,只有聚合后的数据,你说坑不坑,咋办,解决办法是建普通表,预聚合就建预聚合表,我在想这个原来保存一份数据就行的,因为预聚合就导致了保存多份数据,这个很坑啊。
去掉mertrics配置将group改成false即可不聚合,当然速度可能就没这么快了。

坑二:可以不用Hive

如上文所述,一些实时数据是存在rt目录下面的,这个不符合hive的表结构形式,如果改成hive表结构形式还需要搬迁,整个过程一点都不高效,所以如果不需要通过hive进行管理的话,完全可以不用在hive建表,甚至可以不用hive,官网文档太让人误解了。

坑三:kafka版本问题

记住,无论你用什么kafka版本,IndexR目前只支持通过0.8版本的kafka消费者进行消费。也许以后会增加,至少在2018年6月1日的时候还只能用0.8版本的kafka消费者消费。

坑四:条数有时候存在不正确问题

通过查询发现,有时候表条数统计还存在在错误的情况,我一直入数据的,也未做任何聚合,数据有时候多有时候少,后续就稳定了,可能存在一些bug,如果你对这个数字要求非常严格,可能不要用它了。

6、性能简单测试

集群配置:8cpu、64G内存(drill用了3.5g)、普通磁盘配置
简单测试结果如下:


image.png
image.png
image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容