org.apache.beam.sdk.transforms.Filter.by()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(13.2k)|赞(0)|评价(0)|浏览(129)

本文整理了Java中org.apache.beam.sdk.transforms.Filter.by()方法的一些代码示例,展示了Filter.by()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Filter.by()方法的具体详情如下:
包路径:org.apache.beam.sdk.transforms.Filter
类名称:Filter
方法名:by

Filter.by介绍

[英]Returns a PTransform that takes an input PCollection and returns a PCollection with elements that satisfy the given predicate. The predicate must be a SerializableFunction.

Example of use:

PCollection wordList = ...;

See also #lessThan, #lessThanEq, #greaterThan, #greaterThanEq, which return elements satisfying various inequalities with the specified value based on the elements' natural ordering.
[中]返回一个PTransform,该PTransform接受一个输入PCollection,并返回一个包含满足给定谓词的元素的PCollection。谓词必须是SerializableFunction。
使用示例:

PCollection wordList = ...;

另请参见#lessThan、#lessThanEq、#greaterThan、#greaterThanEq,它们返回满足各种不等式的元素,并根据元素的自然顺序指定值。

代码示例

代码示例来源:origin: jbonofre/beam-samples

public static PCollection<String> filterByCountry(PCollection<String> data, final String country) {
  return data.apply("FilterByCountry", Filter.by(new SerializableFunction<String, Boolean>() {
    public Boolean apply(String row) {
      return getCountry(row).equals(country);
    }
  }));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
 * PCollection<T>} with elements that equals to a given value. Elements must be {@code
 * Comparable}.
 *
 * <p>Example of use:
 *
 * <pre>{@code
 * PCollection<Integer> listOfNumbers = ...;
 * PCollection<Integer> equalNumbers = listOfNumbers.apply(Filter.equal(1000));
 * }</pre>
 *
 * <p>See also {@link #greaterThan}, {@link #lessThan}, {@link #lessThanEq} and {@link
 * #greaterThanEq}, which return elements satisfying various inequalities with the specified value
 * based on the elements' natural ordering.
 *
 * <p>See also {@link #by}, which returns elements that satisfy the given predicate.
 */
public static <T extends Comparable<T>> Filter<T> equal(final T value) {
 return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) == 0)
   .described(String.format("x == %s", value));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
 * PCollection<T>} with elements that are less than or equal to a given value, based on the
 * elements' natural ordering. Elements must be {@code Comparable}.
 *
 * <p>Example of use:
 *
 * <pre>{@code
 * PCollection<Integer> listOfNumbers = ...;
 * PCollection<Integer> smallOrEqualNumbers =
 *     listOfNumbers.apply(Filter.lessThanEq(10));
 * }</pre>
 *
 * <p>See also {@link #lessThan}, {@link #greaterThanEq}, {@link #equal} and {@link #greaterThan},
 * which return elements satisfying various inequalities with the specified value based on the
 * elements' natural ordering.
 *
 * <p>See also {@link #by}, which returns elements that satisfy the given predicate.
 */
public static <T extends Comparable<T>> Filter<T> lessThanEq(final T value) {
 return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) <= 0)
   .described(String.format("x ≤ %s", value));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * Returns a {@code PTransform} that takes an input {@link PCollection} and returns a {@link
 * PCollection} with elements that are less than a given value, based on the elements' natural
 * ordering. Elements must be {@code Comparable}.
 *
 * <p>Example of use:
 *
 * <pre>{@code
 * PCollection<Integer> listOfNumbers = ...;
 * PCollection<Integer> smallNumbers =
 *     listOfNumbers.apply(Filter.lessThan(10));
 * }</pre>
 *
 * <p>See also {@link #lessThanEq}, {@link #greaterThanEq}, {@link #equal} and {@link
 * #greaterThan}, which return elements satisfying various inequalities with the specified value
 * based on the elements' natural ordering.
 *
 * <p>See also {@link #by}, which returns elements that satisfy the given predicate.
 */
public static <T extends Comparable<T>> Filter<T> lessThan(final T value) {
 return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) < 0)
   .described(String.format("x < %s", value));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
 * PCollection<T>} with elements that are greater than a given value, based on the elements'
 * natural ordering. Elements must be {@code Comparable}.
 *
 * <p>Example of use:
 *
 * <pre>{@code
 * PCollection<Integer> listOfNumbers = ...;
 * PCollection<Integer> largeNumbers =
 *     listOfNumbers.apply(Filter.greaterThan(1000));
 * }</pre>
 *
 * <p>See also {@link #greaterThanEq}, {@link #lessThan}, {@link #equal} and {@link #lessThanEq},
 * which return elements satisfying various inequalities with the specified value based on the
 * elements' natural ordering.
 *
 * <p>See also {@link #by}, which returns elements that satisfy the given predicate.
 */
public static <T extends Comparable<T>> Filter<T> greaterThan(final T value) {
 return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) > 0)
   .described(String.format("x > %s", value));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
 * PCollection<T>} with elements that are greater than or equal to a given value, based on the
 * elements' natural ordering. Elements must be {@code Comparable}.
 *
 * <p>Example of use:
 *
 * <pre>{@code
 * PCollection<Integer> listOfNumbers = ...;
 * PCollection<Integer> largeOrEqualNumbers =
 *     listOfNumbers.apply(Filter.greaterThanEq(1000));
 * }</pre>
 *
 * <p>See also {@link #greaterThan}, {@link #lessThan}, {@link #equal} and {@link #lessThanEq},
 * which return elements satisfying various inequalities with the specified value based on the
 * elements' natural ordering.
 *
 * <p>See also {@link #by}, which returns elements that satisfy the given predicate.
 */
public static <T extends Comparable<T>> Filter<T> greaterThanEq(final T value) {
 return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) >= 0)
   .described(String.format("x ≥ %s", value));
}

代码示例来源:origin: com.google.cloud.genomics/google-genomics-dataflow

/**
 * Compute a PCollection of reference allele frequencies for SNPs of interest.
 * The SNPs all have only a single alternate allele, and neither the
 * reference nor the alternate allele have a population frequency < minFreq.
 * The results are returned in a PCollection indexed by Position.
 *
 * @param variants a set of variant calls for a reference population
 * @param minFreq the minimum allele frequency for the set
 * @return a PCollection mapping Position to AlleleCounts
 */
static PCollection<KV<Position, AlleleFreq>> getFreq(
  PCollection<Variant> variants, double minFreq) {
 return variants.apply("PassingFilter", Filter.by(VariantFunctions.IS_PASSING))
   .apply("OnChromosomeFilter", Filter.by(VariantFunctions.IS_ON_CHROMOSOME))
   .apply("NotLowQualityFilter", Filter.by(VariantFunctions.IS_NOT_LOW_QUALITY))
   .apply("SNPFilter", Filter.by(VariantFunctions.IS_SINGLE_ALTERNATE_SNP))
   .apply(ParDo.of(new GetAlleleFreq()))
   .apply(Filter.by(new FilterFreq(minFreq)));
}

代码示例来源:origin: googlegenomics/dataflow-java

/**
 * Compute a PCollection of reference allele frequencies for SNPs of interest.
 * The SNPs all have only a single alternate allele, and neither the
 * reference nor the alternate allele have a population frequency < minFreq.
 * The results are returned in a PCollection indexed by Position.
 *
 * @param variants a set of variant calls for a reference population
 * @param minFreq the minimum allele frequency for the set
 * @return a PCollection mapping Position to AlleleCounts
 */
static PCollection<KV<Position, AlleleFreq>> getFreq(
  PCollection<Variant> variants, double minFreq) {
 return variants.apply("PassingFilter", Filter.by(VariantFunctions.IS_PASSING))
   .apply("OnChromosomeFilter", Filter.by(VariantFunctions.IS_ON_CHROMOSOME))
   .apply("NotLowQualityFilter", Filter.by(VariantFunctions.IS_NOT_LOW_QUALITY))
   .apply("SNPFilter", Filter.by(VariantFunctions.IS_SINGLE_ALTERNATE_SNP))
   .apply(ParDo.of(new GetAlleleFreq()))
   .apply(Filter.by(new FilterFreq(minFreq)));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-redis

@Override
 public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
  // reparallelize mimics the same behavior as in JdbcIO
  // breaking fusion
  PCollectionView<Iterable<KV<String, String>>> empty =
    input
      .apply("Consume", Filter.by(SerializableFunctions.constant(false)))
      .apply(View.asIterable());
  PCollection<KV<String, String>> materialized =
    input.apply(
      "Identity",
      ParDo.of(
          new DoFn<KV<String, String>, KV<String, String>>() {
           @ProcessElement
           public void processElement(ProcessContext context) {
            context.output(context.element());
           }
          })
        .withSideInputs(empty));
  return materialized.apply(Reshuffle.viaRandomKey());
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(NeedsRunner.class)
public void testNoFilterByPredicateWithLambda() {
 PCollection<Integer> output = p.apply(Create.of(1, 2, 4, 5)).apply(Filter.by(i -> false));
 PAssert.that(output).empty();
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(NeedsRunner.class)
public void testIdentityFilterByPredicateWithLambda() {
 PCollection<Integer> output =
   p.apply(Create.of(591, 11789, 1257, 24578, 24799, 307)).apply(Filter.by(i -> true));
 PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(NeedsRunner.class)
public void testFilterByPredicateWithLambda() {
 PCollection<Integer> output =
   p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(i -> i % 2 == 0));
 PAssert.that(output).containsInAnyOrder(2, 4, 6);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * Confirms that in Java 8 style, where a lambda results in a rawtype, the output type token is
 * not useful. If this test ever fails there may be simplifications available to us.
 */
@Test
public void testFilterParDoOutputTypeDescriptorRawWithLambda() throws Exception {
 @SuppressWarnings({"unchecked", "rawtypes"})
 PCollection<String> output = p.apply(Create.of("hello")).apply(Filter.by(s -> true));
 thrown.expect(CannotProvideCoderException.class);
 p.getCoderRegistry().getCoder(output.getTypeDescriptor());
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(NeedsRunner.class)
public void testIdentityFilterByPredicate() {
 PCollection<Integer> output =
   p.apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
     .apply(Filter.by(new TrivialFn(true)));
 PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(NeedsRunner.class)
public void testNoFilterByPredicate() {
 PCollection<Integer> output =
   p.apply(Create.of(1, 2, 4, 5)).apply(Filter.by(new TrivialFn(false)));
 PAssert.that(output).empty();
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(NeedsRunner.class)
public void testFilterByPredicate() {
 PCollection<Integer> output =
   p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(new EvenFn()));
 PAssert.that(output).containsInAnyOrder(2, 4, 6);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(NeedsRunner.class)
public void testFilterByMethodReferenceWithLambda() {
 PCollection<Integer> output =
   p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(new EvenFilter()::isEven));
 PAssert.that(output).containsInAnyOrder(2, 4, 6);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-examples-java

/** A basic smoke test that ensures there is no crash at pipeline construction time. */
@Test
public void testMinimalWordCount() throws Exception {
 p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
 p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
   .apply(
     FlatMapElements.into(TypeDescriptors.strings())
       .via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))))
   .apply(Filter.by((String word) -> !word.isEmpty()))
   .apply(Count.perElement())
   .apply(
     MapElements.into(TypeDescriptors.strings())
       .via(
         (KV<String, Long> wordCount) ->
           wordCount.getKey() + ": " + wordCount.getValue()))
   .apply(TextIO.write().to("gs://your-output-bucket/and-output-prefix"));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
 @Category({ValidatesRunner.class, UsesSchema.class})
 public void testSchemasPassedThrough() {
  List<InferredPojo> pojoList =
    Lists.newArrayList(
      new InferredPojo("a", 1), new InferredPojo("b", 2), new InferredPojo("c", 3));

  PCollection<InferredPojo> out = pipeline.apply(Create.of(pojoList)).apply(Filter.by(e -> true));
  assertTrue(out.hasSchema());

  pipeline.run();
 }
}

代码示例来源:origin: org.apache.beam/beam-examples-java

/** Test the filtering. */
@Test
@Category(ValidatesRunner.class)
public void testUserScoresFilter() throws Exception {
 final Instant startMinTimestamp = new Instant(1447965680000L);
 PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
 PCollection<KV<String, Integer>> output =
   input
     .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
     .apply(
       "FilterStartTime",
       Filter.by(
         (GameActionInfo gInfo) -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
     // run a map to access the fields in the result.
     .apply(
       MapElements.into(
           TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
         .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));
 PAssert.that(output).containsInAnyOrder(FILTERED_EVENTS);
 p.run().waitUntilFinish();
}

相关文章

微信公众号

最新文章

更多