本文整理了Java中cascading.flow.Flow.openSink()
方法的一些代码示例,展示了Flow.openSink()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.openSink()
方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:openSink
[英]Method openSink opens the first sink Tap.
[中]方法openSink打开第一个接收器抽头。
代码示例来源:origin: cwensel/cascading
public static void validateLength( Flow flow, int numTuples, int tupleSize, Pattern regex, String name ) throws IOException
{
TupleEntryIterator iterator = name == null ? flow.openSink() : flow.openSink( name );
validateLength( iterator, numTuples, tupleSize, regex );
}
代码示例来源:origin: cascading/cascading-jdbc-core
private void verifySink( Flow<?> flow, int expects ) throws IOException
{
int count = 0;
TupleEntryIterator iterator = flow.openSink();
while( iterator.hasNext() )
{
count++;
iterator.next();
}
iterator.close();
assertEquals( "wrong number of values", expects, count );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTupleEntrySchemeIteratorExceptionHandling() throws IOException
{
if( getPlatformName().equals( "local" ) )
return; // no gzip support
getPlatform().copyFromLocal( inputFileUnexpectedEndOfFile );
Tap source = getPlatform().getTextFile( inputFileUnexpectedEndOfFile );
Tap sink = getPlatform().getTextFile( getOutputPath( getTestName() ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
TupleEntrySchemeIteratorProps.setPermittedExceptions( properties, java.io.EOFException.class );
Pipe pipe = new Pipe( "data" );
pipe = new Each( pipe, new Identity() );
FlowDef flowDef = FlowDef.flowDef().addSource( pipe, source ).addTailSink( pipe, sink );
Flow flow = getPlatform().getFlowConnector( properties ).connect( flowDef );
flow.complete();
validateLength( flow.openSink(), 307 );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testTupleEntrySchemeIteratorExceptionHandling() throws IOException
{
if( getPlatformName().equals( "local" ) )
return; // no gzip support
getPlatform().copyFromLocal( inputFileUnexpectedEndOfFile );
Tap source = getPlatform().getTextFile( inputFileUnexpectedEndOfFile );
Tap sink = getPlatform().getTextFile( getOutputPath( getTestName() ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
TupleEntrySchemeIteratorProps.setPermittedExceptions( properties, java.io.EOFException.class );
Pipe pipe = new Pipe( "data" );
pipe = new Each( pipe, new Identity() );
FlowDef flowDef = FlowDef.flowDef().addSource( pipe, source ).addTailSink( pipe, sink );
Flow flow = getPlatform().getFlowConnector( properties ).connect( flowDef );
flow.complete();
validateLength( flow.openSink(), 307 );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSinkUnknown() throws IOException
{
getPlatform().copyFromLocal( inputFileCross );
Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.RESULTS );
Tap sink = getPlatform().getTabDelimitedFile( Fields.UNKNOWN, getOutputPath( "unknownsinks" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 37, null );
TupleEntryIterator iterator = flow.openSink();
String line = iterator.next().getTuple().toString();
assertTrue( "not equal: wrong values: " + line, line.matches( "[0-9]\t[a-z]\t[A-Z]" ) );
iterator.close();
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSinkUnknown() throws IOException
{
getPlatform().copyFromLocal( inputFileCross );
Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.RESULTS );
Tap sink = getPlatform().getTabDelimitedFile( Fields.UNKNOWN, getOutputPath( "unknownsinks" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 37, null );
TupleEntryIterator iterator = flow.openSink();
String line = iterator.next().getTuple().toString();
assertTrue( "not equal: wrong values: " + line, line.matches( "[0-9]\t[a-z]\t[A-Z]" ) );
iterator.close();
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSinkDeclaredFields() throws IOException
{
getPlatform().copyFromLocal( inputFileCross );
Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.ALL );
Tap sink = getPlatform().getTextFile( new Fields( "line" ), new Fields( "second", "first", "third" ), getOutputPath( "declaredsinks" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 37, null );
TupleEntryIterator iterator = flow.openSink();
String line = iterator.next().getString( 0 );
assertTrue( "not equal: wrong values", line.matches( "[a-z]\t[0-9]\t[A-Z]" ) );
iterator.close();
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSinkDeclaredFields() throws IOException
{
getPlatform().copyFromLocal( inputFileCross );
Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.ALL );
Tap sink = getPlatform().getTextFile( new Fields( "line" ), new Fields( "second", "first", "third" ), getOutputPath( "declaredsinks" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 37, null );
TupleEntryIterator iterator = flow.openSink();
String line = iterator.next().getString( 0 );
assertTrue( "not equal: wrong values", line.matches( "[a-z]\t[0-9]\t[A-Z]" ) );
iterator.close();
}
代码示例来源:origin: cwensel/cascading
@Test
public void testCountAll() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
Tap sink = getPlatform().getDelimitedFile( new Fields( "count" ), "\t",
new Class[]{Integer.TYPE}, getOutputPath( "countall" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "count" );
CountBy countBy = new CountBy( new Fields( "char" ), new Fields( "count" ) );
pipe = new AggregateBy( pipe, Fields.NONE, 2, countBy );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 1, 1, Pattern.compile( "^\\d+$" ) );
Tuple[] results = new Tuple[]{
new Tuple( 13 )
};
TupleEntryIterator iterator = flow.openSink();
int count = 0;
while( iterator.hasNext() )
assertEquals( results[ count++ ], iterator.next().getTuple() );
iterator.close();
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testCountAll() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
Tap sink = getPlatform().getDelimitedFile( new Fields( "count" ), "\t",
new Class[]{Integer.TYPE}, getOutputPath( "countall" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "count" );
CountBy countBy = new CountBy( new Fields( "char" ), new Fields( "count" ) );
pipe = new AggregateBy( pipe, Fields.NONE, 2, countBy );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 1, 1, Pattern.compile( "^\\d+$" ) );
Tuple[] results = new Tuple[]{
new Tuple( 13 )
};
TupleEntryIterator iterator = flow.openSink();
int count = 0;
while( iterator.hasNext() )
assertEquals( results[ count++ ], iterator.next().getTuple() );
iterator.close();
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSumBy() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "sum" ), "\t",
new Class[]{String.class, Integer.TYPE}, getOutputPath( "sum" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "sum" );
pipe = new SumBy( pipe, new Fields( "char" ), new Fields( "num" ), new Fields( "sum" ), long.class, 2 );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s\\d+$" ) );
Tuple[] results = new Tuple[]{
new Tuple( "a", 6 ),
new Tuple( "b", 12 ),
new Tuple( "c", 10 ),
new Tuple( "d", 6 ),
new Tuple( "e", 5 ),
};
TupleEntryIterator iterator = flow.openSink();
int count = 0;
while( iterator.hasNext() )
assertEquals( results[ count++ ], iterator.next().getTuple() );
iterator.close();
}
代码示例来源:origin: cwensel/cascading
@Test
public void testMinBy() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "min" ), "\t",
new Class[]{String.class, Integer.TYPE}, getOutputPath( "minby" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "min" );
pipe = new MinBy( pipe, new Fields( "char" ), new Fields( "num" ), new Fields( "min" ), 2 );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s\\d+$" ) );
Tuple[] results = new Tuple[]{
new Tuple( "a", 1 ),
new Tuple( "b", 1 ),
new Tuple( "c", 1 ),
new Tuple( "d", 2 ),
new Tuple( "e", 5 ),
};
TupleEntryIterator iterator = flow.openSink();
int count = 0;
while( iterator.hasNext() )
assertEquals( results[ count++ ], iterator.next().getTuple() );
iterator.close();
}
代码示例来源:origin: cwensel/cascading
@Test
public void testMaxBy() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "max" ), "\t",
new Class[]{String.class, Integer.TYPE}, getOutputPath( "maxby" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "max" );
pipe = new MaxBy( pipe, new Fields( "char" ), new Fields( "num" ), new Fields( "max" ), 2 );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s\\d+$" ) );
Tuple[] results = new Tuple[]{
new Tuple( "a", 5 ),
new Tuple( "b", 5 ),
new Tuple( "c", 4 ),
new Tuple( "d", 4 ),
new Tuple( "e", 5 ),
};
TupleEntryIterator iterator = flow.openSink();
int count = 0;
while( iterator.hasNext() )
assertEquals( results[ count++ ], iterator.next().getTuple() );
iterator.close();
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testAverageBy() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "average" ), "\t",
new Class[]{String.class, Double.TYPE}, getOutputPath( "average" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "average" );
pipe = new AverageBy( pipe, new Fields( "char" ), new Fields( "num" ), new Fields( "average" ), 2 );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s[\\d.]+$" ) );
Tuple[] results = new Tuple[]{
new Tuple( "a", (double) 6 / 2 ),
new Tuple( "b", (double) 12 / 4 ),
new Tuple( "c", (double) 10 / 4 ),
new Tuple( "d", (double) 6 / 2 ),
new Tuple( "e", (double) 5 / 1 ),
};
TupleEntryIterator iterator = flow.openSink();
int count = 0;
while( iterator.hasNext() )
assertEquals( results[ count++ ], iterator.next().getTuple() );
iterator.close();
}
代码示例来源:origin: cwensel/cascading
@Test
public void testCountCount() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
Tap sink = getPlatform().getDelimitedFile( new Fields( "count", "count2" ), "\t",
new Class[]{Integer.TYPE, Integer.TYPE}, getOutputPath( "countcount" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "count" );
pipe = new CountBy( pipe, new Fields( "char" ), new Fields( "count" ), 2 );
pipe = new CountBy( pipe, new Fields( "count" ), new Fields( "count2" ), 2 );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 3, 2, Pattern.compile( "^\\d+\\s\\d+$" ) );
Tuple[] results = new Tuple[]{
new Tuple( 1, 1 ),
new Tuple( 2, 2 ),
new Tuple( 4, 2 ),
};
TupleEntryIterator iterator = flow.openSink();
int count = 0;
while( iterator.hasNext() )
assertEquals( results[ count++ ], iterator.next().getTuple() );
iterator.close();
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testCountCount() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
Tap sink = getPlatform().getDelimitedFile( new Fields( "count", "count2" ), "\t",
new Class[]{Integer.TYPE, Integer.TYPE}, getOutputPath( "countcount" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "count" );
pipe = new CountBy( pipe, new Fields( "char" ), new Fields( "count" ), 2 );
pipe = new CountBy( pipe, new Fields( "count" ), new Fields( "count2" ), 2 );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 3, 2, Pattern.compile( "^\\d+\\s\\d+$" ) );
Tuple[] results = new Tuple[]{
new Tuple( 1, 1 ),
new Tuple( 2, 2 ),
new Tuple( 4, 2 ),
};
TupleEntryIterator iterator = flow.openSink();
int count = 0;
while( iterator.hasNext() )
assertEquals( results[ count++ ], iterator.next().getTuple() );
iterator.close();
}
代码示例来源:origin: cwensel/cascading
@Test
public void testNullsFromScheme() throws IOException
{
getPlatform().copyFromLocal( inputFileComments );
Tap source = new Hfs( new CommentScheme( new Fields( "line" ) ), inputFileComments );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Identity() );
Tap sink = new Hfs( new TextLine( 1 ), getOutputPath( "testnulls" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( source, sink, pipe );
flow.complete();
validateLength( flow, 5, null );
TupleEntryIterator iterator = flow.openSink();
assertEquals( "not equal: tuple.get(1)", "1 a", iterator.next().getObject( 1 ) );
iterator.close();
// confirm the tuple iterator can handle nulls from the source
validateLength( flow.openSource(), 5 );
}
代码示例来源:origin: cascading/cascading-hadoop2-common
@Test
public void testNullsFromScheme() throws IOException
{
getPlatform().copyFromLocal( inputFileComments );
Tap source = new Hfs( new CommentScheme( new Fields( "line" ) ), inputFileComments );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Identity() );
Tap sink = new Hfs( new TextLine( 1 ), getOutputPath( "testnulls" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( source, sink, pipe );
flow.complete();
validateLength( flow, 5, null );
TupleEntryIterator iterator = flow.openSink();
assertEquals( "not equal: tuple.get(1)", "1 a", iterator.next().getObject( 1 ) );
iterator.close();
// confirm the tuple iterator can handle nulls from the source
validateLength( flow.openSource(), 5 );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSimple() throws Exception
{
getPlatform().copyFromLocal( inputFileApache200 );
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache200 );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new Each( pipe, new Fields( "ip" ), new Stop( new Limit( 100 ) ) );
Tap sink = getPlatform().getTextFile( getOutputPath(), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow.openSink(), 100 );
assertEquals( 100, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Written ) );
assertEquals( 101, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Read ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSimpleGroup() throws Exception
{
getPlatform().copyFromLocal( inputFileApache200 );
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache200 );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
pipe = new Each( pipe, new Fields( "ip" ), new Stop( new Limit( 100 ) ) );
Tap sink = getPlatform().getTextFile( getOutputPath(), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow.openSink(), 100 );
assertEquals( 100, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Written ) );
assertEquals( 200, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Read ) );
}
内容来源于网络,如有侵权,请联系作者删除!