org.apache.flink.runtime.execution.Environment.getWriter()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(12.5k)|赞(0)|评价(0)|浏览(107)

本文整理了Java中org.apache.flink.runtime.execution.Environment.getWriter()方法的一些代码示例,展示了Environment.getWriter()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getWriter()方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getWriter

Environment.getWriter介绍

暂无

代码示例

代码示例来源:origin: apache/flink

public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) {
  super(inputBase.getEnvironment().getWriter(0));
}

代码示例来源:origin: apache/flink

private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
      StreamEdge edge,
      int outputIndex,
      Environment environment,
      String taskName,
      long bufferTimeout) {
    @SuppressWarnings("unchecked")
    StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();

    LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);

    ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);

    // we initialize the partitioner here with the number of key groups (aka max. parallelism)
    if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
      int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
      if (0 < numKeyGroups) {
        ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
      }
    }

    RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
      RecordWriter.createRecordWriter(bufferWriter, outputPartitioner, bufferTimeout, taskName);
    output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
    return output;
  }
}

代码示例来源:origin: apache/flink

@Override
  public void invoke() throws Exception {
    RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0));
    try {
      // Determine the amount of data to send per subtask
      int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
      long dataMbPerSubtask = (dataVolumeGb * 10) / getCurrentNumberOfSubtasks();
      long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
      LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
          getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
          SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask / 1024.0));
      boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
      int numRecords = 0;
      SpeedTestRecord record = new SpeedTestRecord();
      for (long i = 0; i < numRecordsToEmit; i++) {
        if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
          Thread.sleep(IS_SLOW_SLEEP_MS);
        }
        writer.emit(record);
      }
    }
    finally {
      writer.clearBuffers();
      writer.flushAll();
    }
  }
}

代码示例来源:origin: apache/flink

@Override
  public void invoke() throws Exception {
    RecordReader<SpeedTestRecord> reader = new RecordReader<>(
        getEnvironment().getInputGate(0),
        SpeedTestRecord.class,
        getEnvironment().getTaskManagerInfo().getTmpDirectories());
    RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0));
    try {
      SpeedTestRecord record;
      while ((record = reader.next()) != null) {
        writer.emit(record);
      }
    }
    finally {
      reader.clearBuffers();
      writer.clearBuffers();
      writer.flushAll();
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime

@Override
protected void initOutputs() throws Exception {
  // initialize the regular outputs first (the ones into the step function).
  super.initOutputs();
  // at this time, the outputs to the step function are created
  // add the outputs for the final solution
  List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
  final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
  final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
  this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
      userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
  // sanity check the setup
  final int writersIntoStepFunction = this.eventualOutputs.size();
  final int writersIntoFinalResult = finalOutputWriters.size();
  final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
  if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
    throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
  }
  // now, we can instantiate the sync gate
  this.toSync = new RecordWriter<IOReadableWritable>(getEnvironment().getWriter(syncGateIndex));
  this.toSyncPartitionId = getEnvironment().getWriter(syncGateIndex).getPartitionId();
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

@Override
protected void initOutputs() throws Exception {
  // initialize the regular outputs first (the ones into the step function).
  super.initOutputs();
  // at this time, the outputs to the step function are created
  // add the outputs for the final solution
  List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
  final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
  final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
  this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
      userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
  // sanity check the setup
  final int writersIntoStepFunction = this.eventualOutputs.size();
  final int writersIntoFinalResult = finalOutputWriters.size();
  final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
  if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
    throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
  }
  // now, we can instantiate the sync gate
  this.toSync = getEnvironment().getWriter(syncGateIndex);
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

@Override
protected void initOutputs() throws Exception {
  // initialize the regular outputs first (the ones into the step function).
  super.initOutputs();
  // at this time, the outputs to the step function are created
  // add the outputs for the final solution
  List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
  final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
  final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
  this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
      userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
  // sanity check the setup
  final int writersIntoStepFunction = this.eventualOutputs.size();
  final int writersIntoFinalResult = finalOutputWriters.size();
  final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
  if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
    throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
  }
  // now, we can instantiate the sync gate
  this.toSync = new RecordWriter<IOReadableWritable>(getEnvironment().getWriter(syncGateIndex));
  this.toSyncPartitionId = getEnvironment().getWriter(syncGateIndex).getPartitionId();
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

@Override
protected void initOutputs() throws Exception {
  // initialize the regular outputs first (the ones into the step function).
  super.initOutputs();
  // at this time, the outputs to the step function are created
  // add the outputs for the final solution
  List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
  final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
  final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
  this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
      userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
  // sanity check the setup
  final int writersIntoStepFunction = this.eventualOutputs.size();
  final int writersIntoFinalResult = finalOutputWriters.size();
  final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
  if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
    throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
  }
  // now, we can instantiate the sync gate
  this.toSync = new RecordWriter<>(getEnvironment().getWriter(syncGateIndex));
  this.toSyncPartitionId = getEnvironment().getWriter(syncGateIndex).getPartitionId();
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

private static <OUT> StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> createStreamRecordWriter(
      StreamEdge edge,
      int outputIndex,
      Environment environment,
      String taskName,
      long bufferTimeout) {
    @SuppressWarnings("unchecked")
    StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();

    LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);

    ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);

    // we initialize the partitioner here with the number of key groups (aka max. parallelism)
    if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
      int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
      if (0 < numKeyGroups) {
        ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
      }
    }

    StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
      new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
    output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
    return output;
  }
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

private static <OUT> StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> createStreamRecordWriter(
      StreamEdge edge,
      int outputIndex,
      Environment environment,
      String taskName,
      long bufferTimeout) {
    @SuppressWarnings("unchecked")
    StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();

    LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);

    ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);

    // we initialize the partitioner here with the number of key groups (aka max. parallelism)
    if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
      int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
      if (0 < numKeyGroups) {
        ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
      }
    }

    StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
      new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
    output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
    return output;
  }
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

task.getEnvironment().getWriter(outputOffset + i).setTypeSerializer(serializerFactory.getSerializer());
task.getEnvironment().getWriter(outputOffset + i).setParentTask(task);
    new RecordWriter<T>(task.getEnvironment().getWriter(outputOffset + i),
      oe, strategy == ShipStrategyType.BROADCAST);

代码示例来源:origin: org.apache.flink/flink-runtime

new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

private <T> RecordWriterOutput<T> createStreamOutput(
    StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
    Environment taskEnvironment,
    String taskName) {
  OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
  TypeSerializer outSerializer = null;
  if (edge.getOutputTag() != null) {
    // side output
    outSerializer = upStreamConfig.getTypeSerializerSideOut(
        edge.getOutputTag(), taskEnvironment.getUserClassLoader());
  } else {
    // main output
    outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
  }
  @SuppressWarnings("unchecked")
  StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
  LOG.debug("Using partitioner {} for output {} of task ", outputPartitioner, outputIndex, taskName);
  ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
  // we initialize the partitioner here with the number of key groups (aka max. parallelism)
  if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
    int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
    if (0 < numKeyGroups) {
      ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
    }
  }
  StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
      new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
  output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
  return new RecordWriterOutput<>(output, outSerializer, sideOutputTag, this);
}

相关文章