cascading.flow.Flow.openSource()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(120)

本文整理了Java中cascading.flow.Flow.openSource()方法的一些代码示例,展示了Flow.openSource()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.openSource()方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:openSource

Flow.openSource介绍

[英]Method openSource opens the first source Tap.
[中]方法openSource打开第一个源代码点击。

代码示例

代码示例来源:origin: cwensel/cascading

TupleEntryIterator iterator = flow.openSource();

代码示例来源:origin: cascading/cascading-platform

TupleEntryIterator iterator = flow.openSource();

代码示例来源:origin: cwensel/cascading

@Test
public void testNullsFromScheme() throws IOException
 {
 getPlatform().copyFromLocal( inputFileComments );
 Tap source = new Hfs( new CommentScheme( new Fields( "line" ) ), inputFileComments );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Identity() );
 Tap sink = new Hfs( new TextLine( 1 ), getOutputPath( "testnulls" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, null );
 TupleEntryIterator iterator = flow.openSink();
 assertEquals( "not equal: tuple.get(1)", "1 a", iterator.next().getObject( 1 ) );
 iterator.close();
 // confirm the tuple iterator can handle nulls from the source
 validateLength( flow.openSource(), 5 );
 }

代码示例来源:origin: cascading/cascading-hadoop2-common

@Test
public void testNullsFromScheme() throws IOException
 {
 getPlatform().copyFromLocal( inputFileComments );
 Tap source = new Hfs( new CommentScheme( new Fields( "line" ) ), inputFileComments );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Identity() );
 Tap sink = new Hfs( new TextLine( 1 ), getOutputPath( "testnulls" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, null );
 TupleEntryIterator iterator = flow.openSink();
 assertEquals( "not equal: tuple.get(1)", "1 a", iterator.next().getObject( 1 ) );
 iterator.close();
 // confirm the tuple iterator can handle nulls from the source
 validateLength( flow.openSource(), 5 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSimpleGroup() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "simple" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 8, null );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSimpleGroup() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "simple" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 8, null );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSkipStrategiesKeep() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "keep" ), SinkMode.KEEP );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 flow.complete();
 assertTrue( "default skip", flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", new FlowSkipIfSinkExists().skipFlow( flow ) );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSkipStrategiesKeep() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "keep" ), SinkMode.KEEP );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 flow.complete();
 assertTrue( "default skip", flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", new FlowSkipIfSinkExists().skipFlow( flow ) );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );
 }

代码示例来源:origin: cwensel/cascading

validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 8, null );

代码示例来源:origin: cascading/cascading-hadoop2-common

validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 8, null );

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSkipStrategiesReplace() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "replace" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 flow.complete();
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 FlowSkipStrategy old = flow.getFlowSkipStrategy();
 FlowSkipStrategy replaced = flow.setFlowSkipStrategy( new FlowSkipIfSinkExists() );
 assertTrue( "not same instance", old == replaced );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSkipStrategiesReplace() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "replace" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 flow.complete();
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 FlowSkipStrategy old = flow.getFlowSkipStrategy();
 FlowSkipStrategy replaced = flow.setFlowSkipStrategy( new FlowSkipIfSinkExists() );
 assertTrue( "not same instance", old == replaced );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );
 }

相关文章