org.apache.flink.api.java.tuple.Tuple2类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(10.8k)|赞(0)|评价(0)|浏览(122)

本文整理了Java中org.apache.flink.api.java.tuple.Tuple2类的一些代码示例,展示了Tuple2类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tuple2类的具体详情如下:
包路径:org.apache.flink.api.java.tuple.Tuple2
类名称:Tuple2

Tuple2介绍

[英]A tuple with 2 fields. Tuples are strongly typed; each field may be of a separate type. The fields of the tuple can be accessed directly as public fields (f0, f1, ...) or via their position through the #getField(int) method. The tuple field positions start at zero.

Tuples are mutable types, meaning that their fields can be re-assigned. This allows functions that work with Tuples to reuse objects in order to reduce pressure on the garbage collector.

Warning: If you subclass Tuple2, then be sure to either

  • not add any new fields, or
  • make it a POJO, and always declare the element type of your DataStreams/DataSets to your descendant type. (That is, if you have a "class Foo extends Tuple2", then don't use instances of Foo in a DataStream<Tuple2> / DataSet<Tuple2>, but declare it as DataStream<Foo> / DataSet<Foo>.)
    [中]包含两个字段的元组。元组是强类型的;每个字段可以是单独的类型。元组的字段可以作为公共字段(f0、f1等)直接访问或者通过#getField(int)方法通过它们的位置。元组字段的位置从零开始。
    元组是可变类型,这意味着它们的字段可以重新分配。这允许使用元组的函数重用对象,以减轻垃圾收集器的压力。
    警告:如果您是Tuple2的子类,那么请确保
    *不添加任何新字段,或
    *将其设置为POJO,并始终将数据流/数据集的元素类型声明为子类型。(也就是说,如果您有一个“类Foo扩展Tuple2”,那么不要在DataStream<Tuple2>/DataSet<Tuple2>中使用Foo实例,而是将其声明为DataStream<Foo>/DataSet<Foo>)

代码示例

代码示例来源:origin: apache/flink

/**
* Shallow tuple copy.
* @return A new Tuple with the same fields as this.
*/
@Override
@SuppressWarnings("unchecked")
public Tuple2<T0, T1> copy() {
  return new Tuple2<>(this.f0,
    this.f1);
}

代码示例来源:origin: apache/flink

private void preserveOrDiscardIfSeenSlideFactorTimes(
    List<Tuple2<Event, Integer>> newEvenstSeenSoFar,
    Tuple2<Event, Integer> windowValue) {
  int timesSeen = windowValue.f1 + 1;
  if (timesSeen != slideFactor) {
    newEvenstSeenSoFar.add(Tuple2.of(windowValue.f0, timesSeen));
  }
}

代码示例来源:origin: apache/flink

@Override
  public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
    Tuple2<Integer, Integer> t = new Tuple2<Integer, Integer>();
    t.setField(value.f1, 0);
    t.setField(value.getField(0), 1);
    return t;
  }
}

代码示例来源:origin: apache/flink

protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) {
    for (int x = 0; x < should.getArity(); x++) {
      assertEquals(message, (Object)should.getField(x), is.getField(x));
    }
  }
}

代码示例来源:origin: apache/flink

@Test(expected = NullPointerException.class)
public void testFailsWithoutUpperBound() {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(1);
  DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
  DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
  streamOne
    .keyBy(new Tuple2KeyExtractor())
    .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
    .between(Time.milliseconds(0), null);
}

代码示例来源:origin: apache/flink

/**
 * .fold() does not support RichFoldFunction, since the fold function is used internally
 * in a {@code FoldingState}.
 */
@Test(expected = UnsupportedOperationException.class)
public void testFoldWithRichFolderFails() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
        private static final long serialVersionUID = -6448847205314995812L;
        @Override
        public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
            Tuple2<String, Integer> value2) throws Exception {
          return null;
        }
      });
  fail("exception was not thrown");
}

代码示例来源:origin: apache/flink

private <K> void testKeyRejection(KeySelector<Tuple2<Integer[], String>, K> keySelector, TypeInformation<K> expectedKeyType) {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<Integer[], String>> input = env.fromElements(
      new Tuple2<>(new Integer[] {1, 2}, "barfoo")
  );
  Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
  // adjust the rule
  expectedException.expect(InvalidProgramException.class);
  expectedException.expectMessage(new StringStartsWith("Type " + expectedKeyType + " cannot be used as key."));
  input.keyBy(keySelector);
}

代码示例来源:origin: apache/flink

@Test
public void testRowOf() {
  Row row1 = Row.of(1, "hello", null, Tuple2.of(2L, "hi"), true);
  Row row2 = new Row(5);
  row2.setField(0, 1);
  row2.setField(1, "hello");
  row2.setField(2, null);
  row2.setField(3, new Tuple2<>(2L, "hi"));
  row2.setField(4, true);
  assertEquals(row1, row2);
}

代码示例来源:origin: dataArtisans/yahoo-streaming-benchmark

@Override
 public void flatMap(Tuple2<String, String> input, Collector<Tuple2<String, String>> out) throws Exception {
  String ad_id = input.getField(0);
  String campaign_id = this.redisAdCampaignCache.execute(ad_id);
  if (campaign_id == null) {
   return;
  }
  Tuple2<String, String> tuple = new Tuple2<>(campaign_id, (String) input.getField(1)); // event_time
  out.collect(tuple);
 }
}

