Spark调优指南
spark调优主要从配置、代码、数据格式、数据倾斜倾斜四大部分进行。
1 配置
1.1 资源分配
num-executors:executor的个数
executor-cores:cpu core 的两倍
executor-memory:每个executor的内存大小
driver-memory:driver的内存大小
1.2 并行度
spark.default.parallelism
spark.sql.partitions
repartition(num)
1.3 内存使用
spark.storage.memoryFraction:用于cache的内存比例
spark.shuffle.memoryFraction:shffule阶段的缓存占内存比例
2 代码
2.1 不要重复创建RDD
2.2 重复使用的RDD进行cache
2.3 使用高性能算子
mapPartition代替map
foreachPartition代替foreach
用reduceByKey代替groupByKey
2.4 filter以后使用coalesce减少小任务
2.5 广播大变量:sc.broadcast
3 数据
3.1 序列化
使用KryoSerializer代替Java序列化
3.2 文件格式
使用parquet文件格式,列式存储,读取效率高
4 倾斜
4.1 聚合(xxByKey)
造成倾斜的Key数量小且不重要
抽样+过滤
造成倾斜的Key数量多且重要
增加并行度
局部聚合+全局聚合
给每个Key加上前缀,聚合
对上步聚合结果的Key去前缀,聚合
4.2 连接
4.2.1 小表连接大表
将reduce join 转成map join
使用广播变量将小表数据进行广播
SparkSQL设置spark.sql.autoBroadcastJoinThreshold,默认10m
4.2.2 大表连接大表
造成倾斜的Key不多
对RDD1进行sample找出造成倾斜的Key
分别对RDD1和RDD2进行filter将其分成skewRDD1和commonRDD1以及skewRDD1和commonRDD2
然后对skewRDD1的key添加随机前缀n,对skewRDD2进行n倍扩容,然后join,再对结果的key进行前缀移除得到joinRDD1
将commonRDD1和commonRDD2进行连接,得到joinRDD2
joinRDD1.union(joinRDD2)
4.2.3 造成倾斜的Key多
对RDD1进行随机前缀n的添加
对RDD2进行n倍扩容
然后进行连接
进行随机前缀的移除处理得到结果
具体调优部分可以参考
Spark统一内存管理:UnifiedMemoryManager spark优化
spark executor的个数和并行任务的个数优化 spark优化