org.apache.flink.api.java.operators.MapOperator.filter()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.1k)|赞(0)|评价(0)|浏览(117)

本文整理了Java中org.apache.flink.api.java.operators.MapOperator.filter()方法的一些代码示例,展示了MapOperator.filter()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.filter()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:filter

MapOperator.filter介绍

暂无

代码示例

代码示例来源:origin: apache/flink

.filter(new FilterFunction<Integer>() {
  @Override
  public boolean filter(Integer value) {

代码示例来源:origin: apache/flink

.filter(new FilterFunction<Integer>() {
  @Override
  public boolean filter(Integer value) {

代码示例来源:origin: apache/flink

/**
 * Tests cross program with replicated data source behind map and filter.
 */
@Test
public void checkCrossWithReplicatedSourceInputBehindMap() {
  ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
  env.setParallelism(DEFAULT_PARALLELISM);
  TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
  ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
      new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new TupleCsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
  DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
  DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
  DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
      .map(new IdMap())
      .filter(new NoFilter())
      .cross(source2)
      .writeAsText("/some/newpath");
  Plan plan = env.createProgramPlan();
  // submit the plan to the compiler
  OptimizedPlan oPlan = compileNoStats(plan);
  // check the optimized Plan
  // when cross should have forward strategy on both sides
  SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
  DualInputPlanNode crossNode = (DualInputPlanNode) sinkNode.getPredecessor();
  ShipStrategyType crossIn1 = crossNode.getInput1().getShipStrategy();
  ShipStrategyType crossIn2 = crossNode.getInput2().getShipStrategy();
  Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, crossIn1);
  Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, crossIn2);
}

代码示例来源:origin: org.gradoop/gradoop-flink

@Override
public DataSet<GraphHead> getGraphHeads() {
 return transactions
  .map(new TransactionGraphHead<>())
  .filter(new ByDifferentId<>(GradoopConstants.DB_GRAPH_ID));
}

代码示例来源:origin: dbs-leipzig/gradoop

@Override
public DataSet<GraphHead> getGraphHeads() {
 return transactions
  .map(new TransactionGraphHead<>())
  .filter(new ByDifferentId<>(GradoopConstants.DB_GRAPH_ID));
}

代码示例来源:origin: seznam/euphoria

static DataSet<Tuple2<Long, String>> getHdfsSource(ExecutionEnvironment env, URI inputPath)
throws IOException {
 SearchEventsParser parser = new SearchEventsParser();
 return env.readFile(new TextInputFormat(new Path(inputPath)), inputPath.toString())
     .map(parser::parse)
     .filter(q -> q != null && q.query != null && !q.query.isEmpty())
     .map(q -> Tuple2.of(q.timestamp, q.query))
     .returns(new TypeHint<Tuple2<Long, String>>() {});
}

代码示例来源:origin: org.gradoop/gradoop-flink

.map(new CSVLineToGraphHead(graphHeadFactory))
 .withBroadcastSet(metaDataBroadcast, BC_METADATA)
 .filter(graphHead -> graphHead.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
 .map(new CSVLineToVertex(vertexFactory))
 .withBroadcastSet(metaDataBroadcast, BC_METADATA)
 .filter(vertex -> vertex.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
 .map(new CSVLineToEdge(edgeFactory))
 .withBroadcastSet(metaDataBroadcast, BC_METADATA)
 .filter(edge -> edge.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));

代码示例来源:origin: dbs-leipzig/gradoop

.map(new CSVLineToGraphHead(graphHeadFactory))
 .withBroadcastSet(metaDataBroadcast, BC_METADATA)
 .filter(graphHead -> graphHead.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
 .map(new CSVLineToVertex(vertexFactory))
 .withBroadcastSet(metaDataBroadcast, BC_METADATA)
 .filter(vertex -> vertex.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
 .map(new CSVLineToEdge(edgeFactory))
 .withBroadcastSet(metaDataBroadcast, BC_METADATA)
 .filter(edge -> edge.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));

代码示例来源:origin: dbs-leipzig/gradoop

graphs.map(new HasEmbeddings(
 new DepthSearchMatching(), queryString))
 .filter(new SecondFieldTrue<>());

代码示例来源:origin: org.gradoop/gradoop-flink

.map(new GrowFrequentPatterns(gSpan, fsmConfig))
.withBroadcastSet(frequentPatterns, DIMSpanConstants.FREQUENT_PATTERNS)
.filter(new NotObsolete());

代码示例来源:origin: dbs-leipzig/gradoop

.map(new GrowFrequentPatterns(gSpan, fsmConfig))
.withBroadcastSet(frequentPatterns, DIMSpanConstants.FREQUENT_PATTERNS)
.filter(new NotObsolete());

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  // parse parameters
  ParameterTool params = ParameterTool.fromArgs(args);
  String input = params.getRequired("input");
  // obtain an execution environment
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  // read messageId, sender, and reply-to fields from the input data set
  DataSet<Tuple3<String, String, String>> mails =
      env.readCsvFile(input)
          .lineDelimiter(MBoxParser.MAIL_RECORD_DELIM)
          .fieldDelimiter(MBoxParser.MAIL_FIELD_DELIM)
          // messageId at position 0, sender at 2, reply-to at 5
          .includeFields("101001")
          .types(String.class, String.class, String.class);
  // extract email addresses and filter out mails from bots
  DataSet<Tuple3<String, String, String>> addressMails = mails
      .map(new EmailExtractor())
      .filter(new ExcludeEmailFilter("git@git.apache.org"))
      .filter(new ExcludeEmailFilter("jira@apache.org"));
  // construct reply connections by joining on messageId and reply-To
  DataSet<Tuple2<String, String>> replyConnections = addressMails
      .join(addressMails).where(2).equalTo(0).projectFirst(1).projectSecond(1);
  // count reply connections for each pair of email addresses
  replyConnections
      .groupBy(0, 1).reduceGroup(new ConnectionCounter())
      .print();
}

相关文章