Hadoop源码学习:DataNode启动

Hadoop源码学习:DataNode启动

流程图:

根据流程图,总结流程:

  1. 初始化DataXceiverServer服务
  2. 初始化HTTP服务
  3. 初始化DataNode的RPC服务端
  4. DataNode向NameNode注册(包括发送心跳)

源码解析

程序入口

找到程序入口:org.apache.hadoop.hdfs.server.datanode.DataNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
  // 进入main方法
public static void secureMain(String args[], SecureResources resources) {
int errorCode = 0;
try {
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
DataNode datanode = createDataNode(args, null, resources);
if (datanode != null) {
datanode.join();
} else {
errorCode = 1;
}
} catch (Throwable e) {
LOG.error("Exception in secureMain", e);
terminate(1, e);
} finally {
// We need to terminate the process here because either shutdown was called
// or some disk related conditions like volumes tolerated or volumes required
// condition was not met. Also, In secure mode, control will go to Jsvc
// and Datanode process hangs if it does not exit.
LOG.warn("Exiting Datanode");
terminate(errorCode);
}
}

// 进入createDataNode方法,找到instantiateDataNode方法
public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException {
DataNode dn = instantiateDataNode(args, conf, resources);
if (dn != null) {
dn.runDatanodeDaemon();
}
return dn;
}
// 进入instantiateDataNode方法,这里主要是参数解析和DataNode实例化。
public static DataNode instantiateDataNode(String args [], Configuration conf,
SecureResources resources) throws IOException {
if (conf == null)
conf = new HdfsConfiguration();

if (args != null) {
// parse generic hadoop options
GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
args = hParser.getRemainingArgs();
}

if (!parseArguments(args, conf)) {
printUsage(System.err);
return null;
}
Collection<StorageLocation> dataLocations = getStorageLocations(conf);
UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, getHostName(conf));
return makeInstance(dataLocations, conf, resources);
}
// 进入makeInstance,继续寻找,直到找到startDataNode方法。
startDataNode(dataDirs, resources);

startDataNode方法(核心源码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void startDataNode(List<StorageLocation> dataDirectories,
SecureResources resources
) throws IOException {

// TODO

storage = new DataStorage();

// global DN settings
registerMXBean();
// 初始化DataXceiverServer
initDataXceiver();
// 初始化HTTP服务
startInfoServer();
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(getConf());
pauseMonitor.start();

// BlockPoolTokenSecretManager is required to create ipc server.
this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();

// Login is done by now. Set the DN user name.
dnUserName = UserGroupInformation.getCurrentUser().getUserName();
LOG.info("dnUserName = {}", dnUserName);
LOG.info("supergroup = {}", supergroup);
// 初始化DataNode的RPC服务
initIpcServer();

metrics = DataNodeMetrics.create(getConf(), getDisplayName());
peerMetrics = dnConf.peerStatsEnabled ?
DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

ecWorker = new ErasureCodingWorker(getConf(), this);
blockRecoveryWorker = new BlockRecoveryWorker(this);

blockPoolManager = new BlockPoolManager(this);
// DataNode向NameNode注册(发送心跳)
blockPoolManager.refreshNamenodes(getConf());

// Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance();
saslClient = new SaslDataTransferClient(dnConf.getConf(),
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
startMetricsLogger();

if (dnConf.diskStatsEnabled) {
diskMetrics = new DataNodeDiskMetrics(this,
dnConf.outliersReportIntervalMs);
}
}

初始化DataXceiverServer服务

DataXceiverServer服务作用:DataNode用来接收客户端和其它DataNode发送过来的数据服务。
源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void initDataXceiver() throws IOException {
// find free port or use privileged port provided
TcpPeerServer tcpPeerServer;
if (secureResources != null) {
tcpPeerServer = new TcpPeerServer(secureResources);
} else {
int backlogLength = getConf().getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
DataNode.getStreamingAddr(getConf()), backlogLength);
}
if (dnConf.getTransferSocketRecvBufferSize() > 0) {
tcpPeerServer.setReceiveBufferSize(
dnConf.getTransferSocketRecvBufferSize());
}
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at {}", streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
xserver = new DataXceiverServer(tcpPeerServer, getConf(), this);
this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty

if (getConf().getBoolean(
HdfsClientConfigKeys.Read.ShortCircuit.KEY,
HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) ||
getConf().getBoolean(
HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
HdfsClientConfigKeys
.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
DomainPeerServer domainPeerServer =
getDomainPeerServer(getConf(), streamingAddr.getPort());
if (domainPeerServer != null) {
this.localDataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(domainPeerServer, getConf(), this));
LOG.info("Listening on UNIX domain socket: {}",
domainPeerServer.getBindPath());
}
}
this.shortCircuitRegistry = new ShortCircuitRegistry(getConf());
}

初始化HTTP服务

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void startInfoServer()
throws IOException {
// SecureDataNodeStarter will bind the privileged port to the channel if
// the DN is started by JSVC, pass it along.
ServerSocketChannel httpServerChannel = secureResources != null ?
secureResources.getHttpServerChannel() : null;

httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);
httpServer.start();
if (httpServer.getHttpAddress() != null) {
infoPort = httpServer.getHttpAddress().getPort();
}
if (httpServer.getHttpsAddress() != null) {
infoSecurePort = httpServer.getHttpsAddress().getPort();
}
}

