cascading.pipe.GroupBy类的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(12.7k)|赞(0)|评价(0)|浏览(121)

本文整理了Java中cascading.pipe.GroupBy类的一些代码示例,展示了GroupBy类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。GroupBy类的具体详情如下:
包路径:cascading.pipe.GroupBy
类名称:GroupBy

GroupBy介绍

[英]The GroupBy pipe groups the Tuple stream by the given groupFields.

If more than one Pipe instance is provided on the constructor, all branches will be merged. It is required that all Pipe instances output the same field names, otherwise the cascading.flow.FlowConnector will fail to create a cascading.flow.Flow instance. Again, the Pipe instances are merged together as if one Tuple stream and not joined. See CoGroup for joining by common fields.

Typically an Every follows GroupBy to apply an Aggregator function to every grouping. The Each operator may also follow GroupBy to apply a Function or Filter to the resulting stream. But an Each cannot come immediately before an Every.

Optionally a stream can be further sorted by providing sortFields. This allows an Aggregator to receive values in the order of the sortedFields.

Note that local sorting always happens on the groupFields, sortFields are a secondary sorting on the grouped values within the current grouping. sortFields is particularly useful if the Aggregators following the GroupBy would like to see their arguments in order.

For more control over sorting at the group or secondary sort level, use cascading.tuple.Fieldscontaining java.util.Comparator instances for the appropriate fields when setting the groupFields or sortFields values. Fields allows you to set a custom java.util.Comparator instance for each field name or position. It is required that each Comparator class also be java.io.Serializable.

It should be noted for MapReduce systems, distributed group sorting is not 'total'. That is groups are sorted as seen by each Reducer, but they are not sorted across Reducers. See the MapReduce algorithm for details.

See the cascading.tuple.Hasher interface when a custom java.util.Comparator on the grouping keys is being provided that makes two values with differing hashCode values equal. For example, new BigDecimal( 100.0D ) and new Double 100.0D ) are equal using a custom Comparator, but Object#hashCode() will be different, thus forcing each value into differing partitions.

Note that grouping one String key with a lowercase value with another String key with an uppercase value using a "case insensitive" Comparator will not have consistent results. The grouping will execute and be correct, but the actual values in the key columns may be replaced with "equivalent" values from other streams.

That is, if two streams are merged and then grouped on a key, where one stream the key values are uppercase and the other stream values are lowercase, the resulting key value for the grouping may arbitrarily be either upper or lower case.

If the original key values must be retained, consider normalizing the keys with a Function and then grouping on the resulting field.
[中]GroupBy管道根据给定的groupFields对元组流进行分组。
如果构造函数上提供了多个管道实例,则将合并所有分支。要求所有管道实例都输出相同的字段名,否则会导致级联。流FlowConnector将无法创建级联。流流实例。同样,管道实例合并在一起,就好像一个元组流没有连接一样。有关通过公共字段连接的信息,请参见CoGroup。
通常,在GroupBy之后加上一个Every,将聚合器函数应用于每个分组。每个操作符还可以遵循GroupBy将函数或过滤器应用于结果流。但是一个Each不能在一个Each之前立即出现。
可选地,可以通过提供排序字段对流进行进一步排序。这允许聚合器按SortedField的顺序接收值。
请注意,本地排序始终发生在groupFields上,sortFields是当前分组中分组值的辅助排序。如果GroupBy后面的聚合器希望按顺序查看其参数,则sortFields特别有用。
要在组或辅助排序级别对排序进行更多控制,请使用级联。元组。包含java的字段。util。设置groupFields或sortFields值时相应字段的比较器实例。字段允许您设置自定义java。util。每个字段名或位置的比较器实例。要求每个比较器类也是java。木卫一。可序列化。
应该注意的是,对于MapReduce系统,分布式组排序不是“总计”。也就是说,组按每个减速器进行排序,但它们不会跨减速器进行排序。有关详细信息,请参见MapReduce算法。
请参见级联。元组。自定义java时的Hasher接口。util。正在提供分组键上的比较器,使具有不同hashCode值的两个值相等。例如,使用自定义比较器,新的BigDecimal(100.0D)和新的Double(100.0D)是相等的,但Object#hashCode()将不同,从而将每个值强制放入不同的分区中。
请注意,使用“不区分大小写”的比较器将一个字符串键与一个小写值分组,而将另一个字符串键与一个大写值分组将不会得到一致的结果。分组将执行且正确,但键列中的实际值可能会替换为其他流中的“等效”值。
也就是说,如果两个流被合并,然后在一个键上分组,其中一个流的键值是大写的,而另一个流的值是小写的,则分组的结果键值可以是任意的大写或小写。
如果必须保留原始键值,请考虑用函数对键进行规范化,然后对结果字段进行分组。

代码示例

代码示例来源:origin: cascading/lingual-core

