org.apache.flink.api.common.functions.FlatMapFunction.flatMap()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(238)

本文整理了Java中org.apache.flink.api.common.functions.FlatMapFunction.flatMap()方法的一些代码示例,展示了FlatMapFunction.flatMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。FlatMapFunction.flatMap()方法的具体详情如下:
包路径:org.apache.flink.api.common.functions.FlatMapFunction
类名称:FlatMapFunction
方法名:flatMap

FlatMapFunction.flatMap介绍

[英]The core method of the FlatMapFunction. Takes an element from the input data set and transforms it into zero, one, or more elements.
[中]FlatMapFunction的核心方法。从输入数据集中获取一个元素,并将其转换为零个、一个或多个元素。

代码示例

代码示例来源:origin: apache/flink

@Override
  public void flatMap(PyObject value, Collector<PyObject> out) throws Exception {
    this.collector.setCollector(out);
    try {
      this.fun.flatMap(value, this.collector);
    } catch (PyException pe) {
      throw createAndLogException(pe);
    }
  }
}

代码示例来源:origin: apache/flink

@Override
  public void processElement(StreamRecord<IN> element) throws Exception {
    collector.setTimestamp(element);
    userFunction.flatMap(element.getValue(), collector);
  }
}

代码示例来源:origin: apache/flink

@Override
  protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
    FlatMapFunction<T, T> function = this.userFunction.getUserCodeObject();
    
    FunctionUtils.setFunctionRuntimeContext(function, ctx);
    FunctionUtils.openFunction(function, this.parameters);
    
    ArrayList<T> result = new ArrayList<T>(inputData.size());
    ListCollector<T> collector = new ListCollector<T>(result);

    for (T element : inputData) {
      function.flatMap(element, collector);
    }
    
    FunctionUtils.closeFunction(function);
    
    return result;
  }
}

代码示例来源:origin: apache/flink

@Override
public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
  userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
}

代码示例来源:origin: apache/flink

@Override
  protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
    FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();
    
    FunctionUtils.setFunctionRuntimeContext(function, ctx);
    FunctionUtils.openFunction(function, parameters);

    ArrayList<OUT> result = new ArrayList<OUT>(input.size());

    TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
    TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);

    CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);

    for (IN element : input) {
      IN inCopy = inSerializer.copy(element);
      function.flatMap(inCopy, resultCollector);
    }

    FunctionUtils.closeFunction(function);

    return result;
  }
}

代码示例来源:origin: apache/flink

@Override
public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
  userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
  getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

