Spark源码学习:应用程序提交

Spark源码学习:应用程序提交

相关类

  • org.apache.spark.deploy.SparkSubmit
  • org.apache.spark.deploy.yarn.YarnClusterApplication
  • org.apache.spark.deploy.yarn.ApplicationMaster
  • org.apache.spark.deploy.yarn.YarnAllocator
  • org.apache.spark.deploy.yarn.ExecutorRunnable
  • org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
  • org.apache.spark.executor.CoarseGrainedExecutorBackend
  • org.apache.spark.rpc.netty.NettyRpcEnv
  • org.apache.spark.rpc.netty.Dispatcher
  • org.apache.spark.rpc.netty.DedicatedMessageLoop
  • org.apache.spark.SparkContext
  • org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend
  • org.apache.spark.deploy.yarn.ExecutorLauncher

流程图

SparkSubmit类解析

一个标准Spark程序的提交参数如下:

1
2
3
4
5
6
7
8
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000

通过查看Shell脚本文件发现最终执行的是

1
java org.apache.spark.deploy.SparkSubmit

因此Spark程序入口就在org.apache.spark.deploy.SparkSubmit

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// action 参数解析,默认submit
def doSubmit(args: Array[String]): Unit = {
val appArgs = parseArguments(args)
// TODU
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}

// 从submit方法跳转到runMain()。在runMain方法里面会做环境的初始化准备。
// 这里主要是确定childMainClass。根据childMainClass来反射调用不同的类main方法。
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

// 如果是YARN Cluster模式:
if (isYarnCluster) {
// val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
// TODU
}
// 如果是YARN Client模式:
if (deployMode == CLIENT) {
childMainClass = args.mainClass
// TODU
}

//根据childMainClass判断,如果是SparkApplication的子类,那么直接使用构造器初始化一个SparkApplication,如果不是新建一个SparkApplication。
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
// 调用start方法
app.start(childArgs.toArray, sparkConf)

YarnClusterApplication类解析

额外pom 依赖引入

想要查看这个类,需要额外引入依赖。

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.12</artifactId>
<version>3.1.2</version>
</dependency>

核心源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// 进入YarnClusterApplication,找到start方法,执行run方法。
private[spark] class YarnClusterApplication extends SparkApplication {

override def start(args: Array[String], conf: SparkConf): Unit = {
// TODO
new Client(new ClientArguments(args), conf, null).run()
}
}
// 进入run方法,第一行就是提交应用程序。
def run(): Unit = {
this.appId = submitApplication()
// TODO
}

// 在submitApplication方法中,核心操作包括:
def submitApplication(): ApplicationId = {
ResourceRequestHelper.validateResources(sparkConf)

var appId: ApplicationId = null
try {
// 1. 初始化yarnClient(建立了与ResourceManager的连接)
launcherBackend.connect()
yarnClient.init(hadoopConf)
yarnClient.start()

logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()

// The app staging dir based on the STAGING_DIR configuration if configured
// otherwise based on the users home directory.
val appStagingBaseDir = sparkConf.get(STAGING_DIR)
.map { new Path(_, UserGroupInformation.getCurrentUser.getShortUserName) }
.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))

new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
Option(appId.toString)).setCurrentContext()

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

// Set up the appropriate contexts to launch our AM
//2. 创建ApplicationMaster,分配容器
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
// 3. 提交应用程序
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)

appId
} catch {
case e: Throwable =>
if (stagingDirPath != null) {
cleanupStagingDir()
}
throw e
}
}

AM创建方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 进入createContainerLaunchContext方法,创建ApplicationMaster
// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// 发现这里是以bin/java的形式启动AM,即JVM进程。
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++

val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}

ApplicationMaster类解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 上面以bin/java的形式启动AM,找到main方法,继续执行。
if (isClusterMode) {
runDriver()
} else {
runExecutorLauncher()
}

// 1.在runDriver()里会调用startUserApplication()方法。在这个方法中会创建Driver线程(注意:不是进程)
//这个方法会加载用户提交的应用程序--class
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread

