
x33g5p2x  于2022-01-19 转载在 其他  



[英]Method getConfig returns the internal configuration object.

Any changes to this object will not be reflected in child steps. See cascading.flow.FlowConnector for setting default properties visible to children. Or see cascading.flow.FlowStepStrategy for setting properties on individual steps before they are executed.


代码示例来源:origin: elastic/elasticsearch-hadoop

public void flowConfInit(Flow<JobConf> flow) {

代码示例来源:origin: cwensel/cascading

protected String getVertex( Flow flow, Tap tap )
 return tap.getFullIdentifier( flow.getConfig() );

代码示例来源:origin: org.elasticsearch/elasticsearch-hadoop

public void flowConfInit(Flow<JobConf> flow) {

代码示例来源:origin: cwensel/cascading

 * Method addFlow adds a new {@link cascading.flow.Flow} instance that is intended to participate in a {@link Cascade}.
 * @param flow of Flow
 * @return CascadeDef
public CascadeDef addFlow( Flow flow )
 if( flow == null )
  return this;
 if( flows.containsKey( flow.getName() ) )
  throw new CascadeException( "all flow names must be unique, found duplicate: " + flow.getName() );
 Collection<Tap> sinks = flow.getSinksCollection();
 for( Tap sink : sinks )
  String fullIdentifier = sink.getFullIdentifier( flow.getConfig() );
  for( Flow existingFlow : flows.values() )
   Collection<Tap> existingSinks = existingFlow.getSinksCollection();
   for( Tap existingSink : existingSinks )
    if( fullIdentifier.equals( existingSink.getFullIdentifier( existingFlow.getConfig() ) ) )
     throw new CascadeException( "the flow: " + flow.getName() + ", has a sink identifier: " + fullIdentifier + ", in common with the flow: " + existingFlow.getName() );
 flows.put( flow.getName(), flow );
 return this;

代码示例来源:origin: LiveRamp/hank

DomainBuilderOutputCommitter.setupJob(properties.getDomainName(), flow.getConfig());
 DomainBuilderOutputCommitter.commitJob(properties.getDomainName(), flow.getConfig());
  DomainBuilderOutputCommitter.cleanupJob(properties.getDomainName(), flow.getConfig());
DomainBuilderOutputCommitter.cleanupJob(properties.getDomainName(), flow.getConfig());
return flow;

代码示例来源:origin: LiveRamp/hank

DomainBuilderOutputCommitter.setupJob(, flow.getConfig());
 DomainBuilderOutputCommitter.commitJob(, flow.getConfig());
  DomainBuilderOutputCommitter.cleanupJob(, flow.getConfig());
DomainBuilderOutputCommitter.cleanupJob(, flow.getConfig());

代码示例来源:origin: cwensel/cascading

if( flow.getSink() != sink && sink.resourceExists( flow.getConfig() ) )

代码示例来源:origin: cascading/cascading-platform

if( flow.getSink() != sink && sink.resourceExists( flow.getConfig() ) )

代码示例来源:origin: cwensel/cascading

public void testSkippedCascade() throws IOException
 getPlatform().copyFromLocal( inputFileIps );
 String path = "skipped";
 Flow first = firstFlow( path + "/first", false );
 Flow second = secondFlow( first.getSink(), path + "/second" );
 Flow third = thirdFlow( second.getSink(), path + "/third" );
 Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
 CountingFlowListener flowListener = new CountingFlowListener();
 second.addListener( flowListener );
 Cascade cascade = new CascadeConnector( getProperties() ).connect( first, second, third, fourth );
 cascade.setFlowSkipStrategy( new FlowSkipStrategy()
  public boolean skipFlow( Flow flow ) throws IOException
   return true;
  } );
 assertEquals( 1, flowListener.skipped );
 assertFalse( "file exists", fourth.getSink().resourceExists( fourth.getConfig() ) );

代码示例来源:origin: cascading/cascading-platform

public void testSkippedCascade() throws IOException
 getPlatform().copyFromLocal( inputFileIps );
 String path = "skipped";
 Flow first = firstFlow( path + "/first", false );
 Flow second = secondFlow( first.getSink(), path + "/second" );
 Flow third = thirdFlow( second.getSink(), path + "/third" );
 Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
 CountingFlowListener flowListener = new CountingFlowListener();
 second.addListener( flowListener );
 Cascade cascade = new CascadeConnector( getProperties() ).connect( first, second, third, fourth );
 cascade.setFlowSkipStrategy( new FlowSkipStrategy()
  public boolean skipFlow( Flow flow ) throws IOException
   return true;
  } );
 assertEquals( 1, flowListener.skipped );
 assertFalse( "file exists", fourth.getSink().resourceExists( fourth.getConfig() ) );

代码示例来源:origin: cwensel/cascading

public void testSkipStrategiesKeep() throws Exception
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "keep" ), SinkMode.KEEP );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 assertTrue( "default skip", flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", new FlowSkipIfSinkExists().skipFlow( flow ) );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );

代码示例来源:origin: cascading/cascading-platform

public void testSkipStrategiesKeep() throws Exception
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "keep" ), SinkMode.KEEP );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 assertTrue( "default skip", flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", new FlowSkipIfSinkExists().skipFlow( flow ) );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );

代码示例来源:origin: cascading/cascading-platform

public void testSkipStrategiesReplace() throws Exception
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "replace" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 FlowSkipStrategy old = flow.getFlowSkipStrategy();
 FlowSkipStrategy replaced = flow.setFlowSkipStrategy( new FlowSkipIfSinkExists() );
 assertTrue( "not same instance", old == replaced );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );

代码示例来源:origin: cwensel/cascading

public void testSkipStrategiesReplace() throws Exception
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "replace" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 FlowSkipStrategy old = flow.getFlowSkipStrategy();
 FlowSkipStrategy replaced = flow.setFlowSkipStrategy( new FlowSkipIfSinkExists() );
 assertTrue( "not same instance", old == replaced );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );
