org.apache.flink.runtime.execution.Environment.getMetricGroup()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(15.5k)|赞(0)|评价(0)|浏览(183)

本文整理了Java中org.apache.flink.runtime.execution.Environment.getMetricGroup()方法的一些代码示例,展示了Environment.getMetricGroup()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getMetricGroup()方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getMetricGroup

Environment.getMetricGroup介绍

[英]Returns the task specific metric group.
[中]返回特定于任务的度量值组。

代码示例

代码示例来源:origin: apache/flink

private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
      StreamEdge edge,
      int outputIndex,
      Environment environment,
      String taskName,
      long bufferTimeout) {
    @SuppressWarnings("unchecked")
    StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();

    LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);

    ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);

    // we initialize the partitioner here with the number of key groups (aka max. parallelism)
    if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
      int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
      if (0 < numKeyGroups) {
        ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
      }
    }

    RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
      RecordWriter.createRecordWriter(bufferWriter, outputPartitioner, bufferTimeout, taskName);
    output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
    return output;
  }
}

代码示例来源:origin: apache/flink

getStreamStatusMaintainer(),
    this.headOperator,
    getEnvironment().getMetricGroup().getIOMetricGroup(),
    input1WatermarkGauge,
    input2WatermarkGauge);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, input2WatermarkGauge);
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge::getValue);

代码示例来源:origin: apache/flink

@Override
public void init() throws Exception {
  StreamConfig configuration = getConfiguration();
  TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  int numberOfInputs = configuration.getNumberOfInputs();
  if (numberOfInputs > 0) {
    InputGate[] inputGates = getEnvironment().getAllInputGates();
    inputProcessor = new StreamInputProcessor<>(
        inputGates,
        inSerializer,
        this,
        configuration.getCheckpointMode(),
        getCheckpointLock(),
        getEnvironment().getIOManager(),
        getEnvironment().getTaskManagerInfo().getConfiguration(),
        getStreamStatusMaintainer(),
        this.headOperator,
        getEnvironment().getMetricGroup().getIOMetricGroup(),
        inputWatermarkGauge);
  }
  headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  // wrap watermark gauge since registered metrics must be unique
  getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}

代码示例来源:origin: apache/flink

this.config = config;
try {
  OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
  this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
  if (config.isChainStart()) {

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

public DistributedRuntimeUDFContext createRuntimeContext() {
    Environment env = getEnvironment();

    String sourceName =  getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
    sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
    return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
        getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
        getEnvironment().getMetricGroup().addOperator(sourceName));
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime

public DistributedRuntimeUDFContext createRuntimeContext() {
    Environment env = getEnvironment();

    String sourceName =  getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
    sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
    return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
        getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
        getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
  }
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

public DistributedRuntimeUDFContext createRuntimeContext() {
    Environment env = getEnvironment();

    String sourceName =  getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
    sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
    return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
        getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry(),
        getEnvironment().getMetricGroup().addOperator(sourceName));
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

public DistributedRuntimeUDFContext createRuntimeContext() {
    Environment env = getEnvironment();

    String sourceName =  getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
    sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
    return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
        getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
        getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
  }
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

private static <OUT> StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> createStreamRecordWriter(
      StreamEdge edge,
      int outputIndex,
      Environment environment,
      String taskName,
      long bufferTimeout) {
    @SuppressWarnings("unchecked")
    StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();

    LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);

    ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);

    // we initialize the partitioner here with the number of key groups (aka max. parallelism)
    if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
      int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
      if (0 < numKeyGroups) {
        ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
      }
    }

    StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
      new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
    output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
    return output;
  }
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

public DistributedRuntimeUDFContext createRuntimeContext() {
    Environment env = getEnvironment();

    return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
        getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry(),
        getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName()));
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

public DistributedRuntimeUDFContext createRuntimeContext() {
    Environment env = getEnvironment();

    return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
        getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
        getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName()));
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