// 调用这个方法之后,会等待SparkContext初始化。这也是为什么我们的程序上来就需要先创建SparkContext对象的原因。
logInfo("Waiting for spark context initialization...")
val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)

// 2.注册ApplicationMaster
SparkContext初始化之后,会调用registerAM()方法,这里是向ResourceManager注册ApplicationMaster,申请资源。
registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

// 3. 资源申请之后创建分配器,分配资源。
createAllocator()(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)

// 创建分配器
allocator = client.createAllocator(
yarnConf,
_sparkConf,
appAttemptId,
driverUrl,
driverRef,
securityMgr,
localResources)

// 分配资源
allocator.allocateResources()

YarnAllocator类解析

Allocator创建之后会调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 进入allocateResources()方法,处理可被分配的容器:handleAllocatedContainers
handleAllocatedContainers(allocatedContainers.asScala)
processCompletedContainers(completedContainers.asScala)

//在handleAllocatedContainers()方法中有一个方法runAllocatedContainers(containersToUse),这个方法用于运行可被分配的容器。
if (launchContainers) {
launcherPool.execute(() => {
try {
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported
).run()
// TODO

//在run方法里面会创建NMClient,启动Container容器。
def run(): Unit = {
logDebug("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
startContainer()
}

ExecutorRunnable类解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 在startContainer()方法中,有一个prepareCommand()方法,这个方法用于准备启动一个JVM进程的命令。
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId,
"--resourceProfileId", resourceProfileId.toString) ++
userClassPath ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

// TODO: it would be nicer to just make sure there are no null commands here
commands.map(s => if (s == null) "null" else s).toList

// prepareCommand()方法之后,就是真正的Container启动。
nmClient.startContainer(container.get, ctx)

YarnCoarseGrainedExecutorBackend类解析

启动Container实际上就是启动一个JVM进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 找到main方法。
def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt, resourceProfile)
}
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
this.getClass.getCanonicalName.stripSuffix("$"))
CoarseGrainedExecutorBackend.run(backendArgs, createFn)
System.exit(0)
}
// 注意:这个YarnCoarseGrainedExecutorBackend被我们定义为了Executor。

// 在父类中查看run方法。
def run(
arguments: Arguments,
backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend): Unit = {
// TODO
// YarnCoarseGrainedExecutorBackend被我们定义为了Executor。
env.rpcEnv.setupEndpoint("Executor",
backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
}

onStart()

在run()方法执行之后,会进入onStart()方法,中间涉及Netty通信,单独补充。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
override def onStart(): Unit = {
logInfo("Connecting to driver: " + driverUrl)
try {
_resources = parseOrFindResources(resourcesFileOpt)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractAttributes, _resources, resourceProfile.id))
}(ThreadUtils.sameThread).onComplete {
case Success(_) =>
self.send(RegisteredExecutor)
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}

这里会注册Executor。

receive()

onStart()方法执行完成之后,根据Netty通信原理,会执行receive()方法,这里会启动Executor计算对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
driver.get.send(LaunchedExecutor(executorId))
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
// TODO
}

从run()到onStart()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// NettyRpcEnv类:在setupEndpoint()方法中,会注册RPC的通信终端。
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
dispatcher.registerRpcEndpoint(name, endpoint)
}

// Dispatcher类:查看registerRpcEndpoint()方法,这里有个MessageLoop。
var messageLoop: MessageLoop = null
try {
messageLoop = endpoint match {
case e: IsolatedRpcEndpoint =>
new DedicatedMessageLoop(name, e, this)
case _ =>
sharedLoop.register(name, endpoint)
sharedLoop
}
endpoints.put(name, messageLoop)
} catch {
case NonFatal(e) =>
endpointRefs.remove(endpoint)
throw e
}

// DedicatedMessageLoop类:这里有个setActive(inbox),会处理 OnStart message。
private val inbox = new Inbox(name, endpoint)

