基于Structured Streaming的实时流ETL处理(Apache Spark 2.1)

我们正进入大数据时代,组织不断收集大量数据。 但是,这种数据洪峰的价值取决于及时提取可采取行动的见解的能力。 因此,越来越需要能够从海量数据摄取管道中获取实时可操作见解的连续应用程序。
但是,由于开发人员需要克服许多障碍,因此构建生产级连续应用程序可能会面临挑战,其中包括:
提供端到端的可靠性和正确性保证–长期运行的数据处理系统必须通过确保输出与批量处理的结果一致来抵抗故障。 此外,必须持续监控并自动缓解异常活动(例如上游组件的故障,流量高峰等),以确保实时提供高度可用的洞察力
执行复杂的转换–数据以多种格式(CSV,JSON,Avro等)到达,在使用之前,通常必须对其进行重组,转换和扩充。 这样的重组要求批处理系统中的所有传统工具都可用,且不会因为它们带来常见的额外延迟。
处理延迟或混乱的数据–处理物理世界时,延迟或混乱的数据是生活中不可或缺的事实。 因此,随着新信息的到来,聚合和其他复杂的计算必须不断地(准确地)进行修改。
与其他系统集成–信息来自各种来源(Kafka,HDFS,S3等),必须对其进行集成才能查看原貌。
Apache Spark中的Structured Streaming基于Spark SQL的强大基础,利用其强大的API提供无缝的查询接口,同时优化其执行引擎以实现低延迟,不断更新的答案。 这篇博客文章开始了一系列文章,其中我们将探索如何使用Apache Spark 2.1的新功能来克服上述挑战并建立我们自己的生产管道。
在第一篇文章中,我们将重点关注ETL管道,该管道可将原始AWS CloudTrail审计日志转换为JIT数据仓库,以实现更快的临时查询。 我们将展示如何轻松进行现有的批量ETL作业,然后使用Databricks中的结构化流将其作为实时流管道进行生产。 使用此管道,我们已将包含79亿条记录的380万个JSON文件转换为Parquet表,这使我们能够对更新后的Parquet表进行临时查询,其速度比原始JSON文件快10倍。
流式ETL的需求
提取,转换和加载(ETL)管道将原始的,非结构化的数据准备成可以轻松高效地查询的形式。 具体来说,他们需要能够执行以下操作:
过滤,转换和清理数据–原始数据自然是混乱的,需要清理以适合定义良好的结构化格式。 例如,将时间戳字符串解析为日期/时间类型以进行更快的比较,过滤损坏的数据,嵌套/取消嵌套/展平复杂结构以更好地组织重要的列等。
转换为更有效的存储格式–文本,JSON和CSV数据易于生成并且易于阅读,但查询成本很高。 将其转换为Parquet,Avro或ORC等更有效的格式可以减小文件大小并提高处理速度。
按重要列对数据进行分区–通过根据一个或多个列的值对数据进行分区,可以通过仅读取总数据集的相关部分来更有效地应对常见查询

传统上,ETL是作为定期批处理作业执行的。例如,实时转储原始数据,然后每隔几个小时将其转换为结构化形式以启用有效查询。我们最初是用这种方式设置系统的,但是这种技术导致了高延迟。我们不得不等待几个小时才能获得任何见解。在许多用例中,这种延迟是不可接受的。当帐户中发生可疑事件时,我们需要能够立即提出问题。等待几分钟到几小时可能会导致对事件的响应出现不合理的延迟。

幸运的是,结构化流可以轻松地将这些定期批处理作业转换为实时数据管道。流作业使用与批处理数据相同的API表示。此外,该引擎提供与定期批处理作业相同的容错和数据一致性保证,同时提供低得多的端到端延迟。

