分布式文件系统——HDFS

分布式文件系统——HDFS

基本概念

HDFS (Hadoop Distributed File System)是 Hadoop 下的分布式文件系统,具有高容错、高吞吐量等特性,可以部署在低成本的硬件上。

HDFS架构

HDFS 遵循主/从架构,由单个 NameNode(NN) 和多个 DataNode(DN) 组成。

NameNode

NameNode是整个文件系统的管理节点。它维护着文件系统树及整棵树内所有的文件和目录。这些信息以两个文件形式永久保存在本地磁盘上:fsimage(元数据镜像文件)和edits log(操作日志文件)。同时,NameNode也负责处理客户端的读写请求。

DataNode

存储实际的数据块Block,执行数据块的读写操作。

数据存储单元:Block

文件被切分成固定大小的数据块。默认为数据块大小为128M,可配置。
若文件大小不到128M,则单独存成一个Block。

思考:为什么块的大小不能设置为太小,也不能设置为太大?
设置太小为增加寻址时间,设置太大会导致程序在处理这块数据时,非常慢。因此,HDFS块的大小设置主要取决于磁盘传输速率。

副本放置策略

  • 第一个副本:如果是集群内提交,放置在上传文件的DataNode的节点上;如果是集群外提交,则随机挑选一台磁盘不太满,CPU不太忙的节点。

  • 第二个副本:放置在与第一个副本不同的机架的节点上

  • 第三个副本:与第二个副本相同机架的不同节点上

  • 更多副本:随机节点

Client

也就是客户端,主要负责:

  • 文件切分,文件上传到HDFS时,Client将文件切分成一个一个的Block,然后进行上传
  • 与NameNode交互,获取文件的位置信息
  • 与DataNode交互,读写数据
  • Client提供一些命令来管理HDFS,例如NameNode格式化
  • Client可以通过一些命令来访问HDFS,例如对HDFS进行增删操作。

Secondary NameNode

它不是NameNode的备份(但可以作为备份),它的主要工作是帮助NameNode合并edits log,减少NameNode启动时间。

Secondary NameNode执行合并时机

  • 根据配置文件设置的时间间隔fs.checkpoint.period 默认为3600秒
  • 根据配置文件设置edits log大小,fs.checkpoint.size规定edits文件的最大值默认是64M

默认安装在NameNode节点上,但这样,不安全。生产环境通常使用NameNode HA。

Secondary NameNode的工作流程

  1. SecondaryNameNode通知NameNode切换edits文件
  2. SecondaryNameNode从NameNode中获得fsimage和edits(通过HTTP)
  3. SecondaryNameNode将fsimage载入内存,然后开始合并edits
  4. SecondaryNameNode将新的fsimage发回给NameNode
  5. NameNode用新的fsimage替换旧的fsimage

常用Shell命令

实际上,大部分和Linux Shell命令类似。只不过在使用的时候需要加上hadoop fs或者hdfs dfs前缀。下面是具体示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
## 帮助命令
hadoop fs -help

## 显示当前文件目录结构
hadoop fs -ls <path>

## 递归显示当前文件目录结构
hadoop fs -ls -R <path>

## 查看文件内容
hadoop fs -cat <path>
## 或者使用text,将源文件输出为文本格式
hadoop fs -text <path>

## 查看文件头部1K字节
hadoop fs -head <path> 

## 查看文件尾部1K字节
hadoop fs -tail <path> 
# 和Linux下一样,会持续监听文件内容变化 并显示文件的最后一千字节
hadoop fs -tail -f <path> 

## 新建文件目录
hadoop fs -mkdir <path>

## 递归新建文件目录
hadoop fs -mkdir -P <path>

## 删除文件目录
hadoop fs -rm <path>

## 递归删除文件目录
hadoop fs -rm -R <path>

## 移动文件
hadoop fs -mv [src] [dst] 

## 拷贝文件 
hadoop fs -cp [src] [dst]

## 从本地拷贝文件到HDFS
hadoop fs -put [localsrc] [dst]
## 或者使用copyFromLocal
hadoop fs - copyFromLocal [localsrc] [dst] 

## 从HDFS拷贝文件到本地
hadoop fs -get [dst] [localsrc]
## 或者使用copyToLocal 
hadoop fs -copyToLocal [dst] [localsrc]

## 合并下载多个文件
hadoop fs -getmerge [-nl] <src> <localdst>
# 示例 将HDFS上的file1.txt和file2.txt文件合并后下载到本地的/output.txt
hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt
## 可选选项:
## * -nl 在每个文件的末尾添加换行符(LF)
## * -skip-empty-file 跳过空文件

## 更改文件副本系数
hadoop fs -setrep [-R] [-w] <numReplicas> <path>
# 示例
hadoop fs -setrep -w 3 /user/hadoop/test
## 可选选项:
## * -R:用于递归改变目录下所有文件的副本系数。
## * -w: 请求命令是否等待复制完成,可能需要等待一段时间。

##统计当前目录下各文件大小
hadoop fs -du <path>

