本文整理了Java中org.apache.flink.api.java.operators.MapOperator.returns()
方法的一些代码示例,展示了MapOperator.returns()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.returns()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:returns
暂无
代码示例来源:origin: apache/flink
/**
* Apply a function to the attribute of each vertex in the graph.
*
* @param mapper the map function to apply.
* @param returnType the explicit return type.
* @return a new graph
*/
public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K, NV>> returnType) {
DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
private Vertex<K, NV> output = new Vertex<>();
public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
output.f0 = value.f0;
output.f1 = mapper.map(value);
return output;
}
})
.returns(returnType)
.withForwardedFields("f0")
.name("Map vertices");
return new Graph<>(mappedVertices, this.edges, this.context);
}
代码示例来源:origin: apache/flink
/**
* Apply a function to the attribute of each edge in the graph.
*
* @param mapper the map function to apply.
* @param returnType the explicit return type.
* @return a new graph
*/
public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K, NV>> returnType) {
DataSet<Edge<K, NV>> mappedEdges = edges.map(
new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
private Edge<K, NV> output = new Edge<>();
public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
output.f0 = value.f0;
output.f1 = value.f1;
output.f2 = mapper.map(value);
return output;
}
})
.returns(returnType)
.withForwardedFields("f0; f1")
.name("Map edges");
return new Graph<>(this.vertices, mappedEdges, this.context);
}
代码示例来源:origin: apache/flink
/**
* Count the number of elements in a DataSet.
*
* @param input DataSet of elements to be counted
* @param <T> element type
* @return count
*/
public static <T> DataSet<LongValue> count(DataSet<T> input) {
return input
.map(new MapTo<>(new LongValue(1)))
.returns(LONG_VALUE_TYPE_INFO)
.name("Emit 1")
.reduce(new AddLongValue())
.name("Sum");
}
代码示例来源:origin: apache/flink
@SuppressWarnings({"unchecked", "rawtypes"})
@Test
public void testFunctionWithMissingGenericsAndReturns() {
RichMapFunction function = new RichMapFunction() {
private static final long serialVersionUID = 1L;
@Override
public Object map(Object value) throws Exception {
return null;
}
};
TypeInformation<?> info = ExecutionEnvironment.getExecutionEnvironment()
.fromElements("arbitrary", "data")
.map(function).returns(Types.STRING).getResultType();
assertEquals(Types.STRING, info);
}
代码示例来源:origin: apache/flink
return output;
}).returns(returnType).withForwardedFields("f0").name("Initialize vertex values");
代码示例来源:origin: apache/flink
private static DataSet<String> analyze(DataSet<String> input, DataSet<String> stats, int branches) {
for (int i = 0; i < branches; i++) {
final int ii = i;
if (stats != null) {
input = input.map(
new RichMapFunction<String, String>() {
@Override
public String map(String value) {
return value;
}
}).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
}
DataSet<String> branch = input
.map(s -> new Tuple2<>(0, s + ii)).returns(Types.TUPLE(Types.STRING, Types.INT))
.groupBy(0)
.minBy(1)
.map(kv -> kv.f1).returns(Types.STRING);
if (stats == null) {
stats = branch;
} else {
stats = stats.union(branch);
}
}
return stats.map(s -> "(" + s + ").stats");
}
}
代码示例来源:origin: apache/flink
.returns(Types.TUPLE(Types.INT, Types.DOUBLE)); // for lambda with generics
代码示例来源:origin: apache/flink
.returns(returnType)
.setParallelism(parallelism)
.name("Translate edge IDs");
代码示例来源:origin: apache/flink
.returns(returnType)
.setParallelism(parallelism)
.name("Translate edge values");
代码示例来源:origin: apache/flink
.returns(returnType)
.setParallelism(parallelism)
.name("Translate vertex IDs");
代码示例来源:origin: apache/flink
.returns(returnType)
.setParallelism(parallelism)
.name("Translate vertex values");
代码示例来源:origin: apache/flink
@Test
public void testIdentityMapWithMissingTypesAndStringTypeHint() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
.map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
.returns(new TypeHint<Tuple3<Integer, Long, String>>(){});
List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
String expectedResult = "(2,2,Hello)\n" +
"(3,2,Hello world)\n" +
"(1,1,Hi)\n";
compareResultAsText(result, expectedResult);
}
代码示例来源:origin: apache/flink
@Test
public void testIdentityMapWithMissingTypesAndTypeInformationTypeHint() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
// all following generics get erased during compilation
.map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
.returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
List<Tuple3<Integer, Long, String>> result = identityMapDs
.collect();
String expectedResult = "(2,2,Hello)\n" +
"(3,2,Hello world)\n" +
"(1,1,Hi)\n";
compareResultAsText(result, expectedResult);
}
代码示例来源:origin: apache/flink
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.INT, Types.STRING))
DataSet<Tuple2<String, Integer>> iteration = initial
.map(x -> Tuple2.of(x.f0, x.f1 * 2))
.returns(Types.TUPLE(Types.STRING, Types.INT));
DataSet<Boolean> termination = iteration
.returns(Types.TUPLE(Types.INT, Types.INT));
代码示例来源:origin: apache/flink
new InitializeWorkSet<K, VV, Message>()).returns(workSetTypeInfo);
代码示例来源:origin: org.gradoop/gradoop-flink
/**
* Prepares the statistic a source and edge label distribution.
* @param graph the logical graph for the calculation.
* @return tuples with the containing statistics.
*/
@Override
public MapOperator<WithCount<Tuple2<String, String>>, Tuple3<String, String, Long>>
execute(final LogicalGraph graph) {
return new SourceLabelAndEdgeLabelDistribution()
.execute(graph)
.map(value -> Tuple3.of(value.f0.f0, value.f0.f1, value.f1))
.returns(new TypeHint<Tuple3<String, String, Long>>() { });
}
代码示例来源:origin: org.gradoop/gradoop-flink
/**
* Prepares the statistic for distinct edge properties by label.
* @param graph the logical graph for the calculation.
* @return tuples with the containing statistics.
*/
@Override
public MapOperator<WithCount<Tuple2<String, String>>, Tuple3<String, String, Long>>
execute(LogicalGraph graph) {
return new DistinctEdgePropertiesByLabel()
.execute(graph)
.map(value -> Tuple3.of(value.f0.f0, value.f0.f1, value.f1))
.returns(new TypeHint<Tuple3<String, String, Long>>() { });
}
代码示例来源:origin: seznam/euphoria
static DataSet<Tuple2<Long, String>> getHdfsSource(ExecutionEnvironment env, URI inputPath)
throws IOException {
SearchEventsParser parser = new SearchEventsParser();
return env.readFile(new TextInputFormat(new Path(inputPath)), inputPath.toString())
.map(parser::parse)
.filter(q -> q != null && q.query != null && !q.query.isEmpty())
.map(q -> Tuple2.of(q.timestamp, q.query))
.returns(new TypeHint<Tuple2<Long, String>>() {});
}
代码示例来源:origin: dbs-leipzig/gradoop
@Override
public DataSet<Tuple3<String, String, String>> readDistributed(String path, GradoopFlinkConfig
config) {
return config.getExecutionEnvironment()
.readTextFile(path)
.map(line -> StringEscaper.split(line, CSVConstants.TOKEN_DELIMITER, 3))
.map(tokens -> Tuple3.of(tokens[0], tokens[1], tokens[2]))
.returns(new TypeHint<Tuple3<String, String, String>>() {
});
}
代码示例来源:origin: org.gradoop/gradoop-flink
@Override
public DataSet<Tuple3<String, String, String>> readDistributed(String path, GradoopFlinkConfig
config) {
return config.getExecutionEnvironment()
.readTextFile(path)
.map(line -> StringEscaper.split(line, CSVConstants.TOKEN_DELIMITER, 3))
.map(tokens -> Tuple3.of(tokens[0], tokens[1], tokens[2]))
.returns(new TypeHint<Tuple3<String, String, String>>() {
});
}
内容来源于网络,如有侵权,请联系作者删除!