扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
如何进行DAGScheduler源码解读,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
创新互联建站专注于镇原网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供镇原营销型网站建设,镇原网站制作、镇原网页设计、镇原网站官网定制、小程序制作服务,打造镇原网络公司原创品牌,更为您提供镇原网站排名全网营销落地服务。
当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:
进入其构造函数中:
可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。
LiveListenerBus:
BlockManagerMaster:
通过阅读代码,我们可以发现DAGScheduler实例化的时候,调用了initializeEventProcessActor()方法
private def initializeEventProcessActor() { // blocking the thread until supervisor is started, which ensures eventProcessActor is // not null before any job is submitted // 阻塞当前线程,等待supervisor启动,这样可以确保Job提交时,eventProcessActor not null implicit val timeout = Timeout(30 seconds) val initEventActorReply = dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) eventProcessActor = Await.result(initEventActorReply, timeout.duration). asInstanceOf[ActorRef] } initializeEventProcessActor()
DAGSchedulerEventProcessActor:
private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler) extends Actor with Logging { override def preStart() { // set DAGScheduler for taskScheduler to ensure eventProcessActor is always // valid when the messages arrive // 设置taskScheduler对DAGScheduler的引用句柄。在此处设置保证了Job提交时候 // eventProcessActor已经准备就绪 dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) } /** * The main event loop of the DAG scheduler. */ def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execId, fetchFailed = false) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => dagScheduler.handleTaskSetFailed(taskSet, reason) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() } override def postStop() { // Cancel any active jobs in postStop hook dagScheduler.cleanUpAfterSchedulerStop() } }
可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。
关于如何进行DAGScheduler源码解读问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注创新互联行业资讯频道了解更多相关知识。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流