本文整理了Java中cascading.flow.Flow.openTrap()
方法的一些代码示例,展示了Flow.openTrap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.openTrap()
方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:openTrap
[英]Method openTrap opens the first trap Tap.
[中]方法openTrap打开第一个陷阱抽头。
代码示例来源:origin: cwensel/cascading
@Test
public void testTrapNoOperation() throws Exception
{
getPlatform().copyFromLocal( testDelimitedProblematic );
Tap source = getPlatform().getDelimitedFile( new Fields( "id", "name" ).applyTypes( int.class, String.class ), ",", testDelimitedProblematic );
Tap sink = getPlatform().getDelimitedFile( new Fields( "id", "name" ).applyTypes( int.class, String.class ), ",", getOutputPath( getTestName() ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( getOutputPath( getTestName() + "_trap" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "copy" );
FlowDef flowDef = FlowDef.flowDef()
.addSource( pipe, source )
.addTailSink( pipe, sink )
.addTrap( pipe, trap );
Flow flow = getPlatform().getFlowConnector().connect( flowDef );
flow.complete();
validateLength( flow.openTrap(), 1 );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testTrapNoOperation() throws Exception
{
getPlatform().copyFromLocal( testDelimitedProblematic );
Tap source = getPlatform().getDelimitedFile( new Fields( "id", "name" ).applyTypes( int.class, String.class ), ",", testDelimitedProblematic );
Tap sink = getPlatform().getDelimitedFile( new Fields( "id", "name" ).applyTypes( int.class, String.class ), ",", getOutputPath( getTestName() ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( getOutputPath( getTestName() + "_trap" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "copy" );
FlowDef flowDef = FlowDef.flowDef()
.addSource( pipe, source )
.addTailSink( pipe, sink )
.addTrap( pipe, trap );
Flow flow = getPlatform().getFlowConnector().connect( flowDef );
flow.complete();
validateLength( flow.openTrap(), 1 );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTrapDiagnostics() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "diag/tap" + NONDETERMINISTIC ), SinkMode.REPLACE );
Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "diag/trap" + NONDETERMINISTIC ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
properties = TrapProps.trapProps()
.recordAllDiagnostics()
.buildProperties( properties );
Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 0 );
validateLength( flow.openTrap(), 10, 4, Pattern.compile( ".*TrapPlatformTest.*" ) ); // 4 columns, not 1
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testTrapDiagnostics() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "diag/tap" + NONDETERMINISTIC ), SinkMode.REPLACE );
Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "diag/trap" + NONDETERMINISTIC ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
properties = TrapProps.trapProps()
.recordAllDiagnostics()
.buildProperties( properties );
Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 0 );
validateLength( flow.openTrap(), 10, 4, Pattern.compile( ".*TrapPlatformTest.*" ) ); // 4 columns, not 1
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testTrapDiagnosticsLocalConfig() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "diagconfigdef/tap" + NONDETERMINISTIC ), SinkMode.REPLACE );
Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "diagconfigdef/trap" + NONDETERMINISTIC ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
TrapProps.trapProps()
.recordAllDiagnostics()
.setProperties( trap.getConfigDef(), ConfigDef.Mode.DEFAULT );
Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 0 );
validateLength( flow.openTrap(), 10, 4, Pattern.compile( ".*TrapPlatformTest.*" ) ); // 4 columns, not 1
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testTrapTapSourceSink() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Scheme scheme = getPlatform().getTestFailScheme();
Tap source = getPlatform().getTap( scheme, inputFileApache, SinkMode.KEEP );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTap( scheme, getOutputPath( "trapsourcesink/sink" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "trapsourcesink/trap" ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
// compensate for running in cluster mode
getPlatform().setNumMapTasks( properties, 1 );
getPlatform().setNumReduceTasks( properties, 1 );
getPlatform().setNumGatherPartitionTasks( properties, 1 );
Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow.openTapForRead( getPlatform().getTextFile( sink.getIdentifier() ) ), 7 );
validateLength( flow.openTrap(), 2, Pattern.compile( "bad data" ) ); // confirm the payload is written
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTrapDiagnosticsLocalConfig() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "diagconfigdef/tap" + NONDETERMINISTIC ), SinkMode.REPLACE );
Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "diagconfigdef/trap" + NONDETERMINISTIC ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
TrapProps.trapProps()
.recordAllDiagnostics()
.setProperties( trap.getConfigDef(), ConfigDef.Mode.DEFAULT );
Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 0 );
validateLength( flow.openTrap(), 10, 4, Pattern.compile( ".*TrapPlatformTest.*" ) ); // 4 columns, not 1
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTrapTapSourceSink() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Scheme scheme = getPlatform().getTestFailScheme();
Tap source = getPlatform().getTap( scheme, inputFileApache, SinkMode.KEEP );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTap( scheme, getOutputPath( "trapsourcesink/sink" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "trapsourcesink/trap" ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
// compensate for running in cluster mode
getPlatform().setNumMapTasks( properties, 1 );
getPlatform().setNumReduceTasks( properties, 1 );
getPlatform().setNumGatherPartitionTasks( properties, 1 );
Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow.openTapForRead( getPlatform().getTextFile( sink.getIdentifier() ) ), 7 );
validateLength( flow.openTrap(), 2, Pattern.compile( "bad data" ) ); // confirm the payload is written
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTrapNone() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "none/tap" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( getOutputPath( "none/trap" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 8, null );
try
{
flow.openTrap();
fail(); // should throw a file not found exception
}
catch( IOException exception )
{
// do nothing
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTrapEachAllSequence() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "allseq/tap" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "allseq/trap" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 0, null );
validateLength( flow.openTrap(), 10 );
}
代码示例来源:origin: cwensel/cascading
private void runTrapEveryAll( int failAt, String path, int failSize ) throws IOException
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
pipe = new Every( pipe, new TestFailAggregator( new Fields( "fail" ), failAt ), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( path + "/tap" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( getOutputPath( path + "/trap" ), SinkMode.REPLACE );
Map<String, Tap> traps = Cascades.tapsMap( "reduce", trap );
Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, traps, pipe );
flow.complete();
validateLength( flow, 0, null );
validateLength( flow.openTrap(), failSize );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testTrapEachAll() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "all/tap" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( getOutputPath( "all/trap" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 0, null );
validateLength( flow.openTrap(), 10 );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testTrapEachAllSequence() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "allseq/tap" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "allseq/trap" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 0, null );
validateLength( flow.openTrap(), 10 );
}
代码示例来源:origin: cascading/cascading-platform
private void runTrapEveryAll( int failAt, String path, int failSize ) throws IOException
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
pipe = new Every( pipe, new TestFailAggregator( new Fields( "fail" ), failAt ), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( path + "/tap" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( getOutputPath( path + "/trap" ), SinkMode.REPLACE );
Map<String, Tap> traps = Cascades.tapsMap( "reduce", trap );
Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, traps, pipe );
flow.complete();
validateLength( flow, 0, null );
validateLength( flow.openTrap(), failSize );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTrapEachAll() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "all/tap" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( getOutputPath( "all/trap" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 0, null );
validateLength( flow.openTrap(), 10 );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTrapToSequenceFile() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "seq/tap" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "seq/trap" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 0, null );
validateLength( flow.openTrap(), 10 );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testTrapToSequenceFile() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "seq/tap" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "seq/trap" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 0, null );
validateLength( flow.openTrap(), 10 );
}
代码示例来源:origin: cwensel/cascading
/**
* This test verifies traps can cross m/r and step boundaries.
*
* @throws Exception
*/
@Test
public void testTrapEachEveryAllChained() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, AssertionLevel.VALID, new AssertNotEquals( "75.185.76.245" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Each( pipe, AssertionLevel.VALID, new AssertNotEquals( "68.46.103.112" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Each( pipe, AssertionLevel.VALID, new AssertNotEquals( "76.197.151.0" ) );
pipe = new Each( pipe, AssertionLevel.VALID, new AssertNotEquals( "12.215.138.88" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "eacheverychain/tap" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( getOutputPath( "eacheverychain/trap" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 6, null );
validateLength( flow.openTrap(), 4 );
}
代码示例来源:origin: cwensel/cascading
/**
* verify we can fail in randome places into the same trap
*
* @throws Exception
*/
@Test
public void testTrapEachAllChained() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new TestFunction( new Fields( "test" ), new Tuple( 1 ), 1 ), Fields.ALL );
pipe = new Each( pipe, new TestFunction( new Fields( "test2" ), new Tuple( 2 ), 2 ), Fields.ALL );
pipe = new Each( pipe, new TestFunction( new Fields( "test3" ), new Tuple( 3 ), 3 ), Fields.ALL );
pipe = new Each( pipe, new TestFunction( new Fields( "test4" ), new Tuple( 4 ), 4 ), Fields.ALL );
Tap sink = getPlatform().getTextFile( getOutputPath( "allchain/tap-nondeterministic" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( getOutputPath( "allchain/trap-nondeterministic" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 6, null );
validateLength( flow.openTrap(), 4 );
}
代码示例来源:origin: cascading/cascading-platform
/**
* verify we can fail in randome places into the same trap
*
* @throws Exception
*/
@Test
public void testTrapEachAllChained() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new TestFunction( new Fields( "test" ), new Tuple( 1 ), 1 ), Fields.ALL );
pipe = new Each( pipe, new TestFunction( new Fields( "test2" ), new Tuple( 2 ), 2 ), Fields.ALL );
pipe = new Each( pipe, new TestFunction( new Fields( "test3" ), new Tuple( 3 ), 3 ), Fields.ALL );
pipe = new Each( pipe, new TestFunction( new Fields( "test4" ), new Tuple( 4 ), 4 ), Fields.ALL );
Tap sink = getPlatform().getTextFile( getOutputPath( "allchain/tap-nondeterministic" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( getOutputPath( "allchain/trap-nondeterministic" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 6, null );
validateLength( flow.openTrap(), 4 );
}
内容来源于网络,如有侵权,请联系作者删除!