cascading.flow.Flow.openSink()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(17.1k)|赞(0)|评价(0)|浏览(125)

本文整理了Java中cascading.flow.Flow.openSink()方法的一些代码示例,展示了Flow.openSink()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.openSink()方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:openSink

Flow.openSink介绍

[英]Method openSink opens the first sink Tap.
[中]方法openSink打开第一个接收器抽头。

代码示例

代码示例来源:origin: cwensel/cascading

public static void validateLength( Flow flow, int numTuples, int tupleSize, Pattern regex, String name ) throws IOException
 {
 TupleEntryIterator iterator = name == null ? flow.openSink() : flow.openSink( name );
 validateLength( iterator, numTuples, tupleSize, regex );
 }

代码示例来源:origin: cascading/cascading-jdbc-core

private void verifySink( Flow<?> flow, int expects ) throws IOException
 {
 int count = 0;
 TupleEntryIterator iterator = flow.openSink();
 while( iterator.hasNext() )
  {
  count++;
  iterator.next();
  }
 iterator.close();
 assertEquals( "wrong number of values", expects, count );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTupleEntrySchemeIteratorExceptionHandling() throws IOException
 {
 if( getPlatformName().equals( "local" ) )
  return;  // no gzip support
 getPlatform().copyFromLocal( inputFileUnexpectedEndOfFile );
 Tap source = getPlatform().getTextFile( inputFileUnexpectedEndOfFile );
 Tap sink = getPlatform().getTextFile( getOutputPath( getTestName() ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 TupleEntrySchemeIteratorProps.setPermittedExceptions( properties, java.io.EOFException.class );
 Pipe pipe = new Pipe( "data" );
 pipe = new Each( pipe, new Identity() );
 FlowDef flowDef = FlowDef.flowDef().addSource( pipe, source ).addTailSink( pipe, sink );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( flowDef );
 flow.complete();
 validateLength( flow.openSink(), 307 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTupleEntrySchemeIteratorExceptionHandling() throws IOException
 {
 if( getPlatformName().equals( "local" ) )
  return;  // no gzip support
 getPlatform().copyFromLocal( inputFileUnexpectedEndOfFile );
 Tap source = getPlatform().getTextFile( inputFileUnexpectedEndOfFile );
 Tap sink = getPlatform().getTextFile( getOutputPath( getTestName() ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 TupleEntrySchemeIteratorProps.setPermittedExceptions( properties, java.io.EOFException.class );
 Pipe pipe = new Pipe( "data" );
 pipe = new Each( pipe, new Identity() );
 FlowDef flowDef = FlowDef.flowDef().addSource( pipe, source ).addTailSink( pipe, sink );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( flowDef );
 flow.complete();
 validateLength( flow.openSink(), 307 );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSinkUnknown() throws IOException
 {
 getPlatform().copyFromLocal( inputFileCross );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.RESULTS );
 Tap sink = getPlatform().getTabDelimitedFile( Fields.UNKNOWN, getOutputPath( "unknownsinks" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 37, null );
 TupleEntryIterator iterator = flow.openSink();
 String line = iterator.next().getTuple().toString();
 assertTrue( "not equal: wrong values: " + line, line.matches( "[0-9]\t[a-z]\t[A-Z]" ) );
 iterator.close();
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSinkUnknown() throws IOException
 {
 getPlatform().copyFromLocal( inputFileCross );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.RESULTS );
 Tap sink = getPlatform().getTabDelimitedFile( Fields.UNKNOWN, getOutputPath( "unknownsinks" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 37, null );
 TupleEntryIterator iterator = flow.openSink();
 String line = iterator.next().getTuple().toString();
 assertTrue( "not equal: wrong values: " + line, line.matches( "[0-9]\t[a-z]\t[A-Z]" ) );
 iterator.close();
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSinkDeclaredFields() throws IOException
 {
 getPlatform().copyFromLocal( inputFileCross );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.ALL );
 Tap sink = getPlatform().getTextFile( new Fields( "line" ), new Fields( "second", "first", "third" ), getOutputPath( "declaredsinks" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 37, null );
 TupleEntryIterator iterator = flow.openSink();
 String line = iterator.next().getString( 0 );
 assertTrue( "not equal: wrong values", line.matches( "[a-z]\t[0-9]\t[A-Z]" ) );
 iterator.close();
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSinkDeclaredFields() throws IOException
 {
 getPlatform().copyFromLocal( inputFileCross );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.ALL );
 Tap sink = getPlatform().getTextFile( new Fields( "line" ), new Fields( "second", "first", "third" ), getOutputPath( "declaredsinks" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 37, null );
 TupleEntryIterator iterator = flow.openSink();
 String line = iterator.next().getString( 0 );
 assertTrue( "not equal: wrong values", line.matches( "[a-z]\t[0-9]\t[A-Z]" ) );
 iterator.close();
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testCountAll() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "count" ), "\t",
  new Class[]{Integer.TYPE}, getOutputPath( "countall" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "count" );
 CountBy countBy = new CountBy( new Fields( "char" ), new Fields( "count" ) );
 pipe = new AggregateBy( pipe, Fields.NONE, 2, countBy );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 1, 1, Pattern.compile( "^\\d+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( 13 )
 };
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], iterator.next().getTuple() );
 iterator.close();
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testCountAll() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "count" ), "\t",
  new Class[]{Integer.TYPE}, getOutputPath( "countall" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "count" );
 CountBy countBy = new CountBy( new Fields( "char" ), new Fields( "count" ) );
 pipe = new AggregateBy( pipe, Fields.NONE, 2, countBy );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 1, 1, Pattern.compile( "^\\d+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( 13 )
 };
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], iterator.next().getTuple() );
 iterator.close();
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSumBy() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "sum" ), "\t",
  new Class[]{String.class, Integer.TYPE}, getOutputPath( "sum" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "sum" );
 pipe = new SumBy( pipe, new Fields( "char" ), new Fields( "num" ), new Fields( "sum" ), long.class, 2 );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s\\d+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( "a", 6 ),
  new Tuple( "b", 12 ),
  new Tuple( "c", 10 ),
  new Tuple( "d", 6 ),
  new Tuple( "e", 5 ),
  };
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], iterator.next().getTuple() );
 iterator.close();
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testMinBy() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "min" ), "\t",
  new Class[]{String.class, Integer.TYPE}, getOutputPath( "minby" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "min" );
 pipe = new MinBy( pipe, new Fields( "char" ), new Fields( "num" ), new Fields( "min" ), 2 );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s\\d+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( "a", 1 ),
  new Tuple( "b", 1 ),
  new Tuple( "c", 1 ),
  new Tuple( "d", 2 ),
  new Tuple( "e", 5 ),
  };
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], iterator.next().getTuple() );
 iterator.close();
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testMaxBy() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "max" ), "\t",
  new Class[]{String.class, Integer.TYPE}, getOutputPath( "maxby" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "max" );
 pipe = new MaxBy( pipe, new Fields( "char" ), new Fields( "num" ), new Fields( "max" ), 2 );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s\\d+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( "a", 5 ),
  new Tuple( "b", 5 ),
  new Tuple( "c", 4 ),
  new Tuple( "d", 4 ),
  new Tuple( "e", 5 ),
  };
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], iterator.next().getTuple() );
 iterator.close();
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testAverageBy() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "average" ), "\t",
  new Class[]{String.class, Double.TYPE}, getOutputPath( "average" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "average" );
 pipe = new AverageBy( pipe, new Fields( "char" ), new Fields( "num" ), new Fields( "average" ), 2 );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s[\\d.]+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( "a", (double) 6 / 2 ),
  new Tuple( "b", (double) 12 / 4 ),
  new Tuple( "c", (double) 10 / 4 ),
  new Tuple( "d", (double) 6 / 2 ),
  new Tuple( "e", (double) 5 / 1 ),
  };
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], iterator.next().getTuple() );
 iterator.close();
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testCountCount() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "count", "count2" ), "\t",
  new Class[]{Integer.TYPE, Integer.TYPE}, getOutputPath( "countcount" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "count" );
 pipe = new CountBy( pipe, new Fields( "char" ), new Fields( "count" ), 2 );
 pipe = new CountBy( pipe, new Fields( "count" ), new Fields( "count2" ), 2 );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 3, 2, Pattern.compile( "^\\d+\\s\\d+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( 1, 1 ),
  new Tuple( 2, 2 ),
  new Tuple( 4, 2 ),
  };
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], iterator.next().getTuple() );
 iterator.close();
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testCountCount() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "count", "count2" ), "\t",
  new Class[]{Integer.TYPE, Integer.TYPE}, getOutputPath( "countcount" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "count" );
 pipe = new CountBy( pipe, new Fields( "char" ), new Fields( "count" ), 2 );
 pipe = new CountBy( pipe, new Fields( "count" ), new Fields( "count2" ), 2 );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 3, 2, Pattern.compile( "^\\d+\\s\\d+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( 1, 1 ),
  new Tuple( 2, 2 ),
  new Tuple( 4, 2 ),
  };
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], iterator.next().getTuple() );
 iterator.close();
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testNullsFromScheme() throws IOException
 {
 getPlatform().copyFromLocal( inputFileComments );
 Tap source = new Hfs( new CommentScheme( new Fields( "line" ) ), inputFileComments );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Identity() );
 Tap sink = new Hfs( new TextLine( 1 ), getOutputPath( "testnulls" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, null );
 TupleEntryIterator iterator = flow.openSink();
 assertEquals( "not equal: tuple.get(1)", "1 a", iterator.next().getObject( 1 ) );
 iterator.close();
 // confirm the tuple iterator can handle nulls from the source
 validateLength( flow.openSource(), 5 );
 }