public Branch visitChild( Stack stack )
 {
 Branch branch = ( (CascadingRelNode) getChild() ).visitChild( stack );
 Fields fields = createFields();
 String name = stack.getNameFor( GroupBy.class, branch.current );
 Pipe current = new GroupBy( name, branch.current, fields );
 current = stack.addDebug( this, current );
 return new Branch( current, branch );
 }

代码示例来源:origin: dataArtisans/cascading-flink

public GroupByInGate(FlowProcess flowProcess, GroupBy splice, IORole ioRole) {
  super(flowProcess, splice, ioRole);
  this.isBufferJoin = splice.getJoiner() instanceof BufferJoin;
}

代码示例来源:origin: dataArtisans/cascading-flink

Fields groupKeyFields = groupBy.getKeySelectors().get(inScope.getName());
Fields sortKeyFields = groupBy.getSortingSelectors().get(inScope.getName());
  sortKeys = registerKeyFields(input, sortKeyFields);
Order sortOrder = groupBy.isSortReversed() ? Order.DESCENDING : Order.ASCENDING;

代码示例来源:origin: cwensel/cascading

@Override
public List<Pipe> resolveTails( Context context )
 {
 Pipe pipe = new GroupBy( context.getTails().get( 0 ), new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 return Arrays.asList( pipe );
 }

代码示例来源:origin: cascading/cascading-platform

@Override
public List<Pipe> resolveTails( Context context )
 {
 Pipe pipe = new GroupBy( context.getTails().get( 0 ), new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 return Arrays.asList( pipe );
 }

代码示例来源:origin: cwensel/cascading

/**
 * This is an alternative to having two pipes with the same name, but uses one pipe that is split
 * across two branches.
 *
 * @throws IOException
 */
@Test
public void testSameSourceForBranch() throws IOException
 {
 Map sources = new HashMap();
 Map sinks = new HashMap();
 sources.put( "a", new Hfs( new TextLine( new Fields( "first", "second" ) ), "input/path/a" ) );
 Pipe pipeA = new Pipe( "a" );
 Pipe group1 = new GroupBy( "a1", pipeA, Fields.FIRST );
 Pipe group2 = new GroupBy( "a2", pipeA, Fields.FIRST );
 Pipe merge = new GroupBy( "tail", Pipe.pipes( group1, group2 ), new Fields( "first", "second" ) );
 sinks.put( merge.getName(), new Hfs( new TextLine(), "output/path" ) );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
 assertEquals( "not equal: steps.size()", 3, flow.getFlowSteps().size() );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testPipeAssemblySplit()
 {
 Pipe pipe = new TestAssembly( "test" );
 Pipe pipe1 = new GroupBy( "left", pipe, new Fields( "ip" ) );
 Pipe pipe2 = new GroupBy( "right", pipe, new Fields( "ip" ) );
 Tap source = getPlatform().getTextFile( "foo" );
 Tap sink1 = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
 Tap sink2 = getPlatform().getTextFile( "foo/split2", SinkMode.REPLACE );
 Map sources = new HashMap();
 sources.put( "test", source );
 Map sinks = new HashMap();
 sinks.put( "left", sink1 );
 sinks.put( "right", sink2 );
 List<FlowStep> steps = getPlatform().getFlowConnector().connect( sources, sinks, pipe1, pipe2 ).getFlowSteps();
 if( getPlatform().isMapReduce() )
  assertEquals( "not equal: steps.size()", 2, steps.size() );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testPipeAssemblySplit()
 {
 Pipe pipe = new TestAssembly( "test" );
 Pipe pipe1 = new GroupBy( "left", pipe, new Fields( "ip" ) );
 Pipe pipe2 = new GroupBy( "right", pipe, new Fields( "ip" ) );
 Tap source = getPlatform().getTextFile( "foo" );
 Tap sink1 = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
 Tap sink2 = getPlatform().getTextFile( "foo/split2", SinkMode.REPLACE );
 Map sources = new HashMap();
 sources.put( "test", source );
 Map sinks = new HashMap();
 sinks.put( "left", sink1 );
 sinks.put( "right", sink2 );
 List<FlowStep> steps = getPlatform().getFlowConnector().connect( sources, sinks, pipe1, pipe2 ).getFlowSteps();
 if( getPlatform().isMapReduce() )
  assertEquals( "not equal: steps.size()", 2, steps.size() );
 }

代码示例来源:origin: cwensel/cascading

public Pipe createAssembly( Pipe pipe, Fields argFields, Fields declFields, String fieldValue, Fields selectFields )
 {
 pipe = new GroupBy( pipe, Fields.ALL );
 return new Every( pipe, argFields, new TestAggregator( declFields, new Tuple( fieldValue ) ), selectFields );
 }
}

代码示例来源:origin: cascading/cascading-platform

public Pipe createAssembly( Pipe pipe, Fields argFields, Fields declFields, String fieldValue, Fields selectFields )
 {
 pipe = new GroupBy( pipe, Fields.ALL );
 return new Every( pipe, argFields, new TestAggregator( declFields, new Tuple( fieldValue ) ), selectFields );
 }
}

代码示例来源:origin: cascading/cascading-platform

public FirstAssembly( Pipe previous )
 {
 Pipe pipe = new Pipe( "first", previous );
 pipe = new Each( pipe, new Identity() );
 pipe = new GroupBy( pipe, Fields.ALL );
 pipe = new Every( pipe, new First(), Fields.RESULTS );
 setTails( pipe );
 }
}

代码示例来源:origin: cwensel/cascading

public FirstAssembly( Pipe previous )
 {
 Pipe pipe = new Pipe( "first", previous );
 pipe = new Each( pipe, new Identity() );
 pipe = new GroupBy( pipe, Fields.ALL );
 pipe = new Every( pipe, new First(), Fields.RESULTS );
 setTails( pipe );
 }
}

