zookeeper java api 操作(五) | curator-recipes

x33g5p2x  于2021-12-20 转载在 其他  
字(17.1k)|赞(0)|评价(0)|浏览(248)

一、概述

接着上一篇文章,继续学习zookeeper的客户端curator的api操作,curator-recipes丰富了zookeeper应用场景的使用api封装,主要包括Cache(NodeCache、PathChildrenCache、TreeCache),Elections,Locks,Barriers,Counters,Queues等**。**

curator基础篇学习地址:

zookeeper java api 操作(四) | curator_青枫绿屿的博客-CSDN博客

curator-recipes官网地址:

Apache Curator Recipes –

二、pom 依赖

<dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.0.1</version>
</dependency>

三、Curator-recipes

1.Cache

** **recipes的cache有以下三种(pathCache,NodeCache,TreeCache),它对zookeeper的watcher进行了封装,zookeeper的watcher只能监听一次,需要反复注册。采用cache相当于本地有zookeeper节点信息的快照数据,信息改变,则会回调监听事件方法。

a.NodeCache

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class NodeCacheTest {
    static String path = "/zk-curator/nodeCache";
    // 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
    // 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183").sessionTimeoutMs(5000)
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        // 开始连接
        client.start();
        // 创建 EPHEMERAL类型节点,并创建父节点
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        // use node cache,第一个参数是传入创建的Curator客户端,第二个参数是监听节点的路径,第三个dataIsCompressed参数表示是否对数据进行压缩
        final NodeCache cache = new NodeCache(client, path, false);
        // gives the option of doing an initial build ,buildInitial if true,
        // will be called before this method returns in order to get an initial view of the node
        cache.start(true);
        // node cache add listener
        cache.getListenable().addListener(new NodeCacheListener() {
            // 监听node change 事件
            public void nodeChanged() throws Exception {
                System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData()));
            }
        });
        client.setData().forPath(path, "update".getBytes());
        Thread.sleep(1000);
        // 删除节点时,nodeCache不会监听
        client.delete().deletingChildrenIfNeeded().forPath(path);
        client.close();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

b.PathCache

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class PathChildrenCacheTest {
    static String path = "/curator-recipes-pathChildrenCache";
    // 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
    // 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3)).sessionTimeoutMs(5000).build();

    public static void main(String[] args) throws Exception {
        // 开始连接
        client.start();
        // 新建一个pathChildrenCache,cacheData:true
        PathChildrenCache cache = new PathChildrenCache(client, path, true);
        // 调用开始才会开始缓存
        cache.start(StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED," + event.getData().getPath());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED," + event.getData().getPath());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED," + event.getData().getPath());
                    break;
                case CONNECTION_LOST:
                    System.out.println("CONNECTION_LOST," + event.getData().getPath());
                    break;
                default:
                    break;
                }
            }
        });
        // 创建临时节点
        client.create().withMode(CreateMode.PERSISTENT).forPath(path);
        Thread.sleep(1000);
        // 创建临时子节点
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path + "/c1");
        Thread.sleep(1000);
        // 删除子节点
        client.delete().forPath(path + "/c1");
        Thread.sleep(1000);
		// 删除节点
        client.delete().forPath(path);
        // 连接关闭
        client.close();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

c.TreeCache

package com.szwn.curator;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class TreeCacheTest {
    static String path = "/curator-recipes-treeCache";
    // 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
    // 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3)).sessionTimeoutMs(5000).build();
    static ExecutorService es = Executors.newFixedThreadPool(3);

    public static void main(String[] args) throws Exception {
        // 开始连接
        client.start();
        // TreeCache
        TreeCache cache = new TreeCache(client, path);
        // 调用开始才会开始缓存
        cache.start();
        // 添加一个listener,并用一个线程池es处理
        cache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {
                switch (event.getType()) {
                case NODE_ADDED:
                    System.out.println("NODE_ADDED," + event.getData().getPath());
                    break;
                case NODE_UPDATED:
                    System.out.println("NODE_UPDATED," + event.getData().getPath());
                    break;
                case NODE_REMOVED:
                    System.out.println("NODE_REMOVED," + event.getData().getPath());
                    break;
                case CONNECTION_LOST:
                    System.out.println("CONNECTION_LOST," + event.getData().getPath());
                    break;
                default:
                    break;
                }
            }
        }, es);
        // 创建临时节点
        client.create().withMode(CreateMode.PERSISTENT).forPath(path);
        Thread.sleep(1000);
        // 创建临时子节点
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path + "/c1");
        Thread.sleep(1000);
        // 删除子节点
        client.delete().forPath(path + "/c1");
        Thread.sleep(1000);
        // 删除节点
        client.delete().forPath(path);
        Thread.sleep(1000);
        // 连接关闭
        client.close();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

