spark常用性能调优 spark性能优化实战

spark的优化其实说起来很多,那常用的部分主要由哪些呢。

下面是实际项目优化的记录:资源分配、重构RDD架构以及RDD持久化、gc OOM异常、Shuffle调优等

在实际项目中分配更多资源

分配更多资源是性能调优的王道,就是增加和分配更多的资源,性能和速度上的提升是显而易见的;基本上,在一定范围内,增加资源和性能的提升是成正比的;写完一个spark程序后,进行性能调优的时候,首先第一步,就是调节最优的性能配置;在这个基础之上,如果说你的spark作业,能够分配的资源达到了你的能力范围的顶端之后,无法再分配更多的资源了,公司资源有限;那时候才是考虑去做后面的这些性能调优的点。

分配哪些资源?

executor、cpu per executor、memory per executor、driver memory

在哪里分配这些资源?

在提交spark作业时,调整对应参数:

/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \  配置executor的数量
--driver-memory 100m \  配置driver的内存(影响不大)
--executor-memory 100m \  配置每个executor的内存大小
--executor-cores 3 \  配置每个executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

调节到多大,才算是最大?

第一种

Spark standalone,公司集群上,搭建了一套spark集群,我们应该清楚的知道每台机器的配置。我们在设置的时候,就根据实际的cpu、内存情况,去调节每个spark节点的资源分配。
例如:有20台机器,每台有2个cpu和4G内存,那么如果配置20个executor,那么每个executor内存分配4G和2个cpu。

第二种

yarn,应该去查看spark作业提交到的资源队列大概有多少资源。
比如:如果有500G内存和100个cpu,那么如果分配50个executor,那么每个executor分配的cpu core为2个。

为什么调节了资源之后,性能可以提升?

增加executor的数量:

如果executor的数量比较少,那么能够并行执行的task数量就比较少,就意味着我们的application并行执行能力很弱。
​比如:有3个executor,每个executor有2个cpu core,它同时能够并行执行6个task。如果增加executor的数量到10个、50个甚至100个,那并行能力就变成了原先的数倍、数十倍。相应的,性能(执行的速度)也就提升了数倍~数十倍。

增加每个executor的内存量:

1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写磁盘,减少了磁盘IO。
2、对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不足,也会写入磁盘。这时如果有充足的内存,就会有更少的数据写入磁盘,甚至不需要写入磁盘,减少了磁盘IO,提升了性能。
3、对于task的执行,可能会创建很多的对象,如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC、垃圾回收,minor GC和full GC。(速度很慢)。内存加大以后,带来更少的GC,速度变快了。

增加每个executor的cpu core:

增加cpu core,也是增加了执行的并行能力。原本20个executor,每个executor2个cpu core。能够执行的task数量只有40个。但是如果将每个executor的cpu数量增加到5个,能够并行的task数量,就是100个。执行的速度提升了2.5倍。

在实际项目中调整并行度

spark application提交之后,会生成一个或者多个job(每个action操作都会生成一个job),每个job会生成一些stage(发生shuffle的时候会拆分出来),每个stage会产生多个task,这些task会在executor上面执行。​

并行度

并行度其实就是指,spark作业中,各个stage的task数量,也就代表了spark作业在各个stage(阶段)中的并行度。
配置参数:spark.default.parallelism

调整原则:

1、task数量,至少设置成与spark application和总cpu core数量相同。
例如:总共有150个cpu core,那么最少需要设置150个task,避免有cpu core空闲。
2、官网建议task数量是cpu core的2~3倍,按上面的例子也就是要设置300~500。
由于在实际情况中,每个task运行的时间是不一样的,有的可能10s就完成了,有的需要1分钟才完成,这时候就会有大量的cpu core闲着,等那些执行慢的task,导致了资源的浪费。如果设置成2~3倍,那先执行完成的cpu core就会去执行其他的,就尽量让cpu core不要空闲,提升性能。而且task数量多,每个task处理的数据量就会变小。

