背景:
公司要启动项目要做一个实时数据系统,去替代现在在oracle上计算的那套东西全部搬到Hadoop平台上;调研了Druid、Hbase、Kudu,最后定下来方案使用Kudu。下面是基本
1. 公司的业务源端数据库是Oracle,实时同步数据的工具就是GoldenGate;
2. 资源的限制,Kafka集群没有挂nas盘,所以暂时我们用的是单机的,没办法,穷只有一个字我只说一次。
项目诉求:
1. 自适应:确保源端表在发生变化的时候,需要通过数据包字段来识别表结构是否发生变化,然后触发目标表变化和原来表数据备份;
2. 自动识别离线数据是否完成:因为我们这边离线数据通过调度平台来完成,不在实时这部分;
3. 源端ORACLE保持一致性
系统架构:
系统机构分离线同步和实时同步两部分:由于KAFKA分区的原因会造成少量的数据错乱,所以需要每天凌晨会把T+1离线数据同步过来覆盖掉原来的数据(以防有错)。
1. 离线部分:从已有的离线数据表,将关系型转成非关系型批量插入KUDU中;
2. 实时部分:在Oracle 源端装GoldenGate 的源端,目的端在一台装有KAFKA的机器上(图方便),KAFKA的CONSUMER直接消费OGG目的端过来的数据,需要做到检查、解析、新增/更新等操作;
2.1 OGG到KAFKA安装教程具体见(https://blog.csdn.net/clerk0324/article/details/73498266):
中间也踩了不少的坑,导致目的端的replicat启动不起来,这个时候需要检查OGG对应KAFKA配置是否配对了(KAFKA配的lib安装路、LD_LIBRARY_PATH、JAVA_HOME这些是否都存在);
2.2 KUDU 非关系型数据库特点(Kudu 自带的API ,研究的是这个文档 :https://github.com/apache/kudu/blob/master/python/kudu/client.pyx)
a. Kudu数据库所有的表必须有主键,且主键的类型只能是字符型(源端建表的习惯不好,主键中有int类型、还有字段不设置默认值,让后续代码加了很多逻辑去弥补这一麻烦,所以规范很重要!);
b. Kudu 在插入数据的时候需要所有数据项都要有值且非空;
c. 在更新Kudu 数据的时候需要按照主键去更新;
d. 为什么不用impala,其实刚开始使用的就是impala,因为实时解析部分是要一直插入、更新的,impala在大概第五分钟的时候就已经操作不了数据(进入假死状态),而且发现在进行复杂的SQL的时候,还没执行完就报错了,KUDU是支持高并发的但是impala不行,所以在解析程序中使用的都是Kudu 自带的API ;
e. 默认会在impala上建一个对应的外部表指向Kudu的同名表,impala应该是通过Kudu表的表名去链的,因为我要备份建新表的时候不重新指向一遍它是找不到新表的;
2.2 KAFKA 消费OGG目的端数据解析程序(本系统的最重要部分之一,还有一个最重要是离线批量数据由Hive->Kudu )
该段核心承担了系统实时部分主要功能:
a. 解析字段部分:这是最基本的功能,我们选择同步的字段类型是json,而不是用分隔符隔开的一个字符串,首先方便校验原表字段是否做了变化,可以让触发自适应去更新Kudu表结构;
b.过来的key是否在对应同步表的表结构中,若不在触发备份 - > 更新表结构 操作;
c. 判断离线数据是否同步成功:这里有两个最重要的逻辑:1. 如何判断离线数据是否同步成功;2. 离线数据同步是需要时间的,假设离线数据同步完花了T分钟,此时如果直接去覆盖实时表部分必然会将T分钟内更新的数据给覆盖掉;这两个问题解决,实时的数据才能和源端保持一致性;
这就是该项目大概梳理的逻辑部分;欢迎大家和我交流,也希望能有很多不同的新思路;