代码示例来源:origin: vasia/gelly-streaming

@Test
public void testCounts() throws Exception {
  FlatMapFunction f = new ExactTriangleCount.SumAndEmitCounters();
  Tuple2<Integer, Integer> expected = new Tuple2<>();
  f.flatMap(new Tuple2<>(-1, 1), out3);
  expected.setField(-1, 0);
  expected.setField(1, 1);
  Assert.assertEquals(expected, resultTuple3);
  f.flatMap(new Tuple2<>(-1, 5), out3);
  expected.setField(-1, 0);
  expected.setField(6, 1);
  Assert.assertEquals(expected, resultTuple3);
  f.flatMap(new Tuple2<>(2, 2), out3);
  expected.setField(2, 0);
  expected.setField(2, 1);
  Assert.assertEquals(expected, resultTuple3);
  f.flatMap(new Tuple2<>(-1, 4), out3);
  expected.setField(-1, 0);
  expected.setField(10, 1);
  Assert.assertEquals(expected, resultTuple3);
  f.flatMap(new Tuple2<>(2, 4), out3);
  expected.setField(2, 0);
  expected.setField(6, 1);
  Assert.assertEquals(expected, resultTuple3);

代码示例来源:origin: apache/flink

@SuppressWarnings("unchecked")
private <V> V unwrap(Tuple2<K, V> t) {
  return t == null ? null : (V) (t.getField(1));
}

代码示例来源:origin: apache/flink

@Override
  public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
    Integer i = value.f0;
    value.setField(value.f1, 0);
    value.setField(i, 1);
    return value;
  }
}

代码示例来源:origin: apache/flink

@Test(expected = NullPointerException.class)
public void testFailsWithoutLowerBound() {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(1);
  DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
  DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
  streamOne
    .keyBy(new Tuple2KeyExtractor())
    .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
    .between(null, Time.milliseconds(1));
}

代码示例来源:origin: apache/flink

/**
 * .aggregate() does not support RichAggregateFunction, since the AggregateFunction is used internally
 * in an {@code AggregatingState}.
 */
@Test(expected = UnsupportedOperationException.class)
public void testAggregateWithRichFunctionFails() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .aggregate(new DummyRichAggregationFunction<Tuple2<String, Integer>>());
  fail("exception was not thrown");
}

代码示例来源:origin: apache/flink

@Test
public void testTupleNestedArrayKeyRejection() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<Integer[], String>> input = env.fromElements(
      new Tuple2<>(new Integer[] {1, 2}, "test-test"));
  TypeInformation<?> expectedTypeInfo = new TupleTypeInfo<Tuple2<Integer[], String>>(
      BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
  // adjust the rule
  expectedException.expect(InvalidProgramException.class);
  expectedException.expectMessage(new StringStartsWith("Type " + expectedTypeInfo + " cannot be used as key."));
  input.keyBy(new KeySelector<Tuple2<Integer[], String>, Tuple2<Integer[], String>>() {
    @Override
    public Tuple2<Integer[], String> getKey(Tuple2<Integer[], String> value) throws Exception {
      return value;
    }
  });
}

代码示例来源:origin: dataArtisans/yahoo-streaming-benchmark

@Override
 public void flatMap(Tuple2<String, String> input, Collector<Tuple2<String, Long>> out) throws Exception {
  String ad_id = input.getField(0);
  String campaign_id = this.redisAdCampaignCache.execute(ad_id);
  if (campaign_id == null) {
   return;
  }
  Tuple2<String, Long> tuple = new Tuple2<>(campaign_id, Long.parseLong(input.f1));
  out.collect(tuple);
 }
}

代码示例来源:origin: apache/flink

@SuppressWarnings("unchecked")
  @Override
  public void join(Tuple2<K, I1> value1, I2 value2, Collector<OUT> collector) throws Exception {
    wrappedFunction.join(value1 == null ? null : (I1) value1.getField(1), value2, collector);
  }
}

代码示例来源:origin: apache/flink

protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) {
    for (int x = 0; x < should.getArity(); x++) {
      assertEquals(message, (Object)should.getField(x), is.getField(x));
    }
  }
}

代码示例来源:origin: apache/flink

@Override
  public Tuple2<Long, Long> join(Tuple2<Long, Long> edge,
      Tuple2<Long, Long> vertexWithCompId) throws Exception {
    vertexWithCompId.setField(edge.f1, 0);
    return vertexWithCompId;
  }
}

代码示例来源:origin: apache/flink

@Test(expected = UnsupportedTimeCharacteristicException.class)
public void testExecutionFailsInProcessingTime() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  env.setParallelism(1);
  DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
  DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
  streamOne.keyBy(new Tuple2KeyExtractor())
    .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
    .between(Time.milliseconds(0), Time.milliseconds(0))
    .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
      @Override
      public void processElement(Tuple2<String, Integer> left,
        Tuple2<String, Integer> right, Context ctx,
        Collector<String> out) throws Exception {
        out.collect(left + ":" + right);
      }
    });
}

相关文章

微信公众号

最新文章

更多