org.apache.spark.network.yarn.YarnShuffleService类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(337)

本文整理了Java中org.apache.spark.network.yarn.YarnShuffleService类的一些代码示例,展示了YarnShuffleService类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。YarnShuffleService类的具体详情如下:
包路径:org.apache.spark.network.yarn.YarnShuffleService
类名称:YarnShuffleService

YarnShuffleService介绍

[英]An external shuffle service used by Spark on Yarn. This is intended to be a long-running auxiliary service that runs in the NodeManager process. A Spark application may connect to this service by setting spark.shuffle.service.enabled. The application also automatically derives the service port through spark.shuffle.service.port specified in the Yarn configuration. This is so that both the clients and the server agree on the same port to communicate on. The service also optionally supports authentication. This ensures that executors from one application cannot read the shuffle files written by those from another. This feature can be enabled by setting spark.authenticate in the Yarn configuration before starting the NM. Note that the Spark application must also set spark.authenticate manually and, unlike in the case of the service port, will not inherit this setting from the Yarn configuration. This is because an application running on the same Yarn cluster may choose to not use the external shuffle service, in which case its setting of spark.authenticate should be independent of the service's.
[中]Spark在纱线上使用的外部洗牌服务。这是一个在NodeManager进程中运行的长期辅助服务。Spark应用程序可以通过设置“Spark”连接到此服务。洗牌服务启用。该应用程序还通过“spark”自动派生服务端口。洗牌服务端口在纱线配置中指定。这样,客户端和服务器就可以在同一个端口上进行通信。该服务还可选地支持身份验证。这确保了来自一个应用程序的执行者无法读取来自另一个应用程序的执行者编写的无序文件。可通过设置“spark”来启用此功能。在开始NM之前,在纱线配置中进行验证。请注意,Spark应用程序还必须设置“Spark”。authenticate手动,与服务端口不同,它不会从配置中继承此设置。这是因为在同一个纱线集群上运行的应用程序可能会选择不使用外部洗牌服务,在这种情况下,其设置为“spark”。authenticate应该独立于服务的。

代码示例

代码示例来源:origin: io.snappydata/snappy-spark-network-yarn

@Override
public void initializeApplication(ApplicationInitializationContext context) {
 String appId = context.getApplicationId().toString();
 try {
  ByteBuffer shuffleSecret = context.getApplicationDataForService();
  logger.info("Initializing application {}", appId);
  if (isAuthenticationEnabled()) {
   AppId fullId = new AppId(appId);
   if (db != null) {
    byte[] key = dbAppKey(fullId);
    byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8);
    db.put(key, value);
   }
   secretManager.registerApp(appId, shuffleSecret);
  }
 } catch (Exception e) {
  logger.error("Exception when initializing application {}", appId, e);
 }
}

代码示例来源:origin: org.apache.spark/spark-network-yarn_2.11

private void loadSecretsFromDb() throws IOException {
 secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
 // Make sure this is protected in case its not in the NM recovery dir
 FileSystem fs = FileSystem.getLocal(_conf);
 fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700));
 db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
 logger.info("Recovery location is: " + secretsFile.getPath());
 if (db != null) {
  logger.info("Going to reload spark shuffle data");
  DBIterator itr = db.iterator();
  itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
  while (itr.hasNext()) {
   Map.Entry<byte[], byte[]> e = itr.next();
   String key = new String(e.getKey(), StandardCharsets.UTF_8);
   if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
    break;
   }
   String id = parseDbAppKey(key);
   ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
   logger.info("Reloading tokens for app: " + id);
   secretManager.registerApp(id, secret);
  }
 }
}

代码示例来源:origin: io.snappydata/snappy-spark-network-yarn

registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
if (authEnabled) {
 createSecretManager();
 bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
 throw e;
} else {
 noteFailure(e);

代码示例来源:origin: org.apache.spark/spark-network-yarn_2.11

registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
 secretManager = new ShuffleSecretManager();
 if (_recoveryPath != null) {
  loadSecretsFromDb();
 throw e;
} else {
 noteFailure(e);

代码示例来源:origin: org.apache.spark/spark-network-yarn_2.10

registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
if (authEnabled) {
 createSecretManager();
 bootstraps.add(new AuthServerBootstrap(transportConf, secretManager));
 throw e;
} else {
 noteFailure(e);

代码示例来源:origin: org.apache.spark/spark-network-yarn_2.10

@Override
public void initializeApplication(ApplicationInitializationContext context) {
 String appId = context.getApplicationId().toString();
 try {
  ByteBuffer shuffleSecret = context.getApplicationDataForService();
  if (isAuthenticationEnabled()) {
   AppId fullId = new AppId(appId);
   if (db != null) {
    byte[] key = dbAppKey(fullId);
    byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8);
    db.put(key, value);
   }
   secretManager.registerApp(appId, shuffleSecret);
  }
 } catch (Exception e) {
  logger.error("Exception when initializing application {}", appId, e);
 }
}

代码示例来源:origin: org.apache.spark/spark-network-yarn_2.10

private void createSecretManager() throws IOException {
 secretManager = new ShuffleSecretManager();
 secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
 // Make sure this is protected in case its not in the NM recovery dir
 FileSystem fs = FileSystem.getLocal(_conf);
 fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));
 db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
 logger.info("Recovery location is: " + secretsFile.getPath());
 if (db != null) {
  logger.info("Going to reload spark shuffle data");
  DBIterator itr = db.iterator();
  itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
  while (itr.hasNext()) {
   Map.Entry<byte[], byte[]> e = itr.next();
   String key = new String(e.getKey(), StandardCharsets.UTF_8);
   if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
    break;
   }
   String id = parseDbAppKey(key);
   ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
   logger.info("Reloading tokens for app: " + id);
   secretManager.registerApp(id, secret);
  }
 }
}

