IMPALA&HIVE大数据平台数据血缘与数据地图

https://www.freesion.com/article/1176553841/


impala数据血缘与数据地图系列:

1. 解析impala与hive的血缘日志

2. 实时采集impala血缘日志推送到kafka

---------------------------------解析impala与hive血缘日志-------------------------------------------------------------

IMPALA血缘:

CDH官方文档impala数据血缘:

https://docs.cloudera.com/documentation/enterprise/latest/topics/impala_lineage.html

在CM中找到该参数:

开启impala血缘,以及配置血缘日志路径及文件最大限制。

参数:lineage_event_log_dir

目录:每个impala daemon节点下 /var/log/impalad/lineage

需要注意的是这里只记录执行成功的脚本。

我这里使用的是CDH6.2版本,与CDH5的版本在日志记录的结构上有所区别,但区别不大。 




测试:使用impala-shell 指定impala daemon节点启动命令行,执行SQL命令,然后查看该daemon节点最新日志。

impala-shell -i uathd01

这里我创建一个视图:

然后到uathd01的节点看最新血缘日志:

把这段json串拿出来格式化一下看看:

queryText:执行的命令

queryId : impala的执行ID

hash: sql的hash

user:执行该命令的用户

timestamp: 开始时间戳

endTime:结束时间戳

edges: 记录每个source到target的映射关系,edgeType为PREDICATE的部分是所有source字段到所有的target字段id的映射,edgeType为PROJECTION的是每个source字段到每个target字段的映射,这里是多对一的关系,即如果有一个目标字段是由两个源字段处理得来的话,这里的sourceid和targetid就是一个多对一的关系,但如果是一个源字段处理出了两个目标字段,在这里仍旧是两个代码块。

vertices: 该SQL内所有的源和目标字段,与edges中的id一一对应。

{

"queryText":"create view vw_lineage_test5 as select acc.gid,acc.decrypt_name,ind.company_name from dl_nccp.account acc inner join dl_nccp.individual ind on acc.gid=ind.gid and acc.is_deleted='0' and acc.is_valid='0' limit 100",

"queryId":"1d435f512faba59e:adc0fa8c00000000",

"hash":"f1b6e2813084ca457ebb78292715144c",

"user":"hive@NOAHGROUPTEST.COM.LOCAL",

"timestamp":1586397809,

"endTime":1586397818,

"edges": [

{

"sources": [

1

],

"targets": [

0

],

"edgeType":"PROJECTION"

},

{

"sources": [

3

],

"targets": [

2

],

"edgeType":"PROJECTION"

},

{

"sources": [

5

],

"targets": [

4

],

"edgeType":"PROJECTION"

},

{

"sources": [

1,

6,

7,

8

],

"targets": [

0,

2,

4

],

"edgeType":"PREDICATE"

}

],

"vertices": [

{

"id":0,

"vertexType":"COLUMN",

"vertexId":"default.vw_lineage_test5.gid"

},

{

"id":1,

"vertexType":"COLUMN",

"vertexId":"dl_nccp.account.gid"

},

{

"id":2,

"vertexType":"COLUMN",

"vertexId":"default.vw_lineage_test5.decrypt_name"

},

{

"id":3,

"vertexType":"COLUMN",

"vertexId":"dl_nccp.account.decrypt_name"

},

{

"id":4,

"vertexType":"COLUMN",

"vertexId":"default.vw_lineage_test5.company_name"

},

{

"id":5,

"vertexType":"COLUMN",

"vertexId":"dl_nccp.individual.company_name"

},

{

"id":6,

"vertexType":"COLUMN",

"vertexId":"dl_nccp.account.is_deleted"

},

{

"id":7,

"vertexType":"COLUMN",

"vertexId":"dl_nccp.account.is_valid"

},

{

"id":8,

"vertexType":"COLUMN",

"vertexId":"dl_nccp.individual.gid"

}

]

}

HIVE:

 同impala类似,不再赘述,区别仅仅是日志的json格式以及记录的详细程度的区别。

 应用:

接下来就是如何使用这些血缘的日志,我们已经分析了impala血缘日志的结构,接下来只要使用日志采集工具filebeat或flume,logstash等工具采集每个impala daemon节点上的日志,然后对每个json串进行解析即可,后面的文章会演示如何实时采集impala血缘到kafka,消费kafka里的血缘数据处理后写入neo4j数据库内进行数据血缘数据地图的展示。


impala数据血缘与数据地图系列:

1. 解析impala与hive的血缘日志

2. 实时采集impala血缘日志推送到kafka

