本文整理了Java中cascading.flow.Flow.getConfig()
方法的一些代码示例,展示了Flow.getConfig()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.getConfig()
方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:getConfig
[英]Method getConfig returns the internal configuration object.
Any changes to this object will not be reflected in child steps. See cascading.flow.FlowConnector for setting default properties visible to children. Or see cascading.flow.FlowStepStrategy for setting properties on individual steps before they are executed.
[中]方法getConfig返回内部配置对象。
对此对象的任何更改都不会反映在子步骤中。参见级联。流FlowConnector,用于设置对子级可见的默认属性。或者看级联。流FlowStepStrategy,用于在执行单个步骤之前设置其属性。
代码示例来源:origin: elastic/elasticsearch-hadoop
@Override
public void flowConfInit(Flow<JobConf> flow) {
CascadingUtils.addSerializationToken(flow.getConfig());
}
代码示例来源:origin: cwensel/cascading
protected String getVertex( Flow flow, Tap tap )
{
return tap.getFullIdentifier( flow.getConfig() );
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch-hadoop
@Override
public void flowConfInit(Flow<JobConf> flow) {
CascadingUtils.addSerializationToken(flow.getConfig());
}
代码示例来源:origin: cwensel/cascading
/**
* Method addFlow adds a new {@link cascading.flow.Flow} instance that is intended to participate in a {@link Cascade}.
*
* @param flow of Flow
* @return CascadeDef
*/
public CascadeDef addFlow( Flow flow )
{
if( flow == null )
return this;
if( flows.containsKey( flow.getName() ) )
throw new CascadeException( "all flow names must be unique, found duplicate: " + flow.getName() );
Collection<Tap> sinks = flow.getSinksCollection();
for( Tap sink : sinks )
{
String fullIdentifier = sink.getFullIdentifier( flow.getConfig() );
for( Flow existingFlow : flows.values() )
{
Collection<Tap> existingSinks = existingFlow.getSinksCollection();
for( Tap existingSink : existingSinks )
{
if( fullIdentifier.equals( existingSink.getFullIdentifier( existingFlow.getConfig() ) ) )
throw new CascadeException( "the flow: " + flow.getName() + ", has a sink identifier: " + fullIdentifier + ", in common with the flow: " + existingFlow.getName() );
}
}
}
flows.put( flow.getName(), flow );
return this;
}
代码示例来源:origin: LiveRamp/hank
DomainBuilderOutputCommitter.setupJob(properties.getDomainName(), flow.getConfig());
DomainBuilderOutputCommitter.commitJob(properties.getDomainName(), flow.getConfig());
DomainBuilderOutputCommitter.cleanupJob(properties.getDomainName(), flow.getConfig());
DomainBuilderOutputCommitter.cleanupJob(properties.getDomainName(), flow.getConfig());
return flow;
代码示例来源:origin: LiveRamp/hank
DomainBuilderOutputCommitter.setupJob(domainBuilder.properties.getDomainName(), flow.getConfig());
DomainBuilderOutputCommitter.commitJob(domainBuilder.properties.getDomainName(), flow.getConfig());
DomainBuilderOutputCommitter.cleanupJob(domainBuilder.properties.getDomainName(), flow.getConfig());
DomainBuilderOutputCommitter.cleanupJob(domainBuilder.properties.getDomainName(), flow.getConfig());
代码示例来源:origin: cwensel/cascading
if( flow.getSink() != sink && sink.resourceExists( flow.getConfig() ) )
count++;
代码示例来源:origin: cascading/cascading-platform
if( flow.getSink() != sink && sink.resourceExists( flow.getConfig() ) )
count++;
代码示例来源:origin: cwensel/cascading
@Test
public void testSkippedCascade() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
String path = "skipped";
Flow first = firstFlow( path + "/first", false );
Flow second = secondFlow( first.getSink(), path + "/second" );
Flow third = thirdFlow( second.getSink(), path + "/third" );
Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
CountingFlowListener flowListener = new CountingFlowListener();
second.addListener( flowListener );
Cascade cascade = new CascadeConnector( getProperties() ).connect( first, second, third, fourth );
cascade.setFlowSkipStrategy( new FlowSkipStrategy()
{
public boolean skipFlow( Flow flow ) throws IOException
{
return true;
}
} );
cascade.start();
cascade.complete();
assertEquals( 1, flowListener.skipped );
assertFalse( "file exists", fourth.getSink().resourceExists( fourth.getConfig() ) );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSkippedCascade() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
String path = "skipped";
Flow first = firstFlow( path + "/first", false );
Flow second = secondFlow( first.getSink(), path + "/second" );
Flow third = thirdFlow( second.getSink(), path + "/third" );
Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
CountingFlowListener flowListener = new CountingFlowListener();
second.addListener( flowListener );
Cascade cascade = new CascadeConnector( getProperties() ).connect( first, second, third, fourth );
cascade.setFlowSkipStrategy( new FlowSkipStrategy()
{
public boolean skipFlow( Flow flow ) throws IOException
{
return true;
}
} );
cascade.start();
cascade.complete();
assertEquals( 1, flowListener.skipped );
assertFalse( "file exists", fourth.getSink().resourceExists( fourth.getConfig() ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSkipStrategiesKeep() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
// !!! enable replace
Tap sink = getPlatform().getTextFile( getOutputPath( "keep" ), SinkMode.KEEP );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
sink.deleteResource( flow.getConfig() );
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
flow.complete();
assertTrue( "default skip", flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", new FlowSkipIfSinkExists().skipFlow( flow ) );
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 10, null );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSkipStrategiesKeep() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
// !!! enable replace
Tap sink = getPlatform().getTextFile( getOutputPath( "keep" ), SinkMode.KEEP );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
sink.deleteResource( flow.getConfig() );
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
flow.complete();
assertTrue( "default skip", flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", new FlowSkipIfSinkExists().skipFlow( flow ) );
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 10, null );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSkipStrategiesReplace() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
// !!! enable replace
Tap sink = getPlatform().getTextFile( getOutputPath( "replace" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
sink.deleteResource( flow.getConfig() );
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
flow.complete();
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
FlowSkipStrategy old = flow.getFlowSkipStrategy();
FlowSkipStrategy replaced = flow.setFlowSkipStrategy( new FlowSkipIfSinkExists() );
assertTrue( "not same instance", old == replaced );
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 10, null );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSkipStrategiesReplace() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
// !!! enable replace
Tap sink = getPlatform().getTextFile( getOutputPath( "replace" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
sink.deleteResource( flow.getConfig() );
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
flow.complete();
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
FlowSkipStrategy old = flow.getFlowSkipStrategy();
FlowSkipStrategy replaced = flow.setFlowSkipStrategy( new FlowSkipIfSinkExists() );
assertTrue( "not same instance", old == replaced );
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 10, null );
}
内容来源于网络,如有侵权,请联系作者删除!