org.apache.flink.runtime.execution.Environment.getJobID()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(148)

本文整理了Java中org.apache.flink.runtime.execution.Environment.getJobID()方法的一些代码示例,展示了Environment.getJobID()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getJobID()方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getJobID

Environment.getJobID介绍

[英]Returns the ID of the job that the task belongs to.
[中]返回任务所属作业的ID。

代码示例

代码示例来源:origin: apache/flink

this.jobId = env.getJobID();

代码示例来源:origin: apache/flink

@Override
public void init() throws Exception {
  final String iterationId = getConfiguration().getIterationId();
  if (iterationId == null || iterationId.length() == 0) {
    throw new Exception("Missing iteration ID in the task configuration");
  }
  final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
      getEnvironment().getTaskInfo().getIndexOfThisSubtask());
  final long iterationWaitTime = getConfiguration().getIterationWaitTime();
  LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
  @SuppressWarnings("unchecked")
  BlockingQueue<StreamRecord<IN>> dataChannel =
      (BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
  LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
  this.headOperator = new RecordPusher<>();
  this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
  // call super.init() last because that needs this.headOperator to be set up
  super.init();
}

代码示例来源:origin: apache/flink

final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId ,
    getEnvironment().getTaskInfo().getIndexOfThisSubtask());

代码示例来源:origin: apache/flink

() -> stateBackend.createKeyedStateBackend(
  environment,
  environment.getJobID(),
  operatorIdentifierText,
  keySerializer,

代码示例来源:origin: apache/flink

checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());

代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb_2.11

this.jobId = env.getJobID();

代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb_2.10

this.jobId = env.getJobID();

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

public CheckpointStreamFactory createSavepointStreamFactory(StreamOperator<?> operator, String targetLocation) throws IOException {
  return stateBackend.createSavepointStreamFactory(
    getEnvironment().getJobID(),
    createOperatorIdentifier(operator, configuration.getVertexID()),
    targetLocation);
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

/**
 * This is only visible because
 * {@link org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink} uses the
 * checkpoint stream factory to write write-ahead logs. <b>This should not be used for
 * anything else.</b>
 */
public CheckpointStreamFactory createCheckpointStreamFactory(StreamOperator<?> operator) throws IOException {
  return stateBackend.createStreamFactory(
      getEnvironment().getJobID(),
      createOperatorIdentifier(operator, configuration.getVertexID()));
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

public String brokerKey() {
  if (brokerKey == null) {
    int iterationId = config.getIterationId();
    brokerKey = getEnvironment().getJobID().toString() + '#' + iterationId + '#' +
        getEnvironment().getTaskInfo().getIndexOfThisSubtask();
  }
  return brokerKey;
}

代码示例来源:origin: org.apache.flink/flink-runtime

public String brokerKey() {
  if (brokerKey == null) {
    int iterationId = config.getIterationId();
    brokerKey = getEnvironment().getJobID().toString() + '#' + iterationId + '#' +
        getEnvironment().getTaskInfo().getIndexOfThisSubtask();
  }
  return brokerKey;
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

public String brokerKey() {
  if (brokerKey == null) {
    int iterationId = config.getIterationId();
    brokerKey = getEnvironment().getJobID().toString() + '#' + iterationId + '#' +
        getEnvironment().getTaskInfo().getIndexOfThisSubtask();
  }
  return brokerKey;
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

public String brokerKey() {
  if (brokerKey == null) {
    int iterationId = config.getIterationId();
    brokerKey = getEnvironment().getJobID().toString() + '#' + iterationId + '#' +
        getEnvironment().getTaskInfo().getIndexOfThisSubtask();
  }
  return brokerKey;
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

@Override
public void init() throws Exception {
  final String iterationId = getConfiguration().getIterationId();
  if (iterationId == null || iterationId.length() == 0) {
    throw new Exception("Missing iteration ID in the task configuration");
  }
  final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
      getEnvironment().getTaskInfo().getIndexOfThisSubtask());
  final long iterationWaitTime = getConfiguration().getIterationWaitTime();
  LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
  @SuppressWarnings("unchecked")
  BlockingQueue<StreamRecord<IN>> dataChannel =
      (BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
  LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
  this.headOperator = new RecordPusher<>();
  this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
  // call super.init() last because that needs this.headOperator to be set up
  super.init();
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

@Override
public void init() throws Exception {
  final String iterationId = getConfiguration().getIterationId();
  if (iterationId == null || iterationId.length() == 0) {
    throw new Exception("Missing iteration ID in the task configuration");
  }
  final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
      getEnvironment().getTaskInfo().getIndexOfThisSubtask());
  final long iterationWaitTime = getConfiguration().getIterationWaitTime();
  LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
  @SuppressWarnings("unchecked")
  BlockingQueue<StreamRecord<IN>> dataChannel =
      (BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
  LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
  this.headOperator = new RecordPusher<>();
  this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
  // call super.init() last because that needs this.headOperator to be set up
  super.init();
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

@Override
public void init() throws Exception {
  final String iterationId = getConfiguration().getIterationId();
  if (iterationId == null || iterationId.length() == 0) {
    throw new Exception("Missing iteration ID in the task configuration");
  }
  final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
      getEnvironment().getTaskInfo().getIndexOfThisSubtask());
  final long iterationWaitTime = getConfiguration().getIterationWaitTime();
  LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
  @SuppressWarnings("unchecked")
  BlockingQueue<StreamRecord<IN>> dataChannel =
      (BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
  LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
  this.headOperator = new RecordPusher<>();
  this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
  // call super.init() last because that needs this.headOperator to be set up
  super.init();
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId ,
    getEnvironment().getTaskInfo().getIndexOfThisSubtask());

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
    TypeSerializer<K> keySerializer,
    int numberOfKeyGroups,
    KeyGroupRange keyGroupRange) throws Exception {
  if (keyedStateBackend != null) {
    throw new RuntimeException("The keyed state backend can only be created once.");
  }
  String operatorIdentifier = createOperatorIdentifier(
      headOperator,
      configuration.getVertexID());
  keyedStateBackend = stateBackend.createKeyedStateBackend(
      getEnvironment(),
      getEnvironment().getJobID(),
      operatorIdentifier,
      keySerializer,
      numberOfKeyGroups,
      keyGroupRange,
      getEnvironment().getTaskKvStateRegistry());
  // let keyed state backend participate in the operator lifecycle, i.e. make it responsive to cancelation
  cancelables.registerClosable(keyedStateBackend);
  // restore if we have some old state
  Collection<KeyedStateHandle> restoreKeyedStateHandles =
    restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState();
  keyedStateBackend.restore(restoreKeyedStateHandles);
  @SuppressWarnings("unchecked")
  AbstractKeyedStateBackend<K> typedBackend = (AbstractKeyedStateBackend<K>) keyedStateBackend;
  return typedBackend;
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

() -> stateBackend.createKeyedStateBackend(
  environment,
  environment.getJobID(),
  operatorIdentifierText,
  keySerializer,

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

() -> stateBackend.createKeyedStateBackend(
  environment,
  environment.getJobID(),
  operatorIdentifierText,
  keySerializer,

相关文章