Springboot2.3整合Zookeeper3.6实现基本操作

x33g5p2x  于2021-08-25 转载在 Zookeeper  
字(4.5k)|赞(0)|评价(0)|浏览(386)

本文主要介绍Springboot3.4整合Zookeeper3.6版本,需提前安装好zookeeper开发环境,有不清楚的小伙伴,请参考Zookeeper3.6搭建单机版和集群版

1. 引入pom依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
</dependency>

2. zookeeper配置文件

@Configuration
public class ZookeeperConfig {

	//zookeeper连接信息
    @Value("${zookeeper.connect.address}")
    private String connectStr;
    //session超时时间(毫秒)
    @Value("${zookeeper.connect.time-out}")
    private int timeOut;
    private CountDownLatch connectLatch = new CountDownLatch(1);

    /** * 初始化zookeeper连接 * @return * @throws IOException * @throws InterruptedException */
    @Bean
    public ZooKeeper getZkClient() throws IOException, InterruptedException {
        ZooKeeper zkClient = new ZooKeeper(connectStr, timeOut, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }
            }
        });
        connectLatch.await();
        return zkClient;
    }
}

3. Zookeeper接口封装

public interface ZookeeperService {

    /** * 创建节点 * @param path 节点路径 * @param data 节点数据 * @param perm 节点权限 * @param nodeType 节点类型 * @return * @throws KeeperException * @throws InterruptedException */
    String createNode(String path, String data, ArrayList<ACL> perm, CreateMode nodeType) throws KeeperException, InterruptedException;

    /** * 判断节点是否存在 * @param path 节点路径 * @param nodeWatch 是否复用zookeeper中默认的watch * @return * @throws KeeperException * @throws InterruptedException */
    Stat existNode(String path, boolean nodeWatch) throws KeeperException, InterruptedException;

    /** * 判断节点是否存在 * @param path 节点路径 * @param watcher 监听类型 创建/删除/更新 * @return * @throws KeeperException * @throws InterruptedException */
    Stat existNode(String path, Watcher watcher) throws KeeperException, InterruptedException;

    /** * 修改节点 * @param path 节点路径 * @param data 节点数据 * @return * @throws KeeperException * @throws InterruptedException */
    Stat updateNode(String path, String data) throws KeeperException, InterruptedException;

    /** * 删除节点 * @param path 节点路径 * @throws KeeperException * @throws InterruptedException */
    void deleteNode(String path) throws KeeperException, InterruptedException;

    /** * 获取节点数据 * @param path 节点路径 * @return * @throws KeeperException * @throws InterruptedException */
    String getNodeData(String path) throws KeeperException, InterruptedException;

    /** * 获取子节点 * @param path 节点路径 * @param nodeWatch 是否复用zookeeper中默认的watch * @return * @throws KeeperException * @throws InterruptedException */
    List<String> getChildrenNode(String path, boolean nodeWatch) throws KeeperException, InterruptedException;

    /** * 获取子节点 * @param path 节点路径 * @param watcher 监听类型 创建/删除/更新 * @return * @throws KeeperException * @throws InterruptedException */
    List<String> getChildrenNode(String path, Watcher watcher) throws KeeperException, InterruptedException;
}

4. Zookeeper接口实现

@Service
public class ZookeeperServiceImpl implements ZookeeperService {

    @Autowired
    private ZooKeeper zkClient;

    @Override
    public String createNode(String path, String data, ArrayList<ACL> perm, CreateMode nodeType) throws KeeperException, InterruptedException {
        String nodeStr = zkClient.create(path, data.getBytes(), perm, nodeType);
        return nodeStr;
    }

    @Override
    public Stat existNode(String path, boolean nodeWatch) throws KeeperException, InterruptedException {
        Stat exists = zkClient.exists(path, nodeWatch);
        return exists;
    }

    @Override
    public Stat existNode(String path, Watcher watcher) throws KeeperException, InterruptedException {
        Stat exists = zkClient.exists(path, watcher);
        return exists;
    }

    /** * 更新节点 * @param path 节点路径 * @param data 节点数据 * version 版本号,如果为-1,表示忽略版本检查 * @return * @throws KeeperException * @throws InterruptedException */
    @Override
    public Stat updateNode(String path, String data) throws KeeperException, InterruptedException {
        Stat stat = zkClient.setData(path, data.getBytes(), 0);
        return stat;
    }

    @Override
    public void deleteNode(String path) throws KeeperException, InterruptedException {
        zkClient.delete(path, 0);
    }

    @Override
    public String getNodeData(String path) throws KeeperException, InterruptedException {
        Stat stat = new Stat();
        byte[] data = zkClient.getData(path, true, stat);
        String dataStr = new String(data);
        return dataStr;
    }

    @Override
    public List<String> getChildrenNode(String path, boolean nodeWatch) throws KeeperException, InterruptedException {
        List<String> childrenList = zkClient.getChildren(path, nodeWatch);
        return childrenList;
    }

    @Override
    public List<String> getChildrenNode(String path, Watcher watcher) throws KeeperException, InterruptedException {
        List<String> childrenList = zkClient.getChildren(path, watcher);
        return childrenList;
    }
}

5. 节点类型

Zookeeper中节点类型主要有两类,即持久节点(Persistent)和临时节点(Ephemeral),每种节点又分为有序和无序
持久无序节点:客户端与Zookeeper断开连接后,该节点依旧存在
持久有序节点:客户端与Zookeeper端口连接后,该节点依旧存在,Zookeeper会给该节点名称进行顺序编号
临时无序节点:客户端与Zookeeper断开连接后,该节点被删除
临时有序节点:客户端与Zookeeper断开连接后,该节点被删除,Zookeeper会给该节点名称进行顺序编号

6. 客户端向服务端写数据流程

写流程之写入请求发送给Leader节点

写入请求发送给Leader节点

写流程之写入请求发送给follower节点

写入请求发送给follower节点

相关文章