ZooKeeper必知必会

基本概念

基于ZooKeeper3.5.7

概述

ZooKeeper是一个开源的分布式框架,用于为其它分布式框架(例如Hadoop)提供协调服务。
ZooKeeper = 文件系统 + 通知机制。

特点

  • ZooKeeper集群由一个Leader、多个Follower组成(集群个数必须为2N+1)。
  • 集群中只要有半数以上节点存活,ZooKeeper集群就能正常工作。
  • 全局数据一致,每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
  • 更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。
  • 数据更新原子性,一次数据更新要么成功,要么失败。
  • 实时性,在一定时间范围内,Client能读到最新数据。

数据结构

与Linux文件系统类似,整体上可以看作是一棵树,每个节点称为ZNode。
每一个ZNode默认最多能够存储1MB的数据,每个ZNode都可以通过其路径唯一标识。

本地安装

安装步骤

下载地址:https://archive.apache.org/dist/zookeeper/
选择apache-zookeeper-3.5.7-bin.tar.gz下载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
## 已安装JDK
[root@node02 ~]# java -version
java version "1.8.0_162"
Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
## 解压zk到指定目录
root@node02 tools]# tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt
## 重命名,方便后续操作
[root@node02 opt]# mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
## 修改配置文件(直接使用模板文件)
[root@node02 zookeeper-3.5.7]# cd /opt/zookeeper-3.5.7/conf/
[root@node02 conf]# mv zoo_sample.cfg zoo.cfg
## 编辑修改(本地安装可忽略,主要是数据存放地址变更)
[root@node02 conf]# vim zoo.cfg
## 启动zk服务端
[root@node02 conf]# cd ../bin/
[root@node02 bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
## 启动完成之后查看进程
[root@node02 bin]# jps
1520 Jps
1484 QuorumPeerMain
## 启动客户端进行访问,使用quit退出
[root@node02 bin]# ./zkCli.sh
Connecting to localhost:2181
## 查看目录结构
[zk: localhost:2181(CONNECTED) 1] ls /
[zookeeper]

配置参数详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
## 查看配置文件
[root@node02 conf]# cat zoo.cfg
# The number of milliseconds of each tick
## 通信心跳时间,ZK服务端与客户端心跳时间,单位毫秒
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
## LF初始通信时限,Leader和Follower初始连接时能容忍的最多心跳数(tickTime的倍数),这里是10*2000毫秒
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
## LF同步通信时限,Leader和Follower之间的通信时间如果超过配置,Leader会认为Follower死掉,从服务器列表中删掉Follower。
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
## 数据文件存放路径,保存zk中的数据
dataDir=/tmp/zookeeper
# the port at which the clients will connect
## 客户端连接端口号,默认2181
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
## 客户端最大连接数
#maxClientCnxns=60

集群安装

节点规划

节点 进程
node03 jdk1.8、zookeeper
node04 jdk1.8、zookeeper
node05 jdk1.8、zookeeper

安装步骤

解压文件

1
2
3
4
## 各节点已安装JDK
## 解压zookeeper到指定路径,并修改文件名
[root@node03 opt]# tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/
[root@node03 opt]# mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7

配置服务器编号

1
2
3
4
5
6
## 新建data文件夹,用于存储数据
[root@node03 zookeeper-3.5.7]# mkdir data
## 新建myid文件,用于配置服务器编号
[root@node03 data]# vi myid
[root@node03 data]# cat myid
3

配置zoo.cfg文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
## 查看配置文件
[root@node03 conf]# cat zoo.cfg
# The number of milliseconds of each tick
## 通信心跳时间,ZK服务端与客户端心跳时间,单位毫秒
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
## LF初始通信时限,Leader和Follower初始连接时能容忍的最多心跳数(tickTime的倍数),这里是10*2000毫秒
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
## LF同步通信时限,Leader和Follower之间的通信时间如果超过配置,Leader会认为Follower死掉,从服务器列表中删掉Follower。
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
## 数据文件存放路径,保存zookeeper中的数据
dataDir=/opt/zookeeper-3.5.7/data
# the port at which the clients will connect
## 客户端连接端口号,默认2181
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
## 客户端最大连接数
#maxClientCnxns=60

与本地安装不同,集群模式需要额外在zoo.cfg末尾配置以下配信息

1
2
3
server.3 = node03:2888:3888
server.4 = node04:2888:3888
server.5 = node05:2888:3888

这里server.A = B:C:D格式中
A表示一个数字,由myid决定。
B表示这台服务器的地址。
C表示ZooKeeper集群中Leader和Follower服务器交换信息的端口。
D表示万一Leader服务器挂掉之后,用来执行选举时服务器相互通信的端口。

分发到其他服务器

1
2
3
4
## 从node03节点分发到node04节点
[root@node03 opt]# scp -r zookeeper-3.5.7/ root@node04:/opt
## 从node03节点分发到node05节点
[root@node03 opt]# scp -r zookeeper-3.5.7/ root@node05:/opt

注意 zoo.cfg文件中myid变化,分发之后需要进行对应修改,其它保持不变。

启动集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
## 分别启动各节点zkServer
[root@node03 zookeeper-3.5.7]# bin/zkServer.sh start
[root@node04 zookeeper-3.5.7]# bin/zkServer.sh start
[root@node05 zookeeper-3.5.7]# bin/zkServer.sh start

## 查看相关进程
[root@node04 zookeeper-3.5.7]# jps
1633 Jps
1587 QuorumPeerMain

## 查看相关集群角色
[root@node03 zookeeper-3.5.7]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

[root@node04 zookeeper-3.5.7]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader

集群角色

在ZooKeeper集群中,一共有三种角色。

Leader :为客户端提供读写服务,并维护集群状态,它是由集群选举所产生的;

Follower :为客户端提供读写服务,并定期向 Leader 汇报自己的节点状态。同时也参与写操作“过半写成功”的策略和 Leader 的选举;

Observer :为客户端提供读写服务,并定期向 Leader 汇报自己的节点状态,但不参与写操作“过半写成功”的策略和 Leader 的选举,因此 Observer 可以在不影响写性能的情况下提升集群的读性能。

客户端命令行操作

节点类型

  • 持久(Persistent):包括持久节点和持久有序节点,客户端和服务器断开连接之后,创建的节点不会被删除。
  • 临时(Ephemeral):包括临时节点和临时有序节点,客户端和服务器断开连接之后,创建的节点会被删除。

有序节点是指在创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。

常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
## 启动客户端
[root@node03 zookeeper-3.5.7]# bin/zkCli.sh

## 退出客户端
[zk: localhost:2181(CONNECTED) 4] quit

## 查看节点
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]