代码示例来源:origin: cascading/cascading-hadoop2-common

@Test
public void testNullsFromScheme() throws IOException
 {
 getPlatform().copyFromLocal( inputFileComments );
 Tap source = new Hfs( new CommentScheme( new Fields( "line" ) ), inputFileComments );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Identity() );
 Tap sink = new Hfs( new TextLine( 1 ), getOutputPath( "testnulls" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( source, sink, pipe );
 flow.complete();
 validateLength( flow, 5, null );
 TupleEntryIterator iterator = flow.openSink();
 assertEquals( "not equal: tuple.get(1)", "1 a", iterator.next().getObject( 1 ) );
 iterator.close();
 // confirm the tuple iterator can handle nulls from the source
 validateLength( flow.openSource(), 5 );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSimple() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache200 );
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache200 );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new Each( pipe, new Fields( "ip" ), new Stop( new Limit( 100 ) ) );
 Tap sink = getPlatform().getTextFile( getOutputPath(), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow.openSink(), 100 );
 assertEquals( 100, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Written ) );
 assertEquals( 101, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Read ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSimpleGroup() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache200 );
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache200 );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 pipe = new Each( pipe, new Fields( "ip" ), new Stop( new Limit( 100 ) ) );
 Tap sink = getPlatform().getTextFile( getOutputPath(), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow.openSink(), 100 );
 assertEquals( 100, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Written ) );
 assertEquals( 200, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Read ) );
 }

相关文章