org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(12.6k)|赞(0)|评价(0)|浏览(215)

本文整理了Java中org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore类的一些代码示例,展示了YarnConfigurationStore类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。YarnConfigurationStore类的具体详情如下:
包路径:org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore
类名称:YarnConfigurationStore

YarnConfigurationStore介绍

[英]YarnConfigurationStore exposes the methods needed for retrieving and persisting CapacityScheduler configuration via key-value using write-ahead logging. When configuration mutation is requested, caller should first log it with logMutation, which persists this pending mutation. This mutation is merged to the persisted configuration only after confirmMutation is called. On startup/recovery, caller should call retrieve to get all confirmed mutations, then get pending mutations which were not confirmed via getPendingMutations, and replay/confirm them via confirmMutation as in the normal case.
[中]YarnConfiguration Store公开了通过使用预写日志记录的键值检索和持久化CapacityScheduler配置所需的方法。当请求配置变异时,调用者应该首先用logMutation记录它,logMutation会持续这个待处理的变异。只有在调用confirmMutation后,此变异才会合并到持久化配置。在启动/恢复时,调用方应调用retrieve获取所有已确认的突变,然后获取未通过GetPending突变确认的待定突变,并像正常情况一样通过confirm突变重播/确认它们。

代码示例

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Override
public void close() throws IOException {
 confStore.close();
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Override
public void confirmPendingMutation(boolean isValid) throws Exception {
 confStore.confirmMutation(isValid);
 if (!isValid) {
  schedConf = oldConf;
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

public void checkVersion() throws Exception {
 // TODO this was taken from RMStateStore. Should probably refactor
 Version loadedVersion = getConfStoreVersion();
 LOG.info("Loaded configuration store version info " + loadedVersion);
 if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
  return;
 }
 // if there is no version info, treat it as CURRENT_VERSION_INFO;
 if (loadedVersion == null) {
  loadedVersion = getCurrentVersion();
 }
 if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
  LOG.info("Storing configuration store version info "
    + getCurrentVersion());
  storeVersion();
 } else {
  throw new RMStateVersionIncompatibleException(
    "Expecting configuration store version " + getCurrentVersion()
      + ", but loading version " + loadedVersion);
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Test
 public void testNullConfigurationUpdate() throws Exception {
  schedConf.set("key", "val");
  confStore.initialize(conf, schedConf, rmContext);
  assertEquals("val", confStore.retrieve().get("key"));

  Map<String, String> update = new HashMap<>();
  update.put("key", null);
  YarnConfigurationStore.LogMutation mutation =
    new YarnConfigurationStore.LogMutation(update, TEST_USER);
  confStore.logMutation(mutation);
  confStore.confirmMutation(true);
  assertNull(confStore.retrieve().get("key"));
  confStore.close();
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Test
public void testVersioning() throws Exception {
 confStore.initialize(conf, schedConf, rmContext);
 assertNull(confStore.getConfStoreVersion());
 confStore.checkVersion();
 assertEquals(LeveldbConfigurationStore.CURRENT_VERSION_INFO,
   confStore.getConfStoreVersion());
 confStore.close();
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Test
public void testVersioning() throws Exception {
 confStore.initialize(conf, schedConf, rmContext);
 assertNull(confStore.getConfStoreVersion());
 confStore.checkVersion();
 assertEquals(ZKConfigurationStore.CURRENT_VERSION_INFO,
   confStore.getConfStoreVersion());
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Test
public void testPersistConfiguration() throws Exception {
 schedConf.set("key", "val");
 confStore.initialize(conf, schedConf, rmContext);
 assertEquals("val", confStore.retrieve().get("key"));
 confStore.close();
 // Create a new configuration store, and check for old configuration
 confStore = createConfStore();
 schedConf.set("key", "badVal");
 // Should ignore passed-in scheduler configuration.
 confStore.initialize(conf, schedConf, rmContext);
 assertEquals("val", confStore.retrieve().get("key"));
 confStore.close();
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

confStore.initialize(config, schedConf, rmContext);
 confStore.checkVersion();
} catch (Exception e) {
 throw new IOException(e);
schedConf = confStore.retrieve();
this.aclMutationPolicy = ConfigurationMutationACLPolicyFactory
  .getPolicy(config);

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Test
public void testMaxLogs() throws Exception {
 conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
 confStore.initialize(conf, schedConf, rmContext);
 LinkedList<YarnConfigurationStore.LogMutation> logs =
   ((ZKConfigurationStore) confStore).getLogs();
 YarnConfigurationStore.LogMutation mutation =
   new YarnConfigurationStore.LogMutation(update1, TEST_USER);
 confStore.logMutation(mutation);
 logs = ((ZKConfigurationStore) confStore).getLogs();
 assertEquals(1, logs.size());
 assertEquals("val1", logs.get(0).getUpdates().get("key1"));
 confStore.confirmMutation(true);
 assertEquals(1, logs.size());
 assertEquals("val1", logs.get(0).getUpdates().get("key1"));
 update2.put("key2", "val2");
 mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
 confStore.logMutation(mutation);
 logs = ((ZKConfigurationStore) confStore).getLogs();
 assertEquals(2, logs.size());
 assertEquals("val1", logs.get(0).getUpdates().get("key1"));
 assertEquals("val2", logs.get(1).getUpdates().get("key2"));
 confStore.confirmMutation(true);
 assertEquals(2, logs.size());
 assertEquals("val1", logs.get(0).getUpdates().get("key1"));
 update3.put("key3", "val3");
 mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Test
public void testPersistConfiguration() throws Exception {
 schedConf.set("key", "val");
 confStore.initialize(conf, schedConf, rmContext);
 assertEquals("val", confStore.retrieve().get("key"));
 // Create a new configuration store, and check for old configuration
 confStore = createConfStore();
 schedConf.set("key", "badVal");
 // Should ignore passed-in scheduler configuration.
 confStore.initialize(conf, schedConf, rmContext);
 assertEquals("val", confStore.retrieve().get("key"));
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Override
public void reloadConfigurationFromStore() throws Exception {
 schedConf = confStore.retrieve();
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Override
public void logAndApplyMutation(UserGroupInformation user,
  SchedConfUpdateInfo confUpdate) throws Exception {
 oldConf = new Configuration(schedConf);
 Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
 LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
 confStore.logMutation(log);
 for (Map.Entry<String, String> kv : kvUpdate.entrySet()) {
  if (kv.getValue() == null) {
   schedConf.unset(kv.getKey());
  } else {
   schedConf.set(kv.getKey(), kv.getValue());
  }
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Test
public void testPersistUpdatedConfiguration() throws Exception {
 confStore.initialize(conf, schedConf, rmContext);
 assertNull(confStore.retrieve().get("key"));
 Map<String, String> update = new HashMap<>();
 update.put("key", "val");
 YarnConfigurationStore.LogMutation mutation =
   new YarnConfigurationStore.LogMutation(update, TEST_USER);
 confStore.logMutation(mutation);
 confStore.confirmMutation(true);
 assertEquals("val", confStore.retrieve().get("key"));
 confStore.close();
 // Create a new configuration store, and check for updated configuration
 confStore = createConfStore();
 schedConf.set("key", "badVal");
 // Should ignore passed-in scheduler configuration.
 confStore.initialize(conf, schedConf, rmContext);
 assertEquals("val", confStore.retrieve().get("key"));
 confStore.close();
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

confProvider.confirmPendingMutation(true);
assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
  .getConfStore().retrieve().get("key"));
assertEquals("val", ((MutableCSConfigurationProvider) (
  (CapacityScheduler) rm2.getResourceScheduler())
  .getMutableConfProvider()).getConfStore().retrieve().get("key"));
assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler())
  .getConfiguration().get("key"));

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Test
public void testConfigurationUpdate() throws Exception {
 schedConf.set("key1", "val1");
 confStore.initialize(conf, schedConf, rmContext);
 assertEquals("val1", confStore.retrieve().get("key1"));
 Map<String, String> update1 = new HashMap<>();
 update1.put("keyUpdate1", "valUpdate1");
 YarnConfigurationStore.LogMutation mutation1 =
   new YarnConfigurationStore.LogMutation(update1, TEST_USER);
 confStore.logMutation(mutation1);
 confStore.confirmMutation(true);
 assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
 Map<String, String> update2 = new HashMap<>();
 update2.put("keyUpdate2", "valUpdate2");
 YarnConfigurationStore.LogMutation mutation2 =
   new YarnConfigurationStore.LogMutation(update2, TEST_USER);
 confStore.logMutation(mutation2);
 confStore.confirmMutation(false);
 assertNull("Configuration should not be updated",
   confStore.retrieve().get("keyUpdate2"));
 confStore.close();
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

.getMutableConfProvider()).getConfStore().retrieve()
  .get("yarn.scheduler.capacity.root.queues"));
assertEquals("a", ((MutableConfScheduler) rm2.getResourceScheduler())

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Test
public void testPersistUpdatedConfiguration() throws Exception {
 confStore.initialize(conf, schedConf, rmContext);
 assertNull(confStore.retrieve().get("key"));
 Map<String, String> update = new HashMap<>();
 update.put("key", "val");
 YarnConfigurationStore.LogMutation mutation =
   new YarnConfigurationStore.LogMutation(update, TEST_USER);
 confStore.logMutation(mutation);
 confStore.confirmMutation(true);
 assertEquals("val", confStore.retrieve().get("key"));
 // Create a new configuration store, and check for updated configuration
 confStore = createConfStore();
 schedConf.set("key", "badVal");
 // Should ignore passed-in scheduler configuration.
 confStore.initialize(conf, schedConf, rmContext);
 assertEquals("val", confStore.retrieve().get("key"));
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

confProvider.confirmPendingMutation(true);
assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
  .getConfStore().retrieve().get("key"));
  .getMutableConfProvider()).getConfStore().retrieve().get("key"));
assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler())
  .getConfiguration().get("key"));

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Test
public void testMaxLogs() throws Exception {
 conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
 confStore.initialize(conf, schedConf, rmContext);
 LinkedList<YarnConfigurationStore.LogMutation> logs =
   ((LeveldbConfigurationStore) confStore).getLogs();
 YarnConfigurationStore.LogMutation mutation =
   new YarnConfigurationStore.LogMutation(update1, TEST_USER);
 confStore.logMutation(mutation);
 logs = ((LeveldbConfigurationStore) confStore).getLogs();
 assertEquals(1, logs.size());
 assertEquals("val1", logs.get(0).getUpdates().get("key1"));
 confStore.confirmMutation(true);
 assertEquals(1, logs.size());
 assertEquals("val1", logs.get(0).getUpdates().get("key1"));
 update2.put("key2", "val2");
 mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
 confStore.logMutation(mutation);
 logs = ((LeveldbConfigurationStore) confStore).getLogs();
 assertEquals(2, logs.size());
 assertEquals("val1", logs.get(0).getUpdates().get("key1"));
 assertEquals("val2", logs.get(1).getUpdates().get("key2"));
 confStore.confirmMutation(true);
 assertEquals(2, logs.size());
 assertEquals("val1", logs.get(0).getUpdates().get("key1"));
 update3.put("key3", "val3");
 mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);

相关文章