本文整理了Java中org.apache.flink.api.java.tuple.Tuple2
类的一些代码示例,展示了Tuple2
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tuple2
类的具体详情如下:
包路径:org.apache.flink.api.java.tuple.Tuple2
类名称:Tuple2
[英]A tuple with 2 fields. Tuples are strongly typed; each field may be of a separate type. The fields of the tuple can be accessed directly as public fields (f0, f1, ...) or via their position through the #getField(int) method. The tuple field positions start at zero.
Tuples are mutable types, meaning that their fields can be re-assigned. This allows functions that work with Tuples to reuse objects in order to reduce pressure on the garbage collector.
Warning: If you subclass Tuple2, then be sure to either
代码示例来源:origin: apache/flink
/**
* Shallow tuple copy.
* @return A new Tuple with the same fields as this.
*/
@Override
@SuppressWarnings("unchecked")
public Tuple2<T0, T1> copy() {
return new Tuple2<>(this.f0,
this.f1);
}
代码示例来源:origin: apache/flink
private void preserveOrDiscardIfSeenSlideFactorTimes(
List<Tuple2<Event, Integer>> newEvenstSeenSoFar,
Tuple2<Event, Integer> windowValue) {
int timesSeen = windowValue.f1 + 1;
if (timesSeen != slideFactor) {
newEvenstSeenSoFar.add(Tuple2.of(windowValue.f0, timesSeen));
}
}
代码示例来源:origin: apache/flink
@Override
public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
Tuple2<Integer, Integer> t = new Tuple2<Integer, Integer>();
t.setField(value.f1, 0);
t.setField(value.getField(0), 1);
return t;
}
}
代码示例来源:origin: apache/flink
protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) {
for (int x = 0; x < should.getArity(); x++) {
assertEquals(message, (Object)should.getField(x), is.getField(x));
}
}
}
代码示例来源:origin: apache/flink
@Test(expected = NullPointerException.class)
public void testFailsWithoutUpperBound() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
streamOne
.keyBy(new Tuple2KeyExtractor())
.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
.between(Time.milliseconds(0), null);
}
代码示例来源:origin: apache/flink
/**
* .fold() does not support RichFoldFunction, since the fold function is used internally
* in a {@code FoldingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testFoldWithRichFolderFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private static final long serialVersionUID = -6448847205314995812L;
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return null;
}
});
fail("exception was not thrown");
}
代码示例来源:origin: apache/flink
private <K> void testKeyRejection(KeySelector<Tuple2<Integer[], String>, K> keySelector, TypeInformation<K> expectedKeyType) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Integer[], String>> input = env.fromElements(
new Tuple2<>(new Integer[] {1, 2}, "barfoo")
);
Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
// adjust the rule
expectedException.expect(InvalidProgramException.class);
expectedException.expectMessage(new StringStartsWith("Type " + expectedKeyType + " cannot be used as key."));
input.keyBy(keySelector);
}
代码示例来源:origin: apache/flink
@Test
public void testRowOf() {
Row row1 = Row.of(1, "hello", null, Tuple2.of(2L, "hi"), true);
Row row2 = new Row(5);
row2.setField(0, 1);
row2.setField(1, "hello");
row2.setField(2, null);
row2.setField(3, new Tuple2<>(2L, "hi"));
row2.setField(4, true);
assertEquals(row1, row2);
}
代码示例来源:origin: dataArtisans/yahoo-streaming-benchmark
@Override
public void flatMap(Tuple2<String, String> input, Collector<Tuple2<String, String>> out) throws Exception {
String ad_id = input.getField(0);
String campaign_id = this.redisAdCampaignCache.execute(ad_id);
if (campaign_id == null) {
return;
}
Tuple2<String, String> tuple = new Tuple2<>(campaign_id, (String) input.getField(1)); // event_time
out.collect(tuple);
}
}
代码示例来源:origin: vasia/gelly-streaming
@Test
public void testCounts() throws Exception {
FlatMapFunction f = new ExactTriangleCount.SumAndEmitCounters();
Tuple2<Integer, Integer> expected = new Tuple2<>();
f.flatMap(new Tuple2<>(-1, 1), out3);
expected.setField(-1, 0);
expected.setField(1, 1);
Assert.assertEquals(expected, resultTuple3);
f.flatMap(new Tuple2<>(-1, 5), out3);
expected.setField(-1, 0);
expected.setField(6, 1);
Assert.assertEquals(expected, resultTuple3);
f.flatMap(new Tuple2<>(2, 2), out3);
expected.setField(2, 0);
expected.setField(2, 1);
Assert.assertEquals(expected, resultTuple3);
f.flatMap(new Tuple2<>(-1, 4), out3);
expected.setField(-1, 0);
expected.setField(10, 1);
Assert.assertEquals(expected, resultTuple3);
f.flatMap(new Tuple2<>(2, 4), out3);
expected.setField(2, 0);
expected.setField(6, 1);
Assert.assertEquals(expected, resultTuple3);
代码示例来源:origin: apache/flink
@SuppressWarnings("unchecked")
private <V> V unwrap(Tuple2<K, V> t) {
return t == null ? null : (V) (t.getField(1));
}
代码示例来源:origin: apache/flink
@Override
public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
Integer i = value.f0;
value.setField(value.f1, 0);
value.setField(i, 1);
return value;
}
}
代码示例来源:origin: apache/flink
@Test(expected = NullPointerException.class)
public void testFailsWithoutLowerBound() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
streamOne
.keyBy(new Tuple2KeyExtractor())
.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
.between(null, Time.milliseconds(1));
}
代码示例来源:origin: apache/flink
/**
* .aggregate() does not support RichAggregateFunction, since the AggregateFunction is used internally
* in an {@code AggregatingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testAggregateWithRichFunctionFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.aggregate(new DummyRichAggregationFunction<Tuple2<String, Integer>>());
fail("exception was not thrown");
}
代码示例来源:origin: apache/flink
@Test
public void testTupleNestedArrayKeyRejection() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Integer[], String>> input = env.fromElements(
new Tuple2<>(new Integer[] {1, 2}, "test-test"));
TypeInformation<?> expectedTypeInfo = new TupleTypeInfo<Tuple2<Integer[], String>>(
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
// adjust the rule
expectedException.expect(InvalidProgramException.class);
expectedException.expectMessage(new StringStartsWith("Type " + expectedTypeInfo + " cannot be used as key."));
input.keyBy(new KeySelector<Tuple2<Integer[], String>, Tuple2<Integer[], String>>() {
@Override
public Tuple2<Integer[], String> getKey(Tuple2<Integer[], String> value) throws Exception {
return value;
}
});
}
代码示例来源:origin: dataArtisans/yahoo-streaming-benchmark
@Override
public void flatMap(Tuple2<String, String> input, Collector<Tuple2<String, Long>> out) throws Exception {
String ad_id = input.getField(0);
String campaign_id = this.redisAdCampaignCache.execute(ad_id);
if (campaign_id == null) {
return;
}
Tuple2<String, Long> tuple = new Tuple2<>(campaign_id, Long.parseLong(input.f1));
out.collect(tuple);
}
}
代码示例来源:origin: apache/flink
@SuppressWarnings("unchecked")
@Override
public void join(Tuple2<K, I1> value1, I2 value2, Collector<OUT> collector) throws Exception {
wrappedFunction.join(value1 == null ? null : (I1) value1.getField(1), value2, collector);
}
}
代码示例来源:origin: apache/flink
protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) {
for (int x = 0; x < should.getArity(); x++) {
assertEquals(message, (Object)should.getField(x), is.getField(x));
}
}
}
代码示例来源:origin: apache/flink
@Override
public Tuple2<Long, Long> join(Tuple2<Long, Long> edge,
Tuple2<Long, Long> vertexWithCompId) throws Exception {
vertexWithCompId.setField(edge.f1, 0);
return vertexWithCompId;
}
}
代码示例来源:origin: apache/flink
@Test(expected = UnsupportedTimeCharacteristicException.class)
public void testExecutionFailsInProcessingTime() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
streamOne.keyBy(new Tuple2KeyExtractor())
.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
.between(Time.milliseconds(0), Time.milliseconds(0))
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
@Override
public void processElement(Tuple2<String, Integer> left,
Tuple2<String, Integer> right, Context ctx,
Collector<String> out) throws Exception {
out.collect(left + ":" + right);
}
});
}
内容来源于网络,如有侵权,请联系作者删除!