Feed实时数仓

实时数仓

背景

在早期数仓建设中,大多是以批处理的方式为基线进行开发,随着业务发展,需求对于时效性和准确性的要求越来越高,为能满足日益强烈的诉求,并且兼容批处理本身已有的基础建设,本文结合当前infra team所能支撑的前提下,对实时数仓构建的一些想法

现有方式的一些不足

lambda

概述

lambda

作为最为经典和广泛应用的设计,抽象如下:

  • 所有的数据需要分别写入批处理层和流处理层
  • 批处理持久化数据,并做预计算
  • 流处理作为速度层,对实时数据计算近似的 real-time view,作为高延迟 batch view 的快速补偿视图
  • 所有查询需要合并batch view 和 real-time view

优势

  • 在实时性和准确性上达到均衡,从而满足业务诉求
  • 批处理持久化存储,数据可重放,可扩展性和容错性较好
  • 流处理产出近似结果,时效性上得到满足

不足

  • 数据从源头上区分传输和存储,存在数据一致性问题
  • 两套计算逻辑,开发维护成本较大
  • 因为框架的天然劣势,实时只能是近似计算,需要T+1的批处理进行修正

kappa

概述
核心点在于去掉批处理的数据流向,全部用流处理代替,近些年也有依赖同DB的方式,即兼容OLAP和OLTP,统一数据流向

kappa

优势

  • 数据源头不一致问题得到改善,或者全部持久化到消息中间件(kafka为代表),或者consume消息中间件,持久化到兼容DB(Tidb为代表)
  • 服务层不一致问题得到改善,通过重放持久化的消息中间件或兼容DB,实现容错或可扩展目的

不足

  • 无法解决存储成本,以kafka为例,kafka的压缩能力有限,存储成本较大
  • 计算资源开销随着窗口步长的增加而飙升。例如计算月活或年报统计时,涉及到大状态的缓存,无论是持久化到kafka或兼容DB,可重放性极差,存在消费雪崩甚至无法消费的情况

kappa+

概述

核心点在于统一计算引擎,同套计算逻辑消费不同流向的数据,以flink为代表,无论是datastream/dataset,还是力推的flink sql,皆以此为目的

kappa+

优势

  • 计算过程不一致问题得到改善,以flink sql为例,流或批处理时,无需维护两套代码,由框架本身承接场景切换时的兼容逻辑

不足

  • 不能完全兼容hive sql/spark sql,已经成熟的数仓体系下,切换有较大成本
  • 流批sql同个function实际表达的功能可能不同
  • 在流join或大数据量去重上,开销或实现成本仍较大或较为困难

总结

可以看到,痛点主要有以下三点:

  • 数据源的流向和存储不同,从源头上不能一致
  • 计算逻辑或框架不同,计算过程不能一致
  • 计算结果需要合并,服务层不能一致

因此,迭代方案主要围绕以上三点进行优化

设计思路

我们假设数据源只包含业务库(以MySQL为代表)和用户埋点(Nginx Log)

如图所示,实时数仓的建设思路中,大量参考和绑定离线数仓的构建,包括数据链路,加工流程,交互的Services。重点强调三个”Same“

lambda+

Same Source

即流或批处理的源数据流向统一由consume kafka获得,通过at least once的机制,确保数据在消费阶段不会丢失

same source
  • At least once导致的消费重复,流、批作业需根据场景选择性的处理,比如ETL process的DB支持幂等,Application不需要任何处理。下游DB是append-only的,则在层级流转时,做额外的去重、排序处理
  • Producer的ack也是需要根据场景灵活调整。例如cdc类绝不允许丢失的情况(kafka极端丢失的case除外),可以给-1。针对埋点类数据量庞大的,为了保证吞吐,容忍一部分潜在的丢失风险,可以给1
  • Same source只确保流、批作业的original datasource一致,避免传统lambda的方式中,两套作业是完全独立的路径,为之后的数据一致性、补偿机制上带来大量的工作量
  • 针对cdc类数据,批处理则需要额外的row number来维护T+1的状态

