扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
[TOC]
创新互联建站-专业网站定制、快速模板网站建设、高性价比浦北网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式浦北网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖浦北地区。费用合理售后完善,十余年实体公司更值得信赖。import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//创建spark配置文件对象.设置app名称,master地址,local表示为本地模式。
//如果是提交到集群中,通常不指定。因为可能在多个集群汇上跑,写死不方便
val conf = new SparkConf().setAppName("wordCount")
//创建spark context对象
val sc = new SparkContext(conf)
sc.textFile(args(0)).flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile(args(1))
sc.stop()
}
}
核心代码很简单,首先看 textFile这个函数
SparkContext.scala
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
//指定文件路径、输入的格式类为textinputformat,输出的key类型为longwritable,输出的value类型为text
//map(pair => pair._2.toString)取出前面的value,然后将value转为string类型
//最后将处理后的value返回成一个新的list,也就是RDD[String]
//setName(path) 设置该file名字为路径
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
关键性的操作就是:
返回了一个hadoopFile,它有几个参数:
path:文件路径
classOf[TextInputFormat]:这个其实就是输入文件的处理类,也就是我们mr中分析过的TextInputFormat,其实就是直接拿过来的用的,不要怀疑,就是酱紫的
classOf[LongWritable], classOf[Text]:这两个其实可以猜到了,就是输入的key和value的类型。
接着执行了一个map(pair => pair._2.toString),将KV中的value转为string类型
我们接着看看hadoopFile 这个方法
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
//看到这里,最后返回的是一个 HadoopRDD 对象
//指定sc对象,配置文件、输入方法类、KV类型、分区个数
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
最后返回HadoopRDD对象。
接着就是flatMap(.split(" ")) .map((,1)),比较简单
flatMap(_.split(" "))
就是将输入每一行,按照空格切割,然后切割后的元素称为一个新的数组。
然后将每一行生成的数组合并成一个大数组。
map((_,1))
将每个元素进行1的计数,组成KV对,K是元素,V是1
接着看.reduceByKey(_+_)
这个其实就是将同一key的KV进行聚合分组,然后将同一key的value进行相加,最后就得出某个key对应的value,也就是某个单词的个数
看看这个函数
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
这个过程中会分区,默认分区数是2,使用的是HashPartitioner进行分区,可以指定分区的最小个数
图2.1 spark资源调度
1、执行提交命令,会在client客户端启动一个spark-submit进程(用来为Driver申请资源)。
2、为Driver向Master申请资源,在Master的waitingDrivers 集合中添加这个Driver要申请的信息。Master查看works集合,挑选出合适的Work节点。
3、在选中的Work节点中启动Driver进程(Driver进程已经启动了,spark-submit的使命已经完成了,关闭该进程)。所以其实driver也需要资源,也只是跑在executor上的一个线程而已
4、Driver进程为要运行的Application申请资源(这个资源指的是Executor进程)。此时Master的waitingApps 中要添加这个Application申请的资源信息。这时要根据申请资源的要求去计算查看需要用到哪些Worker节点(每一个节点要用多少资源)。在这些节点启动Executor进程。
(注:轮询启动Executor。Executor占用这个节点1G内存和这个Worker所能管理的所有的core)
5、此时Driver就可以分发任务到各个Worker节点的Executor进程中运行了。
Master中的三个集合
val works = new HashSet[WorkInfo]()
works 集合采用HashSet数组存储work的节点信息,可以避免存放重复的work节点。为什么要避免重复?首先我们要知道work节点有可能因为某些原因挂掉,挂掉之后下一次与master通信时会报告给master,这个节点挂掉了,然后master会在works对象里把这个节点去掉,等下次再用到这个节点是时候,再加进来。这样来说,理论上是不会有重复的work节点的。可是有一种特殊情况:work挂掉了,在下一次通信前又自己启动了,这时works里面就会有重复的work信息。
val waitingDrivers = new ArrayBuffer[DriverInfo]()
当客户端向master为Driver申请资源时,会将要申请的Driver的相关信息封装到master节点的DriverInfo这个泛型里,然后添加到waitingDrivers 里。master会监控这个waitingDrivers 对象,当waitingDrivers集合中的元素不为空时,说明有客户端向master申请资源了。此时应该先查看一下works集合,找到符合要求的worker节点,启动Driver。当Driver启动成功后,会把这个申请信息从waitingDrivers 对象中移除。
val waitingApps = new ArrayBuffer[ApplicationInfo]()
Driver启动成功后,会为application向master申请资源,这个申请信息封存到master节点的waitingApps 对象中。同样的,当waitingApps 集合不为空,说明有Driver向Master为当前的Application申请资源。此时查看workers集合,查找到合适的Worker节点启动Executor进程,默认的情况下每一个Worker只是为每一个Application启动一个Executor,这个Executor会使用1G内存和所有的core。启动Executor后把申请信息从waitingApps 对象中移除。
注意点:上面说到master会监控这三个集合,那么到底是怎么监控的呢???
master并不是分出来线程专门的对这三个集合进行监控,相对而言这样是比较浪费资源的。master实际上是‘监控’这三个集合的改变,当这三个集合中的某一个集合发生变化时(新增或者删除),那么就会调用schedule()方法。schedule方法中封装了上面提到的处理逻辑。
1、默认情况下,每一个Worker只会为每一个Application启动一个Executor。每个Executor默认使用1G内存和这个Worker所能管理的所有的core。
2、如果想要在一个Worker上启动多个Executor,在提交Application的时候要指定Executor使用的core数量(避免使用该worker所有的core)。提交命令:spark-submit --executor-cores
3、默认情况下,Executor的启动方式是轮询启动,一定程度上有利于数据的本地化。
什么是轮询启动???为什么要轮训启动呢???
轮询启动:轮询启动就是一个个的启动。例如这里有5个人,每个人要发一个苹果+一个香蕉。轮询启动的分发思路就是:五个人先一人分一个苹果,分发完苹果再分发香蕉。
为什么要使用轮询启动的方式呢???我们做大数据计算首先肯定想的是计算找数据。在数据存放的地方直接计算,而不是把数据搬过来再计算。我们有n台Worker节点,如果只是在数据存放的节点计算。只用了几台Worker去计算,大部分的worker都是闲置的。这种方案肯定不可行。所以我们就使用轮询方式启动Executor,先在每一台节点都允许一个任务。
存放数据的节点由于不需要网络传输数据,所以肯定速度快,执行的task数量就会比较多。这样不会浪费集群资源,也可以在存放数据的节点进行计算,在一定程度上也有利于数据的本地化。
粗粒度(富二代):
在任务执行之前,会先将资源申请完毕,当所有的task执行完毕,才会释放这部分资源。
优点:每一个task执行前。不需要自己去申请资源了,节省启动时间。
缺点:等到所有的task执行完才会释放资源(也就是整个job执行完成),集群的资源就无法充分利用。
这是spark使用的调度粒度,主要是为了让stage,job,task的执行效率高一点
细粒度(穷二代):
Application提交的时候,每一个task自己去申请资源,task申请到资源才会执行,执行完这个task会立刻释放资源。
优点:每一个task执行完毕之后会立刻释放资源,有利于充分利用资源。
缺点:由于需要每一个task自己去申请资源,导致task启动时间过长,进而导致stage、job、application启动时间延长。
我们提交任务时,可以指定一些资源限制的参数:
--executor-cores : 单个executor使用的core数量,不指定的话默认使用该worker所有能调用的core
--executor-memory : 单个executor使用的内存大小,如1G。默认是1G
--total-executor-cores : 整个application最多使用的core数量,防止独占整个集群资源
https://blog.csdn.net/qq_33247435/article/details/83653584#3Spark_51
一个application的调度到完成,需要经过以下阶段:
application-->资源调度-->任务调度(task)-->并行计算-->完成
图3.1 spark调度流程
可以看到,driver启动后,会有下面两个对象:
DAGScheduler:
据RDD的宽窄依赖关系将DAG有向无环图切割成一个个的stage,将stage封装给另一个对象taskSet,taskSet=stage,然后将一个个的taskSet给taskScheduler。
taskScheduler:
taskSeheduler拿倒taskSet之后,会遍历这个taskSet,拿到每一个task,然后去调用HDFS上的方法,获取数据的位置,根据获得的数据位置分发task到响应的Worker节点的Executor进程中的线程池中执行。并且会根据每个task的执行情况监控,等到所有task执行完成后,就告诉master将所哟executor杀死
任务调度中主要涉涉及到以下流程:
1)、DAGScheduler:根据RDD的宽窄依赖关系将DAG有向无环图切割成一个个的stage,将stage封装给另一个对象taskSet,taskSet=stage,然后将一个个的taskSet给taskScheduler。
2)、taskScheduler:taskSeheduler拿倒taskSet之后,会遍历这个taskSet,拿到每一个task,然后去调用HDFS上的方法,获取数据的位置,根据获得的数据位置分发task到响应的Worker节点的Executor进程中的线程池中执行。
3)、taskScheduler:taskScheduler节点会跟踪每一个task的执行情况,若执行失败,TaskScher会尝试重新提交,默认会重试提交三次,如果重试三次依然失败,那么这个task所在的stage失败,此时TaskScheduler向DAGScheduler做汇报。
4)DAGScheduler:接收到stage失败的请求后,,此时DAGSheduler会重新提交这个失败的stage,已经成功的stage不会重复提交,只会重试这个失败的stage。
(注:如果DAGScheduler重试了四次依然失败,那么这个job就失败了,job不会重试
掉队任务的概念:
当所有的task中,75%以上的task都运行成功了,就会每隔一百秒计算一次,计算出目前所有未成功任务执行时间的中位数*1.5,凡是比这个时间长的task都是挣扎的task。
总的调度流程:
=======================================资源调度=========================================
1、启动Master和备用Master(如果是高可用集群需要启动备用Master,否则没有备用Master)。
2、启动Worker节点。Worker节点启动成功后会向Master注册。在works集合中添加自身信息。
3、在客户端提交Application,启动spark-submit进程。伪代码:spark-submit --master --deploy-mode cluster --class jarPath
4、Client向Master为Driver申请资源。申请信息到达Master后在Master的waitingDrivers集合中添加该Driver的申请信息。
5、当waitingDrivers集合不为空,调用schedule()方法,Master查找works集合,在符合条件的Work节点启动Driver。启动Driver成功后,waitingDrivers集合中的该条申请信息移除。Client客户端的spark-submit进程关闭。
(Driver启动成功后,会创建DAGScheduler对象和TaskSchedule对象)
6、当TaskScheduler创建成功后,会向Master会Application申请资源。申请请求发送到Master端后会在waitingApps集合中添加该申请信息。
7、当waitingApps集合中的元素发生改变,会调用schedule()方法。查找works集合,在符合要求的worker节点启动Executor进程。
8、当Executor进程启动成功后会将waitingApps集合中的该申请信息移除。并且向TaskSchedule反向注册。此时TaskSchedule就有一批Executor的列表信息。
=======================================任务调度=========================================
9、根据RDD的宽窄依赖,切割job,划分stage。每一个stage是由一组task组成的。每一个task是一个pipleline计算模式。
10、TaskScheduler会根据数据位置分发task。(taskScheduler是如何拿到数据位置的???TaskSchedule调用HDFS的api,拿到数据的block块以及block块的位置信息)
11、TaskSchedule分发task并且监控task的执行情况。
12、若task执行失败或者挣扎。会重试这个task。默认会重试三次。
13、若重试三次依旧失败。会把这个task返回给DAGScheduler,DAGScheduler会重试这个失败的stage(只重试失败的这个stage)。默认重试四次。
14、告诉master,将集群中的executor杀死,释放资源。
另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流