##汇总统计
hadoop fs -du -s <path>

## 汇总统计并以更友好的方式显示文件大小 
hadoop fs -du -s -h <path>

## 统计文件系统可用空间
hadoop fs -df -h

## 文件检测
hadoop fs -test -[defswrz] URI
## 可选选项:
## * -d:如果路径是目录,返回 0。
## * -e:如果路径存在,则返回 0。
## * -f:如果路径是文件,则返回 0。
## * -s:如果路径不为空,则返回 0。
## * -r:如果路径存在且授予读权限,则返回 0。
## * -w:如果路径存在且授予写入权限,则返回 0。
## * -z:如果文件长度为零,则返回 0。

# 变更文件或目录的所属群组。 用户必须是文件的所有者或超级用户。
hadoop fs -chgrp [-R] GROUP URI [URI ...]

# 修改文件或目录的访问权限 用户必须是文件的所有者或超级用户。
hadoop fs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI ...]

# 修改文件的拥有者 用户必须是超级用户。
hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ]

HDFS客户端API操作

pom文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>

demo代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class HdfsClientTest {
// HDFS管理界面可以查看到具体的端口号。Overview 'node03:9000' (active)
private String HDFS_PATH = "hdfs://node03:9000";
private URI uri;
private Configuration conf;
private FileSystem fs;

/**
* 客户端资源初始化
*/
@Before
public void init() throws Exception {
// 获取集群NameNode的URI
uri = new URI(HDFS_PATH);
// 获取相关配置信息
conf = new Configuration();
// 可以设置相关配置
conf.set("dfs.replication", "2");
// 获取客户端对象,这里采用root用户登录
fs = FileSystem.get(uri, conf,"root");
}

/**
* 客户端资源关闭
*/
@After
public void close() throws IOException {
// 关闭资源
fs.close();
}

/**
* Java API相关操作
*/
@Test
public void test() throws Exception {
// 创建文件夹
fs.mkdirs(new Path("/test"));

// 上传文件
fs.copyFromLocalFile(new Path("D://abc.txt"), new Path("/test"));

// 判断文件或文件夹是否存在
boolean direExists = fs.exists(new Path("/test"));
boolean fileExists = fs.exists(new Path("/test/bcd.txt"));
System.out.println("文件夹是否存在:" + direExists);
System.out.println("文件是否存在:" + fileExists);

// 文件重命名
fs.rename(new Path("/test/abc.txt"),new Path("/test/bcd.txt"));

// 删除文件或文件夹。如果是是递归删除且第二个参数为false,那么会报错。
fs.delete(new Path("/test/bcd.txt"), true);

// 获取文件或文件夹详细信息
FileStatus[] fileStatuses = fs.listStatus(new Path("/test/abc.txt"));
for (FileStatus status : fileStatuses) {
System.out.println(status.toString());
}
}

}

HDFS读写流程

读流程

  1. 使用HDFS提供的客户端开发库,向远程的Namenode发起RPC请求;
  2. Namenode会视情况返回文件的部分或者全部block列表,对于每个block,Namenode都会返回有该block拷贝的datanode地址;
  3. 客户端开发库会选取离客户端最接近的datanode来读取block;
  4. 读取完当前block的数据后,关闭与当前的datanode连接,并为读取下一个block寻找最佳的datanode;
  5. 当读完列表的block后,且文件读取还没有结束,客户端开发库会继续向Namenode获取下一批的block列表。
  6. 读取完一个block都会进行checksum验证,如果读取datanode时出现错误,客户端会通知Namenode,然后再从下一个拥有该block拷贝的datanode继续读。

写流程

  1. 使用HDFS提供的客户端开发库,向远程的Namenode发起RPC请求;
  2. Namenode会检查要创建的文件是否已经存在,创建者是否有权限进行操作,成功则会为文件创建一个记录,否则会让客户端抛出异常;
  3. 当客户端开始写入文件的时候,开发库会将文件切分成多个packets,并在内部以”data queue(数据队列)”的形式管理这些packets,并向Namenode申请新的blocks,获取用来存储replicas的合适的datanodes列表,列表的大小根据在Namenode中对replication的设置而定。
  4. 开始以pipeline(管道)的形式将packet写入所有的replicas中。开发库把packet以流的方式写入第一个datanode,该datanode把该packet存储之后,再将其传递给在此pipeline中的下一个datanode,直到最后一个datanode,这种写数据的方式呈流水线的形式。
  5. 最后一个datanode成功存储之后会返回一个ack packet(确认队列),在pipeline里传递至客户端,在客户端的开发库内部维护着”ack queue”,成功收到datanode返回的ack packet后会从”ack queue”移除相应的packet。
  6. 如果传输过程中,有某个datanode出现了故障,那么当前的pipeline会被关闭,出现故障的datanode会从当前的pipeline中移除,剩余的block会继续剩下的datanode中继续以pipeline的形式传输,同时Namenode会分配一个新的datanode,保持replicas设定的数量。
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信