https://www.freesion.com/article/1176553841/
impala数据血缘与数据地图系列:
---------------------------------解析impala与hive血缘日志-------------------------------------------------------------
IMPALA血缘:
CDH官方文档impala数据血缘:
https://docs.cloudera.com/documentation/enterprise/latest/topics/impala_lineage.html
在CM中找到该参数:
开启impala血缘,以及配置血缘日志路径及文件最大限制。
参数:lineage_event_log_dir
目录:每个impala daemon节点下 /var/log/impalad/lineage
需要注意的是这里只记录执行成功的脚本。
我这里使用的是CDH6.2版本,与CDH5的版本在日志记录的结构上有所区别,但区别不大。
测试:使用impala-shell 指定impala daemon节点启动命令行,执行SQL命令,然后查看该daemon节点最新日志。
impala-shell -i uathd01
这里我创建一个视图:
然后到uathd01的节点看最新血缘日志:
把这段json串拿出来格式化一下看看:
queryText:执行的命令
queryId : impala的执行ID
hash: sql的hash
user:执行该命令的用户
timestamp: 开始时间戳
endTime:结束时间戳
edges: 记录每个source到target的映射关系,edgeType为PREDICATE的部分是所有source字段到所有的target字段id的映射,edgeType为PROJECTION的是每个source字段到每个target字段的映射,这里是多对一的关系,即如果有一个目标字段是由两个源字段处理得来的话,这里的sourceid和targetid就是一个多对一的关系,但如果是一个源字段处理出了两个目标字段,在这里仍旧是两个代码块。
vertices: 该SQL内所有的源和目标字段,与edges中的id一一对应。
{
"queryText":"create view vw_lineage_test5 as select acc.gid,acc.decrypt_name,ind.company_name from dl_nccp.account acc inner join dl_nccp.individual ind on acc.gid=ind.gid and acc.is_deleted='0' and acc.is_valid='0' limit 100",
"queryId":"1d435f512faba59e:adc0fa8c00000000",
"hash":"f1b6e2813084ca457ebb78292715144c",
"user":"hive@NOAHGROUPTEST.COM.LOCAL",
"timestamp":1586397809,
"endTime":1586397818,
"edges": [
{
"sources": [
1
],
"targets": [
0
],
"edgeType":"PROJECTION"
},
{
"sources": [
3
],
"targets": [
2
],
"edgeType":"PROJECTION"
},
{
"sources": [
5
],
"targets": [
4
],
"edgeType":"PROJECTION"
},
{
"sources": [
1,
6,
7,
8
],
"targets": [
0,
2,
4
],
"edgeType":"PREDICATE"
}
],
"vertices": [
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"default.vw_lineage_test5.gid"
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"dl_nccp.account.gid"
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"default.vw_lineage_test5.decrypt_name"
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"dl_nccp.account.decrypt_name"
},
{
"id":4,
"vertexType":"COLUMN",
"vertexId":"default.vw_lineage_test5.company_name"
},
{
"id":5,
"vertexType":"COLUMN",
"vertexId":"dl_nccp.individual.company_name"
},
{
"id":6,
"vertexType":"COLUMN",
"vertexId":"dl_nccp.account.is_deleted"
},
{
"id":7,
"vertexType":"COLUMN",
"vertexId":"dl_nccp.account.is_valid"
},
{
"id":8,
"vertexType":"COLUMN",
"vertexId":"dl_nccp.individual.gid"
}
]
}
HIVE:
同impala类似,不再赘述,区别仅仅是日志的json格式以及记录的详细程度的区别。
应用:
接下来就是如何使用这些血缘的日志,我们已经分析了impala血缘日志的结构,接下来只要使用日志采集工具filebeat或flume,logstash等工具采集每个impala daemon节点上的日志,然后对每个json串进行解析即可,后面的文章会演示如何实时采集impala血缘到kafka,消费kafka里的血缘数据处理后写入neo4j数据库内进行数据血缘数据地图的展示。
impala数据血缘与数据地图系列:
-----------------------------------------实时采集impala血缘日志推送到kafka-----------------------------------------------------
使用filebeat采集impala的血缘日志并推送到kafka
采用filebeat的主要原因是因为轻量,对impala的血缘日志采集不需要进行数据过滤和格式转换,因此不需要使用flume或logstash这样占用资源较大的工具。
filebeat的安装及使用请参考官方手册:
https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html
参数配置:
vim conf/filebeat_impala_lineage.yml
#=========================== Filebeat inputs =============================
filebeat.inputs:
-type:log
# Change to true to enable this input configuration.
enabled:true
# Paths that should be crawled and fetched. Glob based paths.
paths:
#这里指定impala血缘目录,会读取该目录下所有日志
-/var/log/impalad/lineage/*
#============================= Filebeat modules ===============================
filebeat.config.modules:
# Glob pattern for configuration loading
path:${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled:false
# Period on which files under path should be checked for changes
#reload.period: 10s
#===========kafka output===============
output.kafka:
#指定kafka的节点和topic
hosts:["uatka01:9092","uatka02:9092","uatka03:9092"]
topic:wyk_filebeat_impala_lineage_new_demo
required_acks:1
#output.console:
# pretty: true
启动filebeat,注意每个机器上只能启动一个filebeat进程,因此上面的读取日志不要指定文件名。
$FILEBEAT_HOME/filebeat --c $FILEBEAT_HOME/conf/filebeat_impala_lineage.yml -e
启动kafka consumer:
./kafka-console-consumer.sh --bootstrap-server uatka01:9092,uatka02:9092,uatka03:9092 --topic wyk_filebeat_impala_lineage_new_demo --zookeeper uatka01:2181,uatka02:2181,uatka03:2181
启动impala-shell:
impala-shell -i uathd03
1. 在impala-shell内建一个视图:vw_lineage_test11
2.查看impala lineage 日志文件,血缘已记录日志:
3. 查看filebeat控制台,已监听日志文件并写入kafka topic内:
4. 查看kafka consumer是否消费到该血缘记录:
impalaSQL--> impala血缘日志-->Filebeat-->Kafka
完成监控impala脚本并将血缘日志推送到kafka内。
后续只需要实时消费kafka里的信息即可。
impala数据血缘与数据地图系列:
-----------------------------------------实时消费impala血缘数据写入neo4j-----------------------------------------------------
前两篇介绍了如何采集impala和hive的血缘日志以及如何实时将该日志采集到kafka消息队列中,今天来介绍如何实时消费血缘日志并写入neo4j图数据库进行血缘的展现。
首先要保证filebeat采集的日志记录不会丢数据,因此需要在filebeat监控impala日志的yml配置文件中指定acks=-1,该参数保证kafka的Leader 在返回确认或错误响应之前,会等待所有同步副本都收到消息,因此吞吐量会降低,但血缘监控impala里的执行日志,一般来说我们在impala或hive中的脚本执行也不会有那么高频的查询提交。
filebeat.yml [kafka参数部分]
#===========kafka output===============
output.kafka:
hosts:["uatka01:9092","uatka02:9092","uatka03:9092"]
topic:wyk_filebeat_impala_lineage_new_demo
required_acks:-1
消费的时候也要保证每条数据都准确的被消费到而不是在某一条失败后仍提交offset。因此我们需要在消费kafka的时候配置参数:
enable_auto_commit=false
以及消费每条记录成功之后执行
consumer.commit()
这里我使用"库名.表名"作为表的节点唯一标识,"库名.表名.列名"作为列的节点唯一标识。
使用如下命令创建neo4j唯一约束:
CREATECONSTRAINTON(n:IMPALA_TABLE) ASSERT n.nameISUNIQUE;
CREATECONSTRAINTON(n:IMPALA_TABLE_COLUMN) ASSERT n.nameISUNIQUE;
开启kafka server -->开启filebeat--> 运行消费者程序 -->开启impala命令行 -->执行DML -->验证效果 -->执行DML -->验证效果
(在kafka篇章里有具体介绍)
$KAFKA_HOME/bin/kafka-server-start.sh -daemon$KAFKA_HOME/config/server.properties
(参数请参考前面两篇文章):
nohup$FILEBEAT_HOME/filebeat --c$FILEBEAT_HOME/conf/filebeat_impala_lineage_prod.yml -e >$FILEBEAT_HOME/nohup_out.file 2>&1 &
执行消费程序.实时解析kafka的数据并写入neo4j.
这里是演示,所以我指定其中一台impala节点,同样在第二步里的filebeat也是在该impala daemon节点执行的
impala-shell -i uathd03
impala-shell -i 指定filebeat监控的impala节点
vw_lineage_test,逻辑来源于表dl_nccp.account以及dl_nccp.individual表,字段有account表的gid和decrypt_name,branch_name以及individual表的company_name。
createviewvw_lineage_testas
selectacc.gid,acc.decrypt_name,ind.company_name ,acc.branch_name
fromdl_nccp.accountacc
innerjoindl_nccp.individual indonacc.gid=ind.gidandacc.is_deleted='0'andacc.is_valid='0';
使用match命令查看所有到节点'vw_lineage_test'的关系。
MATCHp=()-[r]->(d:IMPALA_TABLE)whered.name='default.vw_lineage_test'RETURNp
新增一个来源表dl_nccp.contact以及该表的字段telephone。
alterviewvw_lineage_test2as
selectacc.gid,acc.decrypt_name,ind.company_name,acc.branch_name,c.telephone
fromdl_nccp.accountacc
innerjoindl_nccp.individual indonacc.gid=ind.gidandacc.is_deleted='0'andacc.is_valid='0'
innerjoindl_nccp.contact conacc.gid = c.gid
可以看到新增的contact表以及telephone字段的关系已经实时更新到neo4j内
MATCHp=()-[r]->(d:IMPALA_TABLE)whered.name='default.vw_lineage_test'RETURNp
实现字段粒度的血缘分析以及表粒度血缘和业务来源库表的血缘 以及元数据的实时采集。
数据地图:
元数据管理:
架构图:
如果想了解如何实现请参照前面几篇文章:
impala数据血缘与数据地图系列:
1. 解析impala与hive的血缘日志
2. 实时采集impala血缘日志推送到kafka
3. 实时消费血缘记录写入neo4j并验证
---------------------------------Impala血缘 架构图-----------------------------------------------------------
红色部分是用户会接触到的部分,绿色部分对于用户无感知。
解读:
1. impala是无主的MPP架构,因此用户每次SQL指定的impala节点就是主节点,当用户通过SQL或jdbc/odbc接口查询impala时,SQL命令首先 会发送到impala daemon节点,由该节点的QueryPlanner解析SQL成执行计划后发送给其他daemon节点分别计算各自的数据然后返回给该impala daemon节点。 所以我们只要在每台impala daemon节点部署filebeat并监控血缘日志即可。
2. 使用Filebeat监控impala血缘日志后发送到kafka集群指定的topic中;
3. 解析kafka内的血缘日志,将元数据(user,timestamp,id等信息),实体(表,字段),关系(表到表,字段到字段,字段到表)识别出来;
4. 将第三步里的结果存储进Neo4J;
5. 用户可以使用CQL或封装的接口对Neo4J里存储的impala血缘进行实时的查询;
功能介绍:
实时血缘:
建视图:逻辑如下
create view vw_lineage_test as
select acc.gid,acc.decrypt_name,ind.company_name ,acc.branch_name
from dl_nccp.account acc
inner join dl_nccp.individual ind on acc.gid=ind.gid and acc.is_deleted='0' and acc.is_valid='0';
修改视图逻辑:新增一个来源表contract以及该表的telephone字段
alter view vw_lineage_test as
select acc.gid,acc.decrypt_name,ind.company_name,acc.branch_name,c.telephone
from dl_nccp.account acc
inner join dl_nccp.individual ind on acc.gid=ind.gid and acc.is_deleted='0' and acc.is_valid='0'
inner join dl_nccp.contact c on acc.gid = c.gid
全类型血缘:
目前已实现字段到表,字段到字段,表到表,表到库级别的全类型血缘关系:
技术元数据管理:
实时更新数据字典、ETL任务元数据:
影响分析:
指定节点向后进行影响分析:
血缘分析:
指定节点向前进行血缘分析:
深度查询:
可指定血缘的查询深度: