cascading.flow.Flow.openTrap()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(22.2k)|赞(0)|评价(0)|浏览(74)

本文整理了Java中cascading.flow.Flow.openTrap()方法的一些代码示例,展示了Flow.openTrap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.openTrap()方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:openTrap

Flow.openTrap介绍

[英]Method openTrap opens the first trap Tap.
[中]方法openTrap打开第一个陷阱抽头。

代码示例

代码示例来源:origin: cwensel/cascading

@Test
public void testTrapNoOperation() throws Exception
 {
 getPlatform().copyFromLocal( testDelimitedProblematic );
 Tap source = getPlatform().getDelimitedFile( new Fields( "id", "name" ).applyTypes( int.class, String.class ), ",", testDelimitedProblematic );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "id", "name" ).applyTypes( int.class, String.class ), ",", getOutputPath( getTestName() ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( getOutputPath( getTestName() + "_trap" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "copy" );
 FlowDef flowDef = FlowDef.flowDef()
  .addSource( pipe, source )
  .addTailSink( pipe, sink )
  .addTrap( pipe, trap );
 Flow flow = getPlatform().getFlowConnector().connect( flowDef );
 flow.complete();
 validateLength( flow.openTrap(), 1 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTrapNoOperation() throws Exception
 {
 getPlatform().copyFromLocal( testDelimitedProblematic );
 Tap source = getPlatform().getDelimitedFile( new Fields( "id", "name" ).applyTypes( int.class, String.class ), ",", testDelimitedProblematic );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "id", "name" ).applyTypes( int.class, String.class ), ",", getOutputPath( getTestName() ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( getOutputPath( getTestName() + "_trap" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "copy" );
 FlowDef flowDef = FlowDef.flowDef()
  .addSource( pipe, source )
  .addTailSink( pipe, sink )
  .addTrap( pipe, trap );
 Flow flow = getPlatform().getFlowConnector().connect( flowDef );
 flow.complete();
 validateLength( flow.openTrap(), 1 );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTrapDiagnostics() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "diag/tap" + NONDETERMINISTIC ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "diag/trap" + NONDETERMINISTIC ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 properties = TrapProps.trapProps()
  .recordAllDiagnostics()
  .buildProperties( properties );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 0 );
 validateLength( flow.openTrap(), 10, 4, Pattern.compile( ".*TrapPlatformTest.*" ) ); // 4 columns, not 1
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTrapDiagnostics() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "diag/tap" + NONDETERMINISTIC ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "diag/trap" + NONDETERMINISTIC ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 properties = TrapProps.trapProps()
  .recordAllDiagnostics()
  .buildProperties( properties );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 0 );
 validateLength( flow.openTrap(), 10, 4, Pattern.compile( ".*TrapPlatformTest.*" ) ); // 4 columns, not 1
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTrapDiagnosticsLocalConfig() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "diagconfigdef/tap" + NONDETERMINISTIC ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "diagconfigdef/trap" + NONDETERMINISTIC ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 TrapProps.trapProps()
  .recordAllDiagnostics()
  .setProperties( trap.getConfigDef(), ConfigDef.Mode.DEFAULT );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 0 );
 validateLength( flow.openTrap(), 10, 4, Pattern.compile( ".*TrapPlatformTest.*" ) ); // 4 columns, not 1
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTrapTapSourceSink() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Scheme scheme = getPlatform().getTestFailScheme();
 Tap source = getPlatform().getTap( scheme, inputFileApache, SinkMode.KEEP );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTap( scheme, getOutputPath( "trapsourcesink/sink" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "trapsourcesink/trap" ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 // compensate for running in cluster mode
 getPlatform().setNumMapTasks( properties, 1 );
 getPlatform().setNumReduceTasks( properties, 1 );
 getPlatform().setNumGatherPartitionTasks( properties, 1 );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow.openTapForRead( getPlatform().getTextFile( sink.getIdentifier() ) ), 7 );
 validateLength( flow.openTrap(), 2, Pattern.compile( "bad data" ) ); // confirm the payload is written
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTrapDiagnosticsLocalConfig() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "diagconfigdef/tap" + NONDETERMINISTIC ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "diagconfigdef/trap" + NONDETERMINISTIC ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 TrapProps.trapProps()
  .recordAllDiagnostics()
  .setProperties( trap.getConfigDef(), ConfigDef.Mode.DEFAULT );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 0 );
 validateLength( flow.openTrap(), 10, 4, Pattern.compile( ".*TrapPlatformTest.*" ) ); // 4 columns, not 1
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTrapTapSourceSink() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Scheme scheme = getPlatform().getTestFailScheme();
 Tap source = getPlatform().getTap( scheme, inputFileApache, SinkMode.KEEP );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTap( scheme, getOutputPath( "trapsourcesink/sink" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "trapsourcesink/trap" ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 // compensate for running in cluster mode
 getPlatform().setNumMapTasks( properties, 1 );
 getPlatform().setNumReduceTasks( properties, 1 );
 getPlatform().setNumGatherPartitionTasks( properties, 1 );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow.openTapForRead( getPlatform().getTextFile( sink.getIdentifier() ) ), 7 );
 validateLength( flow.openTrap(), 2, Pattern.compile( "bad data" ) ); // confirm the payload is written
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTrapNone() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "none/tap" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( getOutputPath( "none/trap" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 8, null );
 try
  {
  flow.openTrap();
  fail(); // should throw a file not found exception
  }
 catch( IOException exception )
  {
  // do nothing
  }
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTrapEachAllSequence() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "allseq/tap" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "allseq/trap" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 0, null );
 validateLength( flow.openTrap(), 10 );
 }

代码示例来源:origin: cwensel/cascading

private void runTrapEveryAll( int failAt, String path, int failSize ) throws IOException
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 pipe = new Every( pipe, new TestFailAggregator( new Fields( "fail" ), failAt ), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( path + "/tap" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( getOutputPath( path + "/trap" ), SinkMode.REPLACE );
 Map<String, Tap> traps = Cascades.tapsMap( "reduce", trap );
 Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, traps, pipe );
 flow.complete();
 validateLength( flow, 0, null );
 validateLength( flow.openTrap(), failSize );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTrapEachAll() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "all/tap" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( getOutputPath( "all/trap" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 0, null );
 validateLength( flow.openTrap(), 10 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTrapEachAllSequence() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "allseq/tap" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "allseq/trap" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 0, null );
 validateLength( flow.openTrap(), 10 );
 }

代码示例来源:origin: cascading/cascading-platform

private void runTrapEveryAll( int failAt, String path, int failSize ) throws IOException
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 pipe = new Every( pipe, new TestFailAggregator( new Fields( "fail" ), failAt ), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( path + "/tap" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( getOutputPath( path + "/trap" ), SinkMode.REPLACE );
 Map<String, Tap> traps = Cascades.tapsMap( "reduce", trap );
 Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, traps, pipe );
 flow.complete();
 validateLength( flow, 0, null );
 validateLength( flow.openTrap(), failSize );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTrapEachAll() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "all/tap" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( getOutputPath( "all/trap" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 0, null );
 validateLength( flow.openTrap(), 10 );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTrapToSequenceFile() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "seq/tap" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "seq/trap" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 0, null );
 validateLength( flow.openTrap(), 10 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTrapToSequenceFile() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "seq/tap" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "seq/trap" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 0, null );
 validateLength( flow.openTrap(), 10 );
 }

代码示例来源:origin: cwensel/cascading

/**
 * This test verifies traps can cross m/r and step boundaries.
 *
 * @throws Exception
 */
@Test
public void testTrapEachEveryAllChained() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, AssertionLevel.VALID, new AssertNotEquals( "75.185.76.245" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Each( pipe, AssertionLevel.VALID, new AssertNotEquals( "68.46.103.112" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Each( pipe, AssertionLevel.VALID, new AssertNotEquals( "76.197.151.0" ) );
 pipe = new Each( pipe, AssertionLevel.VALID, new AssertNotEquals( "12.215.138.88" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "eacheverychain/tap" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( getOutputPath( "eacheverychain/trap" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 6, null );
 validateLength( flow.openTrap(), 4 );
 }

代码示例来源:origin: cwensel/cascading

/**
 * verify we can fail in randome places into the same trap
 *
 * @throws Exception
 */
@Test
public void testTrapEachAllChained() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new TestFunction( new Fields( "test" ), new Tuple( 1 ), 1 ), Fields.ALL );
 pipe = new Each( pipe, new TestFunction( new Fields( "test2" ), new Tuple( 2 ), 2 ), Fields.ALL );
 pipe = new Each( pipe, new TestFunction( new Fields( "test3" ), new Tuple( 3 ), 3 ), Fields.ALL );
 pipe = new Each( pipe, new TestFunction( new Fields( "test4" ), new Tuple( 4 ), 4 ), Fields.ALL );
 Tap sink = getPlatform().getTextFile( getOutputPath( "allchain/tap-nondeterministic" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( getOutputPath( "allchain/trap-nondeterministic" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 6, null );
 validateLength( flow.openTrap(), 4 );
 }

代码示例来源:origin: cascading/cascading-platform

/**
 * verify we can fail in randome places into the same trap
 *
 * @throws Exception
 */
@Test
public void testTrapEachAllChained() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new TestFunction( new Fields( "test" ), new Tuple( 1 ), 1 ), Fields.ALL );
 pipe = new Each( pipe, new TestFunction( new Fields( "test2" ), new Tuple( 2 ), 2 ), Fields.ALL );
 pipe = new Each( pipe, new TestFunction( new Fields( "test3" ), new Tuple( 3 ), 3 ), Fields.ALL );
 pipe = new Each( pipe, new TestFunction( new Fields( "test4" ), new Tuple( 4 ), 4 ), Fields.ALL );
 Tap sink = getPlatform().getTextFile( getOutputPath( "allchain/tap-nondeterministic" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( getOutputPath( "allchain/trap-nondeterministic" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 6, null );
 validateLength( flow.openTrap(), 4 );
 }

相关文章