org.apache.hadoop.yarn.api.records.Container类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(16.0k)|赞(0)|评价(0)|浏览(122)

本文整理了Java中org.apache.hadoop.yarn.api.records.Container类的一些代码示例,展示了Container类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Container类的具体详情如下:
包路径:org.apache.hadoop.yarn.api.records.Container
类名称:Container

Container介绍

[英]Container represents an allocated resource in the cluster.

The ResourceManager is the sole authority to allocate any Container to applications. The allocated Containeris always on a single node and has a unique ContainerId. It has a specific amount of Resource allocated.

It includes details such as:

  • ContainerId for the container, which is globally unique.
  • NodeId of the node on which it is allocated.
  • HTTP uri of the node.
  • Resource allocated to the container.
  • Priority at which the container was allocated.
  • Container Token of the container, used to securely verify authenticity of the allocation.
    Typically, an ApplicationMaster receives the Containerfrom the ResourceManager during resource-negotiation and then talks to the NodeManager to start/stop containers.
    [中]容器表示群集中已分配的资源。
    ResourceManager是将任何容器分配给应用程序的唯一权限。分配的容器始终位于单个节点上,并且具有唯一的容器ID。它分配了特定数量的资源。
    它包括以下细节:
    *容器的ContainerId,全局唯一。
    *在其上分配节点的NodeId。
    *节点的HTTP uri。
    *分配给容器的资源。
    *容器分配的优先级。
    *容器的容器令牌,用于安全地验证分配的真实性。
    通常,ApplicationMaster在资源协商期间从ResourceManager接收容器,然后与NodeManager对话以启动/停止容器。

代码示例

代码示例来源:origin: alibaba/jstorm

