本文整理了Java中org.apache.flink.api.java.operators.MapOperator.withBroadcastSet()
方法的一些代码示例,展示了MapOperator.withBroadcastSet()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.withBroadcastSet()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:withBroadcastSet
暂无
代码示例来源:origin: apache/flink
@Test
public void testUnaryOp() {
try {
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
DataSet<String> bcData = env.fromElements(SUFFIX);
List<String> result = new ArrayList<String>();
env.fromElements(TEST_DATA)
.map(new SuffixAppender()).withBroadcastSet(bcData, BC_VAR_NAME)
.output(new LocalCollectionOutputFormat<String>(result));
env.execute();
assertEquals(TEST_DATA.length, result.size());
for (String s : result) {
assertTrue(s.indexOf(SUFFIX) > 0);
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void testBranchesOnlyInBCVariables1() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Long> input = env.generateSequence(1, 10);
DataSet<Long> bc_input = env.generateSequence(1, 10);
input
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
.output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
compileNoStats(plan);
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataSet<Long> input = env.generateSequence(1, 10);
DataSet<Long> bc1 = env.generateSequence(1, 5);
DataSet<Long> bc2 = env.generateSequence(6, 10);
List<Long> result = input
.map(new Mapper())
.withBroadcastSet(bc1.union(bc2), BC_NAME)
.reduce(new Reducer())
.collect();
Assert.assertEquals(Long.valueOf(3025), result.get(0));
}
代码示例来源:origin: apache/flink
.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
代码示例来源:origin: apache/flink
.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
.withBroadcastSet(bc_input2, "bc2");
.withBroadcastSet(bc_input1, "bc1")
.withBroadcastSet(bc_input2, "bc2");
.withBroadcastSet(bc_input1, "bc1")
.union(joinResult)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
代码示例来源:origin: apache/flink
@Test
public void testBCVariableClosure() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
DataSet<String> reduced = input
.map(new IdentityMapper<String>())
.reduceGroup(new Top1GroupReducer<String>());
DataSet<String> initialSolution = input.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc");
IterativeDataSet<String> iteration = initialSolution.iterate(100);
iteration.closeWith(iteration.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "red"))
.output(new DiscardingOutputFormat<String>());
Plan plan = env.createProgramPlan();
try{
compileNoStats(plan);
}catch(Exception e){
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void testBranchBeforeIteration() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> source1 = env.generateSequence(0,1);
DataSet<Long> source2 = env.generateSequence(0,1);
IterativeDataSet<Long> loopHead = source2.iterate(10).name("Loop");
DataSet<Long> loopTail = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopHead, "BC").name("In-Loop Mapper");
DataSet<Long> loopRes = loopHead.closeWith(loopTail);
DataSet<Long> map = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopRes, "BC").name("Post-Loop Mapper");
map.output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
try {
compileNoStats(plan);
}
catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
.map(new SubUpdate()).withBroadcastSet(loop, "parameters")
代码示例来源:origin: apache/flink
@Test
public void testNoBreakerForIndependentVariable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> source1 = env.fromElements("test");
DataSet<String> source2 = env.fromElements("test");
source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name")
.output(new DiscardingOutputFormat<String>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());
assertEquals(DataExchangeMode.PIPELINED, mapper.getInput().getDataExchangeMode());
assertEquals(DataExchangeMode.PIPELINED, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
.withBroadcastSet(bcInput1, "bc1")
.withBroadcastSet(bcInput2, "bc2");
代码示例来源:origin: apache/flink
map(new RichMapper1()).withBroadcastSet(ints, "ints");
List<Tuple3<Integer, Long, String>> result = bcMapDs.collect();
代码示例来源:origin: apache/flink
source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name")
.output(new DiscardingOutputFormat<String>());
代码示例来源:origin: apache/flink
.withBroadcastSet(source, "bc");
代码示例来源:origin: apache/flink
.withBroadcastSet(iteration, "bc2")
.withBroadcastSet(bcInput1, "bc1");
代码示例来源:origin: apache/flink
private static DataSet<String> analyze(DataSet<String> input, DataSet<String> stats, int branches) {
for (int i = 0; i < branches; i++) {
final int ii = i;
if (stats != null) {
input = input.map(
new RichMapFunction<String, String>() {
@Override
public String map(String value) {
return value;
}
}).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
}
DataSet<String> branch = input
.map(s -> new Tuple2<>(0, s + ii)).returns(Types.TUPLE(Types.STRING, Types.INT))
.groupBy(0)
.minBy(1)
.map(kv -> kv.f1).returns(Types.STRING);
if (stats == null) {
stats = branch;
} else {
stats = stats.union(branch);
}
}
return stats.map(s -> "(" + s + ").stats");
}
}
代码示例来源:origin: apache/flink
.map(new KMeans.SelectNearestCenter()).withBroadcastSet(loop, "centroids")
.rebalance()
.map(new KMeans.SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
代码示例来源:origin: apache/flink
.map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1);
代码示例来源:origin: apache/flink
IterativeDataSet<String> iteration3 = input.iterate(17);
iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1"))
.output(new DiscardingOutputFormat<String>());
iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2"))
代码示例来源:origin: apache/flink
}).withBroadcastSet(term, "some-name");
代码示例来源:origin: apache/flink
.map(new SelectNearestCenter()).name(MAPPER_NAME).withBroadcastSet(centroids, "centroids");
内容来源于网络,如有侵权,请联系作者删除!