重构RDD架构以及RDD持久化

RDD架构重构和优化:

尽量去复用RDD,差不多的RDD,可以抽取成一个共同的RDD,供后面的RDD计算时,反复使用。

公共的RDD一定要实现持久化:

对于多次计算和使用的公共的RDD,一定要进行持久化。

持久化是可以序列化的:

持久化方式的选择:首先使用纯内存的持久化方式;如果出现OOM内存溢出,则采用纯内存+序列化的方法;如果依然存在OOM异常,则使用内存+磁盘的方式(无序列化);再其次使用,内存+磁盘+序列化的方式;最次的是仅使用磁盘

数据高可靠性

为了数据的高可靠性,如果内存充足,可以使用双副本机制进行持久化。
也就是后缀带_2的策略,如:StorageLevel.MEMORY_ONLY_2()

广播大变量

没有广播变量的缺点:

默认情况下,task使用到了外部变量,那么每个task都会获取一份外部变量的副本,会占用不必要的内存消耗,导致在Rdd持久化时不能写入到内存,只能持久化到磁盘,增加了IO读写操作。
同时,task创建对象时,内存不足,进行频繁的GC操作,降低效率。

使用广播变量的好处:

1、广播变量不是每个task保存一份,而是每个executor保存一份。这样的话,就可以让变量产生的副本大大减少。
2、广播变量初始化时,在Driver上生成一份副本,task运行时需要用广播变量中的数据,首先会在本地的Executor对应的BlockManager中尝试获取变量副本;如果本地没有,那么就会从Driver远程拉取变量副本,并保存在本地的BlockManager中;以后这个Executor中的task使用到的数据都从本地的BlockManager中直接获取。
3、Executor中的BlockManager除了从远程的Driver中拉取变量副本,也可以从其他节点的BlockManager中拉取数据,距离越近越好。

使用Kryo序列化

使用Kryo序列化的好处:

1、 默认情况,spark使用的是java的序列化机制,ObjecOutputStream/ObjectInputStream,对象输入输出流机制,来进行序列化。
2、该序列化的好处是使用方便,但是必须实现Serializable,缺点是效率低,速度慢,序列化后占用空间大。
3、KryoSerilizable序列化机制,效率高,速度快,占用空间小(只有java序列化的1/10),可以减少网络传输。

Kryo序列化生效为止及效果:

1、算子函数中使用到的外部变量:使用kryo后,可以优化网络传输的性能,可以优化集群中内存的占用和消耗。
2、持久化RDD,优化内存的占用和消耗;持久化RDD占用的内存越少,task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC。
3、shuffle:可以优化网络传输的性能。

使用方法:

//配置使用kryo进行序列化
conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
//注册自定义的类型使用KryoSerializer序列化
.registerKryoClasses(new Class[]{CategorySortKey.class})

使用fastutil优化数据格式

fastutil是什么:

Fastutil扩展了java标准的集合框架,占用内存更小,存取速度更快,还提供了双向迭代器,并对引用类型使用等号(=)进行比较。

使用场景:

1、fastutil尽量提供了在任何情况下都是速度最快的集合框架。
2、如果算子使用了外部变量;那么第一,你可以使用Broadcast广播变量优化;第二,可以使用Kryo序列化,提升序列化性能和效率;第三,如果外部变量是某种比较大的集合,可以考虑使用fastutil改写外部变量,首先从源头上就减少内存的占用,通过广播变量进一步减少内存占用,再通过Kryo序列化进一步减少内存占用。
3、在你的算子函数里,也就是task要执行的计算逻辑里面,如果有逻辑中出现要创建比较大的Map、List集合,可能会占用较大的内存空间,而且可能涉及到消耗性能的遍历、存取等集合操作;那么此时,可以考虑使用fastutil类库进行重写,使用fastutil在一定程度上可以减少task创建出来的集合类型的内存占用。避免executor内存频繁占满,频繁GC,导致性能下降。

调节数据本地化等待时长

本地化级别:

