org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(122)

本文整理了Java中org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()方法的一些代码示例,展示了Environment.getTaskManagerInfo()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getTaskManagerInfo()方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getTaskManagerInfo

Environment.getTaskManagerInfo介绍

[英]Gets the task manager info, with configuration and hostname.
[中]获取任务管理器信息,包括配置和主机名。

代码示例

代码示例来源:origin: apache/flink

private void tryShutdownTimerService() {
  if (timerService != null && !timerService.isTerminated()) {
    try {
      final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
        getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
      if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
        LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
          "timers. Will continue with shutdown procedure.", timeoutMs);
      }
    } catch (Throwable t) {
      // catch and log the exception to not replace the original exception
      LOG.error("Could not shut down timer service", t);
    }
  }
}

代码示例来源:origin: apache/flink

private StateBackend createStateBackend() throws Exception {
  final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader());
  return StateBackendLoader.fromApplicationOrConfigOrDefault(
      fromApplication,
      getEnvironment().getTaskManagerInfo().getConfiguration(),
      getUserCodeClassLoader(),
      LOG);
}

代码示例来源:origin: apache/flink

String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
ensureRocksDBIsLoaded(tempDir);

代码示例来源:origin: apache/flink

getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator,

代码示例来源:origin: apache/flink

@Override
  public void invoke() throws Exception {
    RecordReader<SpeedTestRecord> reader = new RecordReader<>(
        getEnvironment().getInputGate(0),
        SpeedTestRecord.class,
        getEnvironment().getTaskManagerInfo().getTmpDirectories());
    try {
      boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
      int numRecords = 0;
      while (reader.next() != null) {
        if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
          Thread.sleep(IS_SLOW_SLEEP_MS);
        }
      }
    }
    finally {
      reader.clearBuffers();
    }
  }
}

代码示例来源:origin: apache/flink

Configuration taskManagerConfig = environment.getTaskManagerInfo().getConfiguration();
int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
if (historySize <= 0) {

代码示例来源:origin: apache/flink

@Override
public void init() throws Exception {
  StreamConfig configuration = getConfiguration();
  TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  int numberOfInputs = configuration.getNumberOfInputs();
  if (numberOfInputs > 0) {
    InputGate[] inputGates = getEnvironment().getAllInputGates();
    inputProcessor = new StreamInputProcessor<>(
        inputGates,
        inSerializer,
        this,
        configuration.getCheckpointMode(),
        getCheckpointLock(),
        getEnvironment().getIOManager(),
        getEnvironment().getTaskManagerInfo().getConfiguration(),
        getStreamStatusMaintainer(),
        this.headOperator,
        getEnvironment().getMetricGroup().getIOMetricGroup(),
        inputWatermarkGauge);
  }
  headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  // wrap watermark gauge since registered metrics must be unique
  getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}

代码示例来源:origin: apache/flink

@Override
  public void invoke() throws Exception {
    RecordReader<SpeedTestRecord> reader = new RecordReader<>(
        getEnvironment().getInputGate(0),
        SpeedTestRecord.class,
        getEnvironment().getTaskManagerInfo().getTmpDirectories());
    RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0));
    try {
      SpeedTestRecord record;
      while ((record = reader.next()) != null) {
        writer.emit(record);
      }
    }
    finally {
      reader.clearBuffers();
      writer.clearBuffers();
      writer.flushAll();
    }
  }
}

代码示例来源:origin: apache/flink

final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
  ? getExecutionConfig().getLatencyTrackingInterval()

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
  return getEnvironment().getTaskManagerInfo();
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
  return getEnvironment().getTaskManagerInfo();
}

代码示例来源:origin: org.apache.flink/flink-runtime

@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
  return getEnvironment().getTaskManagerInfo();
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
  return getEnvironment().getTaskManagerInfo();
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

private void tryShutdownTimerService() {
  if (timerService != null && !timerService.isTerminated()) {
    try {
      final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
        getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
      if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
        LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
          "timers. Will continue with shutdown procedure.", timeoutMs);
      }
    } catch (Throwable t) {
      // catch and log the exception to not replace the original exception
      LOG.error("Could not shut down timer service", t);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

private void tryShutdownTimerService() {
  if (timerService != null && !timerService.isTerminated()) {
    try {
      final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
        getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
      if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
        LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
          "timers. Will continue with shutdown procedure.", timeoutMs);
      }
    } catch (Throwable t) {
      // catch and log the exception to not replace the original exception
      LOG.error("Could not shut down timer service", t);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

private StateBackend createStateBackend() throws Exception {
  final StateBackend fromJob = configuration.getStateBackend(getUserCodeClassLoader());
  if (fromJob != null) {
    // backend has been configured on the environment
    LOG.info("Using user-defined state backend: {}.", fromJob);
    return fromJob;
  }
  else {
    return AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(
        getEnvironment().getTaskManagerInfo().getConfiguration(),
        getUserCodeClassLoader(),
        LOG);
  }
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

private StateBackend createStateBackend() throws Exception {
  final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader());
  return StateBackendLoader.fromApplicationOrConfigOrDefault(
      fromApplication,
      getEnvironment().getTaskManagerInfo().getConfiguration(),
      getUserCodeClassLoader(),
      LOG);
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

private StateBackend createStateBackend() throws Exception {
  final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader());
  return StateBackendLoader.fromApplicationOrConfigOrDefault(
      fromApplication,
      getEnvironment().getTaskManagerInfo().getConfiguration(),
      getUserCodeClassLoader(),
      LOG);
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
  this.container = containingTask;
  this.config = config;
  this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
  this.output = new CountingOutput(output, ((OperatorMetricGroup) this.metrics).getIOMetricGroup().getNumRecordsOutCounter());
  if (config.isChainStart()) {
    ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseInputMetricsForTask();
  }
  if (config.isChainEnd()) {
    ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
  }
  Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
  int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
  if (historySize <= 0) {
    LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
    historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
  }
  latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));
  this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
  stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
  stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

@Override
public void init() throws Exception {
  StreamConfig configuration = getConfiguration();
  TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  int numberOfInputs = configuration.getNumberOfInputs();
  if (numberOfInputs > 0) {
    InputGate[] inputGates = getEnvironment().getAllInputGates();
    inputProcessor = new StreamInputProcessor<>(
        inputGates,
        inSerializer,
        this,
        configuration.getCheckpointMode(),
        getCheckpointLock(),
        getEnvironment().getIOManager(),
        getEnvironment().getTaskManagerInfo().getConfiguration(),
        getStreamStatusMaintainer(),
        this.headOperator);
    // make sure that stream tasks report their I/O statistics
    inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
  }
}

相关文章