org.apache.flink.api.java.operators.MapOperator.join()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(15.2k)|赞(0)|评价(0)|浏览(95)

本文整理了Java中org.apache.flink.api.java.operators.MapOperator.join()方法的一些代码示例,展示了MapOperator.join()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.join()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:join

MapOperator.join介绍

暂无

代码示例

代码示例来源:origin: apache/flink

@Test
public void reuseSinglePartitioningJoin1() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
      .partitionByHash(0,1)
      .map(new MockMapper()).withForwardedFields("0;1")
      .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
      .where(0,1).equalTo(0,1).with(new MockJoin());
  joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
  Plan plan = env.createProgramPlan();
  OptimizedPlan oPlan = compileWithStats(plan);
  SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
  DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
  checkValidJoinInputProperties(join);
}

代码示例来源:origin: apache/flink

@Test
public void reuseSinglePartitioningJoin2() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
      .partitionByHash(0,1)
      .map(new MockMapper()).withForwardedFields("0;1")
      .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
      .where(0,1).equalTo(2,1).with(new MockJoin());
  joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
  Plan plan = env.createProgramPlan();
  OptimizedPlan oPlan = compileWithStats(plan);
  SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
  DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
  checkValidJoinInputProperties(join);
}

代码示例来源:origin: apache/flink

@Test
public void reuseSinglePartitioningJoin4() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
      .partitionByHash(0)
      .map(new MockMapper()).withForwardedFields("0")
      .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
      .where(0,1).equalTo(2,1).with(new MockJoin());
  joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
  Plan plan = env.createProgramPlan();
  OptimizedPlan oPlan = compileWithStats(plan);
  SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
  DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
  checkValidJoinInputProperties(join);
}

代码示例来源:origin: apache/flink

@Test
public void reuseBothPartitioningJoin3() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
      .partitionByHash(0)
      .map(new MockMapper()).withForwardedFields("0")
      .join(set2.partitionByHash(2,1)
              .map(new MockMapper())
              .withForwardedFields("2;1"),
          JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
      .where(0,1).equalTo(2,1).with(new MockJoin());
  joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
  Plan plan = env.createProgramPlan();
  OptimizedPlan oPlan = compileWithStats(plan);
  SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
  DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
  checkValidJoinInputProperties(join);
}

代码示例来源:origin: apache/flink

@Test
public void reuseBothPartitioningJoin4() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
      .partitionByHash(0,2)
      .map(new MockMapper()).withForwardedFields("0;2")
      .join(set2.partitionByHash(1)
              .map(new MockMapper())
              .withForwardedFields("1"),
          JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
      .where(0,2).equalTo(2,1).with(new MockJoin());
  joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
  Plan plan = env.createProgramPlan();
  OptimizedPlan oPlan = compileWithStats(plan);
  SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
  DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
  checkValidJoinInputProperties(join);
}

代码示例来源:origin: apache/flink

@Test
public void reuseBothPartitioningJoin5() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
      .partitionByHash(2)
      .map(new MockMapper()).withForwardedFields("2")
      .join(set2.partitionByHash(1)
              .map(new MockMapper())
              .withForwardedFields("1"),
          JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
      .where(0,2).equalTo(2,1).with(new MockJoin());
  joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
  Plan plan = env.createProgramPlan();
  OptimizedPlan oPlan = compileWithStats(plan);
  SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
  DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
  checkValidJoinInputProperties(join);
}

代码示例来源:origin: apache/flink

@Test
public void reuseBothPartitioningJoin6() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
      .partitionByHash(0)
      .map(new MockMapper()).withForwardedFields("0")
      .join(set2.partitionByHash(1)
              .map(new MockMapper())
              .withForwardedFields("1"),
          JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
      .where(0,2).equalTo(1,2).with(new MockJoin());
  joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
  Plan plan = env.createProgramPlan();
  OptimizedPlan oPlan = compileWithStats(plan);
  SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
  DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
  checkValidJoinInputProperties(join);
}

代码示例来源:origin: apache/flink

@Test
public void reuseBothPartitioningJoin2() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
      .partitionByHash(0,1)
      .map(new MockMapper()).withForwardedFields("0;1")
      .join(set2.partitionByHash(1,2)
              .map(new MockMapper())
              .withForwardedFields("1;2"),
          JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
      .where(0,1).equalTo(2,1).with(new MockJoin());
  joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
  Plan plan = env.createProgramPlan();
  OptimizedPlan oPlan = compileWithStats(plan);
  SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
  DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
  checkValidJoinInputProperties(join);
}

代码示例来源:origin: apache/flink

@Test
public void reuseBothPartitioningJoin1() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
      .partitionByHash(0,1)
      .map(new MockMapper()).withForwardedFields("0;1")
      .join(set2.partitionByHash(0,1)
            .map(new MockMapper())
            .withForwardedFields("0;1"),
          JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
      .where(0,1).equalTo(0,1).with(new MockJoin());
  joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
  Plan plan = env.createProgramPlan();
  OptimizedPlan oPlan = compileWithStats(plan);
  SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
  DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
  checkValidJoinInputProperties(join);
}

