Zookeeper节点知识点整理

x33g5p2x  于2021-10-14 转载在 Zookeeper  
字(12.7k)|赞(0)|评价(0)|浏览(723)

zookeeper选举机制

Zookeeper选举机制——第一次启动

(1)服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为 LOOKING;

(2)服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1) 大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING

(3)服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING; LOOKING

4)服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为 1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;

(5)服务器5启动,同4一样当小弟。

Zookeeper选举机制——非第一次启动

(1)当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:

  • 服务器初始化启动。
  • 服务器运行期间无法和Leader保持连接。

(2)而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:

  • 集群中本来就已经存在一个Leader。

对于第一种已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。

  • 集群中确实不存在Leader。

假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。

选举Leader规则:****①EPOCH大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,服务器id大的胜出

ZK 集群启动停止脚本

如果不是使用Docker搭建的zookeeper集群,那么每次启动zookeeper集群会非常繁琐,因此一般使用启停脚本,快速完成zookeeper集群的启停

1)在 hadoop102 的/home/atguigu/bin 目录下创建脚本

vim zk.sh

在脚本中编写如下内容

#!/bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
 echo ---------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh 
start"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
 echo ---------- zookeeper $i 停止 ------------ 
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh 
stop"
done
};;
"status"){
for i in hadoop102 hadoop103 hadoop104
do
 echo ---------- zookeeper $i 状态 ------------ 
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh 
status"
done
};;
esac

2)增加脚本执行权限

chmod u+x zk.sh

3)Zookeeper 集群启动脚本

zk.sh start

4)Zookeeper 集群停止脚本

zk.sh stop

客户端命令行操作

命令行语法

1)启动客户端

bin/zkCli.sh -server hadoop102:2181

2)显示所有操作命令

help

znode 节点数据信息

1)查看当前znode中所包含的内容

ls /

默认顶层有一个zookeeper节点

2)查看当前节点详细数据

ls / -s
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

(1)czxid:创建节点的事务 zxid

每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所
有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之
前发生。

(2)ctime:znode 被创建的毫秒数(从 1970 年开始)
(3)mzxid:znode 最后更新的事务 zxid
(4)mtime:znode 最后修改的毫秒数(从 1970 年开始)
(5)pZxid:znode 最后更新的子节点 zxid
(6)cversion:znode 子节点变化号,znode 子节点修改次数
(7)dataversion:znode 数据变化号
(8)aclVersion:znode 访问控制列表的变化号
(9)ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是
临时节点则是 0。
(10)dataLength:znode 的数据长度
(11)numChildren:znode 子节点数量

节点类型(持久/短暂/有序号/无序号)

1)分别创建2个普通节点(永久节点 + 不带序号)

create /sanguo "diaochan"
Created /sanguo

create /sanguo/shuguo "liubei"
Created /sanguo/shuguo

注意:创建节点时,要赋值

2)获得节点的值

get  /sanguo -s
diaochan
cZxid = 0x100000003
ctime = Wed Aug 29 00:03:23 CST 2018
mZxid = 0x100000003
mtime = Wed Aug 29 00:03:23 CST 2018
pZxid = 0x100000004
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 1
get /sanguo/shuguo -s
liubei
cZxid = 0x100000004
ctime = Wed Aug 29 00:04:35 CST 2018
mZxid = 0x100000004
mtime = Wed Aug 29 00:04:35 CST 2018
pZxid = 0x100000004
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

3)创建带序号的节点(永久节点 + 带序号)

(1)先创建一个普通的根节点/sanguo/weiguo

create /sanguo/weiguo "caocao"
Created /sanguo/weiguo

(2)创建带序号的节点

create -s  /sanguo/weiguo/zhangliao "zhangliao"
Created /sanguo/weiguo/zhangliao0000000000

create -s /sanguo/weiguo/zhangliao "zhangliao"
Created /sanguo/weiguo/zhangliao0000000001

create -s /sanguo/weiguo/xuchu "xuchu"
Created /sanguo/weiguo/xuchu0000000002

如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。

4)创建短暂节点(短暂节点 + 不带序号 or 带序号)

(1)创建短暂的不带序号的节点

create -e /sanguo/wuguo "zhouyu"
Created /sanguo/wuguo

(2)创建短暂的带序号的节点

create -e -s /sanguo/wuguo "zhouyu"
Created /sanguo/wuguo0000000001

(3)在当前客户端是能查看到的

ls /sanguo 
[wuguo, wuguo0000000001, shuguo]

(4)退出当前客户端然后再重启客户端

quit
 bin/zkCli.sh

(5)再次查看根目录下短暂节点已经删除

ls /sanguo
[shuguo]

5)修改节点数据值

set /sanguo/weiguo "simayi"

监听原理

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除)时,ZooKeeper 会通知客户端。监听机制保证 ZooKeeper 保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序。

1、监听原理详解

1)首先要有一个main()线程

2)在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。

3)通过connect线程将注册的监听事件发送给Zookeeper。

