org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(12.7k)|赞(0)|评价(0)|浏览(174)

本文整理了Java中org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore类的一些代码示例,展示了ZKRMStateStore类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZKRMStateStore类的具体详情如下:
包路径:org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
类名称:ZKRMStateStore

ZKRMStateStore介绍

[英]Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved separately. The currentMasterkey and nextMasterkey have been stored. Also, AMRMToken has been removed from ApplicationAttemptState.
[中]从1.1更改为1.2,AMRMTokenSecretManager状态已单独保存。已存储currentMasterkey和nextMasterkey。此外,AMRMToken已从ApplicationAttentive状态中删除。

代码示例

代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager

@Override
protected synchronized void storeVersion() throws Exception {
 String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
 byte[] data =
   ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
 if (existsWithRetries(versionNodePath, false) != null) {
  setDataWithRetries(versionNodePath, data, -1);
 } else {
  createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
 }
}

代码示例来源:origin: com.github.jiayuhan-it/hadoop-yarn-server-resourcemanager

private void logRootNodeAcls(String prefix) throws Exception {
 Stat getStat = new Stat();
 List<ACL> getAcls = getACLWithRetries(zkRootNodePath, getStat);
 StringBuilder builder = new StringBuilder();
 builder.append(prefix);
 for (ACL acl : getAcls) {
  builder.append(acl.toString());
 }
 builder.append(getStat.toString());
 LOG.debug(builder.toString());
}

代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager

@Override
public synchronized void startInternal() throws Exception {
 // createConnection for future API calls
 createConnection();
 // ensure root dirs exist
 createRootDirRecursively(znodeWorkingPath);
 createRootDir(zkRootNodePath);
 setRootNodeAcls();
 deleteFencingNodePath();
 if (HAUtil.isHAEnabled(getConfig())){
  verifyActiveStatusThread = new VerifyActiveStatusThread();
  verifyActiveStatusThread.start();
 }
 createRootDir(rmAppRoot);
 createRootDir(rmDTSecretManagerRoot);
 createRootDir(dtMasterKeysRootPath);
 createRootDir(delegationTokensRootPath);
 createRootDir(dtSequenceNumberPath);
 createRootDir(amrmTokenSecretManagerRoot);
 syncInternal(zkRootNodePath);
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Override
public synchronized RMState loadState() throws Exception {
 RMState rmState = new RMState();
 // recover DelegationTokenSecretManager
 loadRMDTSecretManagerState(rmState);
 // recover RM applications
 loadRMAppState(rmState);
 // recover AMRMTokenSecretManager
 loadAMRMTokenSecretManagerState(rmState);
 // recover reservation state
 loadReservationSystemState(rmState);
 return rmState;
}

代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager

@Override
public synchronized RMState loadState() throws Exception {
 RMState rmState = new RMState();
 // recover DelegationTokenSecretManager
 loadRMDTSecretManagerState(rmState);
 // recover RM applications
 loadRMAppState(rmState);
 // recover AMRMTokenSecretManager
 loadAMRMTokenSecretManagerState(rmState);
 return rmState;
}

代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager

private void setRootNodeAcls() throws Exception {
 if (LOG.isTraceEnabled()) {
  logRootNodeAcls("Before fencing\n");
 }
 if (HAUtil.isHAEnabled(getConfig())) {
  setAcl(zkRootNodePath, zkRootNodeAcl);
 } else {
  setAcl(zkRootNodePath, zkAcl);
 }
 if (LOG.isTraceEnabled()) {
  logRootNodeAcls("After fencing\n");
 }
}

代码示例来源:origin: com.github.jiayuhan-it/hadoop-yarn-server-resourcemanager

private void loadApplicationAttemptState(ApplicationStateData appState,
  ApplicationId appId)
  throws Exception {
 String appPath = getNodePath(rmAppRoot, appId.toString());
 List<String> attempts = getChildrenWithRetries(appPath, false);
 for (String attemptIDStr : attempts) {
  if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
   String attemptPath = getNodePath(appPath, attemptIDStr);
   byte[] attemptData = getDataWithRetries(attemptPath, false);
   ApplicationAttemptStateDataPBImpl attemptState =
     new ApplicationAttemptStateDataPBImpl(
       ApplicationAttemptStateDataProto.parseFrom(attemptData));
   appState.attempts.put(attemptState.getAttemptId(), attemptState);
  }
 }
 LOG.debug("Done loading applications from ZK state store");
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Override
public synchronized void startInternal() throws Exception {
 // ensure root dirs exist
 zkManager.createRootDirRecursively(znodeWorkingPath, zkAcl);
 create(zkRootNodePath);
 setRootNodeAcls();
 delete(fencingNodePath);
 if (HAUtil.isHAEnabled(getConfig()) && !HAUtil
   .isAutomaticFailoverEnabled(getConfig())) {
  verifyActiveStatusThread = new VerifyActiveStatusThread();
  verifyActiveStatusThread.start();
 }
 create(rmAppRoot);
 create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES));
 for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
  create(rmAppRootHierarchies.get(splitIndex));
 }
 create(rmDTSecretManagerRoot);
 create(dtMasterKeysRootPath);
 create(delegationTokensRootPath);
 for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
  create(rmDelegationTokenHierarchies.get(splitIndex));
 }
 create(dtSequenceNumberPath);
 create(amrmTokenSecretManagerRoot);
 create(reservationRoot);
}

