spark 应用程序性能优化
写在前面
本系列是综合了自己在学习spark过程中的理解记录 + 对参考文章中的一些理解 + 个人实践spark过程中的一些心得而来。写这样一个系列仅仅是为了梳理个人学习spark的笔记记录,所以一切以能够理解为主,没有必要的细节就不会记录了,而且文中有时候会出现英文原版文档,只要不影响理解,都不翻译了。若想深入了解,最好阅读参考文章和官方文档。
其次,本系列是基于目前最新的 spark 1.6.0 系列开始的,spark 目前的更新速度很快,记录一下版本号还是必要的。
1. 优化? Why? How? When? What?
在做 spark 应用程序的优化的时候,从下面几个点出发就够了:
- 为什么:因为你的资源有限,因为你的应用上生产环境了会有很多不稳定的因素,在上生产前做好优化和测试是唯一一个降低不稳定因素影响的办法;
- 怎么做:web ui + log 是做优化的倚天剑和屠龙刀,能掌握好这两点就可以了;
- 何时做:应用开发成熟时,满足业务要求时,就可以根据需求和时间安排开始做了;
- 做什么:一般来说,spark 应用程序 80% 的优化,都是集中在三个地方:内存,磁盘io,网络io。再细点说,就是 driver,executor 的内存,shuffle 的设置,文件系统的配置,集群的搭建,集群和文件系统的搭建[e.g 尽量让文件系统和集群都在一个局域网内,网络更快;如果可以,可以让 driver 和 集群也在一个局域网内,因为有时候需要从 worker 返回数据到 driver]
备注:千万不要一心想着优化都从程序本身入手,虽然大多数时候都是程序自己的原因,但在入手检查程序之前最好先确认所有的 worker 机器情况都正常哦。比如说机器负载,网络情况。
下面这张图来自 databricks 的一个分享 Tuning and Debugging Apache Spark,很有意思,说得非常对啊,哈哈。
2. repartition and coalesce
原文:
Spark provides the `repartition()` function, which shuffles the data across the network to create a new set of partitions. Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of `repartition()` called `coalesce()` that allows avoiding data movement, but only if you are decreasing the number of RDD partitions. To know whether you can safely call coalesce(), you can check the size of the RDD using `rdd.partitions.size()` in Java/Scala and `rdd.getNumPartitions()` in Python and make sure that you are coalescing it to fewer partitions than it currently has.
总结:当要对 rdd 进行重新分片时,如果目标片区数量小于当前片区数量,那么用 coalesce,不要用 repartition。关于 partition 的更多优化细节,参考 chapter 4 of Learning Spark
3. Passing Functions to Spark
In Python, we have three options for passing functions into Spark.
One issue to watch out for when passing functions is inadvertently serializing the object containing the function. When you pass a function that is the member of an object, or contains references to fields in an object (e.g., self.field), Spark sends the entire object to worker nodes, which can be much larger than the bit of information you need. Sometimes this can also cause your program to fail, if your class contains objects that Python can’t figure out how to pickle.
4. worker 的资源分配:cpu, memroy, executors
这个话题比较深,而且在不同的部署模式也不一样 [standalone, yarn, mesos],这里给不了什么建议。唯一的一个宗旨是,不要一昧考虑把所有资源都独立给到 spark 来用,要考虑到机器本身的一些进程,spark 依赖的一些进程,网络情况,任务情况 [计算密集,IO密集,long-live task]等。
这里只能推荐一些 video,slide 和 blog,具体情况具体分析,以后我遇到资源调优的时候再把实际案例发出来。
5. shuffle block size limitation
No Spark shuffle block can be greater than 2 GB — spark shuffle 里的 block size 不能大于 2g。
partition 的数量,partition 越多,分布到每个 partition 上的数据越少,越不容易导致 shuffle 数据过大;
数据分布不均匀,一般是 groupByKey 后,存在某几个 key 包含的数据过大,导致该 key 所在的 partition 上数据过大,有可能触发后期 shuflle block 大于 2g;
一般解决这类办法都是增加 partition 的数量,Top 5 Mistakes When Writing Spark Applications 这里说可以预计让每个 partition 上的数据为 128MB 左右,仅供参考,还是需要具体场景具体分析,这里只把原理讲清楚就行了,并没有一个完美的规范。
- sc.textfile 时指定一个比较大的 partition number
- spark.sql.shuffle.partitions
- rdd.repartition
- rdd.coalesce
TIPS:
在 partition 小于 2000 和大于 2000 的两种场景下,Spark 使用不同的数据结构来在 shuffle 时记录相关信息,在 partition 大于 2000 时,会有另一种更高效 [压缩] 的数据结构来存储信息。所以如果你的 partition 没到 2000,但是很接近 2000,可以放心的把 partition 设置为 2000 以上。
6. level of parallel – partition
先来看看一个 stage 里所有 task 运行的一些性能指标,其中的一些说明:
- Scheduler Delay: spark 分配 task 所花费的时间
- Executor Computing Time: executor 执行 task 所花费的时间
- Getting Result Time: 获取 task 执行结果所花费的时间
- Result Serialization Time: task 执行结果序列化时间
- Task Deserialization Time: task 反序列化时间
- Shuffle Write Time: shuffle 写数据时间
- Shuffle Read Time: shuffle 读数据所花费时间
partition 过小[容易引入 data skew 问题]
- Scheduler Delay: 无明显变化
- Executor Computing Time: 不稳定,有大有小,但平均下来比较大
- Getting Result Time: 不稳定,有大有小,但平均下来比较大
- Result Serialization Time: 不稳定,有大有小,但平均下来比较大
- Task Deserialization Time: 不稳定,有大有小,但平均下来比较大
- Shuffle Write Time: 不稳定,有大有小,但平均下来比较大
- Shuffle Read Time: 不稳定,有大有小,但平均下来比较大
partition 过大
- Scheduler Delay: 无明显变化
- Executor Computing Time: 比较稳定,平均下来比较小
- Getting Result Time: 比较稳定,平均下来比较小
- Result Serialization Time: 比较稳定,平均下来比较小
- Task Deserialization Time: 比较稳定,平均下来比较小
- Shuffle Write Time: 比较稳定,平均下来比较小
- Shuffle Read Time: 比较稳定,平均下来比较小
那应该怎么设置 partition 的数量呢?这里同样也没有专门的公式和规范,一般都在尝试几次后有一个比较优化的结果。但宗旨是:尽量不要导致 data skew 问题,尽量让每一个 task 执行的时间在一段变化不大的区间之内。
7. data skew
大多数时候,我们希望的分布式计算带来的好处应该是像下图这样的效果:
- Top 5 Mistakes When Writing Spark Applications
- Sparkling: Speculative Partition of Data for Spark Applications – Peilong Li
8. avoid cartesian operation
rdd.cartesian 操作很耗时,特别是当数据集很大的时候,cartesian 的数量级都是平方级增长的,既耗时也耗空间。
>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)]
9. avoid shuffle when possible
10. use reduceByKey instead of GroupByKey when possible
11. use treeReduce instead of reduce when possible
12. use Kryo serializer
spark 应用程序中,在对 RDD 进行 shuffle 和 cache 时,数据都是需要被序列化才可以存储的,此时除了 IO 外,数据序列化也可能是应用程序的瓶颈。这里推荐使用 kryo 序列库,在数据序列化时能保证较高的序列化效率。
sc_conf = SparkConf()
sc_conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
13. Next
这些都是一些实际实践中的经验和对一些高质量分享的总结[大多数是来自那些高质量分享],里面可能有说得不完全正确的地方,在未来亲自实践,调试过后会再有一篇性能调试的 blog 的,本篇仅供参考哦。下一次,我们来看看怎么统一部署和配置 spark 的 cluster,那的确几乎来自个人实践经验了。
via:Spark技术日报
End.