60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据

我是创始人李岩:很抱歉!给自己产品做个广告,点击进来看看。  

60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据

作者 | Sital Kedia, 王硕杰, Avery Ching

Apache Spark 于 2009 年在加州大学伯克利分校的 AMPLab 由 Matei Zaharia 发起,后来在2013 年贡献给 Apache。它是目前增长最快的数据处理平台之一,由于它能支持流、批量、命令式(RDD)、声明式(SQL)、图数据库和机器学习等用例,而且所有这些都内置在相同的 API 和底层计算引擎中。

— Sital Kedia, 王硕杰, Avery Ching

Facebook 经常使用数据驱动的分析方法来做决策。在过去的几年,用户和产品的增长已经需要我们的分析工程师一次查询就要操作数十 TB 大小的数据集。我们的一些批量分析执行在古老的 Hive[1] 平台( Apache Hive 由 Facebook 贡献于 2009 年)和 Corona[2] 上——这是我们定制的 MapReduce 实现。

Facebook 还不断增加其对 Presto 的用量,用于对几个包括 Hive 在内的内部数据存储的 ANSI-SQL 查询。我们也支持其他分析类型,比如图数据库处理graph processing和机器学习(Apache Giraph[3])和流(例如:Puma[4]、Swift[5] 和 Stylus[6])。

同时 Facebook 的各种产品涵盖了广泛的分析领域,我们与开源社区不断保持沟通,以便共享我们的经验并从其他人那里学习。Apache Spark[7] 于 2009 年在加州大学伯克利分校的 AMPLab 由 Matei Zaharia 发起,后来在2013 年贡献给 Apache。它是目前增长最快的数据处理平台之一,由于它能支持流、批量、命令式(RDD)、声明式(SQL)、图数据库和机器学习等用例,而且所有这些都内置在相同的 API 和底层计算引擎中。

Spark 可以有效地利用更大量级的内存,优化整个流水线pipeline中的代码,并跨任务重用 JVM 以获得更好的性能。最近我们感觉 Spark 已经成熟,我们可以在一些批量处理用例方面把它与 Hive 相比较。在这篇文章其余的部分,我们讲述了在扩展 Spark 来替代我们一个 Hive 工作任务时的所得到经验和学习到的教训。

用例:实体排名的特征准备

Facebook 会以多种方式做实时的实体entity排名。对于一些在线服务平台,原始特征值是由 Hive 线下生成的,然后将数据加载到实时关联查询系统。

我们在几年前建立的基于 Hive 的老式基础设施属于计算资源密集型,且很难维护,因为其流水线被划分成数百个较小的 Hive 任务。为了可以使用更加新的特征数据和提升可管理性,我们拿一个现有的流水线试着将其迁移至 Spark。

以前的Hive实现

基于 Hive 的流水线由三个逻辑阶段stage组成,每个阶段对应由 entity_id 划分的数百个较小的 Hive 作业,因为在每个阶段运行大型 Hive 作业job不太可靠,并受到每个作业的最大任务task数量的限制。

60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据

这三个逻辑阶段可以总结如下:

  1. 过滤出非产品的特征和噪点。
  2. 在每个(entity_id, target_id)对上进行聚合。
  3. 将表格分割成 N 个分片,并通过自定义二进制文件管理每个分片,以生成用于在线查询的自定义索引文件。

基于 Hive 的流水线建立该索引大概要三天完成。它也难于管理,因为该流水线包含上百个分片的作业,使监控也变得困难。同时也没有好的方法来估算流水线进度或计算剩余时间。考虑到 Hive 流水线的上述限制,我们决定建立一个更快、更易于管理的 Spark 流水线。

Spark实现

全量的调试会很慢,有挑战,而且是资源密集型的。我们从转换基于 Hive 流水线的最资源密集型的第二阶段开始。我们以一个 50GB 的压缩输入例子开始,然后逐渐扩展到 300GB、1TB,然后到 20TB。在每次规模增长时,我们都解决了性能和稳定性问题,但是实验到 20TB 时,我们发现了最大的改善机会。

运行 20TB 的输入时,我们发现,由于大量的任务导致我们生成了太多输出文件(每个大小在 100MB 左右)。在 10 小时的作业运行时中,有三分之一是用在将文件从阶段目录移动到 HDFS 中的最终目录。

起初,我们考虑两个方案:要么改善 HDFS 中的批量重命名来支持我们的用例,或者配置 Spark 生成更少的输出文件(这很难,由于在这一步有大量的任务 — 70000 个)。我们退一步来看这个问题,考虑第三种方案。

由于我们在流水线的第二步中生成的 tmp_table2 表是临时的,仅用于存储流水线的中间输出,所以对于 TB 级数据的单一读取作业任务,我们基本上是在压缩、序列化和复制三个副本。

相反,我们更进一步:移除两个临时表并整合 Hive 过程的所有三个部分到一个单独的 Spark 作业,读取 60TB 的压缩数据然后对 90TB 的数据执行重排shuffle和排序sort。最终的 Spark 作业如下:

60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据

对于我们的作业如何规划Spark?

当然,为如此大的流水线运行一个单独的 Spark 任务,第一次尝试没有成功,甚至是第十次尝试也没有。