4)在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。

5)Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。

6)listener线程内部调用了process()方法。

2.常见的监听

1)节点的值变化监听

(1)在 hadoop104 主机上注册监听/sanguo 节点数据变化

get -w /sanguo

(2)在 hadoop103 主机上修改/sanguo 节点的数据

set /sanguo "xisi"

(3)观察 hadoop104 主机收到数据变化的监听

WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged 
path:/sanguo

注意:在hadoop103再多次修改/sanguo的值,hadoop104上不会再收到监听。因为注册 一次,只能监听一次。想再次监听,需要再次注册。

2)节点的子节点变化监听(路径变化)

(1)在 hadoop104 主机上注册监听/sanguo 节点的子节点变化

ls -w /sanguo
[shuguo, weiguo]

(2)在 hadoop103 主机/sanguo 节点上创建子节点

create /sanguo/jin "simayi"
Created /sanguo/jin

(3)观察 hadoop104 主机收到子节点变化的监听

WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged 
path:/sanguo

注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。

3.节点删除与查看

1)删除节点

delete /sanguo/jin

2)递归删除节点

deleteall /sanguo/shuguo

3)查看节点状态

stat /sanguo

cZxid = 0x100000003
ctime = Wed Aug 29 00:03:23 CST 2018
mZxid = 0x100000011
mtime = Wed Aug 29 00:21:23 CST 2018
pZxid = 0x100000014
cversion = 9
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 1

客户端 API 操作

前提:保证 hadoop102、hadoop103、hadoop104 服务器上 Zookeeper 集群服务端启动

IDEA 环境搭建

1)创建一个工程:zookeeper

2)添加pom文件

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>

3)拷贝log4j.properties文件到项目根目录

需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

log4j.rootLogger=INFO, stdout 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n 
log4j.appender.logfile=org.apache.log4j.FileAppender 
log4j.appender.logfile.File=target/spring.log 
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout 
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n

4)创建包名com.dhy.zk

5)创建类名称zkClient

创建 ZooKeeper 客户端

