本文整理了Java中org.apache.flink.api.java.operators.MapOperator.setParallelism()
方法的一些代码示例,展示了MapOperator.setParallelism()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.setParallelism()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:setParallelism
暂无
代码示例来源:origin: apache/flink
private <K extends Tuple> void createHashPartitionOperation(PythonOperationInfo info) {
DataSet<Tuple2<K, byte[]>> op1 = sets.getDataSet(info.parentID);
DataSet<byte[]> result = op1
.partitionByHash(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism)
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("HashPartitionPostStep");
sets.add(info.setID, result);
}
代码示例来源:origin: apache/flink
private <K extends Tuple> void createDistinctOperation(PythonOperationInfo info) {
DataSet<Tuple2<K, byte[]>> op = sets.getDataSet(info.parentID);
DataSet<byte[]> result = op
.distinct(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism).name("Distinct")
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("DistinctPostStep");
sets.add(info.setID, result);
}
代码示例来源:origin: apache/flink
@Override
public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
DataSet<Edge<K, EV>> edges = graph.getEdges();
if (hasNullValueEdges(edges)) {
return edges
.map(new EdgeToTuple2Map<>())
.name("Edge to Tuple2")
.setParallelism(parallelism.getValue().intValue());
} else {
return edges;
}
}
代码示例来源:origin: apache/flink
private void createTextSink(PythonOperationInfo info) {
DataSet<byte[]> parent = sets.getDataSet(info.parentID);
parent.map(new StringDeserializerMap()).setParallelism(info.parallelism)
.writeAsText(info.path, info.writeMode).setParallelism(info.parallelism).name("TextSink");
}
代码示例来源:origin: apache/flink
@SuppressWarnings("unchecked")
private <T extends Tuple> void createCsvSource(ExecutionEnvironment env, PythonOperationInfo info) {
if (!(info.types instanceof TupleTypeInfo)) {
throw new RuntimeException("The output type of a csv source has to be a tuple. The derived type is " + info);
}
Path path = new Path(info.path);
String lineD = info.lineDelimiter;
String fieldD = info.fieldDelimiter;
TupleTypeInfo<T> types = (TupleTypeInfo<T>) info.types;
sets.add(info.setID, env.createInput(new TupleCsvInputFormat<>(path, lineD, fieldD, types), types).setParallelism(info.parallelism).name("CsvSource")
.map(new SerializerMap<T>()).setParallelism(info.parallelism).name("CsvSourcePostStep"));
}
代码示例来源:origin: apache/flink
private void createPrintSink(PythonOperationInfo info) {
DataSet<byte[]> parent = sets.getDataSet(info.parentID);
parent.map(new StringDeserializerMap()).setParallelism(info.parallelism).name("PrintSinkPreStep")
.output(new PrintingOutputFormat<String>(info.toError)).setParallelism(info.parallelism);
}
代码示例来源:origin: apache/flink
private void createCsvSink(PythonOperationInfo info) {
DataSet<byte[]> parent = sets.getDataSet(info.parentID);
parent.map(new StringTupleDeserializerMap()).setParallelism(info.parallelism).name("CsvSinkPreStep")
.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(info.parallelism).name("CsvSink");
}
代码示例来源:origin: apache/flink
private void createTextSource(ExecutionEnvironment env, PythonOperationInfo info) {
sets.add(info.setID, env.readTextFile(info.path).setParallelism(info.parallelism).name("TextSource")
.map(new SerializerMap<String>()).setParallelism(info.parallelism).name("TextSourcePostStep"));
}
代码示例来源:origin: apache/flink
private void createSequenceSource(ExecutionEnvironment env, PythonOperationInfo info) {
sets.add(info.setID, env.generateSequence(info.frm, info.to).setParallelism(info.parallelism).name("SequenceSource")
.map(new SerializerMap<Long>()).setParallelism(info.parallelism).name("SequenceSourcePostStep"));
}
代码示例来源:origin: apache/flink
private void createValueSource(ExecutionEnvironment env, PythonOperationInfo info) {
sets.add(info.setID, env.fromCollection(info.values).setParallelism(info.parallelism).name("ValueSource")
.map(new SerializerMap<>()).setParallelism(info.parallelism).name("ValueSourcePostStep"));
}
代码示例来源:origin: apache/flink
@Override
public Graph<LongValue, NullValue, NullValue> generate() {
Preconditions.checkState(vertexPairCount > 0);
// Vertices
long vertexCount = 2 * vertexPairCount;
DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
// Edges
LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);
DataSet<Edge<LongValue, NullValue>> edges = env
.fromParallelCollection(iterator, LongValue.class)
.setParallelism(parallelism)
.name("Edge iterators")
.map(new LinkVertexToSingletonNeighbor())
.setParallelism(parallelism)
.name("Complete graph edges");
// Graph
return Graph.fromDataSet(vertices, edges, env);
}
代码示例来源:origin: apache/flink
private <K extends Tuple> void createFirstOperation(PythonOperationInfo info) {
if (sets.isDataSet(info.parentID)) {
DataSet<byte[]> op = sets.getDataSet(info.parentID);
sets.add(info.setID, op
.first(info.count).setParallelism(info.parallelism).name("First"));
} else if (sets.isUnsortedGrouping(info.parentID)) {
UnsortedGrouping<Tuple2<K, byte[]>> op = sets.getUnsortedGrouping(info.parentID);
sets.add(info.setID, op
.first(info.count).setParallelism(info.parallelism).name("First")
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
} else if (sets.isSortedGrouping(info.parentID)) {
SortedGrouping<Tuple2<K, byte[]>> op = sets.getSortedGrouping(info.parentID);
sets.add(info.setID, op
.first(info.count).setParallelism(info.parallelism).name("First")
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
}
}
代码示例来源:origin: apache/flink
@Test
public void testCorrectnessOfAllGroupReduceForTuplesWithCombine() throws Exception {
/*
* check correctness of all-groupreduce for tuples with combine
*/
org.junit.Assume.assumeTrue(mode != TestExecutionMode.COLLECTION);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env)
.map(new IdentityMapper<Tuple3<Integer, Long, String>>()).setParallelism(4);
Configuration cfg = new Configuration();
cfg.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION);
DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine())
.withParameters(cfg);
List<Tuple2<Integer, String>> result = reduceDs.collect();
String expected = "322," +
"testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
compareResultAsTuples(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testSortPartitionByKeyField() throws Exception {
/*
* Test sort partition on key field
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
List<Tuple1<Boolean>> result = ds
.map(new IdMapper<Tuple3<Integer, Long, String>>()).setParallelism(4) // parallelize input
.sortPartition(1, Order.DESCENDING)
.mapPartition(new OrderCheckMapper<>(new Tuple3Checker()))
.distinct().collect();
String expected = "(true)\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testSortPartitionPojoByNestedFieldExpression() throws Exception {
/*
* Test sort partition on field expression
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
List<Tuple1<Boolean>> result = ds
.map(new IdMapper<POJO>()).setParallelism(1) // parallelize input
.sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING)
.sortPartition("number", Order.DESCENDING)
.mapPartition(new OrderCheckMapper<>(new PojoChecker()))
.distinct().collect();
String expected = "(true)\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testSortPartitionByFieldExpression() throws Exception {
/*
* Test sort partition on field expression
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
List<Tuple1<Boolean>> result = ds
.map(new IdMapper()).setParallelism(4) // parallelize input
.sortPartition("f1", Order.DESCENDING)
.mapPartition(new OrderCheckMapper<>(new Tuple3Checker()))
.distinct().collect();
String expected = "(true)\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testSortPartitionWithKeySelector2() throws Exception {
/*
* Test sort partition on an extracted key
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
List<Tuple1<Boolean>> result = ds
.map(new IdMapper<Tuple3<Integer, Long, String>>()).setParallelism(4) // parallelize input
.sortPartition(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> value) throws Exception {
return new Tuple2<>(value.f0, value.f1);
}
}, Order.DESCENDING)
.mapPartition(new OrderCheckMapper<>(new Tuple3Checker()))
.distinct().collect();
String expected = "(true)\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testSortPartitionByNestedFieldExpression() throws Exception {
/*
* Test sort partition on nested field expressions
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
List<Tuple1<Boolean>> result = ds
.map(new IdMapper<Tuple2<Tuple2<Integer, Integer>, String>>()).setParallelism(3) // parallelize input
.sortPartition("f0.f1", Order.ASCENDING)
.sortPartition("f1", Order.DESCENDING)
.mapPartition(new OrderCheckMapper<>(new NestedTupleChecker()))
.distinct().collect();
String expected = "(true)\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testSortPartitionByTwoFieldExpressions() throws Exception {
/*
* Test sort partition on two field expressions
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
List<Tuple1<Boolean>> result = ds
.map(new IdMapper<Tuple5<Integer, Long, Integer, String, Long>>()).setParallelism(2) // parallelize input
.sortPartition("f4", Order.ASCENDING)
.sortPartition("f2", Order.DESCENDING)
.mapPartition(new OrderCheckMapper<>(new Tuple5Checker()))
.distinct().collect();
String expected = "(true)\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testSortPartitionByTwoKeyFields() throws Exception {
/*
* Test sort partition on two key fields
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
List<Tuple1<Boolean>> result = ds
.map(new IdMapper<Tuple5<Integer, Long, Integer, String, Long>>()).setParallelism(2) // parallelize input
.sortPartition(4, Order.ASCENDING)
.sortPartition(2, Order.DESCENDING)
.mapPartition(new OrderCheckMapper<>(new Tuple5Checker()))
.distinct().collect();
String expected = "(true)\n";
compareResultAsText(result, expected);
}
内容来源于网络,如有侵权,请联系作者删除!