for (Container allocatedContainer : allocatedContainers) {
  LOG.info("Launching shell command on a new container."
      + ", containerId=" + allocatedContainer.getId()
      + ", containerNode=" + allocatedContainer.getNodeId().getHost()
      + ":" + allocatedContainer.getNodeId().getPort()
      + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
      + ", containerResourceMemory"
      + allocatedContainer.getResource().getMemory()
      + ", containerResourceVirtualCores"
      + allocatedContainer.getResource().getVirtualCores());
  if (allocatedContainer.getPriority().getPriority() == 0) {
    String supervisorHost = allocatedContainer.getNodeId().getHost();
    startType = STARTType.SUPERVISOR;
    String containerPath = RegistryUtils.componentPath(
        JOYConstants.APP_TYPE, jstormMasterContext.instanceName,
        allocatedContainer.getId().getApplicationAttemptId().getApplicationId().toString(), allocatedContainer.getId().toString());
        sr = new ServiceRecord();
        sr.set(JOYConstants.HOST, supervisorHost);
        sr.set(YarnRegistryAttributes.YARN_ID, allocatedContainer.getId().toString());
        sr.description = JOYConstants.CONTAINER;
        sr.set(YarnRegistryAttributes.YARN_PERSISTENCE,
    jstormMasterContext.nimbusHost = allocatedContainer.getNodeId().getHost();
    String path = RegistryUtils.serviceclassPath(
        JOYConstants.APP_TYPE, jstormMasterContext.instanceName);

代码示例来源:origin: apache/hive

public Container createContainer(Resource capability, Priority priority, String hostname,
   int port, String nodeHttpAddress) {
  ContainerId containerId =
    ContainerId.newContainerId(customAppAttemptId, nextId.getAndIncrement());
  NodeId nodeId = NodeId.newInstance(hostname, port);

  Container container =
    Container.newInstance(containerId, nodeId, nodeHttpAddress, capability, priority, null);

  return container;
 }
}

代码示例来源:origin: apache/flink

private void releaseYarnContainer(Container container) {
  LOG.info("Releasing YARN container {}", container.getId());
  containersBeingReturned.put(container.getId(), container);
  // release the container on the node manager
  try {
    nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
  } catch (Throwable t) {
    // we only log this error. since the ResourceManager also gets the release
    // notification, the container should be eventually cleaned up
    LOG.error("Error while calling YARN Node Manager to release container", t);
  }
  // tell the master that the container is no longer needed
  resourceManagerClient.releaseAssignedContainer(container.getId());
}

代码示例来源:origin: apache/drill

public static String labelContainer(Container container) {
 StringBuilder buf = new StringBuilder()
   .append("[id: ")
   .append(container.getId())
   .append(", host: ")
   .append(container.getNodeId().getHost())
   .append(", priority: ")
   .append(container.getPriority())
   .append("]");
 return buf.toString();
}

代码示例来源:origin: apache/drill

@Override
public void killContainer(Container container) {
 nodeMgr.stopContainerAsync(container.getId(), container.getNodeId());
}

代码示例来源:origin: alibaba/jstorm

amRMClient = AMRMClientAsync.createAMRMClientAsync(JOYConstants.AM_RM_CLIENT_INTERVAL, allocListener);
jstormMasterContext.amRMClient = amRMClient;
amRMClient.init(conf);
amRMClient.start();
jstormMasterContext.maxMemory = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capability of resources in this cluster " + jstormMasterContext.maxMemory);
jstormMasterContext.maxVcores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capability of resources in this cluster " + jstormMasterContext.maxVcores);
      container.getId().getApplicationAttemptId().getApplicationId().toString(), container.getId().toString());
  ServiceRecord sr = null;
  try {
    if (!registryOperations.exists(containerPath)) {
      String contianerHost = container.getNodeId().getHost();
      registryOperations.mknode(containerPath, true);
      sr = new ServiceRecord();
      sr.set(JOYConstants.HOST, contianerHost);
      sr.set(YarnRegistryAttributes.YARN_ID, container.getId().toString());
      sr.description = JOYConstants.CONTAINER;
      sr.set(YarnRegistryAttributes.YARN_PERSISTENCE,
  if (container.getPriority().getPriority() == 0)
    jstormMasterContext.supervisorContainers.add(container);
  else if (container.getPriority().getPriority() == 1) {
    jstormMasterContext.nimbusContainers.add(container);

代码示例来源:origin: alibaba/jstorm

@Override
public String info() throws TException {
  StringBuffer sbRet = new StringBuffer();
  sbRet.append("JstormOnYarn\n");
  sbRet.append("Instance Name:" + jstormMasterContext.instanceName + "\n");
  sbRet.append("Jstorm's location on hdfs:" + jstormMasterContext.deployPath + "\n");
  if (jstormMasterContext.user != null) {
    sbRet.append("Jstorm's data path:" + jstormMasterContext.nimbusDataDirPrefix + jstormMasterContext.instanceName + "\n");
    sbRet.append("Cluster userName:" + jstormMasterContext.user + "\n");
  }
  sbRet.append("Nimbus Count:" + jstormMasterContext.nimbusContainers.size() + "\n");
  sbRet.append("Supervisor Count:" + jstormMasterContext.supervisorContainers.size() + "\n");
  sbRet.append("detail :\n");
  sbRet.append("Type      \tContainerId                              \tHost      \tContainerMemory\tContainerVCores\n");
  for (Container container : jstormMasterContext.nimbusContainers) {
    sbRet.append("Nimbus    \t" + container.getId().toString() + "\t" + container.getNodeId().getHost() + "\t" + container.getResource().getMemory()
        + "\t        " + container.getResource().getVirtualCores() + "\n");
  }
  for (Container container : jstormMasterContext.supervisorContainers) {
    sbRet.append("Supervisor\t" + container.getId().toString() + "\t" + container.getNodeId().getHost() + "\t" + container.getResource().getMemory()
        + "\t        " + container.getResource().getVirtualCores() + "\n");
  }
  LOG.info("info is: " + sbRet.toString());
  return sbRet.toString();
}

代码示例来源:origin: alibaba/jstorm

nimbusMap.put(container.getId().getContainerId(), container);
  supervisorMap.put(container.getId().getContainerId(), container);
Long containerId = containerStatus.getContainerId().getContainerId();
  if (nimbusMap.containsKey(containerId)) {
    Container nimbusContainer = nimbusMap.get(containerId);
    containerAsk = setupContainerAskForRM(nimbusContainer.getResource().getMemory(), nimbusContainer.getResource().getVirtualCores(), nimbusContainer.getPriority().getPriority(), nimbusContainer.getNodeId().getHost());
    LOG.info("restart nimbus container" + ", containerId="
        + containerStatus.getContainerId());
  } else if (supervisorMap.containsKey(containerId)) {
    Container supervisorContainer = supervisorMap.get(containerId);
    containerAsk = setupContainerAskForRM(supervisorContainer.getResource().getMemory(), supervisorContainer.getResource().getVirtualCores(), supervisorContainer.getPriority().getPriority(), supervisorContainer.getNodeId().getHost());
    LOG.info("restart supervisor container" + ", containerId="
        + containerStatus.getContainerId());
    amRMClient.addContainerRequest(containerAsk);
    try {
      jstormMasterContext.requestBlockingQueue.put(containerAsk);

代码示例来源:origin: apache/drill

/**
 * Utility method to display YARN container information in a useful way for
 * log messages.
 *
 * @param container
 * @return
 */
public static String describeContainer(Container container) {
 StringBuilder buf = new StringBuilder()
   .append("[id: ")
   .append(container.getId())
   .append(", host: ")
   .append(container.getNodeId().getHost())
   .append(", priority: ")
   .append(container.getPriority())
   .append(", memory: ")
   .append(container.getResource().getMemory())
   .append(" MB, vcores: ")
   .append(container.getResource().getVirtualCores())
   .append("]");
 return buf.toString();
}

代码示例来源:origin: apache/metron

@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
 LOG.info("Got response from RM for container ask, allocatedCnt="
     + allocatedContainers.size());
 for (Container allocatedContainer : allocatedContainers) {
  containers.put(allocatedContainer.getId(), allocatedContainer);
  state.registerContainer(allocatedContainer.getResource(), allocatedContainer);
  LOG.info("Launching shell command on a new container."
      + ", containerId=" + allocatedContainer.getId()
      + ", containerNode=" + allocatedContainer.getNodeId().getHost()
      + ":" + allocatedContainer.getNodeId().getPort()
      + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
      + ", containerResourceMemory="
      + allocatedContainer.getResource().getMemory()
      + ", containerResourceVirtualCores="
      + allocatedContainer.getResource().getVirtualCores());
 }
}

代码示例来源:origin: alibaba/jstorm

private static void publishContainerStartEvent(
    final TimelineClient timelineClient, Container container, String domainId,
    UserGroupInformation ugi) {
  final TimelineEntity entity = new TimelineEntity();
  entity.setEntityId(container.getId().toString());
  entity.setEntityType(DSEntity.DS_CONTAINER.toString());
  entity.setDomainId(domainId);
  entity.addPrimaryFilter(JOYConstants.USER, ugi.getShortUserName());
  TimelineEvent event = new TimelineEvent();
  event.setTimestamp(System.currentTimeMillis());
  event.setEventType(DSEvent.DS_CONTAINER_START.toString());
  event.addEventInfo(JOYConstants.NODE, container.getNodeId().toString());
  event.addEventInfo(JOYConstants.RESOURCES, container.getResource().toString());
  entity.addEvent(event);
  try {
    ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
      @Override
      public TimelinePutResponse run() throws Exception {
        return timelineClient.putEntities(entity);
      }
    });
  } catch (Exception e) {
    LOG.error("Container start event could not be published for "
            + container.getId().toString(),
        e instanceof UndeclaredThrowableException ? e.getCause() : e);
  }
}

代码示例来源:origin: apache/ignite

env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost()));
    ));
  log.log(Level.INFO, "Launching container: {0}.", c.getId());
  containers.put(c.getId(),
    new IgniteContainer(
      c.getId(),
      c.getNodeId(),
      c.getResource().getVirtualCores(),
      c.getResource().getMemory()));
  log.log(Level.WARNING, "Error launching container " + c.getId(), ex);
