本文整理了Java中cascading.flow.Flow.getSourcesCollection()
方法的一些代码示例,展示了Flow.getSourcesCollection()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.getSourcesCollection()
方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:getSourcesCollection
[英]Method getSourcesCollection returns a Collection of source Taps for this Flow object.
[中]方法getSourcesCollection返回此流对象的源抽头集合。
代码示例来源:origin: cwensel/cascading
public static List<Tuple> getSourceAsList( Flow flow ) throws IOException
{
return asCollection( flow, (Tap) flow.getSourcesCollection().iterator().next(), Fields.ALL, new ArrayList<Tuple>() );
}
代码示例来源:origin: cwensel/cascading
private void makeGraph( Flow[] flows )
{
for( Flow flow : flows )
{
LinkedList<Tap> sources = new LinkedList<Tap>( flow.getSourcesCollection() );
LinkedList<Tap> sinks = new LinkedList<Tap>( flow.getSinksCollection() );
sinks.addAll( flow.getCheckpointsCollection() );
unwrapCompositeTaps( sources );
unwrapCompositeTaps( sinks );
for( Tap source : sources )
addVertex( getVertex( flow, source ) );
for( Tap sink : sinks )
addVertex( getVertex( flow, sink ) );
for( Tap source : sources )
{
for( Tap sink : sinks )
addEdgeFor( flow, source, sink );
}
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testCheckpointTapCascade() throws IOException
{
if( !getPlatform().isMapReduce() )
return;
getPlatform().copyFromLocal( inputFileIps );
String path = "checkpoint";
Flow first = firstFlow( path + "/first", false );
Flow second = secondFlow( first.getSink(), path + "/second" );
Flow third = thirdCheckpointFlow( second.getSink(), path + "/third" );
Flow fourth = fourthFlow( (Tap) third.getCheckpoints().values().iterator().next(), path + "/fourth" );
Cascade cascade = new CascadeConnector( getProperties() ).connect( fourth, second, third, first );
cascade.start();
cascade.complete();
validateLength( fourth, 20 );
assertTrue( cascade.getHeadFlows().contains( first ) );
assertTrue( cascade.getSourceTaps().containsAll( first.getSourcesCollection() ) );
assertTrue( cascade.getIntermediateTaps().containsAll( third.getCheckpointsCollection() ) );
assertTrue( cascade.getCheckpointsTaps().containsAll( third.getCheckpointsCollection() ) );
assertTrue( cascade.getTailFlows().contains( fourth ) );
assertTrue( cascade.getSinkTaps().containsAll( fourth.getSinksCollection() ) );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testCheckpointTapCascade() throws IOException
{
if( !getPlatform().isMapReduce() )
return;
getPlatform().copyFromLocal( inputFileIps );
String path = "checkpoint";
Flow first = firstFlow( path + "/first", false );
Flow second = secondFlow( first.getSink(), path + "/second" );
Flow third = thirdCheckpointFlow( second.getSink(), path + "/third" );
Flow fourth = fourthFlow( (Tap) third.getCheckpoints().values().iterator().next(), path + "/fourth" );
Cascade cascade = new CascadeConnector( getProperties() ).connect( fourth, second, third, first );
cascade.start();
cascade.complete();
validateLength( fourth, 20 );
assertTrue( cascade.getHeadFlows().contains( first ) );
assertTrue( cascade.getSourceTaps().containsAll( first.getSourcesCollection() ) );
assertTrue( cascade.getIntermediateTaps().containsAll( third.getCheckpointsCollection() ) );
assertTrue( cascade.getCheckpointsTaps().containsAll( third.getCheckpointsCollection() ) );
assertTrue( cascade.getTailFlows().contains( fourth ) );
assertTrue( cascade.getSinkTaps().containsAll( fourth.getSinksCollection() ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSimpleCascade() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
String path = "simple";
Flow first = firstFlow( path + "/first", false );
Flow second = secondFlow( first.getSink(), path + "/second" );
Flow third = thirdFlow( second.getSink(), path + "/third" );
Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
Cascade cascade = new CascadeConnector( getProperties() ).connect( fourth, second, third, first );
cascade.start();
cascade.complete();
validateLength( fourth, 20 );
assertTrue( cascade.getHeadFlows().contains( first ) );
assertTrue( cascade.getSourceTaps().containsAll( first.getSourcesCollection() ) );
assertTrue( cascade.getTailFlows().contains( fourth ) );
assertTrue( cascade.getSinkTaps().containsAll( fourth.getSinksCollection() ) );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSimpleCascade() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
String path = "simple";
Flow first = firstFlow( path + "/first", false );
Flow second = secondFlow( first.getSink(), path + "/second" );
Flow third = thirdFlow( second.getSink(), path + "/third" );
Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
Cascade cascade = new CascadeConnector( getProperties() ).connect( fourth, second, third, first );
cascade.start();
cascade.complete();
validateLength( fourth, 20 );
assertTrue( cascade.getHeadFlows().contains( first ) );
assertTrue( cascade.getSourceTaps().containsAll( first.getSourcesCollection() ) );
assertTrue( cascade.getTailFlows().contains( fourth ) );
assertTrue( cascade.getSinkTaps().containsAll( fourth.getSinksCollection() ) );
}
内容来源于网络,如有侵权,请联系作者删除!