据我们所知,从重排shuffle的数据大小来说,这是现实世界最大的 Spark 作业(Databrick 的 PB 级排序[8]是以合成数据来说)。我们对核心 Spark 基础架构和我们的应用程序进行了许多改进和优化使这个作业得以运行。这种努力的优势在于,许多这些改进适用于 Spark 的其他大型作业任务,我们将所有的工作回馈给开源 Apache Spark 项目 – 有关详细信息请参阅 JIRA。

下面,我们将重点讲述将实体排名流水线之一部署到生产环境所做的重大改进。

可靠性修复

处理频繁的节点重启

为了可靠地执行长时间运行作业,我们希望系统能够容错并可以从故障中恢复(主要是由于平时的维护或软件错误导致的机器重启所引发的)。虽然 Spark 设计为可以容忍机器重启,但我们发现它在足够强健到可以处理常见故障之前还有各种错误/问题需要解决。

60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据 60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据

文件。最重要的是,我们在 Spark 驱动程序中实现了一项功能,可以暂停执行任务调度,所以不会由于集群重启而导致的过多的任务失败,从而导致作业失败。

其他的可靠性修复

60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据 60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据 60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据

任务是很困难的。Spark 执行程序会内存溢出,因为排序程序(sorter)中存在导致无限增长的指针数组的漏洞。当不再有可用的内存用于指针数组增长时,我们通过强制将数据溢出到磁盘来修复问题。因此,现在我们可以每主机运行 24 个任务,而不会内存溢出。

性能改进

在实施上述可靠性改进后,我们能够可靠地运行 Spark 作业了。基于这一点,我们将精力转向与性能相关的项目,以充分发挥 Spark 的作用。我们使用 Spark 的指标和几个分析器来查找一些性能瓶颈。

我们用来查找性能瓶颈的工具

60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据 60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据

性能优化

60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据 60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据 60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据 60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据 60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据 60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据

在所有这些可靠性和性能改进之后,我们很高兴地报告,我们为我们的实体排名系统之一构建和部署了一个更快、更易于管理的流水线,并且我们提供了在 Spark 中运行其他类似作业的能力。

Spark流水线与Hive流水线性能对比

我们使用以下性能指标来比较 Spark 流水线与 Hive 流水线。请注意,这些数字并不是在查询或作业级别的直接比较 Spark 与 Hive ,而是比较使用灵活的计算引擎(例如 Spark)构建优化的流水线,而不是比较仅在查询/作业级别(如 Hive)操作的计算引擎。

CPU 时间:这是从系统角度看 CPU 使用。例如,你在一个 32 核机器上使用 50% 的 CPU 10 秒运行一个单进程任务,然后你的 CPU 时间应该是 32 * 0.5 * 10 = 160 CPU 秒。

60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据

CPU 预留时间:这是从资源管理框架的角度来看 CPU 预留。例如,如果我们保留 32 位机器 10 秒钟来运行作业,则CPU 预留时间为 32 * 10 = 320 CPU 秒。CPU 时间与 CPU 预留时间的比率反映了我们如何在集群上利用预留的CPU 资源。

当准确时,与 CPU 时间相比,预留时间在运行相同工作负载时可以更好地比较执行引擎。例如,如果一个进程需要 1 个 CPU 的时间才能运行,但是必须保留 100 个 CPU 秒,则该指标的效率要低于需要 10 个 CPU 秒而仅保留 10 个 CPU 秒来执行相同的工作量的进程。

我们还计算内存预留时间,但不包括在这里,因为其数字类似于 CPU 预留时间,因为在同一硬件上运行实验,而在 Spark 和 Hive 的情况下,我们不会将数据缓存在内存中。Spark 有能力在内存中缓存数据,但是由于我们的集群内存限制,我们决定类似与 Hive 一样工作在核心外部。

60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据

等待时间:端到端的工作流失时间。

60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的-36大数据

结论和未来工作

Facebook 的性能和可扩展的分析在产品开发中给予了协助。Apache Spark 提供了将各种分析用例统一为单一 API 和高效计算引擎的独特功能。

我们挑战了 Spark,来将一个分解成数百个 Hive 作业的流水线替换成一个 Spark 作业。通过一系列的性能和可靠性改进之后,我们可以将 Spark 扩大到处理我们在生产中的实体排名数据处理用例之一。

在这个特殊用例中,我们展示了 Spark 可以可靠地重排和排序 90 TB+ 的中间数据,并在一个单一作业中运行了 25 万个任务。 与旧的基于 Hive 的流水线相比,基于 Spark 的流水线产生了显着的性能改进(4.5-6 倍 CPU,3-4 倍资源预留和大约 5 倍的延迟),并且已经投入使用了几个月。

虽然本文详细介绍了我们 Spark 最具挑战性的用例,越来越多的客户团队已将 Spark 工作负载部署到生产中。 性能 、可维护性和灵活性是继续推动更多用例到 Spark 的优势。 Facebook 很高兴成为 Spark 开源社区的一部分,并将共同开发 Spark 充分发挥其潜力。

End.

转载请注明来自36大数据(36dsj.com): 36大数据 » 60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的

随意打赏

facebook messengerhadoop 大数据 sparkspark streamingfacebook数据中心apache sparkhadoop spark如何使用facebookfacebook最新数据facebook用户数据facebook 大数据
提交建议
微信扫一扫,分享给好友吧。