关注公众号:登峰大数据,阅读Spark实战第二版(完整中文版),系统学习Spark3.0大数据框架!
如果您觉得作者翻译的内容有帮助,请分享给更多人。您的分享,是作者翻译的动力!
本章涵盖了
使用缓存和检查点来提高Spark的性能
选择正确的方法来提高性能
收集性能信息
选择正确的代码位置来使用缓存或检查点
正确使用collect()和collectAsList()
Spark分析是快速的。它可以轻松地跨集群或个人电脑中的多个节点处理数据。Spark非常喜爱内存。这是Spark性能的关键设计。然而,随着数据集从用于开发应用程序的示例增长到生产数据集,您可能会感到性能正在下降。
在这一章中,你将获得一些关于Spark如何使用内存的基础知识。这些知识将帮助优化应用程序。
首先在具有虚拟数据的应用程序中使用缓存和检查点。这一步将帮助您更好地理解用于优化应用程序的各种模式。
然后,切换到一个具有真实数据的真实示例。在第二个实验中,您将对包含巴西经济信息的数据集运行分析操作。
最后,了解优化工作负载时的其他注意事项。我还分享了一些关于提高性能的提示以及进一步提高性能的方法。
实验
本章的例子可以在GitHub上找到:https://github.com/jgperrin/net.jgp.books.spark.ch16
16.1 缓存和检查点可以提高性能
在第一部分中,您将在Apache Spark的上下文中探讨缓存和检查点。然后,在虚拟数据上运行一个实验,用不使用缓存、使用缓存以及使用即时检查点和非即时检查点来执行一个过程。在此过程中,您还将学习如何收集性能数据,并以可视化表示查看收集到的数据。
Apache Spark提供了两种不同的技术来提高性能:
缓存,通过cache() 或者persist(),可以保存数据和数据血缘
检查点,通过checkpoint()来保存数据,而不需要保存数据血缘
数据血缘是数据的时间轴
在我小学的历史课上,老师会让我们在一个历史时间轴上工作,让每个学生在时间轴上写下一个历史事件。
对于数据,情况完全相同:您希望能够跟踪数据的历史和来源。如果数据有问题,您希望能够查明问题的来源。同样,如果您需要将应用程序添加到现有的架构中,数据血缘将帮助您选择最合适的位置插入这个新应用程序。
在一个公司中,数据血缘指示数据的来源、去向,以及在这个过程中要进行哪些修改。数据血缘在现代相对复杂的企业应用程序中是必不可少的,因为它保证了系统中有可信的数据。
在下图中的理论示例中,销售点终端生成数据,这些数据通过Perl应用程序传输到本地Informix数据库。本地数据复制到HQ数据库。在那里,一些应用程序可以使用聚合数据,包括Node.js微服务。有几个客户端正在使用这个微服务。
在Spark上下文中,数据血缘由有向无环图(DAG)表示。你在第4章学过DAG。下图表示第4章中使用的DAG。
在第4章中,您使用了实验#200,其中加载了一个初始数据集,复制了60次,基于表达式(计算平均值)创建了一列,复制了两列,并删除了三列。
数据血缘的示例:数据来自销售点终端,并整合到本地Informix数据库中,该数据库复制到HQ。然后,从这个数据库中提取部分数据,并存储在Node.js微服务使用的MySQL数据库中。
基于第4章实验#200的Spark数据血缘示例:加载数据集,然后添加和删除列。DAG存储了数据转换的每一步(在Catalyst优化有效转换之前)。
用户不能直接访问DAG,除非使用dataframe的explain()方法。它类似于MySQL或SQL Server的SQL EXPLAIN关键字、Oracle上的EXPLAIN PLAN或IBM Informix上的SET EXPLAIN,以显示RDBMS上SQL语句的执行计划。
16.1.1 Spark缓存的用途
让我们从理解缓存提供了什么以及它在Apache Spark环境中的工作方式开始。了解缓存的多个存储级别,从而可以对缓存进行微调。
缓存可以用来提高性能。缓存将把dataframe持久化到内存、磁盘或内存和磁盘的组合中。缓存还将保存数据的血缘。只有在需要从头重新构建数据集时,保存血缘才有用,如果集群中的一个节点发生故障,就会重新构建数据集。
Spark提供了两种缓存方法:cache()和persist()。它们的工作原理相同,不同之处是persist()允许指定希望使用的存储级别。当使用参数时,cache()等价于persist(StorageLevel.MEMORY_ONLY)。没有其他理由使用其中一种而不是另一种。使用persist()方法的可用存储级别如下:
MEMORY_ONLY--这是默认级别。它将把组成dataframe的RDD作为反序列化的Java对象存储在JVM中。如果RDD无法完全放到内存,Spark不会缓存部分分区:Spark将根据需要重新计算。
MEMORY_AND_DISK--与MEMORY_ONLY类似,不同之处是当Spark耗尽内存时,它会将RDD序列化到磁盘上。它比较慢,因为磁盘比较慢,但是性能取决于节点上的存储类(例如NVMe驱动器和机械驱动器)。
MEMORY_ONLY_SER--与MEMORY_ONLY类似,但Java对象是序列化的。这应该占用更少的空间,但是读取将消耗更多的CPU。
MEMORY_AND_DISK_SER--类似于序列化的MEMORY_AND_DISK。
DISK_ONLY--存储构成dataframe的RDD分区到磁盘。
OFF_HEAP--与MEMORY_ONLY_SER类似的行为,但它使用堆外内存。需要激活堆外使用(有关内存管理的更多细节,请参见16.1.3小节)。
MEMORY_AND_DISK_2、MEMORY_AND_DISK_SER_2、MEMORY_ONLY_2和MEMORY_ONLY _SER_2等价于不带_2的分区,但在两个集群节点上添加了对每个分区的复制。当需要复制数据以提高可用性时,可以使用这些设置。
您可以使用unpersist()来释放缓存,以及使用storageLevel()来查询dataframe的当前存储级别。unpersist()方法将清除缓存,无论您是通过cache()还是persist()创建的。当你不再使用这个dataframe时,你会清除缓存,这样你就可以释放内存来处理其他数据集。当dataframe没有被缓存/持久化时,storageLevel()返回StorageLevel.NONE。如果您不手动释放内存,它将在会话结束时被清除,这样会使您的内存无法用于更多的数据或处理。
16.1.2 Spark检查点的有效性
检查点是提高Spark性能的另一种方法。在本小节中,您将了解什么是检查点,可以执行什么样的检查点,以及它与缓存有什么不同。
checkpoint()方法将截断DAG(或逻辑计划),并将DataFrame的内容保存到磁盘。DataFrame保存在检查点目录中。检查点目录没有默认值:它必须使用SparkContext的setCheckpointDir()设置,如清单16.1所示。如果没有设置检查点目录,检查点将失败,应用程序将停止。
检查点可以是即时的,也可以是惰性的。默认将立即创建检查点。如果对checkpoint()方法使用false,则在调用操作时将创建检查点。您的使用将根据目标而有所不同:即时地(或现在)构建检查点需要预先花费时间,但随后您可以以一种更有效的方式使用检查点DataFrame。如果你可以等待,你的检查点将会在一个Action需要时创建;检查点的可用性可能无法预测。
要了解更多关于逻辑计划和优化器的信息,请参见Michael Armbrust等人的“Deep Dive into Spark SQL’s Catalyst optimizer( http://mng.bz/gVMZ ).。
16.1.3 使用缓存和检查点
在本小节中,您将通过一个简单的应用程序来实践缓存和检查点。在这个场景中,您将收集性能度量,并查看性能的不同之处。
在实验#100中,将比较缓存和检查点数据之间的性能差异。为了获得相对精确的度量,您需要使用类似的数据集。做到这一点的最佳方法是使用记录生成器:它将消除文件中可能存在的差异,并且可以更精确地定义关键属性,如记录数量、字段和数据类型。加载后,dataframe将包含书籍、作者、评级、出版年份(过去25年内)和语言。表16.1显示了记录的结构和来自数据生成器的一些随机值。
表16.1包含作者姓名、书名、等级、出版年份、语言的图书记录(查看表图)
首先,你只保留(或筛选)评分为5分的书籍(这是一个很好的提示,你可以在曼宁网站和亚马逊上给这本书打分)。从这个“top books,”dataframe中,你可以计算出每年每种语言的图书数量。图16.1演示了该操作。
不要太兴奋:如果你想知道法国作家得到的五星评分是比巴西作家多还是少,这是生成的数据,所以分布基本相同。如果您想执行更复杂的书籍分析,请查看lab #900和存储库中的Goodreads数据集。
图16.1转换的可视化表示:如果你没有在过滤器之后放一个缓存或检查点,过滤器每次都会被重新计算。
应用程序的结果应该如下所示:
...1995... 1337 ProcessingtimesWithoutcache ............... 3618 msWithcache .................. 2559 msWithcheckpoint ............. 1860 ms With non-eager checkpoint ... 1420 ms
代码入口位于net.jgp.books.spark.ch16.lab100 _cache_checkpoint 包中的CacheCheckpointApp类(提醒一下,这里是100号实验)。
//译者注:在WIndows操作系统中本地运行spark程序写文件操作时,报以下错误:....(null) entryincommandstring:nullchmod0644..(后面是目的目录)下载hadoop.dll文件并拷贝到c:\windows\system32目录中然后重新运行代码程序即可