2.Elections

我们知道zookeeper有很多用途,其中之一就是作为分布式架构的注册中心,使用它进行分布式集群系统的管理,由阿里开源的dubbo框架以及springCloud都可以使用zookeeper作为注册中心。在分布式集群环境中,多个服务器必须选举出master进行相关业务逻辑的处理,其余follower需要不断监听master服务器的状态,如果出现异常,则继续参与master选举,担任处理业务的职责。recipes提供两种(LeaderLatch,LeaderSelector)选举方式,我们可以根据自己的需要选择使用。

**a.**LeaderSelector

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class LeaderSelectorTest {
    static String leader_path = "/curator_recipes_leader_path";

    public static void main(String[] args) throws Exception {
        // 模拟3个线程分别获取1个客户端连接进行master选举
        Runnable target = new Runnable() {
            @Override
            public void run() {
                CuratorFramework client = newClient();
                client.start();
                // 新建LeaderSelector,路径为/curator_recipes_leader_path,并注册
                LeaderSelector selector = new LeaderSelector(client, leader_path, new LeaderSelectorListenerAdapter() {
                    // 选举为master成功回调
                    public void takeLeadership(CuratorFramework client) throws Exception {
                        System.out.println(Thread.currentThread().getName() + "成为Master角色");
                        // 此处执行选举成功后的业务逻辑
                        Thread.sleep(3000);
                        System.out.println(Thread.currentThread().getName() + "完成Master操作,释放Master权利");
                    }
                });
                // 该方法能让客户端在释放leader权限后 重新加入leader权限的争夺中
                selector.autoRequeue();
                // 开始争夺
                selector.start();
            }
        };
		Thread thread1 = new Thread(target);
		Thread thread2 = new Thread(target);
		Thread thread3 = new Thread(target);
		thread1.start();
		thread2.start();
		thread3.start();
        Thread.sleep(Integer.MAX_VALUE);
    }

    public static CuratorFramework newClient() {
        return CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
				.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    }

}

此处我们模拟3个服务器不断争夺master选举,可以看出它们依次成为master。它的实现原理是使用一个选举节点锁,当一个master处理完成业务逻辑时,会主动释放节点锁,供其他节点获取,成为新的master。

b.LeaderLatch

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.io.IOException;

public class LeaderLatchTest {
    static String leader_path = "/curator_recipes_latch_path";

    public static void main(String[] args) throws Exception {
        // 模拟3个线程分别获取1个客户端连接进行master选举
        Runnable target = new Runnable() {
            @Override
            public void run() {
                CuratorFramework client = newClient();
                client.start();
                // LeaderLatch,路径为/curator_recipes_leader_path,并注册
                final LeaderLatch latch = new LeaderLatch(client,leader_path,Thread.currentThread().getId()+"");
                latch.addListener(new LeaderLatchListener() {
                    @Override
                    public void isLeader() {
                        System.out.println(Thread.currentThread().getName() + "成为Master角色");
                        // 此处执行选举成功后的业务逻辑
                        try {
                            Thread.sleep(3000);
                            // 调用close方法释放资源
                            latch.close();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName() + "完成Master操作,释放Master权利");
                    }

                    @Override
                    public void notLeader() {
                        System.out.println(Thread.currentThread().getName() + "没有成为Master角色");
                    }
                });
                try {
                    latch.start();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
		Thread thread1 = new Thread(target);
		Thread thread2 = new Thread(target);
		Thread thread3 = new Thread(target);
		thread1.start();
		thread2.start();
		thread3.start();
        Thread.sleep(Integer.MAX_VALUE);
    }

    public static CuratorFramework newClient() {
        return CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
				.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    }

}

此处我们模拟3个服务器不断争夺master选举,可以看出它们依次成为master。它的实现原理是使用一个选举节点node,每一个客户端依次向该节点下创建序列节点,序列最小的为master,处理任务。其余序列大的follower监听最小的序列化,如果master的leaderLatch调用close方法,删除了节点,则后续的依次成为master。

3.Locks

recipes第二个应用场景是分布式锁,它提供了5种锁供我们使用(InterProcessMutex,InterProcessSemaphoreMutex,InterProcessReadWriteLock,InterProcessSemaphoreV2,InterProcessMultiLock)。此处我们模拟30个用户同时生成订单,使用InterProcessMutex 锁。通过结果可以看出只有锁释放,其他线程重新获取锁,生成订单,这样可以保证订单编号不会重复或不会重复生成订单。

package com.szwn.curator;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class LockTest {
    static String lock_path = "/curator_recipes_lock_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        // InterProcessMutex node path 锁
        final InterProcessMutex lock = new InterProcessMutex(client, lock_path);
        final CountDownLatch down = new CountDownLatch(1);
        for (int i = 0; i < 30; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        // 使用countDownLatch await让30个子线程同时竞争锁
                        down.await();
                        lock.acquire();
                        Thread.sleep(1000);
                    } catch (Exception e) {
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
                    String orderNo = sdf.format(new Date());
                    System.out.println(Thread.currentThread().getName() + "生成的订单号是 : " + orderNo);
                    try {
                        lock.release();
                    } catch (Exception e) {
                    }
                }
            }).start();
        }
        down.countDown();
    }
}