## 帮助
[zk: localhost:2181(CONNECTED) 5] help

## 查看节点详细信息
[zk: localhost:2181(CONNECTED) 1] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

## 新建持久节点
[zk: localhost:2181(CONNECTED) 2] create /bigdata 'dashuju'
Created /bigdata
[zk: localhost:2181(CONNECTED) 3] ls /
[bigdata, zookeeper]

## 获取节点值信息
[zk: localhost:2181(CONNECTED) 7] get /bigdata
dashuju

## 新建持久有序节点
[zk: localhost:2181(CONNECTED) 9] create -s /bigdata/flink 'windows'
Created /bigdata/flink0000000000
[zk: localhost:2181(CONNECTED) 10] ls /bigdata
[flink0000000000]

## 再创建一个同名的节点
[zk: localhost:2181(CONNECTED) 13] create -s /bigdata/flink 'count'
Created /bigdata/flink0000000001
[zk: localhost:2181(CONNECTED) 14] ls /bigdata
[flink0000000000, flink0000000001]

## 新建临时节点
[zk: localhost:2181(CONNECTED) 16] create -e /bigdata/spark 'rdd'
Created /bigdata/spark
[zk: localhost:2181(CONNECTED) 17] ls /bigdata
[flink0000000000, flink0000000001, spark]

## 新建临时有序节点
[zk: localhost:2181(CONNECTED) 18] create -e -s /bigdata/hive 'sql'
Created /bigdata/hive0000000003
[zk: localhost:2181(CONNECTED) 19] ls /bigdata
[flink0000000000, flink0000000001, hive0000000003, spark]

## 退出客户端后再次查看
[zk: localhost:2181(CONNECTED) 0] ls /bigdata
[flink0000000000, flink0000000001]

## 修改节点值
[zk: localhost:2181(CONNECTED) 2] set /bigdata/flink0000000000 'test'
[zk: localhost:2181(CONNECTED) 3] get /bigdata/flink0000000000
test

## 删除节点
[zk: localhost:2181(CONNECTED) 9] delete /bigdata/flink0000000000
[zk: localhost:2181(CONNECTED) 10] ls /bigdata
[flink0000000001]

## 递归删除节点
[zk: localhost:2181(CONNECTED) 11] deleteall /bigdata
[zk: localhost:2181(CONNECTED) 12] ls /
[zookeeper]

## 查看节点状态
[zk: localhost:2181(CONNECTED) 13] stat /zookeeper
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -2
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 2

监听器

原理:在客户端注册监听需要关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除等)是,ZooKeeper会通知客户端。监听机制保证了ZooKeeper保存的任何数据的任何变化都能快速的响应到监听了该节点的应用程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
## node03上创建一个节点
[zk: localhost:2181(CONNECTED) 15] create /bigdata 'test001'
Created /bigdata
## 监听该节点
[zk: localhost:2181(CONNECTED) 16] get -w /bigdata
## node04上变更节点信息
[zk: localhost:2181(CONNECTED) 24] set /bigdata 'test002'
## node03查看监听信息(注册一个监听器,生效一次)
[zk: localhost:2181(CONNECTED) 6]
WATCHER::

