本文整理了Java中org.apache.flink.api.java.operators.MapOperator.filter()
方法的一些代码示例,展示了MapOperator.filter()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.filter()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:filter
暂无
代码示例来源:origin: apache/flink
.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) {
代码示例来源:origin: apache/flink
.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) {
代码示例来源:origin: apache/flink
/**
* Tests cross program with replicated data source behind map and filter.
*/
@Test
public void checkCrossWithReplicatedSourceInputBehindMap() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new TupleCsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
.map(new IdMap())
.filter(new NoFilter())
.cross(source2)
.writeAsText("/some/newpath");
Plan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
// check the optimized Plan
// when cross should have forward strategy on both sides
SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
DualInputPlanNode crossNode = (DualInputPlanNode) sinkNode.getPredecessor();
ShipStrategyType crossIn1 = crossNode.getInput1().getShipStrategy();
ShipStrategyType crossIn2 = crossNode.getInput2().getShipStrategy();
Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, crossIn1);
Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, crossIn2);
}
代码示例来源:origin: org.gradoop/gradoop-flink
@Override
public DataSet<GraphHead> getGraphHeads() {
return transactions
.map(new TransactionGraphHead<>())
.filter(new ByDifferentId<>(GradoopConstants.DB_GRAPH_ID));
}
代码示例来源:origin: dbs-leipzig/gradoop
@Override
public DataSet<GraphHead> getGraphHeads() {
return transactions
.map(new TransactionGraphHead<>())
.filter(new ByDifferentId<>(GradoopConstants.DB_GRAPH_ID));
}
代码示例来源:origin: seznam/euphoria
static DataSet<Tuple2<Long, String>> getHdfsSource(ExecutionEnvironment env, URI inputPath)
throws IOException {
SearchEventsParser parser = new SearchEventsParser();
return env.readFile(new TextInputFormat(new Path(inputPath)), inputPath.toString())
.map(parser::parse)
.filter(q -> q != null && q.query != null && !q.query.isEmpty())
.map(q -> Tuple2.of(q.timestamp, q.query))
.returns(new TypeHint<Tuple2<Long, String>>() {});
}
代码示例来源:origin: org.gradoop/gradoop-flink
.map(new CSVLineToGraphHead(graphHeadFactory))
.withBroadcastSet(metaDataBroadcast, BC_METADATA)
.filter(graphHead -> graphHead.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
.map(new CSVLineToVertex(vertexFactory))
.withBroadcastSet(metaDataBroadcast, BC_METADATA)
.filter(vertex -> vertex.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
.map(new CSVLineToEdge(edgeFactory))
.withBroadcastSet(metaDataBroadcast, BC_METADATA)
.filter(edge -> edge.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
代码示例来源:origin: dbs-leipzig/gradoop
.map(new CSVLineToGraphHead(graphHeadFactory))
.withBroadcastSet(metaDataBroadcast, BC_METADATA)
.filter(graphHead -> graphHead.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
.map(new CSVLineToVertex(vertexFactory))
.withBroadcastSet(metaDataBroadcast, BC_METADATA)
.filter(vertex -> vertex.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
.map(new CSVLineToEdge(edgeFactory))
.withBroadcastSet(metaDataBroadcast, BC_METADATA)
.filter(edge -> edge.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
代码示例来源:origin: dbs-leipzig/gradoop
graphs.map(new HasEmbeddings(
new DepthSearchMatching(), queryString))
.filter(new SecondFieldTrue<>());
代码示例来源:origin: org.gradoop/gradoop-flink
.map(new GrowFrequentPatterns(gSpan, fsmConfig))
.withBroadcastSet(frequentPatterns, DIMSpanConstants.FREQUENT_PATTERNS)
.filter(new NotObsolete());
代码示例来源:origin: dbs-leipzig/gradoop
.map(new GrowFrequentPatterns(gSpan, fsmConfig))
.withBroadcastSet(frequentPatterns, DIMSpanConstants.FREQUENT_PATTERNS)
.filter(new NotObsolete());
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
// parse parameters
ParameterTool params = ParameterTool.fromArgs(args);
String input = params.getRequired("input");
// obtain an execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read messageId, sender, and reply-to fields from the input data set
DataSet<Tuple3<String, String, String>> mails =
env.readCsvFile(input)
.lineDelimiter(MBoxParser.MAIL_RECORD_DELIM)
.fieldDelimiter(MBoxParser.MAIL_FIELD_DELIM)
// messageId at position 0, sender at 2, reply-to at 5
.includeFields("101001")
.types(String.class, String.class, String.class);
// extract email addresses and filter out mails from bots
DataSet<Tuple3<String, String, String>> addressMails = mails
.map(new EmailExtractor())
.filter(new ExcludeEmailFilter("git@git.apache.org"))
.filter(new ExcludeEmailFilter("jira@apache.org"));
// construct reply connections by joining on messageId and reply-To
DataSet<Tuple2<String, String>> replyConnections = addressMails
.join(addressMails).where(2).equalTo(0).projectFirst(1).projectSecond(1);
// count reply connections for each pair of email addresses
replyConnections
.groupBy(0, 1).reduceGroup(new ConnectionCounter())
.print();
}
内容来源于网络,如有侵权,请联系作者删除!