Spark源码学习:通信环境

Spark源码学习:通信环境

通信主要包括:

  • Driver-Executor
  • Executor-Driver
  • Executor-Executor

在通信过程中,涉及到的基本概念包括:

  • RpcEndpoint:RPC通信终端。
  • RpcEnv:RPC 上下文环境。
  • Dispatcher:消息调度(分发)器,针对于 RPC 终端需要发送远程消息或者从远程 RPC 接收到的消息,分发至对应的指令收件箱(发件箱)。
  • Inbox:指令消息收件箱。
  • OutBox:指令消息发件箱。
  • TransportClient:Netty 通信客户端。
  • TransportServer:Netty 通信服务端。

org.apache.spark.SparkContext

这里有一个参数:

1
private[spark] def env: SparkEnv = _env

创建DriverEnv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
    // 1.找到createSparkEnv方法
// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)

// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
}

// 2.进入createDriverEnv方法,有一个create方法。
/**
* Create a SparkEnv for the driver.
*/
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains(DRIVER_HOST_ADDRESS),
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
assert(conf.contains(DRIVER_PORT), s"${DRIVER_PORT.key} is not set on the driver!")
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
val port = conf.get(DRIVER_PORT)
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(conf))
} else {
None
}
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
bindAddress,
advertiseAddress,
Option(port),
isLocal,
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}

// 3.在create方法中有具体的创建方式,这里只看重要的部分:在这里实际上是新建了一个RpcEnv
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)

val envInstance = new SparkEnv(
executorId,
rpcEnv,
serializer,
closureSerializer,
serializerManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockManager,
securityManager,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)

// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
// called, and we only need to do it for driver. Because driver may run as a service, and if we
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
if (isDriver) {
val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
envInstance.driverTmpDir = Some(sparkFilesDir)
}

envInstance

// 4.RpcEnv.create方法返回的实际上是一个NettyRpcEnv。
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}

// 5.NettyRpcEnvFactory().create(config)方法中有一个Utils.startServiceOnPort方法来启动Service。
def create(config: RpcEnvConfig): RpcEnv = {
val sparkConf = config.conf
// Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
// KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
throw e
}
}
nettyEnv
}

// 5-1.在启动之前有个nettyEnv.startServer(config.bindAddress, actualPort)方法。
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}

// 5-1-1 transportContext.createServer方法会new一个TransportServer对象。
public TransportServer createServer(
String host, int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, host, port, rpcHandler, bootstraps);
}

// 5-1-2.查看这个TransportServer初始化,里面有一个init方法。
public TransportServer(
TransportContext context,
String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List<TransportServerBootstrap> bootstraps) {
this.context = context;
this.conf = context.getConf();
this.appRpcHandler = appRpcHandler;
if (conf.sharedByteBufAllocators()) {
this.pooledAllocator = NettyUtils.getSharedPooledByteBufAllocator(
conf.preferDirectBufsForSharedByteBufAllocators(), true /* allowCache */);
} else {
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
}
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

boolean shouldClose = true;
try {
init(hostToBind, portToBind);
shouldClose = false;
} finally {
if (shouldClose) {
JavaUtils.closeQuietly(this);
}
}
}

// 5-1-3.在init方法中获取channel有一个NettyUtils.getServerChannelClass(ioMode)方法。
private void init(String hostToBind, int portToBind) {

IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
conf.getModuleName() + "-boss");
EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
conf.getModuleName() + "-server");

bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, pooledAllocator)
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
.childOption(ChannelOption.ALLOCATOR, pooledAllocator);

// TODO
}

//5-1-4.在getServerChannelClass方法里会根据mode进行模式匹配,返回不同的SocketChannel。由于Linux对AIO支持不是很友好,Linux通常会采用Epoll的方式来模拟AIO操作。
public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
switch (mode) {
case NIO:
return NioServerSocketChannel.class;
case EPOLL:
return EpollServerSocketChannel.class;
default:
throw new IllegalArgumentException("Unknown io mode: " + mode);
}
}

// 5-2 在nettyEnv.startServer(config.bindAddress, actualPort)中创建server之后还需要注册。
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
// TODO
endpointRef
}

创建ExecutorEnv

整个过程和创建DriverEnv基本一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// // 1.找到createSparkEnv方法
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
createExecutorEnv(conf, executorId, hostname,
hostname, numCores, ioEncryptionKey, isLocal)
}

// 2.进入createDriverEnv方法,有一个create方法。
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
bindAddress: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
val env = create(
conf,
executorId,
bindAddress,
hostname,
None,
isLocal,
numCores,
ioEncryptionKey
)
SparkEnv.set(env)
env
}
// 3.后面的步骤和创建DriverEnv一样。
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信