PROCESS_LOCAL:进程本地化,代码和数据在同一个进程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中;性能最好
NODE_LOCAL:节点本地化,代码和数据在同一个节点中;比如说,数据作为一个HDFS block块,就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不同executor中;数据需要在进程间进行传输
NO_PREF:对于task来说,数据从哪里获取都一样,没有好坏之分
RACK_LOCAL:机架本地化,数据和task在一个机架的两个节点上;数据需要通过网络在节点之间进行传输
ANY:数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差

为什么要调整本地化时长?

Spark在Driver上,对Application的每一个stage的task,进行分配之前,都会计算出每个task要计算的是哪个分片数据,RDD的某个partition;Spark的task分配算法优先会希望每个task正好分配到它要计算的数据所在的节点,这样的话,就不用在网络间传输数据;
但是呢,通常来说,有时,事与愿违,可能task没有机会分配到它的数据所在的节点,为什么呢,可能那个节点的计算资源和计算能力都满了;所以呢,这种时候,通常来说,Spark会等待一段时间,默认情况下是3s钟(不是绝对的,还有很多种情况,对不同的本地化级别,都会去等待),到最后,实在是等待不了了,就会选择一个比较差的本地化级别,比如说,将task分配到靠它要计算的数据所在节点,比较近的一个节点,然后进行计算。
但是对于第二种情况,通常来说,肯定是要发生数据传输,task会通过其所在节点的BlockManager来获取数据,BlockManager发现自己本地没有数据,会通过一个getRemote()方法,通过TransferService(网络数据传输组件)从数据所在节点的BlockManager中,获取数据,通过网络传输回task所在节点。
对于我们来说,当然不希望是类似于第二种情况的了。最好的,当然是task和数据在一个节点上,直接从本地executor的BlockManager中获取数据,纯内存,或者带一点磁盘IO;如果要通过网络传输数据的话,那么实在是,性能肯定会下降的,大量网络传输,以及磁盘IO,都是性能的杀手。

具体参数设置

new SparkConf().set("spark.locality.wait", "10")

spark Shuffle调优

合并map端输出文件

开启map端文件合并机制参数设置

conf.set(“spark.shuffle.consolidateFiles”,” true”)

开启了合并机制后,运行流程:

第一个stage,同时可以运行cpu core个task,比如cpu core是2个,并行运行2个task;每个task都创建下一个stage的task数量个文件;
第一个stage,并行运行的2个task执行完以后;就会执行另外两个task;另外2个task不会再重新创建输出文件;而是复用之前的task创建的map端输出文件,将数据写入上一批task的输出文件中。
第二个stage,task在拉取数据的时候,就不会去拉取上一个stage每一个task为自己创建的那份输出文件了;而是拉取少量的输出文件,每个输出文件中,可能包含了多个task给自己的map端输出。

使用效果:

实际在生产环境中,使用了spark.shuffle.consolidateFiles机制以后,实际的性能调优的效果:对于上述的这种生产环境的配置,性能的提升,还是相当的客观的。spark作业,5个小时 -> 2~3个小时。

调节map端内存缓冲区大小和reduce端内存缓冲区内存占比

spark.shuffle.file.buffer,默认32k

在map task处理的数据量比较大的情况下,而你的task的内存缓冲默认是比较小的,32kb。可能会造成多次的map端往磁盘文件的spill溢写操作,发生大量的磁盘IO,从而降低性能。
调优原则:
spark.shuffle.file.buffer,每次扩大一倍,然后看看效果,64,128;

spark.shuffle.memoryFraction,0.2

reduce端聚合内存,占比。默认是0.2。如果数据量比较大,reduce task拉取过来的数据很多,那么就会频繁发生reduce端聚合内存不够用,频繁发生spill操作,溢写到磁盘上去。而且最要命的是,磁盘上溢写的数据量越大,后面在进行聚合操作的时候,很可能会多次读取磁盘中的数据,进行聚合。
调优原则:
spark.shuffle.memoryFraction,每次提高0.1,看看效果。

