本文整理了Java中org.apache.flink.api.common.functions.FlatMapFunction.flatMap()
方法的一些代码示例,展示了FlatMapFunction.flatMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。FlatMapFunction.flatMap()
方法的具体详情如下:
包路径:org.apache.flink.api.common.functions.FlatMapFunction
类名称:FlatMapFunction
方法名:flatMap
[英]The core method of the FlatMapFunction. Takes an element from the input data set and transforms it into zero, one, or more elements.
[中]FlatMapFunction的核心方法。从输入数据集中获取一个元素,并将其转换为零个、一个或多个元素。
代码示例来源:origin: apache/flink
@Override
public void flatMap(PyObject value, Collector<PyObject> out) throws Exception {
this.collector.setCollector(out);
try {
this.fun.flatMap(value, this.collector);
} catch (PyException pe) {
throw createAndLogException(pe);
}
}
}
代码示例来源:origin: apache/flink
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}
代码示例来源:origin: apache/flink
@Override
protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction<T, T> function = this.userFunction.getUserCodeObject();
FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);
ArrayList<T> result = new ArrayList<T>(inputData.size());
ListCollector<T> collector = new ListCollector<T>(result);
for (T element : inputData) {
function.flatMap(element, collector);
}
FunctionUtils.closeFunction(function);
return result;
}
}
代码示例来源:origin: apache/flink
@Override
public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
}
代码示例来源:origin: apache/flink
@Override
protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();
FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, parameters);
ArrayList<OUT> result = new ArrayList<OUT>(input.size());
TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
for (IN element : input) {
IN inCopy = inSerializer.copy(element);
function.flatMap(inCopy, resultCollector);
}
FunctionUtils.closeFunction(function);
return result;
}
}
代码示例来源:origin: apache/flink
@Override
public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}
代码示例来源:origin: org.apache.flink/flink-runtime
@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}
代码示例来源:origin: com.alibaba.blink/flink-core
@Override
protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction<T, T> function = this.userFunction.getUserCodeObject();
FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);
ArrayList<T> result = new ArrayList<T>(inputData.size());
ListCollector<T> collector = new ListCollector<T>(result);
for (T element : inputData) {
function.flatMap(element, collector);
}
FunctionUtils.closeFunction(function);
return result;
}
}
代码示例来源:origin: org.apache.flink/flink-core
@Override
protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction<T, T> function = this.userFunction.getUserCodeObject();
FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);
ArrayList<T> result = new ArrayList<T>(inputData.size());
ListCollector<T> collector = new ListCollector<T>(result);
for (T element : inputData) {
function.flatMap(element, collector);
}
FunctionUtils.closeFunction(function);
return result;
}
}
代码示例来源:origin: com.alibaba.blink/flink-core
@Override
protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();
FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, parameters);
ArrayList<OUT> result = new ArrayList<OUT>(input.size());
TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
for (IN element : input) {
IN inCopy = inSerializer.copy(element);
function.flatMap(inCopy, resultCollector);
}
FunctionUtils.closeFunction(function);
return result;
}
}
代码示例来源:origin: org.apache.flink/flink-core
@Override
protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();
FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, parameters);
ArrayList<OUT> result = new ArrayList<OUT>(input.size());
TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
for (IN element : input) {
IN inCopy = inSerializer.copy(element);
function.flatMap(inCopy, resultCollector);
}
FunctionUtils.closeFunction(function);
return result;
}
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
// cache references on the stack
final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
final FlatMapFunction<IT, OT> function = this.taskContext.getStub();
final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
while (this.running && ((record = input.next(record)) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
} else {
IT record;
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
}
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
// cache references on the stack
final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
final FlatMapFunction<IT, OT> function = this.taskContext.getStub();
final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
while (this.running && ((record = input.next(record)) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
} else {
IT record;
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
}
}
代码示例来源:origin: org.apache.flink/flink-runtime
@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
// cache references on the stack
final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
final FlatMapFunction<IT, OT> function = this.taskContext.getStub();
final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
while (this.running && ((record = input.next(record)) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
} else {
IT record;
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
}
}
内容来源于网络,如有侵权,请联系作者删除!