Same SQL

即无论是流、批处理,所处理的(submit application)的引擎不同,语义可能也不同,但需要确保业务(加工)逻辑一致

same sql
  • 在主流引擎中,流、批处理统一用一套框架不是很现实。flink对批处理的兼容程度(例如大量的Hive/Spark built-in func不支持,对复杂数据类型支持较弱,读Hive表数据略复杂)不够。Spark对流处理,在state和event time上没有Flink优秀,因此还是lambda的方式,两套引擎对应两种场景
  • 尽量避免两套代码的开发,最大程度上直接复用/移植批处理的代码,即Spark sql = Flink sql[+udf]
  • 对于窗口聚合等语义不一致的点,需要确保业务逻辑的代码保持一致

Same DB[option]

即OLAP,OLTP两种模式的兼容

  • 终极目标是一套DB可以兼容。既能支持事务,有类cdc的通知机制,亦能支持多范围、多层次、大时间粒度的统计。例如OceanBase、TiDB
  • 在实际场景中,大多是数仓ODS或DWD层的DB统一。特点是开发或迁徙成本低,ETL process不复杂,强贴合Hive或Hadoop套件。例如Hudi,或流式写Hive(+presto/impala)
  • 实时数仓的DWS因为业务场景、业务需求的不同,几乎都是case by case的方案,很难做到一套框架+DB的统一

框架选择

根据上文提到的三个“Same”,框架选择则如下图所示

  • 业务库和埋点数据ingest到kafka
  • 计算统一使用Flink,且以SQL为主
框架选择

数仓分层

数仓分层

关键计算逻辑描述

计算UV

流内构建

bitmap

即在流内通过状态维护bitmap,相对于HyperLogLog可以做到精准去重,且资源开销在可接受范围内。如以下的示例:

StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.days(2))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .build();
 
 
ValueStateDescriptor<Roaring64Bitmap> bitmapDescriptor = new ValueStateDescriptor(
        "Roaring64Bitmap",
        TypeInformation.of(new TypeHint<Roaring64Bitmap>() {
        }));
 
 
 
bitmapDescriptor.enableTimeToLive(ttlConfig);
 
 
 
// ...
if (!bitmap.contains(uid)) {
    bitmap.addLong(uid);
    bitmapState.update(bitmap);
}

缺陷在于

  1. 纯flink sql的count distinct,state的实现与之不同,需要额外的udf
  2. 对于多个key而言,累计开销巨大。例如通常我们计算uv的维度组合不止一个
  3. 基于第2点,通常业务需求的时间范围或跨度都较大,例如当日实时uv,累计uv,新用户数等。state在不设置ttl的情况下,会导致cp时间过长,背压,甚至流量雪崩的情况
  4. 强依赖于state

将count distinct 转化为 count(1)

借助外部存储

通俗而言,就是不借助flink本身的state,而是靠外存来维护bitmap或其他结构,在query层完成最终的统计(或同时在外存中维护统计值)


bitmap sink hbase
api merge

好处在于flink除IO外存时,没有额外的开销,state也不大,并且因为外存的原因,保证at least once即可

缺陷在于

  1. 外存序列化/反序列化的IO开销
  2. 同样受多个key的限制
  3. 写压力 -> 读压力的转移,query的性能和并发受较大影响

query下沉

流任务只承担ETL的功能,保证数据不丢失。明细数据灌入例如doris/hudi等组件,在query层完成最终的聚合统计。优势在于解放流任务,将压力最终转移到query。缺陷依然是在大数据量、高并发的情况下,开销和查询性能的影响

宽表构建

流JOIN