4.Barriers

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class BarrierTest {

	public static void main(String[] args) throws Exception {
		final String barrier_path = "/curator_recipes_barrier_path";
		// 新建客户端
		final CuratorFramework client = newClient();
		// 新建barrier
		final DistributedBarrier barrier = new DistributedBarrier(client, barrier_path);
		client.start();
		// 创建5个线程使用barrier
		barrier.setBarrier();
		for (int i = 0; i < 5; i++) {
			new Thread(new Runnable() {
				public void run() {
					try {
						// 等待 barrier 移除
						barrier.waitOnBarrier();
						// 设置barrier,执行自己逻辑
						barrier.setBarrier();
						System.out.println(Thread.currentThread().getName() + "号barrier设置");
						Thread.sleep(1000);
						System.out.println("启动...");
						// 移除barrier
						barrier.removeBarrier();
					} catch (Exception e) {
					}
				}
			}).start();
		}
		barrier.removeBarrier();
		Thread.sleep(2000);
	}
	public static CuratorFramework newClient() {
		return CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
				.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
	}
}

5.Counters

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;

public class DistributeAtomicInt {
	static String distributeInt = "/curator_recipes_distributedAtoInt";
	static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
			.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

	public static void main(String[] args) throws Exception {
		client.start();
		DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, distributeInt,
				new RetryNTimes(3, 1000));
		atomicInteger.initialize(0);
		AtomicValue<Integer> rc = atomicInteger.add(8);
		System.out.println("succeed: " + rc.succeeded() + ",preValue" + rc.preValue() + ",postValue" + rc.postValue());
		AtomicValue<Integer> rc1 = atomicInteger.compareAndSet(2,10);
		System.out.println("succeed: " + rc1.succeeded() + ",preValue" + rc1.preValue() + ",postValue" + rc1.postValue());
	}
}

6.Queues

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;
import org.apache.curator.framework.recipes.queue.DistributedIdQueue;
import org.apache.curator.framework.recipes.queue.DistributedPriorityQueue;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;

public class DistributedQueueTest {
	static String distributePath= "/curator_recipes_distributedQueue";
	static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
			.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

	public static void main(String[] args) throws Exception {
		client.start();
		QueueConsumer<String> consumer = new QueueConsumer<String>() {
			@Override
			public void consumeMessage(String message) throws Exception {
				System.out.println(message);
			}
			@Override
			public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
			}
		};
		QueueSerializer<String> serializer = new QueueSerializer<String>() {
			@Override
			public byte[] serialize(String item) {
				return item.getBytes();
			}
			@Override
			public String deserialize(byte[] bytes) {
				return new String(bytes);
			}
		};
		QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer, distributePath);
//		DistributedDelayQueue<String> delayQueue = builder.buildDelayQueue();
//		DistributedIdQueue<String> idQueue = builder.buildIdQueue();
//		DistributedPriorityQueue<String> priorityQueue = builder.buildPriorityQueue(1);
		final DistributedQueue<String> queue = builder.buildQueue();
		// Start the queue. No other methods work until this is called
		queue.start();
		for (int i = 0; i < 10; i++) {
			Thread thread = new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						queue.put(Thread.currentThread().getName());
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			});
			thread.start();
		}
		Thread.sleep(Integer.MAX_VALUE);
	}
}

相关文章