org.apache.spark.network.yarn.YarnShuffleService.initRecoveryDb()方法的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(3.9k)|赞(0)|评价(0)|浏览(98)

本文整理了Java中org.apache.spark.network.yarn.YarnShuffleService.initRecoveryDb()方法的一些代码示例,展示了YarnShuffleService.initRecoveryDb()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。YarnShuffleService.initRecoveryDb()方法的具体详情如下:
包路径:org.apache.spark.network.yarn.YarnShuffleService
类名称:YarnShuffleService
方法名:initRecoveryDb

YarnShuffleService.initRecoveryDb介绍

[英]Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled and DB exists in the local dir of NM by old version of shuffle service.
[中]如果旧版本的shuffle service启用了Thread NM recovery,并且NM的本地目录中存在DB,请找出恢复路径并处理移动DB的操作。

代码示例

代码示例来源:origin: org.apache.spark/spark-network-yarn_2.11

private void loadSecretsFromDb() throws IOException {
 secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
 // Make sure this is protected in case its not in the NM recovery dir
 FileSystem fs = FileSystem.getLocal(_conf);
 fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700));
 db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
 logger.info("Recovery location is: " + secretsFile.getPath());
 if (db != null) {
  logger.info("Going to reload spark shuffle data");
  DBIterator itr = db.iterator();
  itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
  while (itr.hasNext()) {
   Map.Entry<byte[], byte[]> e = itr.next();
   String key = new String(e.getKey(), StandardCharsets.UTF_8);
   if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
    break;
   }
   String id = parseDbAppKey(key);
   ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
   logger.info("Reloading tokens for app: " + id);
   secretManager.registerApp(id, secret);
  }
 }
}

代码示例来源:origin: org.apache.spark/spark-network-yarn_2.10

private void createSecretManager() throws IOException {
 secretManager = new ShuffleSecretManager();
 secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
 // Make sure this is protected in case its not in the NM recovery dir
 FileSystem fs = FileSystem.getLocal(_conf);
 fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));
 db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
 logger.info("Recovery location is: " + secretsFile.getPath());
 if (db != null) {
  logger.info("Going to reload spark shuffle data");
  DBIterator itr = db.iterator();
  itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
  while (itr.hasNext()) {
   Map.Entry<byte[], byte[]> e = itr.next();
   String key = new String(e.getKey(), StandardCharsets.UTF_8);
   if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
    break;
   }
   String id = parseDbAppKey(key);
   ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
   logger.info("Reloading tokens for app: " + id);
   secretManager.registerApp(id, secret);
  }
 }
}

代码示例来源:origin: io.snappydata/snappy-spark-network-yarn

private void createSecretManager() throws IOException {
 secretManager = new ShuffleSecretManager();
 secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
 // Make sure this is protected in case its not in the NM recovery dir
 FileSystem fs = FileSystem.getLocal(_conf);
 fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));
 db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
 logger.info("Recovery location is: " + secretsFile.getPath());
 if (db != null) {
  logger.info("Going to reload spark shuffle data");
  DBIterator itr = db.iterator();
  itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
  while (itr.hasNext()) {
   Map.Entry<byte[], byte[]> e = itr.next();
   String key = new String(e.getKey(), StandardCharsets.UTF_8);
   if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
    break;
   }
   String id = parseDbAppKey(key);
   ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
   logger.info("Reloading tokens for app: " + id);
   secretManager.registerApp(id, secret);
  }
 }
}

代码示例来源:origin: io.snappydata/snappy-spark-network-yarn

registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);

代码示例来源:origin: org.apache.spark/spark-network-yarn_2.10

registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);

代码示例来源:origin: org.apache.spark/spark-network-yarn_2.11

registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);

相关文章