Spark源码学习:应用程序执行

Spark源码学习:应用程序执行

理论基础

一个应用程序可以划分为四个部分:
Application:初始化一个SparkContext即生成一个Application
Job:一个action算子就会生成一个Job
Stage:Stage等于宽依赖的个数+1
Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

一个应用程序的执行涉及以下几部分:
RDD依赖
阶段的划分
任务的切分
任务的调度
任务的执行

RDD依赖

org.apache.spark.Dependency

在Dependency中定义了两种依赖:宽依赖和窄依赖。
其中窄依赖又包含两种依赖:OneToOneDependency和RangeDependency。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {

override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}

@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
extends Dependency[Product2[K, V]] {
// TODO
}

阶段的划分

Spark中阶段的划分等于Shuffle依赖的数量 + 1。

org.apache.spark.scheduler.DAGScheduler

通过一个RDD的action算子进入run方法,最终找到DAGScheduler的post方法。

1
2
3
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))

post请求之后通过onReceive方法接收。模式匹配之后找到关键的handleJobSubmitted方法。

1
2
3
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

在这个方法里面有stage的具体划分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
// 如何创建finalStage? 进入到createResultStage方法里面,发现有个getOrCreateParentStages(rdd, jobId)方法
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

getOrCreateParentStages(rdd, jobId)方法又依赖上一个ShuffleMapStage。

1
2
3
4
5
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}

如果说没有上一个ShuffleMapStage,那么当前Stage就是ResultStage。每次有一个新的上游ShuffleMapStage,就会嵌套一层。
因此Spark中阶段的划分等于Shuffle依赖的数量 + 1。

任务的切分

总共的任务就是每个Stage的最后一个RDD的分区数之和。
当上面的Stage创建好之后,最后会有一个submitStage(finalStage) 方法,点击进入查看。

1
2
3
4
5
6
7
8
9
10
11
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}

任务提交以后最终在Task这里进行分区。重点关注:partitionsToCompute这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}

case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
}

在这个方法里面,有一个抽象方法,通过查看实现类的方法。

1
2
3
4
5
6
7
8
  // findMissingPartitions()是一个抽象方法,我们找到它的实现。
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// 在ShuffleMapStage这个类中找到具体实现,发现实际上就是以numPartitions来进行分区的。
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}

任务的调度

在Task切分之后,有一个submitTasks方法。这里将任务封装成了TaskSet。

1
2
3
4
5
6
if (tasks.nonEmpty) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
}

进入这个方法,发现是一个抽象方法,我们找到具体的实现类:org.apache.spark.scheduler.TaskSchedulerImpl

1
2
3
4
5
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])

org.apache.spark.scheduler.SchedulableBuilder

这里我们发现也是一个抽象方法,通过查看实现方法,发现调度器有两种:FIFOSchedulableBuilder和FairSchedulableBuilder

1
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

上面的调度执行完成之后,继续向下执行,发现有一个backend.reviveOffers(),同样是一个抽象方法,我们找到对应的实现类。即CoarseGrainedSchedulerBackend。
在这个方法里面发送一个ReviveOffers。我们通过ReviveOffers找到具体的方法。

1
2
3
4
5
6
  override def reviveOffers(): Unit = {
driverEndpoint.send(ReviveOffers)
}
// 找到具体的makeOffers方法
case ReviveOffers =>
makeOffers()

在这里会启动任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def makeOffers(): Unit = {
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort),
executorData.resourcesInfo.map { case (rName, rInfo) =>
(rName, rInfo.availableAddrs.toBuffer)
})
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (taskDescs.nonEmpty) {
launchTasks(taskDescs)
}
}

查看具体的launchTasks方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
for (task <- tasks.flatten) {
// 这里有个encode序列化
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit() >= maxRpcMessageSize) {
// TODO
}
else {
// TODO

// 涉及到Netty。
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}

任务的执行

org.apache.spark.executor.CoarseGrainedExecutorBackend

找到对应的receive方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
driver.get.send(LaunchedExecutor(executorId))
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
// TODO
}

根据模式匹配可以看到任务最终是被放在了线程池中执行。

1
2
3
4
5
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信