扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这篇文章将为大家详细讲解有关如何剖析具体实现,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
创新互联坚持“要么做到,要么别承诺”的工作理念,服务领域包括:成都网站建设、网站建设、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的平乐网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!
一、概述
这里我们从源码角度剖析BypassMergeSortShuffleWriter实现策略的原理和具体的实现细节。
BypassMergeSortShuffleWriter具体的实现都在对应类的write()函数中,我们直接看源码进行剖析
1.先看构造函数初始化
BypassMergeSortShuffleWriter( BlockManager blockManager, IndexShuffleBlockResolver shuffleBlockResolver, BypassMergeSortShuffleHandlehandle, int mapId, TaskContext taskContext, SparkConf conf) { // 获取spark.shuffle.file.buffer参数值,默认32k,这里是一个比较重要的条有参数, // 该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。 // 将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘 //如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数, // 也就可以减少磁盘IO次数,进而提升性能 this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; // 是否采用NIO的从文件到文件流的复制方式,默认值是true 一般不用修改 this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true); this.blockManager = blockManager; // 获取shufflehandle中的ShuffleDependency对象,通过该对象得到分区器和分区个数等数据。 final ShuffleDependency dep = handle.dependency(); this.mapId = mapId; this.shuffleId = dep.shuffleId(); this.partitioner = dep.partitioner(); this.numPartitions = partitioner.numPartitions(); this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics(); //设置序列化工具对象,和shuffleBlockResolver对象, // 该对象用来创建和维护shuffle的数据的逻辑块和物理文件位置之间的映射的对象 this.serializer = dep.serializer(); this.shuffleBlockResolver = shuffleBlockResolver; }
2.再看write()函数,源码如下:
//这里大体意思是 为每个分区在磁盘创建临时文件 并给每一个writer
上面代码的大体思路如下:
a.确定分区数,然后为每个分区创建DiskBlockObjectWriter和临时文件
b.循环将record通过Partitioner进行分区,并写入对应分区临时文件
c. 将分区数据刷到磁盘
d.根据shuffleId和mapId,构建ShuffleDataBlockId,创建合并文件data和合并文件的临时文件,文件格式为:
shuffle_{shuffleId}_{mapId}_{reduceId}.data
e.将分区文件合并到一个总的临时文件,合并后会重命名为最终输出文件名,并返回一个对应分区文件长度的数组
f.创建索引文件index和索引临时文件,每一个分区的长度和offset写入索引文件等;并且重命名临时data文件和临时index文件
g.将一些信息封装到MapStatus返回
存在问题:
这种Writer会为每个分区创建一个临时文件,如果分区过多时,会创建很多的output输出流和临时文件对象,占用资源过多,性能会下降。
重点关注:
参数:spark.shuffle.file.buffer 默认值32k
默认情况下,shuffle的map task,输出到磁盘文件的时候,统一都会先写入到每个task自己关联的一个内存缓冲区,每一次当内存缓冲区满溢后,然后才会进行溢写到磁盘中。如果内存冲突可适当调大这个参数,从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
关于如何剖析具体实现就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流