public DistributedRuntimeUDFContext createRuntimeContext() {
    Environment env = getEnvironment();

    return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
        getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
        getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime

public DistributedRuntimeUDFContext createRuntimeContext() {
    Environment env = getEnvironment();

    return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
        getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
        getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
  }
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

@Override
public void init() throws Exception {
  StreamConfig configuration = getConfiguration();
  TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  int numberOfInputs = configuration.getNumberOfInputs();
  if (numberOfInputs > 0) {
    InputGate[] inputGates = getEnvironment().getAllInputGates();
    inputProcessor = new StreamInputProcessor<>(
        inputGates,
        inSerializer,
        this,
        configuration.getCheckpointMode(),
        getCheckpointLock(),
        getEnvironment().getIOManager(),
        getEnvironment().getTaskManagerInfo().getConfiguration(),
        getStreamStatusMaintainer(),
        this.headOperator,
        getEnvironment().getMetricGroup().getIOMetricGroup(),
        inputWatermarkGauge);
  }
  headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  // wrap watermark gauge since registered metrics must be unique
  getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

@Override
public void init() throws Exception {
  StreamConfig configuration = getConfiguration();
  TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  int numberOfInputs = configuration.getNumberOfInputs();
  if (numberOfInputs > 0) {
    InputGate[] inputGates = getEnvironment().getAllInputGates();
    inputProcessor = new StreamInputProcessor<>(
        inputGates,
        inSerializer,
        this,
        configuration.getCheckpointMode(),
        getCheckpointLock(),
        getEnvironment().getIOManager(),
        getEnvironment().getTaskManagerInfo().getConfiguration(),
        getStreamStatusMaintainer(),
        this.headOperator,
        getEnvironment().getMetricGroup().getIOMetricGroup(),
        inputWatermarkGauge);
  }
  headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  // wrap watermark gauge since registered metrics must be unique
  getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
    AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
    AbstractAccumulatorRegistry accumulatorRegistry)
{
  this.config = config;
  this.taskName = taskName;
  this.userCodeClassLoader = userCodeClassLoader;
  this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
  this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
  this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
  this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
  Environment env = parent.getEnvironment();
  if (parent instanceof BatchTask) {
    this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(metrics);
  } else {
    this.udfContext = new DistributedRuntimeUDFContext(env.getTaskInfo(), userCodeClassLoader,
        parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorRegistry, metrics);
  }
  this.executionConfig = executionConfig;
  this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
  setup(parent);
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
    AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
    Map<String, Accumulator<?,?>> accumulatorMap)
{
  this.config = config;
  this.taskName = taskName;
  this.userCodeClassLoader = userCodeClassLoader;
  this.metrics = parent.getEnvironment().getMetricGroup().getOrAddOperator(taskName);
  this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
  this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
  this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
  Environment env = parent.getEnvironment();
  if (parent instanceof BatchTask) {
    this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(metrics);
  } else {
    this.udfContext = new DistributedRuntimeUDFContext(env.getTaskInfo(), userCodeClassLoader,
        parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorMap, metrics
    );
  }
  this.executionConfig = executionConfig;
  this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
  setup(parent);
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
    AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
    Map<String, Accumulator<?,?>> accumulatorMap)
{
  this.config = config;
  this.taskName = taskName;
  this.userCodeClassLoader = userCodeClassLoader;
  this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
  this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
  this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
  this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
  Environment env = parent.getEnvironment();
  if (parent instanceof BatchTask) {
    this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(metrics);
  } else {
    this.udfContext = new DistributedRuntimeUDFContext(env.getTaskInfo(), userCodeClassLoader,
        parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorMap, metrics
    );
  }
  this.executionConfig = executionConfig;
  this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
  setup(parent);
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

@Override
public void init() throws Exception {
  StreamConfig configuration = getConfiguration();
  TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  int numberOfInputs = configuration.getNumberOfInputs();
  if (numberOfInputs > 0) {
    InputGate[] inputGates = getEnvironment().getAllInputGates();
    inputProcessor = new StreamInputProcessor<>(
        inputGates,
        inSerializer,
        this,
        configuration.getCheckpointMode(),
        getCheckpointLock(),
        getEnvironment().getIOManager(),
        getEnvironment().getTaskManagerInfo().getConfiguration(),
        getStreamStatusMaintainer(),
        this.headOperator);
    // make sure that stream tasks report their I/O statistics
    inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
  }
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
  this.container = containingTask;
  this.config = config;
  this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
  this.output = new CountingOutput(output, ((OperatorMetricGroup) this.metrics).getIOMetricGroup().getNumRecordsOutCounter());
  if (config.isChainStart()) {
    ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseInputMetricsForTask();
  }
  if (config.isChainEnd()) {
    ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
  }
  Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
  int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
  if (historySize <= 0) {
    LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
    historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
  }
  latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));
  this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
  stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
  stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
}

相关文章