在其余的文章中,我们将详细介绍如何将AWS CloudTrail审计日志转换为有效的,分区的,镶木地板的数据仓库。 AWS CloudTrail通过将压缩的JSON日志文件交付到S3存储桶,使我们能够跟踪在各种AWS账户中执行的所有操作。这些文件可启用各种业务和任务关键型智能,例如成本归因和安全监控。但是,以其原始形式,即使具有Apache Spark的功能,它们的查询也非常昂贵。为了实现快速洞察,我们运行一个Continuous Application,将原始JSON日志文件转换为优化的Parquet表。让我们深入研究一下如何编写此管道。如果您想查看完整的代码,这里是Scala和Python笔记本。将它们导入Databricks并自行运行。
使用结构化流转换原始日志
我们首先根据CloudTrail文档定义JSON记录的架构。

val cloudTrailSchema = new StructType()
  .add("Records", ArrayType(new StructType()
    .add("additionalEventData", StringType)
    .add("apiVersion", StringType)
    .add("awsRegion", StringType)
    // ...

有关完整架构,请参见随附的笔记本。 这样,我们可以定义一个流数据框架,该数据框架表示正在写入S3存储桶的CloudTrail文件中的数据流。

val rawRecords = spark.readStream
  .schema(cloudTrailSchema)
  .json("s3n://mybucket/AWSLogs/*/CloudTrail/*/2017/*/*")

了解此rawRecords DataFrame表示的一个好方法是首先了解结构化流编程模型。 关键思想是将任何数据流都视为无界表:添加到该流中的新记录就像将行追加到表中一样。


image.png

这使我们可以将批处理和流数据都视为表。 由于表和DataFrame / Dataset在语义上是同义词,因此相同的类似批处理的DataFrame / Dataset查询可以应用于批处理和流数据。 在这种情况下,我们将转换原始的JSON数据,以便使用Spark SQL的内置支持来处理复杂的嵌套模式,从而更易于查询。 这是转换的精简版本。

val cloudtrailEvents = rawRecords 
  .select(explode($"records") as 'record)
  .select(
    unix_timestamp(
      $"record.eventTime", 
      "yyyy-MM-dd'T'hh:mm:ss").cast("timestamp") as 'timestamp, $"record.*")

在这里,我们将从每个文件加载的记录数组分解(拆分)为单独的记录。 我们还将每个记录中的字符串事件时间字符串解析为Spark的时间戳类型,并展平嵌套列以方便查询。 请注意,如果cloudtrailEvents是一组固定文件上的批处理DataFrame,则我们将编写相同的查询,并且仅将结果作为parsed.write.parquet(“ / cloudtrail”)编写一次。 相反,我们将启动一个StreamingQuery,该查询将连续运行以在新数据到达时对其进行转换。

val streamingETLQuery = cloudtrailEvents
  .withColumn("date", $"timestamp".cast("date") // derive the date
  .writeStream
  .trigger(ProcessingTime("10 seconds")) // check for files every 10s
  .format("parquet") // write as Parquet partitioned by date
  .partitionBy("date")
  .option("path", "/cloudtrail")
  .option("checkpointLocation", "/cloudtrail.checkpoint/")
  .start()

在这里,我们在开始之前为StreamingQuery指定以下配置。

从时间戳列导出日期
每10秒检查一次新文件(即触发间隔)
将经过解析的DataFrame转换后的数据作为Parquet格式的表写入路径/ cloudtrail。
按日期对Parquet表进行分区,以便以后可以高效地查询数据的时间片; 监视应用程序中的关键要求。
将检查点信息保存在路径/ checkpoints / cloudtrail中以实现容错(稍后在博客中进行说明)
就结构化流模型而言,这就是执行此查询的方式。


image.png

从概念上讲,rawRecords数据帧是仅追加的输入表,而cloudtrailEvents数据帧是转换后的结果表。换句话说,将新行追加到输入(rawRecords)时,结果表(cloudtrailEvents)将具有新的转换行。在这种情况下,Spark SQL引擎每10秒触发一次对新文件的检查。当发现新数据(即输入表中的新行)时,它将转换数据以在结果表中生成新行,然后将其写为Parquet文件。

此外,在运行此流查询时,您可以使用Spark SQL同时查询Parquet表。流查询以事务方式写入Parquet数据,以便并发交互式查询处理将始终看到最新数据的一致视图。这种有力的保证称为前缀完整性,它使结构化流传输管道与较大的持续应用程序很好地集成在一起。

您可以在我们以前的博客中阅读有关结构化流模型及其相对于其他流引擎的优势的更多详细信息。

解决生产挑战
先前,我们强调了在生产中运行流式ETL管道必须解决的许多挑战。 让我们看看在Databricks平台上运行的结构化流如何解决这些问题。

从故障中恢复以获得一次精确的容错保证
长时间运行的管道必须能够承受机器故障。 使用结构化流,实现容错就像为查询指定检查点位置一样容易。 在先前的代码段中,我们在下面的代码行中进行了此操作。

.option("checkpointLocation", "/cloudtrail.checkpoint/")

该检查点目录是针对每个查询的,并且在查询处于活动状态时,Spark会不断将已处理数据的元数据写入检查点目录。即使整个集群发生故障,也可以使用相同的检查点目录在新集群上重新启动查询,并持续进行恢复。更具体地说,在新集群上,Spark使用元数据在失败的查询停止的地方开始新查询,从而确保端到端的一次准确保证和数据一致性(请参见我们先前博客的“故障恢复”部分)。

此外,只要输入源和输出模式保持相同,此相同的机制就可以让您在重新启动之间升级查询。从Spark 2.1开始,我们将检查点数据编码为JSON,以确保将来的兼容性。因此,即使在更新Spark版本后,您也可以重新启动查询。在所有情况下,您将获得相同的容错和一致性保证。

请注意,Databricks使设置自动恢复非常容易,这将在下一部分中显示。

监控,警报和升级
为了使连续应用程序平稳运行,它必须对单个计算机甚至整个集群故障都具有鲁棒性。在Databricks中,我们与结构化流开发紧密集成,使我们能够连续监视流查询中的故障(并自动重新启动它们。您要做的就是创建一个新的Job,并配置Job retry策略。您还可以配置该Job发送电子邮件以通知您失败。


image.png

通过更新代码和/或Spark版本,然后重新启动Job,可以轻松进行应用程序升级。有关更多详细信息,请参见我们的在生产中运行结构化流的指南。

机器故障并不是确保稳健处理所需的唯一情况。在本系列的后面,我们将详细讨论如何监视流量高峰和上游故障。

将实时数据与历史/批次数据结合
许多应用程序要求将历史/批处理数据与实时数据结合在一起。例如,除了传入的审核日志外,我们可能已经有大量待转换的日志积压。理想情况下,我们希望既要实现这两个目标,又要尽快以交互方式查询最新数据,还可以访问历史数据以进行将来的分析。使用大多数现有系统来建立这样的管道通常很复杂,因为您将必须设置多个流程:批处理作业以转换历史数据,流传输管道以转换实时数据,也许还有另一步骤来组合结果。

结构化流技术消除了这一挑战。您可以配置上述查询,以便在使用空间集群功能处理旧文件时优先处理新数据文件到达时的处理。首先,我们将文件源的选项lastFirst设置为true,以便首先处理新文件。然后,我们设置maxFilesPerTrigger来限制每次要处理多少个文件。这将调整查询以更频繁地更新下游数据仓库,以便使最新数据可用于尽快查询。我们可以一起定义rawLogs DataFrame,如下所示:

val rawJson = spark.readStream
  .schema(cloudTrailSchema)
  .option("latestFirst", "true")
  .option("maxFilesPerTrigger", "20")
  .json("s3n://mybucket/AWSLogs/*/CloudTrail/*/2017/01/*")

这样,我们可以编写一个查询,轻松地将实时数据与历史数据结合在一起,同时确保低延迟,高效和数据一致性。

结论
Apache Spark中的结构化流是编写流式ETL管道的最佳框架,如上文所述,Databricks使在大规模生产中轻松运行它们成为可能。 我们共享了有关步骤的高层概述,包括提取,转换,加载和最终查询,以建立您的流式ETL生产管道。 我们还讨论并演示了结构化流技术如何克服在生产中解决和建立高容量和低延迟流技术管道的挑战。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354

推荐阅读更多精彩内容