代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager

private synchronized void loadRMAppState(RMState rmState) throws Exception {
 List<String> childNodes = getChildrenWithRetries(rmAppRoot, false);
 for (String childNodeName : childNodes) {
  String childNodePath = getNodePath(rmAppRoot, childNodeName);
  byte[] childData = getDataWithRetries(childNodePath, false);
  if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
   // application
   if (LOG.isDebugEnabled()) {
    LOG.debug("Loading application from znode: " + childNodeName);
   }
   ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
   ApplicationStateDataPBImpl appState =
     new ApplicationStateDataPBImpl(
       ApplicationStateDataProto.parseFrom(childData));
   if (!appId.equals(
     appState.getApplicationSubmissionContext().getApplicationId())) {
    throw new YarnRuntimeException("The child node name is different " +
      "from the application id");
   }
   rmState.appState.put(appId, appState);
   loadApplicationAttemptState(appState, appId);
  } else {
   LOG.info("Unknown child node with name: " + childNodeName);
  }
 }
}

代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager

(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
  CreateMode.PERSISTENT);
store.getDataWithRetries(path, true);
store.setDataWithRetries(path, "newBytes".getBytes(), 0);
byte[] ret = null;
try {
 ret = store.getDataWithRetries(path, true);
} catch (Exception e) {
 String error = "ZKRMStateStore Session restore failed";

代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager

@Override
public synchronized void storeApplicationStateInternal(ApplicationId appId,
  ApplicationStateData appStateDataPB) throws Exception {
 String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
 if (LOG.isDebugEnabled()) {
  LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
 }
 byte[] appStateData = appStateDataPB.getProto().toByteArray();
 createWithRetries(nodeCreatePath, appStateData, zkAcl,
  CreateMode.PERSISTENT);
}

代码示例来源:origin: com.github.jiayuhan-it/hadoop-yarn-server-resourcemanager

@Override
protected synchronized void removeRMDTMasterKeyState(
  DelegationKey delegationKey) throws Exception {
 String nodeRemovePath =
   getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
     + delegationKey.getKeyId());
 if (LOG.isDebugEnabled()) {
  LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
 }
 if (existsWithRetries(nodeRemovePath, false) != null) {
  doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1));
 } else {
  LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
 }
}

代码示例来源:origin: com.github.jiayuhan-it/hadoop-yarn-server-resourcemanager

