org.apache.zookeeper.ZooKeeper类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(12.5k)|赞(0)|评价(0)|浏览(225)

本文整理了Java中org.apache.zookeeper.ZooKeeper类的一些代码示例,展示了ZooKeeper类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZooKeeper类的具体详情如下:
包路径:org.apache.zookeeper.ZooKeeper
类名称:ZooKeeper

ZooKeeper介绍

[英]This is the main class of ZooKeeper client library. To use a ZooKeeper service, an application must first instantiate an object of ZooKeeper class. All the iterations will be done by calling the methods of ZooKeeper class. The methods of this class are thread-safe unless otherwise noted.

Once a connection to a server is established, a session ID is assigned to the client. The client will send heart beats to the server periodically to keep the session valid.

The application can call ZooKeeper APIs through a client as long as the session ID of the client remains valid.

If for some reason, the client fails to send heart beats to the server for a prolonged period of time (exceeding the sessionTimeout value, for instance), the server will expire the session, and the session ID will become invalid. The client object will no longer be usable. To make ZooKeeper API calls, the application must create a new client object.

If the ZooKeeper server the client currently connects to fails or otherwise does not respond, the client will automatically try to connect to another server before its session ID expires. If successful, the application can continue to use the client.

The ZooKeeper API methods are either synchronous or asynchronous. Synchronous methods blocks until the server has responded. Asynchronous methods just queue the request for sending and return immediately. They take a callback object that will be executed either on successful execution of the request or on error with an appropriate return code (rc) indicating the error.

Some successful ZooKeeper API calls can leave watches on the "data nodes" in the ZooKeeper server. Other successful ZooKeeper API calls can trigger those watches. Once a watch is triggered, an event will be delivered to the client which left the watch at the first place. Each watch can be triggered only once. Thus, up to one event will be delivered to a client for every watch it leaves.

A client needs an object of a class implementing Watcher interface for processing the events delivered to the client. When a client drops the current connection and re-connects to a server, all the existing watches are considered as being triggered but the undelivered events are lost. To emulate this, the client will generate a special event to tell the event handler a connection has been dropped. This special event has EventType None and KeeperState Disconnected.
[中]

代码示例

代码示例来源:origin: apache/zookeeper

private void utestGet(int port)
  throws IOException, InterruptedException, KeeperException
{
  ZooKeeper zk =
    new ZooKeeper("127.0.0.1:" + port, CONNECTION_TIMEOUT, this);
  for (int i = 0; i < 10000; i++) {
    Stat stat = new Stat();
    zk.getData("/" + i, true, stat);
  }
  zk.close();
}

代码示例来源:origin: knightliao/disconf

Stat stat = zk.exists(path, false);
  zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  zk.setData(path, value.getBytes(CHARSET), stat.getVersion());
LOGGER.warn("write connect lost... will retry " + retries + "\t" + e.toString());

代码示例来源:origin: apache/zookeeper

private void utestChildren(int port)
  throws IOException, InterruptedException, KeeperException
{
  ZooKeeper zk =
    new ZooKeeper("127.0.0.1:" + port, CONNECTION_TIMEOUT, this);
  for (int i = 0; i < 10000; i++) {
    zk.getChildren("/" + i, true);
  }
  zk.close();
}

代码示例来源:origin: apache/hbase

private ZooKeeper getZk() throws IOException {
 // may be closed when session expired
 if (zookeeper == null || !zookeeper.getState().isAlive()) {
  zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {});
 }
 return zookeeper;
}

代码示例来源:origin: luxiaoxun/NettyRpc