-----------------------------------------实时采集impala血缘日志推送到kafka-----------------------------------------------------

使用filebeat采集impala的血缘日志并推送到kafka

 采用filebeat的主要原因是因为轻量,对impala的血缘日志采集不需要进行数据过滤和格式转换,因此不需要使用flume或logstash这样占用资源较大的工具。

filebeat的安装及使用请参考官方手册:

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html

参数配置:

vim conf/filebeat_impala_lineage.yml

#=========================== Filebeat inputs =============================

filebeat.inputs:

-type:log

# Change to true to enable this input configuration.

enabled:true

# Paths that should be crawled and fetched. Glob based paths.

paths:

#这里指定impala血缘目录,会读取该目录下所有日志

-/var/log/impalad/lineage/*

#============================= Filebeat modules ===============================

filebeat.config.modules:

# Glob pattern for configuration loading

path:${path.config}/modules.d/*.yml

# Set to true to enable config reloading

reload.enabled:false

# Period on which files under path should be checked for changes

#reload.period: 10s

#===========kafka output===============

output.kafka:

#指定kafka的节点和topic

hosts:["uatka01:9092","uatka02:9092","uatka03:9092"]

topic:wyk_filebeat_impala_lineage_new_demo

required_acks:1

#output.console:

#  pretty: true

DEMO:

启动filebeat,注意每个机器上只能启动一个filebeat进程,因此上面的读取日志不要指定文件名。

$FILEBEAT_HOME/filebeat --c $FILEBEAT_HOME/conf/filebeat_impala_lineage.yml -e




启动kafka consumer:

./kafka-console-consumer.sh --bootstrap-server uatka01:9092,uatka02:9092,uatka03:9092 --topic wyk_filebeat_impala_lineage_new_demo --zookeeper uatka01:2181,uatka02:2181,uatka03:2181

启动impala-shell:

impala-shell -i uathd03

1. 在impala-shell内建一个视图:vw_lineage_test11

 2.查看impala lineage 日志文件,血缘已记录日志:

3. 查看filebeat控制台,已监听日志文件并写入kafka topic内:

4. 查看kafka consumer是否消费到该血缘记录:

流程结束:

impalaSQL--> impala血缘日志-->Filebeat-->Kafka

完成监控impala脚本并将血缘日志推送到kafka内。

后续只需要实时消费kafka里的信息即可。


impala数据血缘与数据地图系列:

1. 解析impala与hive的血缘日志

2. 实时采集impala血缘日志推送到kafka

3. 实时消费血缘记录写入neo4j并验证

-----------------------------------------实时消费impala血缘数据写入neo4j-----------------------------------------------------

前两篇介绍了如何采集impala和hive的血缘日志以及如何实时将该日志采集到kafka消息队列中,今天来介绍如何实时消费血缘日志并写入neo4j图数据库进行血缘的展现。

血缘记录Exactly Once

首先要保证filebeat采集的日志记录不会丢数据,因此需要在filebeat监控impala日志的yml配置文件中指定acks=-1,该参数保证kafka的Leader 在返回确认或错误响应之前,会等待所有同步副本都收到消息,因此吞吐量会降低,但血缘监控impala里的执行日志,一般来说我们在impala或hive中的脚本执行也不会有那么高频的查询提交。

filebeat.yml  [kafka参数部分]

#===========kafka output===============

output.kafka:

hosts:["uatka01:9092","uatka02:9092","uatka03:9092"]

topic:wyk_filebeat_impala_lineage_new_demo

required_acks:-1

消费的时候也要保证每条数据都准确的被消费到而不是在某一条失败后仍提交offset。因此我们需要在消费kafka的时候配置参数:

enable_auto_commit=false

以及消费每条记录成功之后执行

consumer.commit()

在neo4j中创建唯一约束

这里我使用"库名.表名"作为表的节点唯一标识,"库名.表名.列名"作为列的节点唯一标识。

使用如下命令创建neo4j唯一约束:

CREATECONSTRAINTON(n:IMPALA_TABLE) ASSERT n.nameISUNIQUE;

CREATECONSTRAINTON(n:IMPALA_TABLE_COLUMN) ASSERT n.nameISUNIQUE;

效果验证

开启kafka server -->开启filebeat--> 运行消费者程序 -->开启impala命令行 -->执行DML -->验证效果 -->执行DML -->验证效果

1. 开启kafka sever:

(在kafka篇章里有具体介绍)

$KAFKA_HOME/bin/kafka-server-start.sh -daemon$KAFKA_HOME/config/server.properties

2. 启动filebeat

(参数请参考前面两篇文章):

nohup$FILEBEAT_HOME/filebeat --c$FILEBEAT_HOME/conf/filebeat_impala_lineage_prod.yml -e >$FILEBEAT_HOME/nohup_out.file 2>&1 &

3. 运行消费者程序

执行消费程序.实时解析kafka的数据并写入neo4j.

4. 进入impala命令行

这里是演示,所以我指定其中一台impala节点,同样在第二步里的filebeat也是在该impala daemon节点执行的

impala-shell -i uathd03

impala-shell -i 指定filebeat监控的impala节点

5.执行DML 创建视图

vw_lineage_test,逻辑来源于表dl_nccp.account以及dl_nccp.individual表,字段有account表的gid和decrypt_name,branch_name以及individual表的company_name。

createviewvw_lineage_testas

selectacc.gid,acc.decrypt_name,ind.company_name ,acc.branch_name

fromdl_nccp.accountacc

innerjoindl_nccp.individual indonacc.gid=ind.gidandacc.is_deleted='0'andacc.is_valid='0';

6. 验证效果

使用match命令查看所有到节点'vw_lineage_test'的关系。

MATCHp=()-[r]->(d:IMPALA_TABLE)whered.name='default.vw_lineage_test'RETURNp

7.执行DML 修改视图逻辑

新增一个来源表dl_nccp.contact以及该表的字段telephone。

alterviewvw_lineage_test2as

selectacc.gid,acc.decrypt_name,ind.company_name,acc.branch_name,c.telephone

fromdl_nccp.accountacc

innerjoindl_nccp.individual indonacc.gid=ind.gidandacc.is_deleted='0'andacc.is_valid='0'

innerjoindl_nccp.contact conacc.gid = c.gid

7. 验证效果

可以看到新增的contact表以及telephone字段的关系已经实时更新到neo4j内

MATCHp=()-[r]->(d:IMPALA_TABLE)whered.name='default.vw_lineage_test'RETURNp

8. 最终效果

实现字段粒度的血缘分析以及表粒度血缘和业务来源库表的血缘 以及元数据的实时采集。

数据地图:

 元数据管理:



架构图:

如果想了解如何实现请参照前面几篇文章:

impala数据血缘与数据地图系列:

1. 解析impala与hive的血缘日志

2. 实时采集impala血缘日志推送到kafka

3. 实时消费血缘记录写入neo4j并验证

---------------------------------Impala血缘 架构图-----------------------------------------------------------

红色部分是用户会接触到的部分,绿色部分对于用户无感知。

解读:

1. impala是无主的MPP架构,因此用户每次SQL指定的impala节点就是主节点,当用户通过SQL或jdbc/odbc接口查询impala时,SQL命令首先 会发送到impala daemon节点,由该节点的QueryPlanner解析SQL成执行计划后发送给其他daemon节点分别计算各自的数据然后返回给该impala daemon节点。 所以我们只要在每台impala daemon节点部署filebeat并监控血缘日志即可。

2. 使用Filebeat监控impala血缘日志后发送到kafka集群指定的topic中;

3. 解析kafka内的血缘日志,将元数据(user,timestamp,id等信息),实体(表,字段),关系(表到表,字段到字段,字段到表)识别出来;

4. 将第三步里的结果存储进Neo4J;

5. 用户可以使用CQL或封装的接口对Neo4J里存储的impala血缘进行实时的查询;


功能介绍:

实时血缘:

建视图:逻辑如下

create view vw_lineage_test as

select acc.gid,acc.decrypt_name,ind.company_name ,acc.branch_name

from dl_nccp.account acc

inner join dl_nccp.individual ind on acc.gid=ind.gid and acc.is_deleted='0' and acc.is_valid='0';

修改视图逻辑:新增一个来源表contract以及该表的telephone字段

alter view vw_lineage_test as

select acc.gid,acc.decrypt_name,ind.company_name,acc.branch_name,c.telephone

from dl_nccp.account acc

inner join dl_nccp.individual ind on acc.gid=ind.gid and acc.is_deleted='0' and acc.is_valid='0'

inner join dl_nccp.contact c on acc.gid = c.gid

全类型血缘:

目前已实现字段到表,字段到字段,表到表,表到库级别的全类型血缘关系:

技术元数据管理:

实时更新数据字典、ETL任务元数据:

影响分析:

指定节点向后进行影响分析:

血缘分析:

指定节点向前进行血缘分析:

深度查询:

可指定血缘的查询深度:

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351