@Override
protected synchronized void updateRMDelegationTokenState(
  RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
  throws Exception {
 ArrayList<Op> opList = new ArrayList<Op>();
 String nodeRemovePath =
   getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
     + rmDTIdentifier.getSequenceNumber());
 if (existsWithRetries(nodeRemovePath, false) == null) {
  // in case znode doesn't exist
  addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
  LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
 } else {
  // in case znode exists
  addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
 }
 doStoreMultiWithRetries(opList);
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

private void loadApplicationAttemptState(ApplicationStateData appState,
  String appPath) throws Exception {
 List<String> attempts = getChildren(appPath);
 for (String attemptIDStr : attempts) {
  if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
   String attemptPath = getNodePath(appPath, attemptIDStr);
   byte[] attemptData = getData(attemptPath);
   ApplicationAttemptStateDataPBImpl attemptState =
     new ApplicationAttemptStateDataPBImpl(
       ApplicationAttemptStateDataProto.parseFrom(attemptData));
   appState.attempts.put(attemptState.getAttemptId(), attemptState);
  }
 }
 LOG.debug("Done loading applications from ZK state store");
}

代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager

@Override
protected synchronized Version loadVersion() throws Exception {
 String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
 if (existsWithRetries(versionNodePath, false) != null) {
  byte[] data = getDataWithRetries(versionNodePath, false);
  Version version =
    new VersionPBImpl(VersionProto.parseFrom(data));
  return version;
 }
 return null;
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

String appIdRemovePath = getLeafAppIdNodePath(removeAppId, false);
int splitIndex = appIdNodeSplitIndex;
if (!exists(appIdRemovePath)) {
 ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(removeAppId);
 if (alternatePathInfo != null) {
  appIdRemovePath = alternatePathInfo.path;
  for (ApplicationAttemptId attemptId : attempts) {
   String attemptRemovePath =
     getNodePath(appIdRemovePath, attemptId.toString());
   zkManager.safeDelete(attemptRemovePath, zkAcl, fencingNodePath);
checkRemoveParentZnode(appIdRemovePath, splitIndex);

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
    YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
String hierarchiesPath = getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES);
rmAppRootHierarchies = new HashMap<>(5);
rmAppRootHierarchies.put(0, rmAppRoot);
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
 rmAppRootHierarchies.put(splitIndex,
   getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
  YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
  zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
  getNodePath(zkRootNodePath, RM_DT_SECRET_MANAGER_ROOT);
dtMasterKeysRootPath = getNodePath(rmDTSecretManagerRoot,
  RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
  RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
rmDelegationTokenHierarchies = new HashMap<>(5);
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
 rmDelegationTokenHierarchies.put(splitIndex,
   getNodePath(delegationTokensRootPath, Integer.toString(splitIndex)));
dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,

代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager

public void run() {
  try {
   while (true) {
    if(isFencedState()) { 
     break;
    }
    doStoreMultiWithRetries(emptyOpList);
    Thread.sleep(zkSessionTimeout);
   }
  } catch (InterruptedException ie) {
   LOG.info(VerifyActiveStatusThread.class.getName() + " thread " +
     "interrupted! Exiting!");
  } catch (Exception e) {
   notifyStoreOperationFailed(new StoreFencedException());
  }
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Override
protected synchronized Version loadVersion() throws Exception {
 String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
 if (exists(versionNodePath)) {
  byte[] data = getData(versionNodePath);
  return new VersionPBImpl(VersionProto.parseFrom(data));
 }
 return null;
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Override
public synchronized long getAndIncrementEpoch() throws Exception {
 String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
 long currentEpoch = baseEpoch;
 if (exists(epochNodePath)) {
  // load current epoch
  byte[] data = getData(epochNodePath);
  Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
  currentEpoch = epoch.getEpoch();
  // increment epoch and store it
  byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
    .toByteArray();
  zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl,
    fencingNodePath);
 } else {
  // initialize epoch node with 1 for the next time.
  byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
    .toByteArray();
  zkManager.safeCreate(epochNodePath, storeData, zkAcl,
    CreateMode.PERSISTENT, zkAcl, fencingNodePath);
 }
 return currentEpoch;
}

相关文章

微信公众号

最新文章

更多