private void AddRootNode(ZooKeeper zk){
  try {
    Stat s = zk.exists(Constant.ZK_REGISTRY_PATH, false);
    if (s == null) {
      zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
  } catch (KeeperException e) {
    logger.error(e.toString());
  } catch (InterruptedException e) {
    logger.error(e.toString());
  }
}

代码示例来源:origin: apache/hbase

public synchronized void reconnectAfterExpiration()
   throws IOException, KeeperException, InterruptedException {
 if (zk != null) {
  LOG.info("Closing dead ZooKeeper connection, session" +
   " was: 0x"+Long.toHexString(zk.getSessionId()));
  zk.close();
  // reset the ZooKeeper connection
  zk = null;
 }
 checkZk();
 LOG.info("Recreated a ZooKeeper, session" +
  " is: 0x"+Long.toHexString(zk.getSessionId()));
}

代码示例来源:origin: twitter/distributedlog

@Test(timeout = 60000)
public void testNamespaceListener() throws Exception {
  URI uri = createDLMURI("/" + runtime.getMethodName());
  zooKeeperClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
      .conf(conf).uri(uri).build();
  final AtomicReference<Collection<String>> receivedStreams = new AtomicReference<Collection<String>>(null);
  namespace.registerNamespaceListener(new NamespaceListener() {
    @Override
  latches[2].await();
  assertEquals(0, numFailures.get());
  assertNotNull(receivedStreams.get());
  Set<String> streamSet = new HashSet<String>();
  streamSet.addAll(receivedStreams.get());
  assertEquals(2, receivedStreams.get().size());
  assertEquals(2, streamSet.size());

代码示例来源:origin: org.apache.bookkeeper/bookkeeper-server

@Override
public byte[] call() throws KeeperException, InterruptedException {
  ZooKeeper zkHandle = zk.get();
  if (null == zkHandle) {
    return ZooKeeperClient.super.getData(path, watch, stat);
  }
  return zkHandle.getData(path, watch, stat);
}

代码示例来源:origin: org.apache.bookkeeper/bookkeeper-server

@Override
void zkRun() {
  ZooKeeper zkHandle = zk.get();
  if (null == zkHandle) {
    ZooKeeperClient.super.exists(path, watcher, stCb, worker);
  } else {
    zkHandle.exists(path, watcher, stCb, worker);
  }
}

代码示例来源:origin: org.apache.bookkeeper/bookkeeper-server

private void closeZkHandle() throws InterruptedException {
  ZooKeeper zkHandle = zk.get();
  if (null == zkHandle) {
    super.close();
  } else {
    zkHandle.close();
  }
}

代码示例来源:origin: org.apache.bookkeeper/bookkeeper-server

@Override
public States getState() {
  ZooKeeper zkHandle = zk.get();
  if (null == zkHandle) {
    return ZooKeeperClient.super.getState();
  } else {
    return zkHandle.getState();
  }
}

代码示例来源:origin: apache/zookeeper

private void delete_create_get_set_test_1() throws
    IOException, InterruptedException, KeeperException {
 checkRoot();
 ZooKeeper zk = new ZooKeeper(hostPort, 10000, this);
 String parentName = testDirOnZK;
 String nodeName = parentName + "/benwashere";
 try {
  zk.delete(nodeName, -1);
 } catch (KeeperException ke) {
  Code code = ke.code();
  boolean valid = code == KeeperException.Code.NONODE
    || code == KeeperException.Code.NOTEMPTY;
  if (!valid) {
   Assert.fail("Unexpected exception code for delete: " + ke.getMessage());
  zk.create(nodeName, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 } catch (KeeperException ke) {
  Code code = ke.code();
  boolean valid = code == KeeperException.Code.NODEEXISTS;
  if (!valid) {
  zk.setData(nodeName, "hi".getBytes(), 5700);
  Assert.fail("Should have gotten BadVersion exception");
 } catch (KeeperException ke) {
 zk.setData(nodeName, "hi".getBytes(), -1);
 Stat st = new Stat();
 byte[] bytes = zk.getData(nodeName, false, st);
 String retrieved = new String(bytes);
 if (!"hi".equals(retrieved)) {

代码示例来源:origin: apache/zookeeper

public void run() {
  try {
    Stat stat = new Stat();
    String path = zk.create("/hammers/hammer-", new byte[0],
        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
    byte tag[] = (path + " was here!").getBytes();
      String startPath = "/hammers/start";
      System.out.println("Waiting for " + startPath);
      while (zk.exists(startPath, true) == null) {
        wait();
        System.out.print(i + "\r");
        List<String> childs =
          zk.getChildren("/hammers", false);
        Collections.shuffle(childs);
        for (String s : childs) {
          if (s.startsWith("hammer-")) {
            s = "/hammers/" + s;
            zk.setData(s, tag, -1);
            for (int j = 0; j < readsPerWrite; j++) {
              zk.getData(s, false, stat);
        e.printStackTrace();
    zk.close();
  } catch (RuntimeException e) {
    e.printStackTrace();

代码示例来源:origin: apache/zookeeper

zk = createClient();
try {
  zk.addAuthInfo("digest", "ben:passwd".getBytes());
  zk.create("/ben", new byte[0], Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT, this, results);
  zk.create("/ben/2", new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT, this, results);
  zk.delete("/ben", -1, this, results);
  zk.create("/ben2", new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT, this, results);
  zk.getData("/ben2", false, this, results);
  synchronized (results) {
    while (results.size() < 5) {
  Assert.assertEquals(Code.NOAUTH, Code.get(results.get(1)));
  Assert.assertEquals(0, (int) results.get(2));
  Assert.assertEquals(0, (int) results.get(3));
  Assert.assertEquals(0, (int) results.get(4));
} finally {
  zk.close();
  zk.addAuthInfo("digest", "ben:passwd2".getBytes());
  try {
    zk.getData("/ben2", false, new Stat());
    Assert.fail("Should have received a permission error");
  } catch (KeeperException e) {
    Assert.assertEquals(Code.NOAUTH, e.code());
  zk.close();
  zk.addAuthInfo("digest", "ben:passwd".getBytes());
  zk.getData("/ben2", false, new Stat());

代码示例来源:origin: apache/zookeeper

try {
  zk = createClient();
  zk.addAuthInfo("digest", "ben:passwd".getBytes());
  ArrayList<ACL> testACL = new ArrayList<ACL>();
  testACL.add(new ACL(Perms.ALL, new Id("auth", null)));
  zk.create("/acltest", new byte[0], testACL, CreateMode.PERSISTENT);
  zk.close();
  zk = createClient();
  zk.addAuthInfo("digest", "ben:passwd2".getBytes());
  if (skipACL) {
    try {
      zk.getData("/acltest", false, null);
    } catch (KeeperException e) {
      Assert.fail("Badauth reads should succeed with skipACL.");
      zk.getData("/acltest", false, null);
      Assert.fail("Should have received a permission error");
    } catch (KeeperException e) {
      Assert.assertEquals(Code.NOAUTH, e.code());
  zk.addAuthInfo("digest", "ben:passwd".getBytes());
  zk.getData("/acltest", false, null);
  zk.setACL("/acltest", Ids.OPEN_ACL_UNSAFE, -1);
  zk.close();
  zk = createClient();
  zk.getData("/acltest", false, null);
  List<ACL> acls = zk.getACL("/acltest", new Stat());
  Assert.assertEquals(1, acls.size());
  Assert.assertEquals(Ids.OPEN_ACL_UNSAFE, acls);

代码示例来源:origin: apache/nifi

@Override
public StateMap getState(final String componentId) throws IOException {
  verifyEnabled();
  try {
    final Stat stat = new Stat();
    final String path = getComponentPath(componentId);
    final byte[] data = getZooKeeper().getData(path, false, stat);
    final StateMap stateMap = deserialize(data, stat.getVersion(), componentId);
    return stateMap;
  } catch (final InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId + ", due to interruption", e);
  } catch (final KeeperException ke) {
    final Code exceptionCode = ke.code();
    if (Code.NONODE == exceptionCode) {
      return new StandardStateMap(null, -1L);
    }
    if (Code.SESSIONEXPIRED == exceptionCode) {
      invalidateClient();
      return getState(componentId);
    }
    throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId + " with exception code " + exceptionCode, ke);
  } catch (final IOException ioe) {
    // provide more context in the error message
    throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId, ioe);
  }
}

代码示例来源:origin: apache/zookeeper

private void utestPrep(int port)
  throws IOException, InterruptedException, KeeperException
{
  ZooKeeper zk =
    new ZooKeeper("127.0.0.1:" + port, CONNECTION_TIMEOUT, this);
  for (int i = 0; i < 10000; i++) {
    zk.create("/" + i, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  }
  zk.close();
}

代码示例来源:origin: apache/zookeeper

private Stat createWithStatVerifyResult(String newName)
    throws KeeperException, InterruptedException {
  Assert.assertNull("Node existed before created", zk.exists(newName, false));
  Stat stat = new Stat();
  zk.create(newName, newName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER, stat);
  validateCreateStat(stat, newName);
  Stat referenceStat = zk.exists(newName, false);
  Assert.assertNotNull("Node was not created as expected", referenceStat);
  Assert.assertEquals(referenceStat, stat);
  return stat;
}

代码示例来源:origin: apache/zookeeper

private void utestExists(int port)
  throws IOException, InterruptedException, KeeperException
{
  ZooKeeper zk =
    new ZooKeeper("127.0.0.1:" + port, CONNECTION_TIMEOUT, this);
  for (int i = 0; i < 10000; i++) {
    zk.exists("/this/path/doesnt_exist!", true);
  }
  zk.close();
}

代码示例来源:origin: apache/zookeeper

@Test
public void testDeleteWithChildren() throws Exception {
  ZooKeeper zk = createClient();
  zk.create("/parent", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  zk.create("/parent/child", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  try {
    zk.delete("/parent", -1);
    Assert.fail("Should have received a not equals message");
  } catch (KeeperException e) {
    Assert.assertEquals(KeeperException.Code.NOTEMPTY, e.code());
  }
  zk.delete("/parent/child", -1);
  zk.delete("/parent", -1);
  zk.close();
}

相关文章