初始化DataNode的RPC服务端

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private void initIpcServer() throws IOException {
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));

// Add all the RPC protocols that the Datanode implements
RPC.setProtocolEngine(getConf(), ClientDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator =
new ClientDatanodeProtocolServerSideTranslatorPB(this);
BlockingService service = ClientDatanodeProtocolService
.newReflectiveBlockingService(clientDatanodeProtocolXlator);
ipcServer = new RPC.Builder(getConf())
.setProtocol(ClientDatanodeProtocolPB.class)
.setInstance(service)
.setBindAddress(ipcAddr.getHostName())
.setPort(ipcAddr.getPort())
.setNumHandlers(
getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
.setSecretManager(blockPoolTokenSecretManager).build();

ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
= new ReconfigurationProtocolServerSideTranslatorPB(this);
service = ReconfigurationProtocolService
.newReflectiveBlockingService(reconfigurationProtocolXlator);
DFSUtil.addPBProtocol(getConf(), ReconfigurationProtocolPB.class, service,
ipcServer);

InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
new InterDatanodeProtocolServerSideTranslatorPB(this);
service = InterDatanodeProtocolService
.newReflectiveBlockingService(interDatanodeProtocolXlator);
DFSUtil.addPBProtocol(getConf(), InterDatanodeProtocolPB.class, service,
ipcServer);

TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService
.newReflectiveBlockingService(traceAdminXlator);
DFSUtil.addPBProtocol(
getConf(),
TraceAdminProtocolPB.class,
traceAdminService,
ipcServer);

LOG.info("Opened IPC server at {}", ipcServer.getListenerAddress());

// set service-level authorization security policy
if (getConf().getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
ipcServer.refreshServiceAcl(getConf(), new HDFSPolicyProvider());
}
}

DataNode向NameNode注册(包括发送心跳)

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
   // 每一个DataNode创建createBPOS
for (String nsToAdd : toAdd) {
Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToAdd);
Map<String, InetSocketAddress> nnIdToLifelineAddr =
lifelineAddrMap.get(nsToAdd);
ArrayList<InetSocketAddress> addrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList<InetSocketAddress> lifelineAddrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
for (String nnId : nnIdToAddr.keySet()) {
addrs.add(nnIdToAddr.get(nnId));
lifelineAddrs.add(nnIdToLifelineAddr != null ?
nnIdToLifelineAddr.get(nnId) : null);
}
BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
// TODO
// 对每一个DataNode开启线程注册
startAll();

// 通过startAll找到run方法。
public void run() {
LOG.info(this + " starting to offer service");

try {
while (true) {
// init stuff
try {
// setup storage
// 向注册NN
connectToNNAndHandshake();
break;
} catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed
runningState = RunningState.INIT_FAILED;
if (shouldRetryInit()) {
// Retry until all namenode's of BPOS failed initialization
LOG.error("Initialization failed for " + this + " "
+ ioe.getLocalizedMessage());
sleepAndLogInterrupts(5000, "initializing");
} else {
runningState = RunningState.FAILED;
LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);
return;
}
}
}

runningState = RunningState.RUNNING;
if (initialRegistrationComplete != null) {
initialRegistrationComplete.countDown();
}

while (shouldRun()) {
try {
// 发送心跳
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex);
sleepAndLogInterrupts(5000, "offering service");
}
}
runningState = RunningState.EXITED;
} catch (Throwable ex) {
LOG.warn("Unexpected exception in block pool " + this, ex);
runningState = RunningState.FAILED;
} finally {
LOG.warn("Ending block pool service for: " + this);
cleanUp();
}
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信