Hadoop源码学习:NameNode启动

启动流程

流程图

根据流程图,总结流程:

  1. 启动9870端口服务
  2. 加载镜像文件和编辑日志
  3. 开启别名映射服务(如有必要)
  4. 初始化NameNode的RPC服务端
  5. 启动公共服务(NameNode启动资源检查、NameNode对心跳超时判断、安全模式)

源码解析

程序入口

找到程序入口:org.apache.hadoop.hdfs.server.namenode.NameNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
  // 进入main方法
public static void main(String[] argv) throws Exception {
if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
System.exit(0);
}

try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
NameNode namenode = createNameNode(argv, (Configuration)null);
if (namenode != null) {
namenode.join();
}
} catch (Throwable var2) {
LOG.error("Failed to start namenode.", var2);
ExitUtil.terminate(1, var2);
}

}

// 进入createNameNode方法,主要进行参数解析和实例化NameNode。
public static NameNode createNameNode(String argv[], Configuration conf)
throws IOException {
LOG.info("createNameNode " + Arrays.asList(argv));
if (conf == null)
conf = new HdfsConfiguration();
// Parse out some generic args into Configuration.
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
argv = hParser.getRemainingArgs();
// Parse the rest, NN specific args.
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage(System.err);
return null;
}
setStartupOption(conf, startOpt);

boolean aborted = false;
switch (startOpt) {
case FORMAT:
aborted = format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid javac warning
// TODO
default:
DefaultMetricsSystem.initialize("NameNode");
// 从这里进入查看。
return new NameNode(conf);
}
}

initialize(getConf())方法(核心源码)

进入new NameNode(conf),找到initialize(getConf())方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
protected void initialize(Configuration conf) throws IOException {
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if (intervals != null) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);
}
}

UserGroupInformation.setConfiguration(conf);
loginAsNameNodeUser(conf);

NameNode.initMetrics(conf, this.getRole());
StartupProgressMetrics.register(startupProgress);

pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

if (NamenodeRole.NAMENODE == role) {
// 启动HttpServer2服务,确定9870端口
startHttpServer(conf);
}

// 加载镜像文件和编辑日志
loadNamesystem(conf);
// 开启别名映射服务(如有必要)
startAliasMapServerIfNecessary(conf);
// 创建RPC服务器
rpcServer = createRpcServer(conf);

initReconfigurableBackoffKey();

if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using
// the RPC server's bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(getNameNodeAddress());
LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
+ " this namenode/service.");
}
if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}

// 启动公共服务(包括资源检查、心跳超时判断、安全模式)
startCommonServices(conf);
startMetricsLogger(conf);
}

启动9870服务端口

源码如下:

1
2
3
4
5
6
7
private void startHttpServer(final Configuration conf) throws IOException {
// 根据getHttpServerBindAddress来确定服务端口号:9870
httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
// 启动HttpServer2服务
httpServer.start();
httpServer.setStartupProgress(startupProgress);
}

加载镜像文件和编辑日志

源码如下:

1
2
3
protected void loadNamesystem(Configuration conf) throws IOException {
this.namesystem = FSNamesystem.loadFromDisk(conf);
}

开启别名映射服务

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
private void startAliasMapServerIfNecessary(Configuration conf)
throws IOException {
if (conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED,
DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT)
&& conf.getBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED,
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED_DEFAULT)) {
levelDBAliasMapServer = new InMemoryLevelDBAliasMapServer(
InMemoryAliasMap::init, namesystem.getBlockPoolId());
levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start();
}
}

初始化NameNode的RPC服务端

源码如下:

1
2
3
4
5
6
7
8
/**
* Create the RPC server implementation. Used as an extension point for the
* BackupNode.
*/
protected NameNodeRpcServer createRpcServer(Configuration conf)
throws IOException {
return new NameNodeRpcServer(conf, this);
}

启动公共服务(资源检测、心跳超时判断、安全模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
this.haContext = haContext;
try {
nnResourceChecker = new NameNodeResourceChecker(conf);
// 检测资源
checkAvailableResources();
assert !blockManager.isPopulatingReplQueues();
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE);
long completeBlocksTotal = getCompleteBlocksTotal();
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
completeBlocksTotal);
// blockManager:心跳超时判断,安全模式
blockManager.activate(conf, completeBlocksTotal);
} finally {
writeUnlock("startCommonServices");
}

registerMXBean();
DefaultMetricsSystem.instance().register(this);
if (inodeAttributeProvider != null) {
inodeAttributeProvider.start();
dir.setINodeAttributeProvider(inodeAttributeProvider);
}
snapshotManager.registerMXBean();
InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
this.nameNodeHostName = (serviceAddress != null) ?
serviceAddress.getHostName() : "";
}
NameNode启动资源检测

默认元数据存储空间100M。
源码如下:

1
2
3
4
5
6
7
8
9
10
11
/**
* Perform resource checks and cache the results.
*/
void checkAvailableResources() {
long resourceCheckTime = monotonicNow();
Preconditions.checkState(nnResourceChecker != null,
"nnResourceChecker not initialized");
hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
resourceCheckTime = monotonicNow() - resourceCheckTime;
NameNode.getNameNodeMetrics().addResourceCheckTime(resourceCheckTime);
}
Block块管理:blockManager

这里包括:
NameNode对心跳超时判断
安全模式

1
2
3
4
5
6
7
8
9
10
11
12
13
public void activate(Configuration conf, long blockTotal) {
pendingReconstruction.start();
// 心跳超时检测
datanodeManager.activate(conf);
this.redundancyThread.setName("RedundancyMonitor");
this.redundancyThread.start();
storageInfoDefragmenterThread.setName("StorageInfoMonitor");
storageInfoDefragmenterThread.start();
this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
// 安全模式
bmSafeMode.activate(blockTotal);
}
NameNode对心跳超时判断

判断DataNode心跳超时默认时间:10min+30s
源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public void run() {
while(namesystem.isRunning()) {
restartHeartbeatStopWatch();
try {
final long now = Time.monotonicNow();
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
heartbeatCheck();
lastHeartbeatCheck = now;
}
if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
synchronized(HeartbeatManager.this) {
for(DatanodeDescriptor d : datanodes) {
d.setNeedKeyUpdate(true);
}
}
lastBlockKeyUpdate = now;
}
} catch (Exception e) {
LOG.error("Exception while checking heartbeat", e);
}
try {
Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ignored) {
}
// avoid declaring nodes dead for another cycle if a GC pause lasts
// longer than the node recheck interval
if (shouldAbortHeartbeatCheck(-5000)) {
LOG.warn("Skipping next heartbeat scan due to excessive pause");
lastHeartbeatCheck = Time.monotonicNow();
}
}
}
安全模式

阈值:0.999
源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void activate(long total) {
assert namesystem.hasWriteLock();
assert status == BMSafeModeStatus.OFF;

startTime = monotonicNow();
setBlockTotal(total);
if (areThresholdsMet()) {
boolean exitResult = leaveSafeMode(false);
Preconditions.checkState(exitResult, "Failed to leave safe mode.");
} else {
// enter safe mode
status = BMSafeModeStatus.PENDING_THRESHOLD;
initializeReplQueuesIfNecessary();
reportStatus("STATE* Safe mode ON.", true);
lastStatusReport = monotonicNow();
}
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信