org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.keyBy()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(11.6k)|赞(0)|评价(0)|浏览(258)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.keyBy()方法的一些代码示例,展示了SingleOutputStreamOperator.keyBy()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.keyBy()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:keyBy

SingleOutputStreamOperator.keyBy介绍

暂无

代码示例

代码示例来源:origin: apache/flink

private static KeyedStream<Event, Integer> applyTestStatefulOperator(
  String name,
  JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
  KeyedStream<Event, Integer> source,
  List<TypeSerializer<ComplexPayload>> stateSer,
  List<Class<ComplexPayload>> stateClass) {
  return source
    .map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
    .name(name)
    .uid(name)
    .returns(Event.class)
    .keyBy(Event::getKey);
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
    .name("EventSource")
    .uid("EventSource")
    .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
    .keyBy(Event::getKey);
  List<TypeSerializer<ComplexPayload>> stateSer =
    Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
  KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
    applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
    applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
  afterStatefulOperations
    .flatMap(createSemanticsCheckMapper(pt))
    .name("SemanticsCheckMapper")
    .addSink(new PrintSinkFunction<>());
  env.execute("General purpose test job");
}

代码示例来源:origin: apache/flink

.keyBy(0).sum(1);

代码示例来源:origin: apache/flink

.keyBy(0)
.countWindow(windowSize, slideSize)

代码示例来源:origin: apache/flink

return element;
}).keyBy((KeySelector<Long, Long>) value -> value);

代码示例来源:origin: apache/flink

@Test
public void testUserProvidedHashing() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
  env.addSource(new NoOpSourceFunction(), "src").setUidHash(userHashes.get(0))
      .map(new NoOpMapFunction())
      .filter(new NoOpFilterFunction())
      .keyBy(new NoOpKeySelector())
      .reduce(new NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1));
  StreamGraph streamGraph = env.getStreamGraph();
  int idx = 1;
  for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) {
    List<JobVertexID> idAlternatives = jobVertex.getIdAlternatives();
    Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), userHashes.get(idx));
    --idx;
  }
}

代码示例来源:origin: apache/flink

out.collect("x " + value);
}).keyBy(String::length);
    .keyBy(Long::intValue);

代码示例来源:origin: apache/flink

@Test
public void testProgram() throws Exception {
  String resultPath = getTempDirPath("result");
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<String> text = env.fromElements(WordCountData.TEXT);
  DataStream<Tuple2<String, Integer>> counts = text
      .flatMap(new Tokenizer())
      .keyBy(0).sum(1);
  counts.writeAsCsv(resultPath);
  env.execute("WriteAsCsvTest");
  //Strip the parentheses from the expected text like output
  compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
      .replaceAll("[\\\\(\\\\)]", ""), resultPath);
}

代码示例来源:origin: apache/flink

.keyBy(new IdentityKeySelector<Integer>())
.map(new OnceFailingPartitionedSum(failurePos))
.keyBy(0)
.addSink(new CounterSink());

代码示例来源:origin: apache/flink

@Test
public void testProgram() throws Exception {
  String resultPath = getTempDirPath("result");
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<String> text = env.fromElements(WordCountData.TEXT);
  DataStream<Tuple2<String, Integer>> counts = text
      .flatMap(new Tokenizer())
      .keyBy(0).sum(1);
  counts.writeAsText(resultPath);
  env.execute("WriteAsTextTest");
  compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
    final ParameterTool pt = ParameterTool.fromArgs(args);
    final String checkpointDir = pt.getRequired("checkpoint.dir");

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new FsStateBackend(checkpointDir));
    env.setRestartStrategy(RestartStrategies.noRestart());
    env.enableCheckpointing(1000L);
    env.getConfig().disableGenericTypes();

    env.addSource(new MySource()).uid("my-source")
        .keyBy(anInt -> 0)
        .map(new MyStatefulFunction()).uid("my-map")
        .addSink(new DiscardingSink<>()).uid("my-sink");
    env.execute();
  }
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);
  TtlTestConfig config = TtlTestConfig.fromArgs(pt);
  StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(config.ttl)
    .cleanupIncrementally(5, true)
    .cleanupFullSnapshot()
    .build();
  env
    .addSource(new TtlStateUpdateSource(config.keySpace, config.sleepAfterElements, config.sleepTime))
    .name("TtlStateUpdateSource")
    .keyBy(TtlStateUpdate::getKey)
    .flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, config.reportStatAfterUpdatesNum))
    .name("TtlVerifyUpdateFunction")
    .addSink(new PrintSinkFunction<>())
    .name("PrintFailedVerifications");
  env.execute("State TTL test job");
}

代码示例来源:origin: apache/flink

/**
 * Runs the following program.
 * <pre>
 *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
 * </pre>
 */
@Override
public void testProgram(StreamExecutionEnvironment env) {
  assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
  final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
  final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
  final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
  env.enableCheckpointing(200);
  DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
  stream
      // first vertex, chained to the source
      // this filter throttles the flow until at least one checkpoint
      // is complete, to make sure this program does not run without
      .filter(new StringRichFilterFunction())
          // -------------- seconds vertex - one-to-one connected ----------------
      .map(new StringPrefixCountRichMapFunction())
      .startNewChain()
      .map(new StatefulCounterFunction())
          // -------------- third vertex - reducer and the sink ----------------
      .keyBy("prefix")
      .flatMap(new OnceFailingAggregator(failurePos))
      .addSink(new ValidatingSink());
}

代码示例来源:origin: apache/flink

private static void runPartitioningProgram(int parallelism) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(parallelism);
  env.getConfig().enableObjectReuse();
  env.setBufferTimeout(5L);
  env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
  env
    .addSource(new TimeStampingSource())
    .map(new IdMapper<Tuple2<Long, Long>>())
    .keyBy(0)
    .addSink(new TimestampingSink());
  env.execute("Partitioning Program");
}

代码示例来源:origin: apache/flink

@Test
public void testNestedPojoFieldAccessor() throws Exception {
  StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  see.getConfig().disableObjectReuse();
  see.setParallelism(4);
  DataStream<Data> dataStream = see.fromCollection(elements);
  DataStream<Data> summedStream = dataStream
    .keyBy("aaa")
    .sum("stats.count")
    .keyBy("aaa")
    .flatMap(new FlatMapFunction<Data, Data>() {
      Data[] first = new Data[3];
      @Override
      public void flatMap(Data value, Collector<Data> out) throws Exception {
        if (first[value.aaa] == null) {
          first[value.aaa] = value;
          if (value.stats.count != 123) {
            throw new RuntimeException("Expected stats.count to be 123");
          }
        } else {
          if (value.stats.count != 2 * 123) {
            throw new RuntimeException("Expected stats.count to be 2 * 123");
          }
        }
      }
    });
  summedStream.print();
  see.execute();
}

代码示例来源:origin: apache/flink

@Test
public void testProcessdWindowFunctionSideOutput() throws Exception {
  TestListResultSink<Integer> resultSink = new TestListResultSink<>();
  TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
  StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  see.setParallelism(3);
  see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  DataStream<Integer> dataStream = see.fromCollection(elements);
  OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
  SingleOutputStreamOperator<Integer> windowOperator = dataStream
      .assignTimestampsAndWatermarks(new TestWatermarkAssigner())
      .keyBy(new TestKeySelector())
      .timeWindow(Time.milliseconds(1), Time.milliseconds(1))
      .process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
          out.collect(integer);
          context.output(sideOutputTag, "sideout-" + String.valueOf(integer));
        }
      });
  windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
  windowOperator.addSink(resultSink);
  see.execute();
  assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
  assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
}

代码示例来源:origin: apache/flink

return element;
}).keyBy((KeySelector<Long, Long>) value -> value);

代码示例来源:origin: apache/flink

@Test
public void testUserProvidedHashingOnChainSupported() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
      .map(new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
      .filter(new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc")
      .keyBy(new NoOpKeySelector())
      .reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

/**
 * Runs the following program.
 * <pre>
 *     [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
 * </pre>
 */
@Override
public void testProgram(StreamExecutionEnvironment env) {
  DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
  stream
      // -------------- first vertex, chained to the source ----------------
      .filter(new StringRichFilterFunction())
      .shuffle()
      // -------------- seconds vertex - the stateful one that also fails ----------------
      .map(new StringPrefixCountRichMapFunction())
      .startNewChain()
      .map(new StatefulCounterFunction())
      // -------------- third vertex - counter and the sink ----------------
      .keyBy("prefix")
      .map(new OnceFailingPrefixCounter(NUM_STRINGS))
      .addSink(new SinkFunction<PrefixCount>() {
        @Override
        public void invoke(PrefixCount value) throws Exception {
          // Do nothing here
        }
      });
}

代码示例来源:origin: apache/flink

.keyBy(new TestKeySelector())
.timeWindow(Time.milliseconds(1), Time.milliseconds(1))
.allowedLateness(Time.milliseconds(2))

相关文章

微信公众号

最新文章

更多