rmClient.releaseAssignedContainer(c.getId());

代码示例来源:origin: org.apache.slider/slider-core

public static String containerToString(Container container) {
 if (container == null) {
  return "null container";
 }
 return String.format(Locale.ENGLISH,
   "ContainerID=%s nodeID=%s http=%s priority=%s resource=%s",
   container.getId(),
   container.getNodeId(),
   container.getNodeHttpAddress(),
   container.getPriority(),
   container.getResource());
}

代码示例来源:origin: apache/flink

log.info(
  "Received new container: {} - Remaining pending container requests: {}",
  container.getId(),
  numPendingContainerRequests);
  removeContainerRequest(pendingRequestsIterator.next());
  final String containerIdStr = container.getId().toString();
  final ResourceID resourceId = new ResourceID(containerIdStr);
      container.getResource(),
      containerIdStr,
      container.getNodeId().getHost());
    log.error("Could not start TaskManager in container {}.", container.getId(), t);
    resourceManagerClient.releaseAssignedContainer(container.getId());
  log.info("Returning excess container {}.", container.getId());
  resourceManagerClient.releaseAssignedContainer(container.getId());
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);

代码示例来源:origin: org.apache.hadoop/hadoop-mapreduce-client-app

public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
  ContainerLaunchContext containerLaunchContext,
  Container allocatedContainer, Task remoteTask) {
 super(taskAttemptID, allocatedContainer.getId(), StringInterner
  .weakIntern(allocatedContainer.getNodeId().toString()),
  allocatedContainer.getContainerToken(),
  ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
 this.allocatedContainer = allocatedContainer;
 this.containerLaunchContext = containerLaunchContext;
 this.task = remoteTask;
}

代码示例来源:origin: Qihoo360/XLearning

LOG.info("Canceling container: " + container.getId().toString());
   amrmAsync.releaseAssignedContainer(container.getId());
   LOG.info("Canceling container: " + container.getId().toString());
   amrmAsync.releaseAssignedContainer(container.getId());
 while (acquiredWorkerContainers.size() > workerNum) {
  Container releaseContainer = acquiredWorkerContainers.remove(0);
  amrmAsync.releaseAssignedContainer(releaseContainer.getId());
  LOG.info("Release container " + releaseContainer.getId().toString());
 if (acquiredPsContainers.size() > 0) {
  for (Container container : acquiredPsContainers) {
   containerHostnames.add(container.getNodeId().getHost());
   containerHostnames.add(container.getNodeId().getHost());
int index = 0;
for (Container container : acquiredPsContainers) {
 LOG.info("Launching ps container " + container.getId()
   + " on " + container.getNodeId().getHost() + ":" + container.getNodeId().getPort());
 containerListener.registerContainer(new XLearningContainerId(container.getId()), XLearningConstants.PS);
 LOG.info("Launching worker container " + container.getId()
   + " on " + container.getNodeId().getHost() + ":" + container.getNodeId().getPort());
 containerListener.registerContainer(new XLearningContainerId(container.getId()), XLearningConstants.WORKER);
 if (conf.getBoolean(XLearningConfiguration.XLEARNING_TF_EVALUATOR, XLearningConfiguration.DEFAULT_XLEARNING_TF_EVALUATOR) && index == workerNum) {

代码示例来源:origin: apache/drill

startTime = task.launchTime;
if (task.container != null) {
 containerId = task.container.getId().toString();
 Resource resource = task.container.getResource();
 memoryMb = resource.getMemory();
 vcores = resource.getVirtualCores();
 disks = task.getContainerSpec().disks;
 nmLink = "http://" + task.container.getNodeHttpAddress();
} else {
 memoryMb = task.scheduler.getResource().memoryMb;

代码示例来源:origin: apache/ignite

/**
 * @param cont Container.
 * @return {@code True} if container satisfies requirements.
 */
private boolean checkContainer(Container cont) {
  // Check limit on running nodes.
  if (props.instances() <= containers.size())
    return false;
  // Check host name
  if (props.hostnameConstraint() != null
      && props.hostnameConstraint().matcher(cont.getNodeId().getHost()).matches())
    return false;
  // Check that slave satisfies min requirements.
  if (cont.getResource().getVirtualCores() < props.cpusPerNode()
    || cont.getResource().getMemory() < props.totalMemoryPerNode()) {
    log.log(Level.FINE, "Container resources not sufficient requirements. Host: {0}, cpu: {1}, mem: {2}",
      new Object[]{cont.getNodeId().getHost(), cont.getResource().getVirtualCores(),
        cont.getResource().getMemory()});
    return false;
  }
  return true;
}

代码示例来源:origin: alibaba/jstorm

@Override
public void removeNimbus(int number) throws TException {
  if (jstormMasterContext.nimbusContainers.isEmpty())
    return;
  for (int i = 0; i < number; i++) {
    Container container = jstormMasterContext.nimbusContainers.poll();
    if (container != null) {
      amRMClient.releaseAssignedContainer(container.getId());
      LOG.info("release nimbus container, id: " + container.getId().toString());
    }
  }
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-client

public StartContainerEvent(Container container,
  ContainerLaunchContext containerLaunchContext) {
 super(container.getId(), container.getNodeId(),
   container.getContainerToken(), ContainerEventType.START_CONTAINER);
 this.container = container;
 this.containerLaunchContext = containerLaunchContext;
}

相关文章