本文整理了Java中cascading.flow.Flow.openSource()
方法的一些代码示例,展示了Flow.openSource()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.openSource()
方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:openSource
[英]Method openSource opens the first source Tap.
[中]方法openSource打开第一个源代码点击。
代码示例来源:origin: cwensel/cascading
TupleEntryIterator iterator = flow.openSource();
代码示例来源:origin: cascading/cascading-platform
TupleEntryIterator iterator = flow.openSource();
代码示例来源:origin: cwensel/cascading
@Test
public void testNullsFromScheme() throws IOException
{
getPlatform().copyFromLocal( inputFileComments );
Tap source = new Hfs( new CommentScheme( new Fields( "line" ) ), inputFileComments );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Identity() );
Tap sink = new Hfs( new TextLine( 1 ), getOutputPath( "testnulls" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( source, sink, pipe );
flow.complete();
validateLength( flow, 5, null );
TupleEntryIterator iterator = flow.openSink();
assertEquals( "not equal: tuple.get(1)", "1 a", iterator.next().getObject( 1 ) );
iterator.close();
// confirm the tuple iterator can handle nulls from the source
validateLength( flow.openSource(), 5 );
}
代码示例来源:origin: cascading/cascading-hadoop2-common
@Test
public void testNullsFromScheme() throws IOException
{
getPlatform().copyFromLocal( inputFileComments );
Tap source = new Hfs( new CommentScheme( new Fields( "line" ) ), inputFileComments );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Identity() );
Tap sink = new Hfs( new TextLine( 1 ), getOutputPath( "testnulls" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( source, sink, pipe );
flow.complete();
validateLength( flow, 5, null );
TupleEntryIterator iterator = flow.openSink();
assertEquals( "not equal: tuple.get(1)", "1 a", iterator.next().getObject( 1 ) );
iterator.close();
// confirm the tuple iterator can handle nulls from the source
validateLength( flow.openSource(), 5 );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSimpleGroup() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "simple" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 8, null );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSimpleGroup() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "simple" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 8, null );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSkipStrategiesKeep() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
// !!! enable replace
Tap sink = getPlatform().getTextFile( getOutputPath( "keep" ), SinkMode.KEEP );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
sink.deleteResource( flow.getConfig() );
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
flow.complete();
assertTrue( "default skip", flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", new FlowSkipIfSinkExists().skipFlow( flow ) );
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 10, null );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSkipStrategiesKeep() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
// !!! enable replace
Tap sink = getPlatform().getTextFile( getOutputPath( "keep" ), SinkMode.KEEP );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
sink.deleteResource( flow.getConfig() );
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
flow.complete();
assertTrue( "default skip", flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", new FlowSkipIfSinkExists().skipFlow( flow ) );
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 10, null );
}
代码示例来源:origin: cwensel/cascading
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 8, null );
代码示例来源:origin: cascading/cascading-hadoop2-common
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 8, null );
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSkipStrategiesReplace() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
// !!! enable replace
Tap sink = getPlatform().getTextFile( getOutputPath( "replace" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
sink.deleteResource( flow.getConfig() );
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
flow.complete();
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
FlowSkipStrategy old = flow.getFlowSkipStrategy();
FlowSkipStrategy replaced = flow.setFlowSkipStrategy( new FlowSkipIfSinkExists() );
assertTrue( "not same instance", old == replaced );
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 10, null );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSkipStrategiesReplace() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
// !!! enable replace
Tap sink = getPlatform().getTextFile( getOutputPath( "replace" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
sink.deleteResource( flow.getConfig() );
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
flow.complete();
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
FlowSkipStrategy old = flow.getFlowSkipStrategy();
FlowSkipStrategy replaced = flow.setFlowSkipStrategy( new FlowSkipIfSinkExists() );
assertTrue( "not same instance", old == replaced );
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 10, null );
}
内容来源于网络,如有侵权,请联系作者删除!