flink joining,即在流中完成join操作。Flink具有多样的join语义,且易用性很高。主要的问题点在于,流内join时(无论是事实join事实,还是事实join维表,都在变化之中),无法判断数据是否全部到达。因为生产环境数据量的原因,不可能做全关联。窗口join则需要考虑数据延迟、是否回补,retrigger的机制等,开发和维护成本急剧上升。因此大多用于对时效性要求较高,一致性要求不强,业务逻辑不复杂的场景

流join
  • 全关联需要考虑大状态的资源开销、故障恢复的成本
  • 窗口join需要考虑数据延迟带来的额外成本,比如窗口等待的时长和时效性的取舍,迟到数据的回补等
  • 侧流数据没有解决迟到数据的回补,从另一种意义上而言,还会导致额外的数据重复(sink不幂等的话)

Source Union

多流union,按主键更新不同列,变相实现join的功能。优势在于,流处理仅仅是ETL,逻辑复杂度低。靠sink DB支持upsert和按列更新(或更新非空列)完成功能。但缺陷在于,依然没有解决迟到数据的retrigger(即什么时候算数据全部到达,补充完成,下游可以重新计算),并且一定程度上牺牲了时效性保障一致性


source union

Sink Union

分两种情况

1.事实流union,在DB不支持按列更新的前提下,在聚合前union case when 的方式聚合计算

sink union 1

2.事实流关联维表,在聚合完成后,query查询前,点查维表去format维度属性

sink union 2

与离线数仓联动和差异

数据补偿

在业务需求中,并不全是只依赖或计算增量数据,例如最近7天的订单数,支付金额等,又或者在流任务故障,需要从离线数仓回补时,都需要与存量数据交互。存量数据初始化或数据补偿的流程如图所示

利用Hudi提供的bulk_insert的write.operation(类似hbase的bulkload)将Hive表中的数据load到Hudi
任务结束后,增量任务调起。两者之间的records有一定堆叠,避免数据丢失


数据补偿

共享ODS、DIM[option]

将原本小时级/天级的ODS ETL任务改由流式sink,将速度提升至分钟/小时级

  • 加速原本T+1才能产出的业务需求
  • 确保流和批的数据源一致,除却加工逻辑和语义的不同外,不会有源数据所带来的数据不一致

数据延迟、乱序

数据丢失

维表JOIN

对于迟到数据的trigger

具体实施

主旨思想

  1. 对本身有窗口要求的业务需求,对实时敏感程度一般,要求一致性的业务需求,从流 -> 批转化
    例如每15分钟/每小时的当天uv,pv,gmv
  2. 对原本的T+1输出的业务需求,从天 -> 小时的转化,即流式写ODS,给下游层级做加速
  3. 对于无法规避的业务需求,例如对实时敏感程度高,即时trigger类需求,抽象流kafka(dwd逻辑表)
  4. 数据流向不宜过长,通常为ODS -> [DWD] -> DWS
  5. dws因业务需求各异,没有统一的方案输出

流DW

退维

ETL

公共逻辑下沉

Hudi的一些建议

index.type的选择

index type

table.type的选择

image.png

index.bootstrap.enabled的潜在问题

  • 参数为是否将已存在表中的记录的index load到flink state,默认为false
  • 索引导入是一个阻塞过程,在这期间无法完成cp
  • index bootstrap由输入数据触发,需要确保每个分区中至少有一条记录
  • index bootstrap是并发执行的,可以在日志文件中通过finish loading the index under partition以及Load record form file观察index bootstrap的进度
  • 第一个成功的checkpoint表明index bootstrap已完成。 从checkpoint恢复时,不需要再次加载索引
  • sink 多个hudi表时,需要将write.index_bootstrap.tasks set 为1,否则会出现部分index丢失的情况。sink 单hudi表时没有这个限制
  • exist hudi 表数据量极大时,index导入会失败

其他

  • cow表强依赖于flink cp,因为写放大的问题,通常建议调大cp的间隔和timeout,避免因cp失败导致的数据无法更新
  • flink on hudi不支持schema evolution,字段变更会频繁的变动任务
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容