@Before
    public void init() throws Exception {
        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            //zookeeper注册一次,就监听一次
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 收到事件通知后的回调函数(用户的业务逻辑)
                System.out.println(watchedEvent.getType() + "--"
                        + watchedEvent.getPath());
                // 再次启动监听
                List<String> children = null;
                try {
                    children = zooKeeper.getChildren("/",true);
                    for (String child : children) {
                        System.out.println(child);
                    }
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

6)创建子节点

@Test
    public void create() throws InterruptedException, KeeperException {
        //第三个参数是权限
        //OPEN_ACL_UNSAFE:任何人都可以访问
        //第四个参数: 创建持久节点
        // 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
        zooKeeper.create("/dhy","xpylikedhy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

7) 获取子节点并监听节点变化

// 获取子节点
    @Test
    public void getChildren() throws Exception {
        //获取某个路径下的孩子,是否开启监听
        //注册一次监听,只会生效一次
        List<String> children = zooKeeper.getChildren("/",true);
        for (String child : children) {
            System.out.println(child);
        }
        // 延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

8) 判断 Znode 是否存在

// 判断 znode 是否存在
    @Test
    public void exist() throws Exception {
        //是否存在/dhy节点,不进行监听
        Stat stat = zooKeeper.exists("/dhy", false);
        System.out.println(stat == null ? "not exist" : "exist");
    }

如何在本机做pi和域名的映射,设置host文件

C:\Windows\System32\drivers\etc

IDEA操作zk完整代码

public class zkClient
{
    //这里不能加空格
    //必须在本机的host文件中做了域名和ip的映射
    //否则这里不能使用zoo1代替19.168.112.11的实际ip
   private static String connectString="192.168.112.128:2181";
    private int sessionTimeout=5000;
    private ZooKeeper zooKeeper;
    @Before
    public void init() throws Exception {
        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            //zookeeper注册一次,就监听一次
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 收到事件通知后的回调函数(用户的业务逻辑)
                System.out.println(watchedEvent.getType() + "--"
                        + watchedEvent.getPath());
                // 再次启动监听
                List<String> children = null;
                try {
                    children = zooKeeper.getChildren("/",true);
                    for (String child : children) {
                        System.out.println(child);
                    }
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Test
    public void create() throws InterruptedException, KeeperException {
        //第三个参数是权限
        //OPEN_ACL_UNSAFE:任何人都可以访问
        //第四个参数: 创建持久节点
        // 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
        String nodeCreate = zooKeeper.create("/dhy", "xpylikedhy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    // 获取子节点
    @Test
    public void getChildren() throws Exception {
        //获取某个路径下的孩子,是否开启监听
        //注册一次监听,只会生效一次
        List<String> children = zooKeeper.getChildren("/",true);
        for (String child : children) {
            System.out.println(child);
        }
        // 延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

    // 判断 znode 是否存在
    @Test
    public void exist() throws Exception {
        //是否存在/dhy节点,不进行监听
        Stat stat = zooKeeper.exists("/dhy", false);
        System.out.println(stat == null ? "not exist" : "exist");
    }
}

客户端向服务端写数据流程

写流程之写入请求直接发送给Leader节点

写流程之写入请求发送给follower节点

总结

1、首先客户端向zookeeper注册中心已经在监听的服务器写数据
2、如果此时该客户端所连接的服务器不是leader,那么接收到数据的server就会将该请求转发给集群中的leader,而担当leader的服务器又会进一步将数据广播给集群中的其他follower,让所有的follower都将数据写入自己的服务器中
3、每台follower写成功后就会通知给leader
4、如果leader接收到了半数以上的通知时,就表示数据已经写成功了,这时leader就会通知给原本给他转发数据的那台服务器:数据已经写成功了
5、服务器又会将成功通知发送给客户端,整个过程也就结束了。

服务器动态上下线监听案例

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

需求分析

服务器去zk里面创建节点,并存储数据,数据记录自己的主机名,当前连接数等等信息

客户端获取节点,如果节点不存在,表名对应的服务器下线

具体实现

(1)先在集群上创建/servers 节点

create /servers "servers"
 Created /servers

(2)在 Idea 中创建包名:com.dhy.zkcase1

(3)服务器端向 Zookeeper 注册代码

向zk中注册一台服务器,其实就是创建一个节点,然后传入主机名作为节点保存的数据,这个节点通常是临时带序号的

每启动一台服务器就传入对应的主机名称,向服务器中进行注册

public class DistributeServer {
    private static String connectString =
            "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zk = null;
    private String parentNode = "/servers";
    // 创建zk连接
    public void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new
                Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                    }
                });
    }
    // 注册服务器
    public void registServer(String hostname) throws Exception{
        String create = zk.create(
                parentNode + "/"+hostname,,
                hostname.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                //创建临时有序号的节点
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname +" is online "+ create);
    }
    // 业务功能
    public void business(String hostname) throws Exception{
        System.out.println(hostname + " is working ...");
        Thread.sleep(Long.MAX_VALUE);
    }
    public static void main(String[] args) throws Exception {
        // 1 获取 zk 连接
        DistributeServer server = new DistributeServer();
        server.getConnect();
       // 2 利用 zk 连接注册服务器信息
        server.registServer(args[0]);
        // 3 启动业务功能
        server.business(args[0]);
    }
}

(3)客户端代码

public class DistributeClient {
    private static String connectString =
            "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zk = null;
    private String parentNode = "/servers";

    // 创建到 zk 的客户端连接
    public void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new
                Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                     // 再次启动监听
                        try {
                            getServerList();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
    }

    // 获取服务器列表信息
    public void getServerList() throws Exception {
        // 1 获取服务器子节点信息,并且对父节点进行监听
        //这里如果自己new一个watcher,那么就会走自己new的watcher,而不是getConnect()里面的watcher了
        //如果是true,则表明使用getConnect()方法里面的watcher,这样可以实现持续监听
        List<String> children = zk.getChildren(parentNode, true);
        // 2 存储服务器信息列表
        ArrayList<String> servers = new ArrayList<>();
        // 3 遍历所有节点,获取节点中的主机名称信息
        for (String child : children) {
            //不进行监听
            byte[] data = zk.getData(parentNode + "/" + child,
                    false, null);
            servers.add(new String(data));
        }
        // 4 打印服务器列表信息
        System.out.println(servers);
    }

    // 业务功能
    public void business() throws Exception {
        System.out.println("client is working ...");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
        DistributeClient client = new DistributeClient();
        client.getConnect();
// 2 获取 servers 的子节点信息,从中获取服务器信息列表
        client.getServerList();
// 3 业务进程启动
        client.business();
    }
}

测试

1)在 Linux 命令行上操作增加减少服务器

(1)启动 DistributeClient 客户端

(2)在 hadoop102 上 zk 的客户端/servers 目录上创建临时带序号节点

[zk: localhost:2181(CONNECTED) 1] create -e -s  /servers/hadoop102 "hadoop102"
[zk: localhost:2181(CONNECTED) 2] create -e -s /servers/hadoop103 "hadoop103"

(3)观察 Idea 控制台变化

[hadoop102, hadoop103]

(4)执行删除操作

[zk: localhost:2181(CONNECTED) 8] delete 
/servers/hadoop1020000000000

(5)观察 Idea 控制台变化

[hadoop103]

2)在 Idea 上操作增加减少服务器

(1)启动 DistributeClient 客户端(如果已经启动过,不需要重启)

(2)启动 DistributeServer 服务

①点击 Edit Configurations…

②在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102

③回到 DistributeServer 的 main 方 法 , 右 键 , 在 弹 出 的 窗 口 中 点 击 Run “DistributeServer.main()”

④观察 DistributeServer 控制台,提示 hadoop102 is working

⑤观察 DistributeClient 控制台,提示 hadoop102 已经上线

相关文章

微信公众号

最新文章

更多

目录