WatchedEvent state:SyncConnected type:NodeDataChanged path:/bigdata

## 监听子节点目录(路径变化)
## node03上监听bigdata节点
[zk: localhost:2181(CONNECTED) 6] ls -w /bigdata
[]
## node04上新增bigdata子节点
[zk: localhost:2181(CONNECTED) 25] create /bigdata/flink 'test'
Created /bigdata/flink
## node03上查看监听信息(注册一个监听器,生效一次)
[zk: localhost:2181(CONNECTED) 7]
WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/bigdata

客户端API操作

注意:必须保证ZooKeeper集群服务端已启动。

官方API

pom文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>

demo代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* ZooKeeper客户端API相关操作
* 原生API
*/
public class ZkClient {

private ZooKeeper zkClient = null;
private String connectString = "node03:2181,node04:2181,node05:2181";
private int sessionTimeout = 2000;


/**
* 客户端初始化
*/
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {

}
});
}

/**
* 关闭客户端
*/
@After
public void close() throws InterruptedException {
if (zkClient != null) {
zkClient.close();
}
}

/**
* 创建持久节点
* @throws Exception
*/
@Test
public void create()throws Exception{
zkClient.create("/bigdata", "dashuju".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

/**
* 获取节点值
*
* @throws Exception
*/
public void getNode() throws Exception {
byte[] data = zkClient.getData("/bigdata", false, null);
String str = new String(data, "UTF-8");
System.out.println("节点值:" + str);
}

/**
* 更新节点值
*
* @throws Exception
*/
public void updateNode() throws Exception {
// 版本号必须一致,否则无法更新。-1表示任何版本。
zkClient.setData("/bigdata", "bigdata".getBytes(), -1);
}

/**
* 删除节点
*
* @throws Exception
*/
public void deleteNode() throws Exception {
// 版本号必须一致,否则无法删除。-1表示任何版本。
zkClient.delete("/bigdata/spark", -1);
}

/**
* 判断节点是否存在
*
* @throws Exception
*/
public void existNode() throws Exception {
// 如果stat为null,说明节点不存在
Stat stat = zkClient.exists("/bigdata/spark", false);
System.out.println("节点是否存在:" + !(stat == null));
}

/**
* 获取子节点列表
*
* @throws Exception
*/
public void getChildNode() throws Exception {
List<String> childrenList = zkClient.getChildren("/bigdata", false);
for (String str : childrenList) {
System.out.println("子节点:" + str);
}
}
}

Apache Curator(推荐)

pom文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<dependencies>
<!--Curator 相关依赖-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>

demo代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/**
* ZooKeeper客户端API相关操作
* 使用Apache Curator框架
*/
public class ZkClient {

private CuratorFramework zkClient = null;
private String connectString = "node03:2181,node04:2181,node05:2181";
private int sessionTimeout = 2000;


/**
* 客户端初始化
*/
@Before
public void init() {
// 重试策略
RetryNTimes retryPolicy = new RetryNTimes(3, 5000);
zkClient = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
}

/**
* 关闭客户端
*/
@After
public void close() {
if (zkClient != null) {
zkClient.close();
}
}

/**
* 创建持久节点
*
* @throws Exception
*/
public void create() throws Exception {
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/bigdata", "dashuju".getBytes());
}

/**
* 获取节点信息
*
* @throws Exception
*/
public void getNode() throws Exception {
Stat stat = new Stat();
byte[] data = zkClient.getData().storingStatIn(stat).forPath("/bigdata");
System.out.println("节点值:" + new String(data, "UTF-8"));
System.out.println("节点信息:" + stat.toString());
}

/**
* 更新节点值
*
* @throws Exception
*/
public void updateNode() throws Exception {
// 版本号必须一致,否则无法更新。-1表示任何版本。版本号可以传入也可以不传入。
zkClient.setData()
.withVersion(-1)
.forPath("/bigdata", "bigdata".getBytes());
}

/**
* 删除节点
*
* @throws Exception
*/
public void deleteNode() throws Exception {
// 版本号必须一致,否则无法删除。-1表示任何版本。版本号可以传入也可以不传入。
zkClient.delete()
.withVersion(-1)
.forPath("/bigdata");
}

/**
* 判断节点是否存在
*
* @throws Exception
*/
public void existNode() throws Exception {
// 如果stat为null,说明节点不存在
Stat stat = zkClient.checkExists().forPath("/bigdata");
System.out.println("节点是否存在:" + !(stat == null));
}

/**
* 获取子节点列表
*
* @throws Exception
*/
@Test
public void getChildNode() throws Exception {
List<String> childrenList = zkClient.getChildren().forPath("/bigdata");
for (String str : childrenList) {
System.out.println("子节点:" + str);
}
}
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信