代码示例来源:origin: cwensel/cascading

/** Tests that proper pipe graph is assembled without throwing an internal error */
@Test
public void testPipeAssembly()
 {
 Pipe pipe = new TestAssembly( "test" );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 Tap source = getPlatform().getTextFile( "foo" );
 Tap sink = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
 List<FlowStep> steps = getPlatform().getFlowConnector().connect( source, sink, pipe ).getFlowSteps();
 assertEquals( "not equal: steps.size()", 1, steps.size() );
 }

代码示例来源:origin: cascading/cascading-platform

/** Tests that proper pipe graph is assembled without throwing an internal error */
@Test
public void testPipeAssembly()
 {
 Pipe pipe = new TestAssembly( "test" );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 Tap source = getPlatform().getTextFile( "foo" );
 Tap sink = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
 List<FlowStep> steps = getPlatform().getFlowConnector().connect( source, sink, pipe ).getFlowSteps();
 assertEquals( "not equal: steps.size()", 1, steps.size() );
 }

代码示例来源:origin: cwensel/cascading

@Override
public List<Pipe> resolveTails( Context context )
 {
 Pipe pipe = new Pipe( (String) context.getFlow().getSourceNames().get( 0 ) );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 return Arrays.asList( pipe );
 }
};

代码示例来源:origin: cascading/cascading-platform

@Override
public List<Pipe> resolveTails( Context context )
 {
 Pipe pipe = new Pipe( (String) context.getFlow().getSourceNames().get( 0 ) );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 return Arrays.asList( pipe );
 }
};

代码示例来源:origin: cwensel/cascading

@Test
public void testGroupByResolver() throws Exception
 {
 Fields sourceFields = new Fields( "first", "second" );
 Tap source = getPlatform().getTabDelimitedFile( sourceFields, "input/path", SinkMode.KEEP );
 Fields sinkFields = new Fields( "third", "fourth" );
 Tap sink = getPlatform().getTabDelimitedFile( sinkFields, "output/path", SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new GroupBy( pipe, new Fields( "third" ) );
 verify( source, sink, pipe );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testPipeGroupBy()
 {
 Pipe pipe = new Pipe( "foo" );
 pipe = new Each( pipe, new Fields( "a" ), new Identity() );
 pipe = new GroupBy( pipe, new Fields( "b" ) );
 assertEqualsTrace( "cascading.TraceTest.testPipeGroupBy(TraceTest.java", pipe.getTrace() );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testGroupByResolver() throws Exception
 {
 Fields sourceFields = new Fields( "first", "second" );
 Tap source = getPlatform().getTabDelimitedFile( sourceFields, "input/path", SinkMode.KEEP );
 Fields sinkFields = new Fields( "third", "fourth" );
 Tap sink = getPlatform().getTabDelimitedFile( sinkFields, "output/path", SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new GroupBy( pipe, new Fields( "third" ) );
 verify( source, sink, pipe );
 }

代码示例来源:origin: LiveRamp/cascading_ext

public CreateBloomFilter(Pipe keys, String bloomFilterID, String approxCountPartsDir, String bloomPartsDir, String keyBytesField, HashFunctionFactory hashFactory) throws IOException {
 super(keys);
 Pipe smallPipe = new Each(keys, new Fields(keyBytesField), new GetIndices(hashFactory), new Fields("split", "index", "hash_num"));
 smallPipe = new Each(smallPipe, new Fields("split", "index", "hash_num"), new Unique.FilterPartialDuplicates());
 smallPipe = new GroupBy(smallPipe, new Fields("split"));
 smallPipe = new Every(smallPipe, new Fields("index", "hash_num"), new CreateBloomFilterFromIndices(), Fields.ALL);
 ConfigDef bloomDef = smallPipe.getStepConfigDef();
 bloomDef.setProperty(BloomProps.BLOOM_FILTER_PARTS_DIR, bloomPartsDir);
 bloomDef.setProperty(BloomProps.BLOOM_KEYS_COUNTS_DIR, approxCountPartsDir);
 bloomDef.setProperty(BloomProps.TARGET_BLOOM_FILTER_ID, bloomFilterID);
 setTails(smallPipe);
}

相关文章

微信公众号

最新文章

更多

GroupBy类方法