代码示例来源:origin: org.apache.spark/spark-network-yarn_2.11

@Override
public void initializeApplication(ApplicationInitializationContext context) {
 String appId = context.getApplicationId().toString();
 try {
  ByteBuffer shuffleSecret = context.getApplicationDataForService();
  if (isAuthenticationEnabled()) {
   AppId fullId = new AppId(appId);
   if (db != null) {
    byte[] key = dbAppKey(fullId);
    byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8);
    db.put(key, value);
   }
   secretManager.registerApp(appId, shuffleSecret);
  }
 } catch (Exception e) {
  logger.error("Exception when initializing application {}", appId, e);
 }
}

代码示例来源:origin: io.snappydata/snappy-spark-network-yarn

private void createSecretManager() throws IOException {
 secretManager = new ShuffleSecretManager();
 secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
 // Make sure this is protected in case its not in the NM recovery dir
 FileSystem fs = FileSystem.getLocal(_conf);
 fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));
 db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
 logger.info("Recovery location is: " + secretsFile.getPath());
 if (db != null) {
  logger.info("Going to reload spark shuffle data");
  DBIterator itr = db.iterator();
  itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
  while (itr.hasNext()) {
   Map.Entry<byte[], byte[]> e = itr.next();
   String key = new String(e.getKey(), StandardCharsets.UTF_8);
   if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
    break;
   }
   String id = parseDbAppKey(key);
   ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
   logger.info("Reloading tokens for app: " + id);
   secretManager.registerApp(id, secret);
  }
 }
}

代码示例来源:origin: io.snappydata/snappy-spark-network-yarn

@Override
public void stopApplication(ApplicationTerminationContext context) {
 String appId = context.getApplicationId().toString();
 try {
  logger.info("Stopping application {}", appId);
  if (isAuthenticationEnabled()) {
   AppId fullId = new AppId(appId);
   if (db != null) {
    try {
     db.delete(dbAppKey(fullId));
    } catch (IOException e) {
     logger.error("Error deleting {} from executor state db", appId, e);
    }
   }
   secretManager.unregisterApp(appId);
  }
  blockHandler.applicationRemoved(appId, false /* clean up local dirs */);
 } catch (Exception e) {
  logger.error("Exception when stopping application {}", appId, e);
 }
}

代码示例来源:origin: org.apache.spark/spark-network-yarn_2.11

@Override
public void stopApplication(ApplicationTerminationContext context) {
 String appId = context.getApplicationId().toString();
 try {
  if (isAuthenticationEnabled()) {
   AppId fullId = new AppId(appId);
   if (db != null) {
    try {
     db.delete(dbAppKey(fullId));
    } catch (IOException e) {
     logger.error("Error deleting {} from executor state db", appId, e);
    }
   }
   secretManager.unregisterApp(appId);
  }
  blockHandler.applicationRemoved(appId, false /* clean up local dirs */);
 } catch (Exception e) {
  logger.error("Exception when stopping application {}", appId, e);
 }
}

代码示例来源:origin: org.apache.spark/spark-network-yarn_2.10

@Override
public void stopApplication(ApplicationTerminationContext context) {
 String appId = context.getApplicationId().toString();
 try {
  if (isAuthenticationEnabled()) {
   AppId fullId = new AppId(appId);
   if (db != null) {
    try {
     db.delete(dbAppKey(fullId));
    } catch (IOException e) {
     logger.error("Error deleting {} from executor state db", appId, e);
    }
   }
   secretManager.unregisterApp(appId);
  }
  blockHandler.applicationRemoved(appId, false /* clean up local dirs */);
 } catch (Exception e) {
  logger.error("Exception when stopping application {}", appId, e);
 }
}

相关文章