cascading.flow.Flow.getConfig()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(161)

本文整理了Java中cascading.flow.Flow.getConfig()方法的一些代码示例,展示了Flow.getConfig()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.getConfig()方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:getConfig

Flow.getConfig介绍

[英]Method getConfig returns the internal configuration object.

Any changes to this object will not be reflected in child steps. See cascading.flow.FlowConnector for setting default properties visible to children. Or see cascading.flow.FlowStepStrategy for setting properties on individual steps before they are executed.
[中]方法getConfig返回内部配置对象。
对此对象的任何更改都不会反映在子步骤中。参见级联。流FlowConnector,用于设置对子级可见的默认属性。或者看级联。流FlowStepStrategy,用于在执行单个步骤之前设置其属性。

代码示例

代码示例来源:origin: elastic/elasticsearch-hadoop

@Override
public void flowConfInit(Flow<JobConf> flow) {
  CascadingUtils.addSerializationToken(flow.getConfig());
}

代码示例来源:origin: cwensel/cascading

protected String getVertex( Flow flow, Tap tap )
 {
 return tap.getFullIdentifier( flow.getConfig() );
 }
}

代码示例来源:origin: org.elasticsearch/elasticsearch-hadoop

@Override
public void flowConfInit(Flow<JobConf> flow) {
  CascadingUtils.addSerializationToken(flow.getConfig());
}

代码示例来源:origin: cwensel/cascading

/**
 * Method addFlow adds a new {@link cascading.flow.Flow} instance that is intended to participate in a {@link Cascade}.
 *
 * @param flow of Flow
 * @return CascadeDef
 */
public CascadeDef addFlow( Flow flow )
 {
 if( flow == null )
  return this;
 if( flows.containsKey( flow.getName() ) )
  throw new CascadeException( "all flow names must be unique, found duplicate: " + flow.getName() );
 Collection<Tap> sinks = flow.getSinksCollection();
 for( Tap sink : sinks )
  {
  String fullIdentifier = sink.getFullIdentifier( flow.getConfig() );
  for( Flow existingFlow : flows.values() )
   {
   Collection<Tap> existingSinks = existingFlow.getSinksCollection();
   for( Tap existingSink : existingSinks )
    {
    if( fullIdentifier.equals( existingSink.getFullIdentifier( existingFlow.getConfig() ) ) )
     throw new CascadeException( "the flow: " + flow.getName() + ", has a sink identifier: " + fullIdentifier + ", in common with the flow: " + existingFlow.getName() );
    }
   }
  }
 flows.put( flow.getName(), flow );
 return this;
 }

代码示例来源:origin: LiveRamp/hank

DomainBuilderOutputCommitter.setupJob(properties.getDomainName(), flow.getConfig());
 DomainBuilderOutputCommitter.commitJob(properties.getDomainName(), flow.getConfig());
  DomainBuilderOutputCommitter.cleanupJob(properties.getDomainName(), flow.getConfig());
DomainBuilderOutputCommitter.cleanupJob(properties.getDomainName(), flow.getConfig());
return flow;

代码示例来源:origin: LiveRamp/hank

DomainBuilderOutputCommitter.setupJob(domainBuilder.properties.getDomainName(), flow.getConfig());
 DomainBuilderOutputCommitter.commitJob(domainBuilder.properties.getDomainName(), flow.getConfig());
  DomainBuilderOutputCommitter.cleanupJob(domainBuilder.properties.getDomainName(), flow.getConfig());
DomainBuilderOutputCommitter.cleanupJob(domainBuilder.properties.getDomainName(), flow.getConfig());

代码示例来源:origin: cwensel/cascading

if( flow.getSink() != sink && sink.resourceExists( flow.getConfig() ) )
 count++;

代码示例来源:origin: cascading/cascading-platform

if( flow.getSink() != sink && sink.resourceExists( flow.getConfig() ) )
 count++;

代码示例来源:origin: cwensel/cascading

@Test
public void testSkippedCascade() throws IOException
 {
 getPlatform().copyFromLocal( inputFileIps );
 String path = "skipped";
 Flow first = firstFlow( path + "/first", false );
 Flow second = secondFlow( first.getSink(), path + "/second" );
 Flow third = thirdFlow( second.getSink(), path + "/third" );
 Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
 CountingFlowListener flowListener = new CountingFlowListener();
 second.addListener( flowListener );
 Cascade cascade = new CascadeConnector( getProperties() ).connect( first, second, third, fourth );
 cascade.setFlowSkipStrategy( new FlowSkipStrategy()
  {
  public boolean skipFlow( Flow flow ) throws IOException
   {
   return true;
   }
  } );
 cascade.start();
 cascade.complete();
 assertEquals( 1, flowListener.skipped );
 assertFalse( "file exists", fourth.getSink().resourceExists( fourth.getConfig() ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSkippedCascade() throws IOException
 {
 getPlatform().copyFromLocal( inputFileIps );
 String path = "skipped";
 Flow first = firstFlow( path + "/first", false );
 Flow second = secondFlow( first.getSink(), path + "/second" );
 Flow third = thirdFlow( second.getSink(), path + "/third" );
 Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
 CountingFlowListener flowListener = new CountingFlowListener();
 second.addListener( flowListener );
 Cascade cascade = new CascadeConnector( getProperties() ).connect( first, second, third, fourth );
 cascade.setFlowSkipStrategy( new FlowSkipStrategy()
  {
  public boolean skipFlow( Flow flow ) throws IOException
   {
   return true;
   }
  } );
 cascade.start();
 cascade.complete();
 assertEquals( 1, flowListener.skipped );
 assertFalse( "file exists", fourth.getSink().resourceExists( fourth.getConfig() ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSkipStrategiesKeep() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "keep" ), SinkMode.KEEP );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 flow.complete();
 assertTrue( "default skip", flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", new FlowSkipIfSinkExists().skipFlow( flow ) );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSkipStrategiesKeep() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "keep" ), SinkMode.KEEP );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 flow.complete();
 assertTrue( "default skip", flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", new FlowSkipIfSinkExists().skipFlow( flow ) );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSkipStrategiesReplace() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "replace" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 flow.complete();
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 FlowSkipStrategy old = flow.getFlowSkipStrategy();
 FlowSkipStrategy replaced = flow.setFlowSkipStrategy( new FlowSkipIfSinkExists() );
 assertTrue( "not same instance", old == replaced );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSkipStrategiesReplace() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "replace" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 flow.complete();
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 FlowSkipStrategy old = flow.getFlowSkipStrategy();
 FlowSkipStrategy replaced = flow.setFlowSkipStrategy( new FlowSkipIfSinkExists() );
 assertTrue( "not same instance", old == replaced );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );
 }

相关文章