代码示例来源:origin: apache/flink

@Test
public void reuseBothPartitioningJoin7() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
  DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
      .partitionByHash(2)
      .map(new MockMapper()).withForwardedFields("2")
      .join(set2.partitionByHash(1)
              .map(new MockMapper())
              .withForwardedFields("1"),
          JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
      .where(0,2).equalTo(1,2).with(new MockJoin());
  joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
  Plan plan = env.createProgramPlan();
  OptimizedPlan oPlan = compileWithStats(plan);
  SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
  DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
  checkValidJoinInputProperties(join);
}

代码示例来源:origin: apache/flink

invariantInput
  .map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
  .join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1);

代码示例来源:origin: apache/flink

/**
 * Tests join program with replicated data source behind map.
 */
@Test
public void checkJoinWithReplicatedSourceInputBehindMap() {
  ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
  env.setParallelism(DEFAULT_PARALLELISM);
  TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
  ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
      new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new TupleCsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
  DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
  DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
  DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
      .map(new IdMap())
      .join(source2).where("*").equalTo("*")
      .writeAsText("/some/newpath");
  Plan plan = env.createProgramPlan();
  // submit the plan to the compiler
  OptimizedPlan oPlan = compileNoStats(plan);
  // check the optimized Plan
  // when join should have forward strategy on both sides
  SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
  DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor();
  ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
  ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
  Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1);
  Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2);
}

代码示例来源:origin: apache/flink

public static DataSet<Tuple2<Long, Double>> constructPlan(DataSet<Tuple2<Long, Double>> initialData, int numIterations) {
  DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = initialData.iterateDelta(initialData, numIterations, 0);
  DataSet<Tuple2<Long, Double>> delta = iteration.getSolutionSet()
      .join(iteration.getWorkset().flatMap(new Duplicator())).where(0).equalTo(0).with(new SummingJoin()).name(JOIN_1)
      .groupBy(0).aggregate(Aggregations.MIN, 1).map(new Expander())
      .join(iteration.getSolutionSet()).where(0).equalTo(0).with(new SummingJoinProject()).name(JOIN_2);
  DataSet<Tuple2<Long, Double>> changes = delta.groupBy(0).aggregate(Aggregations.SUM, 1);
  DataSet<Tuple2<Long, Double>> result = iteration.closeWith(delta, changes);
  return result;
}

代码示例来源:origin: apache/flink

iteration.getWorkset()
  .map(new IdentityMapper<Tuple2<Double, String>>())
  .join(iteration.getWorkset()).where(1).equalTo(1).projectFirst(0, 1);

代码示例来源:origin: apache/flink

.flatMap(new IdFlatMap())
.map(new IdMap())
.join(source2).where("*").equalTo("*")
.writeAsText("/some/newpath");

代码示例来源:origin: apache/flink

.join(x8Keys).where(3).equalTo(0).with((l, r) -> Tuple2.of(l.f3, 1))
  .returns(Types.TUPLE(Types.STRING, Types.INT))

代码示例来源:origin: org.gradoop/gradoop-flink

@Override
public DataSet<WithCount<Tuple2<String, String>>> execute(LogicalGraph graph) {
 return Count.groupBy(graph.getVertices()
  .map(new ToIdWithLabel<>())
  .join(graph.getEdges().map(new ToTargetIdWithLabel<>()))
  .where(0).equalTo(0)
  .with(new BothLabels()))
  .map(new Tuple2ToWithCount<>());
}

代码示例来源:origin: dbs-leipzig/gradoop

@Override
 public DataSet<WithCount<Tuple2<String, String>>> execute(LogicalGraph graph) {
  return Count.groupBy(graph.getVertices()
   .map(new ToIdWithLabel<>())
   .join(graph.getEdges().map(new ToSourceIdWithLabel<>()))
   .where(0).equalTo(0)
   .with(new BothLabels()))
   .map(new Tuple2ToWithCount<>());
 }
}

代码示例来源:origin: dbs-leipzig/gradoop

@Override
 public DataSet<WithCount<Tuple2<String, String>>> execute(LogicalGraph graph) {
  return Count.groupBy(graph.getVertices()
   .map(new ToIdWithLabel<>())
   .join(graph.getEdges().map(new ToTargetIdWithLabel<>()))
   .where(0).equalTo(0)
   .with(new BothLabels()))
   .map(new Tuple2ToWithCount<>());
 }
}

代码示例来源:origin: org.gradoop/gradoop-flink

@Override
public DataSet<WithCount<Tuple2<String, String>>> execute(LogicalGraph graph) {
 return Count.groupBy(graph.getVertices()
  .map(new ToIdWithLabel<>())
  .join(graph.getEdges().map(new ToSourceIdWithLabel<>()))
  .where(0).equalTo(0)
  .with(new BothLabels()))
  .map(new Tuple2ToWithCount<>());
}

相关文章