@Override
public void collect(IT record) {
  try {
    this.numRecordsIn.inc();
    this.mapper.flatMap(record, this.outputCollector);
  } catch (Exception ex) {
    throw new ExceptionInChainedStubException(this.taskName, ex);
  }
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

@Override
  public void processElement(StreamRecord<IN> element) throws Exception {
    collector.setTimestamp(element);
    userFunction.flatMap(element.getValue(), collector);
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime

@Override
public void collect(IT record) {
  try {
    this.numRecordsIn.inc();
    this.mapper.flatMap(record, this.outputCollector);
  } catch (Exception ex) {
    throw new ExceptionInChainedStubException(this.taskName, ex);
  }
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

@Override
  public void processElement(StreamRecord<IN> element) throws Exception {
    collector.setTimestamp(element);
    userFunction.flatMap(element.getValue(), collector);
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

@Override
public void collect(IT record) {
  try {
    this.numRecordsIn.inc();
    this.mapper.flatMap(record, this.outputCollector);
  } catch (Exception ex) {
    throw new ExceptionInChainedStubException(this.taskName, ex);
  }
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

@Override
  public void processElement(StreamRecord<IN> element) throws Exception {
    collector.setTimestamp(element);
    userFunction.flatMap(element.getValue(), collector);
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

@Override
public void collect(IT record) {
  try {
    this.numRecordsIn.inc();
    this.mapper.flatMap(record, this.outputCollector);
  } catch (Exception ex) {
    throw new ExceptionInChainedStubException(this.taskName, ex);
  }
}

代码示例来源:origin: com.alibaba.blink/flink-core

@Override
  protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
    FlatMapFunction<T, T> function = this.userFunction.getUserCodeObject();
    
    FunctionUtils.setFunctionRuntimeContext(function, ctx);
    FunctionUtils.openFunction(function, this.parameters);
    
    ArrayList<T> result = new ArrayList<T>(inputData.size());
    ListCollector<T> collector = new ListCollector<T>(result);

    for (T element : inputData) {
      function.flatMap(element, collector);
    }
    
    FunctionUtils.closeFunction(function);
    
    return result;
  }
}

代码示例来源:origin: org.apache.flink/flink-core

@Override
  protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
    FlatMapFunction<T, T> function = this.userFunction.getUserCodeObject();
    
    FunctionUtils.setFunctionRuntimeContext(function, ctx);
    FunctionUtils.openFunction(function, this.parameters);
    
    ArrayList<T> result = new ArrayList<T>(inputData.size());
    ListCollector<T> collector = new ListCollector<T>(result);

    for (T element : inputData) {
      function.flatMap(element, collector);
    }
    
    FunctionUtils.closeFunction(function);
    
    return result;
  }
}

代码示例来源:origin: com.alibaba.blink/flink-core

@Override
  protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
    FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();
    
    FunctionUtils.setFunctionRuntimeContext(function, ctx);
    FunctionUtils.openFunction(function, parameters);

    ArrayList<OUT> result = new ArrayList<OUT>(input.size());

    TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
    TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);

    CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);

    for (IN element : input) {
      IN inCopy = inSerializer.copy(element);
      function.flatMap(inCopy, resultCollector);
    }

    FunctionUtils.closeFunction(function);

    return result;
  }
}

代码示例来源:origin: org.apache.flink/flink-core

@Override
  protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
    FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();
    
    FunctionUtils.setFunctionRuntimeContext(function, ctx);
    FunctionUtils.openFunction(function, parameters);

    ArrayList<OUT> result = new ArrayList<OUT>(input.size());

    TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
    TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);

    CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);

    for (IN element : input) {
      IN inCopy = inSerializer.copy(element);
      function.flatMap(inCopy, resultCollector);
    }

    FunctionUtils.closeFunction(function);

    return result;
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

@Override
public void run() throws Exception {
  final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
  final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
  // cache references on the stack
  final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
  final FlatMapFunction<IT, OT> function = this.taskContext.getStub();
  final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
  if (objectReuseEnabled) {
    IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
    while (this.running && ((record = input.next(record)) != null)) {
      numRecordsIn.inc();
      function.flatMap(record, output);
    }
  } else {
    IT record;
    while (this.running && ((record = input.next()) != null)) {
      numRecordsIn.inc();
      function.flatMap(record, output);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

@Override
public void run() throws Exception {
  final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
  final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
  // cache references on the stack
  final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
  final FlatMapFunction<IT, OT> function = this.taskContext.getStub();
  final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
  if (objectReuseEnabled) {
    IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
    while (this.running && ((record = input.next(record)) != null)) {
      numRecordsIn.inc();
      function.flatMap(record, output);
    }
  } else {
    IT record;
    while (this.running && ((record = input.next()) != null)) {
      numRecordsIn.inc();
      function.flatMap(record, output);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime

@Override
public void run() throws Exception {
  final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
  final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
  // cache references on the stack
  final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
  final FlatMapFunction<IT, OT> function = this.taskContext.getStub();
  final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
  if (objectReuseEnabled) {
    IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
    while (this.running && ((record = input.next(record)) != null)) {
      numRecordsIn.inc();
      function.flatMap(record, output);
    }
  } else {
    IT record;
    while (this.running && ((record = input.next()) != null)) {
      numRecordsIn.inc();
      function.flatMap(record, output);
    }
  }
}

相关文章

微信公众号

最新文章

更多

FlatMapFunction类方法