override protected val threadpool = if (endpoint.threadCount() > 1) {
ThreadUtils.newDaemonCachedThreadPool(s"dispatcher-$name", endpoint.threadCount())
} else {
ThreadUtils.newDaemonSingleThreadExecutor(s"dispatcher-$name")
}

(1 to endpoint.threadCount()).foreach { _ =>
threadpool.submit(receiveLoopRunnable)
}

// Mark active to handle the OnStart message.
setActive(inbox)

Spark On YARN Cluster执行流程

根据上面的源码,将 Spark On YARN Cluster执行流程整理如下:
1.执行脚本提交任务,实际上是启动了一个SparkSubmit的JVM进程
2.SparkSubmit类中的main方法反射调用YarnClusterApplication的main方法
3.YarnClusterApplication创建客户端,然后向YARN服务器发送指令bin/java ApplicationMaster
4.YARN框架收到指令后会在指定的NodeManager中启动ApplicationMaster
5.ApplicationMaster启动Driver线程,执行用户的作业,初始化SparkContext
6.ApplicationMaster向ResourceManager注册,申请资源
7.获取资源后AM向NM发送指令bin/java YarnCoarseGrainedExecutorBackend
8.YarnCoarseGrainedExecutorBackend 进程会接收消息,跟 Driver 通信,注册已经启动的 Executor;然后启动计算对象 Executor 等待接收任务
9.Driver 分配任务并监控任务的执行

Spark On YARN Client流程分析

上面已经提到了,在SparkSubmit中,会根据mainClass进行判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 当为YARN Client模式时,会运行用户自己的程序,在程序里面会初始化SparkContext。
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}

// SparkContext类:在SparkContext构建过程中,会创建_schedulerBackend和_taskScheduler, 然后启动它们。
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)

// createTaskScheduler方法
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
sc.deployMode match {
case "cluster" => new YarnClusterScheduler(sc)
case "client" => new YarnScheduler(sc)
case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}

// createSchedulerBackend方法
override def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
sc.deployMode match {
case "cluster" =>
new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case "client" =>
new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case _ =>
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}

// YarnClientSchedulerBackend类中有个start方法,它创建了一个Client实例,通过client.submitApplication()来提交application。
client = new Client(args, conf, sc.env.rpcEnv)
bindToYarn(client.submitApplication(), None)

// 接下来在创建容器的时候,YARN Cluster和Client有了一点点区别。
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}

// ExecutorLauncher本质上执行的还是ApplicationMaster.main()方法。
object ExecutorLauncher {

def main(args: Array[String]): Unit = {
ApplicationMaster.main(args)
}
}

不同的是:
在YARN Cluster模式下,执行的是 runDriver()方法;
在YARN Client模式下,执行的是 runExecutorLauncher()方法。

Spark On YARN Client执行流程

根据上面的源码,将 Spark On YARN Cluster执行流程整理如下:
1.执行脚本提交任务,实际上是启动了一个SparkSubmit的JVM进程
2.SparkSubmit类中的main方法反射调用用户代码的main方法
3启动Driver线程,初始化SparkContext,创建YarnClientSchedulerBackend
4.YarnClientSchedulerBackend向RM发送指令:bin/java ExecutorLauncher
5.YARN框架收到指令后,会在指定的NM中启动ExecutorLauncher(实际上还是调用了ApplicationMaster 的 main 方法)
6.ApplicationMaster向ResourceManager注册,申请资源
7.获取资源后AM向NM发送指令bin/java YarnCoarseGrainedExecutorBackend
8.YarnCoarseGrainedExecutorBackend进程会接收消息,跟 Driver 通信,注册已经启动的 Executor;然后启动计算对象 Executor 等待接收任务
9. Driver 分配任务并监控任务的执行。

可以看到在cluster模式下和client模式下AM执行的入口类是不同的,在cluster模式下,AM进程既要执行Driver,也要负责Executor的资源申请和调度。而在client模式下,客户端spark-submit进程充当着Driver的角色,AM只负责Executor的资源调度。

打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信