cascading.flow.Flow.complete()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(15.3k)|赞(0)|评价(0)|浏览(134)

本文整理了Java中cascading.flow.Flow.complete()方法的一些代码示例,展示了Flow.complete()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.complete()方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:complete

Flow.complete介绍

[英]Method complete starts the current Flow instance if it has not be previously started, then block until completion.
[中]方法complete启动当前流实例(如果以前未启动),然后阻塞直到完成。

代码示例

代码示例来源:origin: cwensel/cascading

@Test
public void testUnGroup() throws Exception
 {
 getPlatform().copyFromLocal( inputFileJoined );
 Tap source = getPlatform().getTextFile( inputFileJoined );
 Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
 pipe = new Each( pipe, new UnGroup( new Fields( "num", "char" ), new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 10 );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testRenameNamed() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap source = getPlatform().getTextFile( inputFileLower );
 Tap sink = getPlatform().getTextFile( new Fields( "line" ), new Fields( "item", "element" ), getOutputPath( "rename" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "shape" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Rename( pipe, new Fields( "num", "char" ), new Fields( "item", "element" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+\\s\\w+$" ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testUnGroup() throws Exception
 {
 getPlatform().copyFromLocal( inputFileJoined );
 Tap source = getPlatform().getTextFile( inputFileJoined );
 Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
 pipe = new Each( pipe, new UnGroup( new Fields( "num", "char" ), new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 10 );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testRenameNarrow() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap source = getPlatform().getTextFile( inputFileLower );
 Tap sink = getPlatform().getTextFile( new Fields( "item" ), new Fields( "char", "item" ), getOutputPath( "renamenarrow" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "shape" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Rename( pipe, new Fields( "num" ), new Fields( "item" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\w+\\s\\d+$" ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testRenameNamed() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap source = getPlatform().getTextFile( inputFileLower );
 Tap sink = getPlatform().getTextFile( new Fields( "line" ), new Fields( "item", "element" ), getOutputPath( "rename" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "shape" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Rename( pipe, new Fields( "num", "char" ), new Fields( "item", "element" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+\\s\\w+$" ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testRenameNarrow() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap source = getPlatform().getTextFile( inputFileLower );
 Tap sink = getPlatform().getTextFile( new Fields( "item" ), new Fields( "char", "item" ), getOutputPath( "renamenarrow" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "shape" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Rename( pipe, new Fields( "num" ), new Fields( "item" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\w+\\s\\d+$" ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testDiscardNarrow() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap source = getPlatform().getTextFile( inputFileLower );
 Tap sink = getPlatform().getTextFile( new Fields( "num" ), new Fields( "num" ), getOutputPath( "discardnarrow" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "shape" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Discard( pipe, new Fields( "char" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+$" ) );
 }

代码示例来源:origin: cwensel/cascading

@Test(expected = FlowTapException.class)
public void testTapKeep() throws IOException
 {
 getPlatform().copyFromLocal( inputFileCrossX2 );
 Tap source = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), " ", inputFileCrossX2 );
 String outputPath = getOutputPath( "/sink" );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "upper" ), "+", outputPath, SinkMode.REPLACE );
 Flow firstFlow = getPlatform().getFlowConnector().connect( "first", source, sink, new Pipe( "head" ) );
 firstFlow.complete();
 sink = getPlatform().getDelimitedFile( new Fields( "upper" ), "+", outputPath, SinkMode.KEEP );
 Flow secondFlow = getPlatform().getFlowConnector().connect( "second", source, sink, new Each( new Pipe( "head" ), new Debug() ) );
 secondFlow.complete();
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testCoerce() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap source = getPlatform().getTextFile( inputFileLower );
 Tap sink = getPlatform().getTextFile( new Fields( "line" ), new Fields( "num", "char" ), getOutputPath( "coerce" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "coerce" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Coerce( pipe, new Fields( "num" ), Integer.class );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+\\s\\w+$" ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testDiscardNarrow() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap source = getPlatform().getTextFile( inputFileLower );
 Tap sink = getPlatform().getTextFile( new Fields( "num" ), new Fields( "num" ), getOutputPath( "discardnarrow" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "shape" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Discard( pipe, new Fields( "char" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+$" ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testRenameAll() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap source = getPlatform().getTextFile( inputFileLower );
 Tap sink = getPlatform().getTextFile( new Fields( "line" ), new Fields( "item", "element" ), getOutputPath( "renameall" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "shape" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Rename( pipe, Fields.ALL, new Fields( "item", "element" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+\\s\\w+$" ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testUnGroupAnon() throws Exception
 {
 getPlatform().copyFromLocal( inputFileJoined );
 Tap source = getPlatform().getTextFile( inputFileJoined );
 Tap sink = getPlatform().getTextFile( getOutputPath( "ungroupedanon" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
 pipe = new Each( pipe, new UnGroup( new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 10 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testCoerce() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap source = getPlatform().getTextFile( inputFileLower );
 Tap sink = getPlatform().getTextFile( new Fields( "line" ), new Fields( "num", "char" ), getOutputPath( "coerce" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "coerce" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Coerce( pipe, new Fields( "num" ), Integer.class );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+\\s\\w+$" ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testUnique() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 Tap source = getPlatform().getTextFile( inputFileLhs );
 Tap sink = getPlatform().getTextFile( new Fields( "item" ), new Fields( "num", "char" ), getOutputPath( "unique" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "shape" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Unique( pipe, new Fields( "num" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+\\s\\w+$" ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testRetainNarrow() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap source = getPlatform().getTextFile( inputFileLower );
 Tap sink = getPlatform().getTextFile( new Fields( "num" ), new Fields( "num" ), getOutputPath( "retainnarrow" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "shape" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Retain( pipe, new Fields( "num" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+$" ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testRenameAll() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap source = getPlatform().getTextFile( inputFileLower );
 Tap sink = getPlatform().getTextFile( new Fields( "line" ), new Fields( "item", "element" ), getOutputPath( "renameall" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "shape" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Rename( pipe, Fields.ALL, new Fields( "item", "element" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+\\s\\w+$" ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testUnique() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 Tap source = getPlatform().getTextFile( inputFileLhs );
 Tap sink = getPlatform().getTextFile( new Fields( "item" ), new Fields( "num", "char" ), getOutputPath( "unique" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "shape" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Unique( pipe, new Fields( "num" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+\\s\\w+$" ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test(expected = FlowTapException.class)
public void testTapKeep() throws IOException
 {
 getPlatform().copyFromLocal( inputFileCrossX2 );
 Tap source = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), " ", inputFileCrossX2 );
 String outputPath = getOutputPath( "/sink" );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "upper" ), "+", outputPath, SinkMode.REPLACE );
 Flow firstFlow = getPlatform().getFlowConnector().connect( "first", source, sink, new Pipe( "head" ) );
 firstFlow.complete();
 sink = getPlatform().getDelimitedFile( new Fields( "upper" ), "+", outputPath, SinkMode.KEEP );
 Flow secondFlow = getPlatform().getFlowConnector().connect( "second", source, sink, new Each( new Pipe( "head" ), new Debug() ) );
 secondFlow.complete();
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testUnGroupAnon() throws Exception
 {
 getPlatform().copyFromLocal( inputFileJoined );
 Tap source = getPlatform().getTextFile( inputFileJoined );
 Tap sink = getPlatform().getTextFile( getOutputPath( "ungroupedanon" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
 pipe = new Each( pipe, new UnGroup( new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 10 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testRetainNarrow() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap source = getPlatform().getTextFile( inputFileLower );
 Tap sink = getPlatform().getTextFile( new Fields( "num" ), new Fields( "num" ), getOutputPath( "retainnarrow" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "shape" );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 pipe = new Each( pipe, new Fields( "line" ), splitter );
 pipe = new Retain( pipe, new Fields( "num" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+$" ) );
 }

相关文章