cascading.flow.Flow.getSinksCollection()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(5.8k)|赞(0)|评价(0)|浏览(78)

本文整理了Java中cascading.flow.Flow.getSinksCollection()方法的一些代码示例,展示了Flow.getSinksCollection()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.getSinksCollection()方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:getSinksCollection

Flow.getSinksCollection介绍

[英]Method getSinksCollection returns a Collection of sink Taps for this Flow object.
[中]方法getSinksCollection返回此流对象的接收器抽头集合。

代码示例

代码示例来源:origin: com.backtype/dfs-datastores-cascading

public static boolean isSinkOf(Tap tap, Flow flow) {
    for(Object t: flow.getSinksCollection()) {
      if(t==tap) return true;
    }
    return false;
  }
}

代码示例来源:origin: cwensel/cascading

/**
 * Method addFlow adds a new {@link cascading.flow.Flow} instance that is intended to participate in a {@link Cascade}.
 *
 * @param flow of Flow
 * @return CascadeDef
 */
public CascadeDef addFlow( Flow flow )
 {
 if( flow == null )
  return this;
 if( flows.containsKey( flow.getName() ) )
  throw new CascadeException( "all flow names must be unique, found duplicate: " + flow.getName() );
 Collection<Tap> sinks = flow.getSinksCollection();
 for( Tap sink : sinks )
  {
  String fullIdentifier = sink.getFullIdentifier( flow.getConfig() );
  for( Flow existingFlow : flows.values() )
   {
   Collection<Tap> existingSinks = existingFlow.getSinksCollection();
   for( Tap existingSink : existingSinks )
    {
    if( fullIdentifier.equals( existingSink.getFullIdentifier( existingFlow.getConfig() ) ) )
     throw new CascadeException( "the flow: " + flow.getName() + ", has a sink identifier: " + fullIdentifier + ", in common with the flow: " + existingFlow.getName() );
    }
   }
  }
 flows.put( flow.getName(), flow );
 return this;
 }

代码示例来源:origin: cwensel/cascading

private void makeGraph( Flow[] flows )
 {
 for( Flow flow : flows )
  {
  LinkedList<Tap> sources = new LinkedList<Tap>( flow.getSourcesCollection() );
  LinkedList<Tap> sinks = new LinkedList<Tap>( flow.getSinksCollection() );
  sinks.addAll( flow.getCheckpointsCollection() );
  unwrapCompositeTaps( sources );
  unwrapCompositeTaps( sinks );
  for( Tap source : sources )
   addVertex( getVertex( flow, source ) );
  for( Tap sink : sinks )
   addVertex( getVertex( flow, sink ) );
  for( Tap source : sources )
   {
   for( Tap sink : sinks )
    addEdgeFor( flow, source, sink );
   }
  }
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testCheckpointTapCascade() throws IOException
 {
 if( !getPlatform().isMapReduce() )
  return;
 getPlatform().copyFromLocal( inputFileIps );
 String path = "checkpoint";
 Flow first = firstFlow( path + "/first", false );
 Flow second = secondFlow( first.getSink(), path + "/second" );
 Flow third = thirdCheckpointFlow( second.getSink(), path + "/third" );
 Flow fourth = fourthFlow( (Tap) third.getCheckpoints().values().iterator().next(), path + "/fourth" );
 Cascade cascade = new CascadeConnector( getProperties() ).connect( fourth, second, third, first );
 cascade.start();
 cascade.complete();
 validateLength( fourth, 20 );
 assertTrue( cascade.getHeadFlows().contains( first ) );
 assertTrue( cascade.getSourceTaps().containsAll( first.getSourcesCollection() ) );
 assertTrue( cascade.getIntermediateTaps().containsAll( third.getCheckpointsCollection() ) );
 assertTrue( cascade.getCheckpointsTaps().containsAll( third.getCheckpointsCollection() ) );
 assertTrue( cascade.getTailFlows().contains( fourth ) );
 assertTrue( cascade.getSinkTaps().containsAll( fourth.getSinksCollection() ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testCheckpointTapCascade() throws IOException
 {
 if( !getPlatform().isMapReduce() )
  return;
 getPlatform().copyFromLocal( inputFileIps );
 String path = "checkpoint";
 Flow first = firstFlow( path + "/first", false );
 Flow second = secondFlow( first.getSink(), path + "/second" );
 Flow third = thirdCheckpointFlow( second.getSink(), path + "/third" );
 Flow fourth = fourthFlow( (Tap) third.getCheckpoints().values().iterator().next(), path + "/fourth" );
 Cascade cascade = new CascadeConnector( getProperties() ).connect( fourth, second, third, first );
 cascade.start();
 cascade.complete();
 validateLength( fourth, 20 );
 assertTrue( cascade.getHeadFlows().contains( first ) );
 assertTrue( cascade.getSourceTaps().containsAll( first.getSourcesCollection() ) );
 assertTrue( cascade.getIntermediateTaps().containsAll( third.getCheckpointsCollection() ) );
 assertTrue( cascade.getCheckpointsTaps().containsAll( third.getCheckpointsCollection() ) );
 assertTrue( cascade.getTailFlows().contains( fourth ) );
 assertTrue( cascade.getSinkTaps().containsAll( fourth.getSinksCollection() ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSimpleCascade() throws IOException
 {
 getPlatform().copyFromLocal( inputFileIps );
 String path = "simple";
 Flow first = firstFlow( path + "/first", false );
 Flow second = secondFlow( first.getSink(), path + "/second" );
 Flow third = thirdFlow( second.getSink(), path + "/third" );
 Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
 Cascade cascade = new CascadeConnector( getProperties() ).connect( fourth, second, third, first );
 cascade.start();
 cascade.complete();
 validateLength( fourth, 20 );
 assertTrue( cascade.getHeadFlows().contains( first ) );
 assertTrue( cascade.getSourceTaps().containsAll( first.getSourcesCollection() ) );
 assertTrue( cascade.getTailFlows().contains( fourth ) );
 assertTrue( cascade.getSinkTaps().containsAll( fourth.getSinksCollection() ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSimpleCascade() throws IOException
 {
 getPlatform().copyFromLocal( inputFileIps );
 String path = "simple";
 Flow first = firstFlow( path + "/first", false );
 Flow second = secondFlow( first.getSink(), path + "/second" );
 Flow third = thirdFlow( second.getSink(), path + "/third" );
 Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
 Cascade cascade = new CascadeConnector( getProperties() ).connect( fourth, second, third, first );
 cascade.start();
 cascade.complete();
 validateLength( fourth, 20 );
 assertTrue( cascade.getHeadFlows().contains( first ) );
 assertTrue( cascade.getSourceTaps().containsAll( first.getSourcesCollection() ) );
 assertTrue( cascade.getTailFlows().contains( fourth ) );
 assertTrue( cascade.getSinkTaps().containsAll( fourth.getSinksCollection() ) );
 }

相关文章