IndexR根据网上手册,已经基本完成安装,做了基本的验证,对其有了初步的认识,从官网摘抄了些,加上点自己的东西, 就有了这篇文章。
1、IndexR如何工作
IndexR 严格来说不算是完备的系统,只是Drill一个插件,严重依赖于zookeeper、Drill和hive,对其版本也有严格要求,在安装过程中发现如果hive版本或drill版本有问题,会遇到各种奇怪问题,比如空指针(HIVE版本不对),比如通过drill查询卡死的问题(drill版本问题)。
IndexR和Drill关系
想了解IndexR,需要首先对Drill有所了解,Apache Drill是仿照Google一个一个软件开发的,实时查询工具,主要实现对各种数据源的快速查询,将普通的SQL进行解析优化,转换成具体的查询,可以查询各种数据源数据,比如HBASE、kudu、HIVE、JSON,paquent文件格式。
Drill 是使用内存方式,尽量不对磁盘缓存数据,所以速度比impla等速度要快一些。
本文介绍的IndexR即是Drill的一种数据源,在Drill叫Storage,也可以抽象看就是一种数据格式。
IndexR和HIVE的关系:
HIVE将HQL(类SQL)的查询语句转成MR任务或Spark任务去执行查询。HIVE不是一个查询引擎。
HIVE由于需要将查询转成MapReduce任务去执行,所以一般来说速度是比较慢。
IndexR主要利用HIVE的表分区功能,以插件形式在HIVE中存在,在建表的时候需要在HIVE中建立相应的外表,
存储格式是IndexR格式。
HIVE与DRILL
HIVE的容错性更强,但是也速度更慢更保守。DRILL更激进、更激进地获取资源,更专门地对SQL做优化,而且不需要那么多容错性保证(因为系统出错了大不了重新启动任务,如果整个处理时间更短的话,比如几分钟之内)。
这些系统让用户更快速地处理SQL任务,牺牲了通用性稳定性等特性。如果说MapReduce是大砍刀,砍啥都不怕,那上面Impla、DRILL就是剔骨刀,灵巧锋利,但是不能搞太大太硬的东西。
2、IndexR建表和入数据
1、根据json创建index表:(表里面设置从哪个kafka消费数据、topic、消费标示)
tools.sh -cmd settb -t tt -c example_schema.json
example_schema.json格式如下:
{
"schema":{
"columns":
[
{"name": "date", "dataType": "int"},
{"name": "d1", "dataType": "string"},
{"name": "m1", "dataType": "int"},
{"name": "m2", "dataType": "bigint"},
{"name": "m3", "dataType": "float", "default": "-0.1"},
{"name": "m4", "dataType": "double"}
]
},
"location": "/indexr/segment/test",
"mode": "vlt",
"agg":{
"grouping": true,
"dims": [
"date",
"d1"
],
"metrics": [
{"name": "m1", "agg": "sum"},
{"name": "m2", "agg": "min"},
{"name": "m3", "agg": "max"},
{"name": "m4", "agg": "first"}
]
}
}
解释如下:
schema
- [required] the schema of table.
columns
- defines all columns of table.location
- (v0.6.0+) the data path of table. It will be set to${indexr.fs.data.root}/segment/<tableName>
if omitted. Node that this option cannot be changed after table creation.mode
- [defaultvlt
] the segment mode. Supportsbasic
andvlt
.sort.columns
- [deprecated, useagg
instead]-
agg
- the pre-aggregation setting, default null.-
grouping
- [optional, default false] rollup the events or not. If true,dims
andmetrics
are required.
Rollup will merge those events with the same dimensions together, works like SQL
select `date`, d1, sum(m1), min(m2), max(m3), first(m4) from table group by `date`, d1
It can greatly reduce data size in OLAP scenario, since we don't care about indivatual event, but the whole pictures on dimension groups.
-
dims
- [required if grouping istrue
] the rollup dimensions. -
metrics
- [required if grouping istrue
] the rollup metrics. Currently supports aggregation functions:- sum
- min
- max
- first
- last
-
2、获取创建hive表的sql 去hive创建表
tools.sh -cmd hivesql -t tt -col dt
得到Hive创建表脚本:
CREATE EXTERNAL TABLE IF NOT EXISTS tabtest (
`date` int,
`d1` string,
`m1` int,
`m2` bigint,
`m3` float,
`m4` double
)
PARTITIONED BY (`dt` string)
ROW FORMAT SERDE 'io.indexr.hive.IndexRSerde'
STORED AS INPUTFORMAT 'io.indexr.hive.IndexRInputFormat'
OUTPUTFORMAT 'io.indexr.hive.IndexROutputFormat'
LOCATION '/indexr/segment/tabtest'
TBLPROPERTIES (
'indexr.segment.mode'='vlt',
'indexr.agg.grouping'='true',
'indexr.agg.dims'='date,d1',
'indexr.agg.metrics'='m1:sum,m2:min,m3:max,m4:first',
'indexr.index.columns'='d1'
)
;
3、开启kafka的消费点 ,设置几台就设置几个
IndexR的实时数据可以通过sparksql和kafka、HIVE方式进行入数据,kafa入数据的时候需要定义消费的节点,如下操作即是利用hostA和hostC两个IndexR节点来消费数据。
tools.sh -cmd listnode
$bin/tools.sh -cmd addrtt -t tt-host hostA,hostC
$ bin/tools.sh -cmd rttnode -t tt
hostA
hostC
4、实时获取kafka数据的表配置
{
"schema":{
"columns":
[
{"name": "date", "dataType": "int"},
{"name": "d1", "dataType": "string", "index": true},
{"name": "m1", "dataType": "int"},
{"name": "m2", "dataType": "bigint"},
{"name": "m3", "dataType": "float", "default": "-0.1"},
{"name": "m4", "dataType": "double"}
]
},
"mode": "vlt",
"agg":{
"grouping": true,
"dims": [
"date",
"d1"
],
"metrics": [
{"name": "m1", "agg": "sum"},
{"name": "m2", "agg": "min"},
{"name": "m3", "agg": "max"},
{"name": "m4", "agg": "first"}
]
},
"realtime":{
"name.alias": {
"date": "dt",
"m1": "m1_alias"
},
"tag.setting": {
"tag.field": "_tag_",
"accept.tags": ["a"],
"accept.none": false
},
"save.period.minutes": 20,
"upload.period.minutes": 60,
"max.row.memory": 500000,
"max.row.realtime": 10000000,
"fetcher": {
"type": "kafka-0.8",
"topic": "test_topic",
"number.empty.as.zero": false,
"properties": {
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms": "15000",
"zookeeper.session.timeout.ms": "40000",
"zookeeper.sync.time.ms": "5000",
"fetch.message.max.bytes": "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "true",
"auto.commit.interval.ms": "5000",
"group.id": "test_group"
}
}
}
}
5、kafka的数据格式
{"_tag_": "a,b", "date": 20160702, "d1": "mac", "m1": 100, "m2": 48224, "m3": 0.76}
或多条上面格式的数据用逗号分隔。
3、IndexR表管理
表更新
$> cd .../indexr-tool
$> bin/tools.sh -cmd settb -t test -c test_schema.json
描述表
$> bin/tools.sh -cmd dctb -t test
列出可用的IndexR的node
$> bin/tools.sh -cmd listnode
hostA
hostB
hostC
hostD
让特定节点上表开始提取数据
$> bin/tools.sh -cmd addrtt -t test -host hostA,hostC
OK
$> bin/tools.sh -cmd rttnode -t test
hostA
hostC</pre>
更新表模式
Use upcol.sh
in indexr-tool.
Stop any tasks like
rt2his
which could manipulate the data in hive table.Update the table schema in IndexR by editing
test_schema.json
and running scriptbin/tools.sh -cmd settb -t tes -c test_schema.json
. This only effects those updated columns, queries on untouched columned should work normally.Update the hive table schema by
ALTER TABLE <tb> ADD COLUMNS(...)
or simply dropping the old and creating a new one if your table is inEXTERNAL
mode.[optional] If your table have setup realtime ingestion, wait for those old segments to upload into storage system, i.e.
rt
folder in hive table path. The duration depends on how many realtime segments data on local IndexR nodes. Normally 20 minutes is safe enough.-
Update historical segments.
- Add columns
$> bin/upcol.sh -add -t test -col '[{"name": "m5", "dataType": "long", "value": "100"}]'
Here
value
could be a literal value, or a SQL, e.g.if((a > b), a + b, a / c)
. You can specify many columns. You can put the json param in a file and reference it by '@', e.g.-col @add_cols.json
.- Remove columns
$> bin/upcol.sh -del -t test -col m1,m2
- Alter columns
$> bin/upcol.sh -alt -t test -col '[{"name": "m1", "dataType": "string", "value": "cast(m1, string)"}]'
Those scripts run mapreduce jobs, and could take a while to complete, depends on the size of your data.
删除表
- Remove all realtime ingestion nodes of the table.
$> bin/tools.sh -cmd rttnode -t test
test:
-----------
hostA
hostB
$> bin/tools.sh -cmd rmrtt -t test -host hostA,hostB
OK
Wait for the realtime segments uploaded to HDFS. The duration depends on how many realtime segments data on local IndexR nodes. Normally 20 minutes is safe enough.
Remove segment files on HDFS and notify segment update.
$> hdfs dfs -rm -r ${indexr.fs.data.root}/segment/test
$> .../indexr-tool/bin/tools.sh -cmd notifysu -t test
Check if any data left behinded:
select count(*) from test
Remove IndexR table.
$> bin/tools.sh -cmd rmtb -t test
- Remove Hive table.
$> hive (default)> drop table test;
4、IndexR的实时数据搬迁
IndexR的实时数据在HIVE中是无法查询到的,IndexR在HDFS的格式如下:
$> bin/hdfs dfs -ls -R /indexr/segment/test
/indexr/segment/test/__UPDATE__
/indexr/segment/test/dt=20160701/
/indexr/segment/test/dt=20160701/000000_0
/indexr/segment/test/rt/
/indexr/segment/test/rt/rtsg.201607021725.e06ca84e-1d65-4f37-a799-b406d01dd10e.seg
/indexr/segment/test/rt/rtsg.201607021825.fop91s73-822s-nxk1-0091-pa1dcw9s1slk.seg</pre>
这可能会产生问题。 rt不是合法的表分区路径,更糟糕的是,它包含可能属于不同分区的行。 除非将这些数据转换为正确的分区,否则无法管理这些数据。
幸运的是,您可以使用Hive的动态分区插入。
将rt文件夹中的所有片段移至新文件夹,如rt2his。 并通知段更新。
hdfs dfs -mkdir / indexr / segment / test / rt2his
hdfs dfs -mv“/ indexr / segment / test / rt / *”“/ indexr / segment / test / rt2his /”
tools.sh -cmd notifysu -t test
创建另一个Hive表,例如 test_rt2his,位置指向table_path / rt2his路径。
CREATE EXTERNAL TABLE IF NOT EXISTS test_rt2his (
`date` int,
`d1` string,
`m1` int,
`m2` bigint,
`m3` float,
`m4` double
)
ROW FORMAT SERDE 'io.indexr.hive.IndexRSerde'
STORED AS INPUTFORMAT 'io.indexr.hive.IndexRInputFormat'
OUTPUTFORMAT 'io.indexr.hive.IndexROutputFormat'
LOCATION '/indexr/segment/test/rt2his'
;
- Insert rows from
test_rt2his
table into the realtest
table. e.g.
insert into table test partition (dt) select *, `date` from test_rt2his;
- Remove segments in
rt2his
folder, and notify updates.
hdfs dfs -rm -r "/indexr/segment/test/rt2his/*"
tools.sh -cmd notifysu -t test
5、 遇到的坑
坑一:数据丢失
测试实时导入数据的时候发现,写入kafka数据量为1千条,结果在drill中只可以查到50条,每增加1千条,drill中只会增加50条,网上问一位大佬,原来是默认的例子里面做了预聚合,就是下面的表结构决定的:
"agg":{
"grouping": true,
"dims": [
"date",
"d1"
],
"metrics": [
{"name": "m1", "agg": "sum"},
{"name": "m2", "agg": "min"},
{"name": "m3", "agg": "max"},
{"name": "m4", "agg": "first"}
]
},
预聚合会直接导致原始数据没有,只有聚合后的数据,你说坑不坑,咋办,解决办法是建普通表,预聚合就建预聚合表,我在想这个原来保存一份数据就行的,因为预聚合就导致了保存多份数据,这个很坑啊。
去掉mertrics配置将group改成false即可不聚合,当然速度可能就没这么快了。
坑二:可以不用Hive
如上文所述,一些实时数据是存在rt目录下面的,这个不符合hive的表结构形式,如果改成hive表结构形式还需要搬迁,整个过程一点都不高效,所以如果不需要通过hive进行管理的话,完全可以不用在hive建表,甚至可以不用hive,官网文档太让人误解了。
坑三:kafka版本问题
记住,无论你用什么kafka版本,IndexR目前只支持通过0.8版本的kafka消费者进行消费。也许以后会增加,至少在2018年6月1日的时候还只能用0.8版本的kafka消费者消费。
坑四:条数有时候存在不正确问题
通过查询发现,有时候表条数统计还存在在错误的情况,我一直入数据的,也未做任何聚合,数据有时候多有时候少,后续就稳定了,可能存在一些bug,如果你对这个数字要求非常严格,可能不要用它了。
6、性能简单测试
集群配置:8cpu、64G内存(drill用了3.5g)、普通磁盘配置
简单测试结果如下: