cz.seznam.euphoria.core.client.operator.FlatMap.of()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(12.0k)|赞(0)|评价(0)|浏览(137)

本文整理了Java中cz.seznam.euphoria.core.client.operator.FlatMap.of()方法的一些代码示例,展示了FlatMap.of()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。FlatMap.of()方法的具体详情如下:
包路径:cz.seznam.euphoria.core.client.operator.FlatMap
类名称:FlatMap
方法名:of

FlatMap.of介绍

[英]Starts building a nameless FlatMap operator to transform the given input dataset.
[中]开始构建一个无名的FlatMap操作符来转换给定的输入数据集。

代码示例

代码示例来源:origin: seznam/euphoria

public <S> Dataset<S> flatMap(UnaryFunctor<T, S> f) {
 return new Dataset<>(FlatMap.of(this.wrap).using(requireNonNull(f)).output());
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Integer> getOutput(Dataset<Integer> input) {
 return FlatMap.of(input)
   .using((Integer e, Collector<Integer> c) -> {
    for (int i = 1; i <= e; i++) {
     c.collect(i);
    }
   })
   .output();
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Integer> getOutput(Dataset<Integer> input) {
 return FlatMap.of(input)
   .using((Integer e, Collector<Integer> c) -> {
    for (int i = 1; i <= e; i++) {
     c.collect(i);
    }
   })
   .output();
}

代码示例来源:origin: seznam/euphoria

private void run() {
 Flow flow = Flow.create();
 Dataset<Pair<ImmutableBytesWritable, Result>> ds = flow.createInput(
   Utils.getHBaseSource(input, conf.get()));
 FlatMap.of(ds)
   .using((Pair<ImmutableBytesWritable, Result> p, Collector<byte[]> c) -> {
    writeCellsAsBytes(p.getSecond(), c);
   })
   .output()
   .persist(Utils.getSink(output, conf.get()));
 LOG.info("Starting flow reading from {} and persisting to {}", input, output);
 executor.submit(flow).join();
}

代码示例来源:origin: seznam/euphoria

static <W, T> Dataset<Pair<W, T>>
extractWindowsToPair(Dataset<T> input, Class<W> expectedWindowType) {
 return FlatMap.of(input)
   .using((UnaryFunctor<T, Pair<W, T>>) (elem, context) -> {
    Object actualWindow = context.getWindow();
    if (actualWindow != null && !expectedWindowType.isAssignableFrom(actualWindow.getClass())) {
     throw new IllegalStateException(
         "Encountered window of type " + actualWindow.getClass()
         + " but expected only " + expectedWindowType);
    }
    @SuppressWarnings("unchecked")
    Pair<W, T> out = Pair.of((W) actualWindow, elem);
    context.collect(out);
   })
   .output();
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Integer> getOutput(Dataset<Integer> input) {
 return FlatMap.of(input).using(
   (UnaryFunctor<Integer, Integer>) (elem, collector) -> {
    collector.getCounter("input").increment();
    collector.getCounter("sum").increment(elem);
    collector.collect(elem * elem);
   })
   .output();
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Integer> getOutput(Dataset<Integer> input) {
 return FlatMap.of(input).using(
   (UnaryFunctor<Integer, Integer>) (elem, collector) -> {
    collector.getCounter("input").increment();
    collector.getCounter("sum").increment(elem);
    collector.collect(elem * elem);
   })
   .output();
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Triple<TimeInterval, Integer, Set<String>>> getOutput
  (Dataset<Pair<String, Integer>> input) {
 input = AssignEventTime.of(input).using(Pair::getSecond).output();
 Dataset<Pair<Integer, Set<String>>> reduced =
   ReduceByKey.of(input)
     .keyBy(e -> e.getFirst().charAt(0) - '0')
     .valueBy(Pair::getFirst)
     .reduceBy(s -> s.collect(Collectors.toSet()))
     .windowBy(Session.of(Duration.ofMillis(5)))
     .output();
 return FlatMap.of(reduced)
   .using((UnaryFunctor<Pair<Integer, Set<String>>,
     Triple<TimeInterval, Integer, Set<String>>>)
     (elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(),
       elem.getFirst(), elem.getSecond())))
   .output();
}

代码示例来源:origin: seznam/euphoria

static <W, F, S> Dataset<Triple<W, F, S>>
extractWindows(Dataset<Pair<F, S>> input, Class<W> expectedWindowType) {
 return FlatMap.of(input)
   .using((UnaryFunctor<Pair<F, S>, Triple<W, F, S>>) (elem, context) -> {
    Object actualWindow = context.getWindow();
    if (actualWindow != null && !expectedWindowType.isAssignableFrom(actualWindow.getClass())) {
     throw new IllegalStateException(
         "Encountered window of type " + actualWindow.getClass()
         + " but expected only " + expectedWindowType);
    }
    @SuppressWarnings("unchecked")
    Triple<W, F, S> out = Triple.of((W) actualWindow, elem.getFirst(), elem.getSecond());
    context.collect(out);
   })
   .output();
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Triple<TimeInterval, Integer, Set<String>>> getOutput
  (Dataset<Pair<String, Integer>> input) {
 input = AssignEventTime.of(input).using(Pair::getSecond).output();
 Dataset<Pair<Integer, Set<String>>> reduced =
   ReduceByKey.of(input)
     .keyBy(e -> e.getFirst().charAt(0) - '0')
     .valueBy(Pair::getFirst)
     .reduceBy(s -> s.collect(Collectors.toSet()))
     .windowBy(Session.of(Duration.ofMillis(5)))
     .output();
 return FlatMap.of(reduced)
   .using((UnaryFunctor<Pair<Integer, Set<String>>,
     Triple<TimeInterval, Integer, Set<String>>>)
     (elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(),
       elem.getFirst(), elem.getSecond())))
   .output();
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Triple<TimeInterval, Integer, String>> getOutput(Dataset<Pair<String, Integer>> input) {
 input = AssignEventTime.of(input).using(e -> e.getSecond()).output();
 Dataset<Pair<Integer, String>> reduced =
   ReduceStateByKey.of(input)
     .keyBy(e -> e.getFirst().charAt(0) - '0')
     .valueBy(e -> e.getFirst().substring(2))
     .stateFactory((StateFactory<String, String, AccState<String>>) AccState::new)
     .mergeStatesBy(AccState::combine)
     .windowBy(TimeSliding.of(Duration.ofMillis(10), Duration.ofMillis(5)))
     .output();
 return FlatMap.of(reduced)
   .using((UnaryFunctor<Pair<Integer, String>, Triple<TimeInterval, Integer, String>>)
     (elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(), elem.getFirst(), elem.getSecond())))
   .output();
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Triple<TimeInterval, Integer, String>> getOutput(Dataset<Pair<String, Integer>> input) {
 input = AssignEventTime.of(input).using(e -> e.getSecond()).output();
 Dataset<Pair<Integer, String>> reduced =
   ReduceStateByKey.of(input)
     .keyBy(e -> e.getFirst().charAt(0) - '0')
     .valueBy(e -> e.getFirst().substring(2))
     .stateFactory((StateFactory<String, String, AccState<String>>) AccState::new)
     .mergeStatesBy(AccState::combine)
     .windowBy(TimeSliding.of(Duration.ofMillis(10), Duration.ofMillis(5)))
     .output();
 return FlatMap.of(reduced)
   .using((UnaryFunctor<Pair<Integer, String>, Triple<TimeInterval, Integer, String>>)
     (elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(), elem.getFirst(), elem.getSecond())))
   .output();
}

代码示例来源:origin: seznam/euphoria

@Test
public void testBuild_ImplicitName() {
 Flow flow = Flow.create("TEST");
 Dataset<String> dataset = Util.createMockDataset(flow, 1);
 Dataset<String> mapped = FlatMap.of(dataset)
     .using((String s, Collector<String> c) -> c.collect(s))
     .output();
 FlatMap map = (FlatMap) flow.operators().iterator().next();
 assertEquals("FlatMap", map.getName());
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Triple<TimeInterval, Integer, String>>
getOutput(Dataset<Pair<String, Integer>> input) {
 input = AssignEventTime.of(input).using(e -> e.getSecond()).output();
 Dataset<Pair<Integer, String>> reduced =
   ReduceStateByKey.of(input)
     .keyBy(e -> e.getFirst().charAt(0) - '0')
     .valueBy(Pair::getFirst)
     .stateFactory(AccState<String>::new)
     .mergeStatesBy(AccState::combine)
     .windowBy(Time.of(Duration.ofMillis(5)))
     .output();
 return FlatMap.of(reduced)
   .using((UnaryFunctor<Pair<Integer, String>, Triple<TimeInterval, Integer, String>>)
     (elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(), elem.getFirst(), elem.getSecond())))
   .output();
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Triple<TimeInterval, Integer, String>>
getOutput(Dataset<Pair<String, Integer>> input) {
 input = AssignEventTime.of(input).using(e -> e.getSecond()).output();
 Dataset<Pair<Integer, String>> reduced =
   ReduceStateByKey.of(input)
     .keyBy(e -> e.getFirst().charAt(0) - '0')
     .valueBy(Pair::getFirst)
     .stateFactory((StateFactory<String, String, AccState<String>>) AccState::new)
     .mergeStatesBy(AccState::combine)
     .windowBy(Session.of(Duration.ofMillis(5)))
     .output();
 return FlatMap.of(reduced)
   .using((UnaryFunctor<Pair<Integer, String>, Triple<TimeInterval, Integer, String>>)
     (elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(), elem.getFirst(), elem.getSecond())))
   .output();
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Triple<TimeInterval, Integer, String>>
getOutput(Dataset<Pair<String, Integer>> input) {
 input = AssignEventTime.of(input).using(e -> e.getSecond()).output();
 Dataset<Pair<Integer, String>> reduced =
   ReduceStateByKey.of(input)
     .keyBy(e -> e.getFirst().charAt(0) - '0')
     .valueBy(Pair::getFirst)
     .stateFactory((StateFactory<String, String, AccState<String>>) AccState::new)
     .mergeStatesBy(AccState::combine)
     .windowBy(Session.of(Duration.ofMillis(5)))
     .output();
 return FlatMap.of(reduced)
   .using((UnaryFunctor<Pair<Integer, String>, Triple<TimeInterval, Integer, String>>)
     (elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(), elem.getFirst(), elem.getSecond())))
   .output();
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Triple<TimeInterval, Integer, String>>
getOutput(Dataset<Pair<String, Integer>> input) {
 input = AssignEventTime.of(input).using(e -> e.getSecond()).output();
 Dataset<Pair<Integer, String>> reduced =
   ReduceStateByKey.of(input)
     .keyBy(e -> e.getFirst().charAt(0) - '0')
     .valueBy(Pair::getFirst)
     .stateFactory(AccState<String>::new)
     .mergeStatesBy(AccState::combine)
     .windowBy(Time.of(Duration.ofMillis(5)))
     .output();
 return FlatMap.of(reduced)
   .using((UnaryFunctor<Pair<Integer, String>, Triple<TimeInterval, Integer, String>>)
     (elem, context) -> context.collect(Triple.of((TimeInterval) context.getWindow(), elem.getFirst(), elem.getSecond())))
   .output();
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Triple<Integer, Integer, Integer>> getOutput(Dataset<Integer> input) {
 Dataset<Pair<Integer, Integer>> output = ReduceStateByKey.of(input)
   .keyBy(e -> e % 3)
   .valueBy(e -> e)
   .stateFactory(SortState::new)
   .mergeStatesBy(SortState::combine)
   .windowBy(new ReduceByKeyTest.TestWindowing())
   .output();
 return FlatMap.of(output)
   .using((UnaryFunctor<Pair<Integer, Integer>, Triple<Integer, Integer, Integer>>)
     (elem, c) -> c.collect(Triple.of(((IntWindow) c.getWindow()).getValue(), elem.getFirst(), elem.getSecond())))
   .output();
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Triple<Integer, Integer, Integer>> getOutput(Dataset<Integer> input) {
 Dataset<Pair<Integer, Integer>> output = ReduceStateByKey.of(input)
   .keyBy(e -> e % 3)
   .valueBy(e -> e)
   .stateFactory(SortState::new)
   .mergeStatesBy(SortState::combine)
   .windowBy(new ReduceByKeyTest.TestWindowing())
   .output();
 return FlatMap.of(output)
   .using((UnaryFunctor<Pair<Integer, Integer>, Triple<Integer, Integer, Integer>>)
     (elem, c) -> c.collect(Triple.of(((IntWindow) c.getWindow()).getValue(), elem.getFirst(), elem.getSecond())))
   .output();
}

代码示例来源:origin: seznam/euphoria

@Test
public void testDistinctOnBatchWithoutWindowingLabels() throws Exception {
 Flow flow = Flow.create("Test");
 Dataset<String> lines = flow.createInput(ListDataSource.bounded(
   asList("one two three four", "one two three", "one two", "one")));
 // expand it to words
 Dataset<String> words = FlatMap.of(lines)
   .using(toWords(w -> w))
   .output();
 Dataset<String> output = Distinct.of(words).output();
 ListDataSink<String> out = ListDataSink.get();
 output.persist(out);
 executor.submit(flow).get();
 DatasetAssert.unorderedEquals(
   out.getOutputs(),
   "four", "one", "three", "two");
}

相关文章