HashShuffleManager和SortShuffleManager

两者区别:
1)、SortShuffleManager会对每个reduce task要处理的数据,进行排序(默认的)。
2)、SortShuffleManager会避免像HashShuffleManager那样,默认就去创建多份磁盘文件。一个task,只会写入一个磁盘文件,不同reduce task的数据,用offset来划分界定。

三、troubleshooting处理

调节reduce端缓冲区大小避免OOM异常

问题描述:

对于map端不断产生的数据,reduce端会不断拉取一部分数据放入到缓冲区,进行聚合处理;
当map端数据特别大时,reduce端的task拉取数据是可能将全部的缓冲区都填满的,此时进行reduce聚合处理时创建大量的对象,导致OOM异常。

解决方案:

当由于以上原因导致OOM的异常出现时,可以通过减小reduce端缓冲区大小来避免OOM异常的出现。(由于内存不足,所以减少每次reduce端的拉取数据量,比如:可以调整为12M)
但是在内存充足的情况下,也可以适当增大reduce端缓冲区的大小,从而减少reduce端拉取数据的次数,提高性能。(由于我们内存充足,所以将参数设置成96M,以减少reduce端数据拉取次数,提高性能)

参数设置:

名称:spark.reducer.maxSizeInFlight 默认值 48M
参数含义:决定每个reducer从远端executor请求的数据量。​

解决JVM GC导致的shuffle文件拉取失败

问题描述:

下一个stage的task去拉取上一个stage的task的输出文件时,如果正好上一个stage正处在full gc的情况下(所有线程都停止运行),它们之间是通过netty通信的,就会出现很长时间拉取不到时间,此时就会报shuffle file not found的错误;但是下一个stage又重新提交task就不会出现问题了。

解决方案:

调节最大尝试拉取次数:
参数名称:spark.shuffle.io.maxRetries
参数含义:shuffle文件拉取的时候,如果没有拉取到(拉取失败),最多重试次数,默认为3次。
调节每次拉取最大的等待时长:
参数名称:spark.shuffle.io.retryWait
参数含义:每一次拉取文件的时间间隔,默认为5秒。默认情况下,重试所导致的最大延迟为15秒,计算公式spark.shuffle.io.maxRetries*spark.shuffle.io.retryWait

Yarn队列资源不足导致的application直接失败

问题描述:

如果yarn上的spark作业已经消耗了一部分资源,如果现在再提交一个spark作业,可能会出现两种情况:
第一、发现yarn资源不足,直接打印fail的log,直接就失败。
第二、发现yarn资源不足,该作业就一直处于等待状态,等待分配资源执行。

解决方案:

如果发生了上面的第一种问题,可以通过以下方法解决
方法一、在代码中限制同一时间内只有一个spark作业提交到yarn上,确保spark作业的资源是充足的(调节同一时间内每个spark能充分使用yarn的最大资源)。
方法二、将长时间的spark作业和短时间的spark作业分别提交到不同的队列中(通过线程池的方式实现)。

各种序列化导致的报错

问题描述:

用client模式去提交spark作业,观察本地打印出来的log。如果出现了类似于Serializable、Serialize等等字眼,报错的log,那么恭喜大家,就碰到了序列化问题导致的报错。

解决方案:

1、你的算子函数里面,如果使用到了外部的自定义类型的变量,那么此时,就要求你的自定义类型,必须是可序列化的。
2、如果要将自定义的类型,作为RDD的元素类型,那么自定义的类型也必须是可以序列化的。
3、不能在上述两种情况下,去使用一些第三方的,不支持序列化的类型。

解决算子函数返回Null导致的问题

问题描述:

1、如果碰到某些值,不想要有返回值的时候

解决方案:

1、在返回的时候,返回一些特殊的值,不要返回null,比如“-999”
2、在通过算子获取到了一个RDD之后,可以对这个RDD执行filter操作,进行数据过滤。filter内,可以对数据进行判定,如果是-999,那么就返回false,给过滤掉就可以了。
3、大家不要忘了,之前咱们讲过的那个算子调优里面的coalesce算子,在filter之后,可以使用coalesce算子压缩一下RDD的partition的数量,让各个partition的数据比较紧凑一些。也能提升一些性能。

解决yarn-client模式导致的网卡流量激增问题

问题描述:

由于咱们的driver是启动在本地机器的,而且driver是全权负责所有的任务的调度的,也就是说要跟yarn集群上运行的多个executor进行频繁的通信(中间有task的启动消息、task的执行统计消息、task的运行状态、shuffle的输出结果)。
在整个spark运行的生命周期内,都会频繁的去进行通信和调度。所有这一切通信和调度都是从你的本地机器上发出去的,和接收到的。那么此时,你的本地机器的网络通信负载是非常非常高的。会导致你的本地机器的网卡流量会激增!!!

解决方案:

yarn-client模式,通常咱们就只会使用在测试环境中。实际上线了以后,在生产环境中,都得用yarn-cluster模式,去提交你的spark作业。
yarn-cluster模式,就跟你的本地机器引起的网卡流量激增的问题,就没有关系了。使用了yarn-cluster模式以后,就不是你的本地机器运行Driver,进行task调度了。是yarn集群中,某个节点会运行driver进程,负责task调度。

yarn-cluster模式的JVM栈内存溢出问题

问题描述:

实践经验,碰到的yarn-cluster的问题:
有的时候,运行一些包含了spark sql的spark作业,可能会碰到yarn-client模式下,可以正常提交运行;yarn-cluster模式下,可能是无法提交运行的,会报出JVM的PermGen(永久代)的内存溢出,OOM。​

问题原因:

yarn-client模式下,driver是运行在本地机器上的,spark使用的JVM的PermGen的配置,是本地的spark-class文件(spark客户端是默认有配置的),JVM的永久代的大小是128M,这个是没有问题的;但是呢,在yarn-cluster模式下,driver是运行在yarn集群的某个节点上的,使用的是没有经过配置的默认设置(PermGen永久代大小),82M。
spark-sql,它的内部是要进行很复杂的SQL的语义解析、语法树的转换等等,特别复杂,在这种复杂的情况下,如果说你的sql本身特别复杂的话,很可能会比较导致性能的消耗,内存的消耗。可能对PermGen永久代的占用会比较大。会报出PermGen Out of Memory error log。

解决方案:

既然是JVM的PermGen永久代内存溢出,那么就是内存不够用。咱们呢,就给yarn-cluster模式下的,driver的PermGen多设置一些。
spark-submit脚本中,加入以下配置即可:
–conf spark.driver.extraJavaOptions=”-XX:PermSize=128M -XX:MaxPermSize=256M”
这个就设置了driver永久代的大小,默认是128M,最大是256M。那么,这样的话,就可以基本保证你的spark作业不会出现上述的yarn-cluster模式导致的永久代内存溢出的问题。

错误的持久化方式以及checkpoint的使用

checkpoint的作用:

默认持久化的Rdd会保存到内存或磁盘中,下次使用该Rdd时直接冲缓存中获取,不需要重新计算;如果内存或者磁盘中文件丢失,再次使用该Rdd时需要重新进行。
如果将持久化的Rdd进行checkpoint处理,会把内存写入到hdfs文件系统中,此时如果再次使用持久化的Rdd,但文件丢失后,会从hdfs中获取Rdd并重新进行缓存。

正确使用:

首先设置checkpoint目录

javaSparkContext.checkpointFile("hdfs://hadoopName:8020/user/yanglin/spark/checkpoint/UserVisitSessionAnalyzeSpark");

将缓存后的Rdd进行checkpoint处理

sessionRowPairRdd.checkpoint();
赞(2) 打赏
特别声明:除特殊标注,本站文章均为原创,遵循CC BY-NC 3.0,转载请注明出处。三伏磨 » spark常用性能调优 spark性能优化实战

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