本文整理了Java中org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
类的一些代码示例,展示了ZKRMStateStore
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZKRMStateStore
类的具体详情如下:
包路径:org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
类名称:ZKRMStateStore
[英]Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved separately. The currentMasterkey and nextMasterkey have been stored. Also, AMRMToken has been removed from ApplicationAttemptState.
[中]从1.1更改为1.2,AMRMTokenSecretManager状态已单独保存。已存储currentMasterkey和nextMasterkey。此外,AMRMToken已从ApplicationAttentive状态中删除。
代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager
@Override
protected synchronized void storeVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
byte[] data =
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (existsWithRetries(versionNodePath, false) != null) {
setDataWithRetries(versionNodePath, data, -1);
} else {
createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
}
}
代码示例来源:origin: com.github.jiayuhan-it/hadoop-yarn-server-resourcemanager
private void logRootNodeAcls(String prefix) throws Exception {
Stat getStat = new Stat();
List<ACL> getAcls = getACLWithRetries(zkRootNodePath, getStat);
StringBuilder builder = new StringBuilder();
builder.append(prefix);
for (ACL acl : getAcls) {
builder.append(acl.toString());
}
builder.append(getStat.toString());
LOG.debug(builder.toString());
}
代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager
@Override
public synchronized void startInternal() throws Exception {
// createConnection for future API calls
createConnection();
// ensure root dirs exist
createRootDirRecursively(znodeWorkingPath);
createRootDir(zkRootNodePath);
setRootNodeAcls();
deleteFencingNodePath();
if (HAUtil.isHAEnabled(getConfig())){
verifyActiveStatusThread = new VerifyActiveStatusThread();
verifyActiveStatusThread.start();
}
createRootDir(rmAppRoot);
createRootDir(rmDTSecretManagerRoot);
createRootDir(dtMasterKeysRootPath);
createRootDir(delegationTokensRootPath);
createRootDir(dtSequenceNumberPath);
createRootDir(amrmTokenSecretManagerRoot);
syncInternal(zkRootNodePath);
}
代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager
@Override
public synchronized RMState loadState() throws Exception {
RMState rmState = new RMState();
// recover DelegationTokenSecretManager
loadRMDTSecretManagerState(rmState);
// recover RM applications
loadRMAppState(rmState);
// recover AMRMTokenSecretManager
loadAMRMTokenSecretManagerState(rmState);
// recover reservation state
loadReservationSystemState(rmState);
return rmState;
}
代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager
@Override
public synchronized RMState loadState() throws Exception {
RMState rmState = new RMState();
// recover DelegationTokenSecretManager
loadRMDTSecretManagerState(rmState);
// recover RM applications
loadRMAppState(rmState);
// recover AMRMTokenSecretManager
loadAMRMTokenSecretManagerState(rmState);
return rmState;
}
代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager
private void setRootNodeAcls() throws Exception {
if (LOG.isTraceEnabled()) {
logRootNodeAcls("Before fencing\n");
}
if (HAUtil.isHAEnabled(getConfig())) {
setAcl(zkRootNodePath, zkRootNodeAcl);
} else {
setAcl(zkRootNodePath, zkAcl);
}
if (LOG.isTraceEnabled()) {
logRootNodeAcls("After fencing\n");
}
}
代码示例来源:origin: com.github.jiayuhan-it/hadoop-yarn-server-resourcemanager
private void loadApplicationAttemptState(ApplicationStateData appState,
ApplicationId appId)
throws Exception {
String appPath = getNodePath(rmAppRoot, appId.toString());
List<String> attempts = getChildrenWithRetries(appPath, false);
for (String attemptIDStr : attempts) {
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
String attemptPath = getNodePath(appPath, attemptIDStr);
byte[] attemptData = getDataWithRetries(attemptPath, false);
ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(attemptData));
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
}
LOG.debug("Done loading applications from ZK state store");
}
代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager
@Override
public synchronized void startInternal() throws Exception {
// ensure root dirs exist
zkManager.createRootDirRecursively(znodeWorkingPath, zkAcl);
create(zkRootNodePath);
setRootNodeAcls();
delete(fencingNodePath);
if (HAUtil.isHAEnabled(getConfig()) && !HAUtil
.isAutomaticFailoverEnabled(getConfig())) {
verifyActiveStatusThread = new VerifyActiveStatusThread();
verifyActiveStatusThread.start();
}
create(rmAppRoot);
create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES));
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
create(rmAppRootHierarchies.get(splitIndex));
}
create(rmDTSecretManagerRoot);
create(dtMasterKeysRootPath);
create(delegationTokensRootPath);
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
create(rmDelegationTokenHierarchies.get(splitIndex));
}
create(dtSequenceNumberPath);
create(amrmTokenSecretManagerRoot);
create(reservationRoot);
}
代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager
private synchronized void loadRMAppState(RMState rmState) throws Exception {
List<String> childNodes = getChildrenWithRetries(rmAppRoot, false);
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, false);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application from znode: " + childNodeName);
}
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
ApplicationStateDataPBImpl appState =
new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData));
if (!appId.equals(
appState.getApplicationSubmissionContext().getApplicationId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application id");
}
rmState.appState.put(appId, appState);
loadApplicationAttemptState(appState, appId);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
}
}
}
代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
store.getDataWithRetries(path, true);
store.setDataWithRetries(path, "newBytes".getBytes(), 0);
byte[] ret = null;
try {
ret = store.getDataWithRetries(path, true);
} catch (Exception e) {
String error = "ZKRMStateStore Session restore failed";
代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager
@Override
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
createWithRetries(nodeCreatePath, appStateData, zkAcl,
CreateMode.PERSISTENT);
}
代码示例来源:origin: com.github.jiayuhan-it/hadoop-yarn-server-resourcemanager
@Override
protected synchronized void removeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception {
String nodeRemovePath =
getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
}
if (existsWithRetries(nodeRemovePath, false) != null) {
doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1));
} else {
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
}
}
代码示例来源:origin: com.github.jiayuhan-it/hadoop-yarn-server-resourcemanager
@Override
protected synchronized void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
ArrayList<Op> opList = new ArrayList<Op>();
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (existsWithRetries(nodeRemovePath, false) == null) {
// in case znode doesn't exist
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
} else {
// in case znode exists
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
}
doStoreMultiWithRetries(opList);
}
代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager
private void loadApplicationAttemptState(ApplicationStateData appState,
String appPath) throws Exception {
List<String> attempts = getChildren(appPath);
for (String attemptIDStr : attempts) {
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
String attemptPath = getNodePath(appPath, attemptIDStr);
byte[] attemptData = getData(attemptPath);
ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(attemptData));
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
}
LOG.debug("Done loading applications from ZK state store");
}
代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager
@Override
protected synchronized Version loadVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
if (existsWithRetries(versionNodePath, false) != null) {
byte[] data = getDataWithRetries(versionNodePath, false);
Version version =
new VersionPBImpl(VersionProto.parseFrom(data));
return version;
}
return null;
}
代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager
String appIdRemovePath = getLeafAppIdNodePath(removeAppId, false);
int splitIndex = appIdNodeSplitIndex;
if (!exists(appIdRemovePath)) {
ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(removeAppId);
if (alternatePathInfo != null) {
appIdRemovePath = alternatePathInfo.path;
for (ApplicationAttemptId attemptId : attempts) {
String attemptRemovePath =
getNodePath(appIdRemovePath, attemptId.toString());
zkManager.safeDelete(attemptRemovePath, zkAcl, fencingNodePath);
checkRemoveParentZnode(appIdRemovePath, splitIndex);
代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
String hierarchiesPath = getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES);
rmAppRootHierarchies = new HashMap<>(5);
rmAppRootHierarchies.put(0, rmAppRoot);
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
rmAppRootHierarchies.put(splitIndex,
getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
getNodePath(zkRootNodePath, RM_DT_SECRET_MANAGER_ROOT);
dtMasterKeysRootPath = getNodePath(rmDTSecretManagerRoot,
RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
rmDelegationTokenHierarchies = new HashMap<>(5);
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
rmDelegationTokenHierarchies.put(splitIndex,
getNodePath(delegationTokensRootPath, Integer.toString(splitIndex)));
dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager
public void run() {
try {
while (true) {
if(isFencedState()) {
break;
}
doStoreMultiWithRetries(emptyOpList);
Thread.sleep(zkSessionTimeout);
}
} catch (InterruptedException ie) {
LOG.info(VerifyActiveStatusThread.class.getName() + " thread " +
"interrupted! Exiting!");
} catch (Exception e) {
notifyStoreOperationFailed(new StoreFencedException());
}
}
}
代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager
@Override
protected synchronized Version loadVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
if (exists(versionNodePath)) {
byte[] data = getData(versionNodePath);
return new VersionPBImpl(VersionProto.parseFrom(data));
}
return null;
}
代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager
@Override
public synchronized long getAndIncrementEpoch() throws Exception {
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
long currentEpoch = baseEpoch;
if (exists(epochNodePath)) {
// load current epoch
byte[] data = getData(epochNodePath);
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
// increment epoch and store it
byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl,
fencingNodePath);
} else {
// initialize epoch node with 1 for the next time.
byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
zkManager.safeCreate(epochNodePath, storeData, zkAcl,
CreateMode.PERSISTENT, zkAcl, fencingNodePath);
}
return currentEpoch;
}
内容来源于网络,如有侵权,请联系作者删除!