本文整理了Java中cascading.flow.Flow.openTapForRead()
方法的一些代码示例,展示了Flow.openTapForRead()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.openTapForRead()
方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:openTapForRead
[英]Method openTapForRead return a cascading.tuple.TupleEntryIterator for the given Tap instance.
Note the returned iterator will return the same instance of cascading.tuple.TupleEntry on every call, thus a copy must be made of either the TupleEntry or the underlying Tuple instance if they are to be stored in a Collection.
[中]方法openTapForRead返回一个级联。元组。给定Tap实例的TupleEntryIterator。
注意,返回的迭代器将返回相同的级联实例。元组。因此,如果要将TupleEntry或基础Tuple实例存储在集合中,则必须对它们进行复制。
代码示例来源:origin: cwensel/cascading
public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, Fields selector, C collection ) throws IOException
{
try( TupleEntryIterator iterator = flow.openTapForRead( tap ) )
{
return asCollection( iterator, selector, collection );
}
}
代码示例来源:origin: cwensel/cascading
private void assertHeaders( Tap output, Flow flow ) throws IOException
{
TupleEntryIterator iterator = flow.openTapForRead( getPlatform().getTextFile( new Fields( "line" ), output.getIdentifier() ) );
assertEquals( iterator.next().getObject( 0 ), "first,second,third,fourth,fifth" );
iterator.close();
}
代码示例来源:origin: cascading/cascading-platform
private void assertHeaders( Tap output, Flow flow ) throws IOException
{
TupleEntryIterator iterator = flow.openTapForRead( getPlatform().getTextFile( new Fields( "line" ), output.getIdentifier() ) );
assertEquals( iterator.next().getObject( 0 ), "first,second,third,fourth,fifth" );
iterator.close();
}
代码示例来源:origin: cwensel/cascading
private void runPartitionTest( String postfix ) throws IOException
{
getPlatform().copyFromLocal( inputFileCrossX2 );
Tap source = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), " ", inputFileCrossX2 );
Tap partitionTap = getPlatform().getDelimitedFile( new Fields( "upper" ), "+", getOutputPath( "/partitioned" ), SinkMode.REPLACE );
Partition partition = new DelimitedPartition( new Fields( "lower", "number" ), "/", postfix );
partitionTap = getPlatform().getPartitionTap( partitionTap, partition, 1 );
Flow firstFlow = getPlatform().getFlowConnector().connect( source, partitionTap, new Pipe( "partition" ) );
firstFlow.complete();
Tap sink = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), "+", getOutputPath( "/final" ), SinkMode.REPLACE );
Flow secondFlow = getPlatform().getFlowConnector().connect( partitionTap, sink, new Pipe( "copy" ) );
secondFlow.complete();
Tap test = getPlatform().getTextFile( new Fields( "line" ), partitionTap.getIdentifier().toString() + "/a/1" + postfix );
validateLength( firstFlow.openTapForRead( test ), 6, Pattern.compile( "[A-Z]" ) );
test = getPlatform().getTextFile( new Fields( "line" ), partitionTap.getIdentifier().toString() + "/b/2" + postfix );
validateLength( firstFlow.openTapForRead( test ), 6, Pattern.compile( "[A-Z]" ) );
List<Tuple> tuples = asList( firstFlow, partitionTap );
assertEquals( 2, Collections.frequency( tuples, new Tuple( "A", "a", "1" ) ) );
assertEquals( 2, Collections.frequency( tuples, new Tuple( "B", "b", "2" ) ) );
test = getPlatform().getTextFile( new Fields( "line" ), sink.getIdentifier() );
validateLength( secondFlow.openTapForRead( test ), 74, Pattern.compile( "[0-9]\\+[a-z]\\+[A-Z]" ) );
}
代码示例来源:origin: cascading/cascading-platform
private void runPartitionTest( String postfix ) throws IOException
{
getPlatform().copyFromLocal( inputFileCrossX2 );
Tap source = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), " ", inputFileCrossX2 );
Tap partitionTap = getPlatform().getDelimitedFile( new Fields( "upper" ), "+", getOutputPath( "/partitioned" ), SinkMode.REPLACE );
Partition partition = new DelimitedPartition( new Fields( "lower", "number" ), "/", postfix );
partitionTap = getPlatform().getPartitionTap( partitionTap, partition, 1 );
Flow firstFlow = getPlatform().getFlowConnector().connect( source, partitionTap, new Pipe( "partition" ) );
firstFlow.complete();
Tap sink = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), "+", getOutputPath( "/final" ), SinkMode.REPLACE );
Flow secondFlow = getPlatform().getFlowConnector().connect( partitionTap, sink, new Pipe( "copy" ) );
secondFlow.complete();
Tap test = getPlatform().getTextFile( new Fields( "line" ), partitionTap.getIdentifier().toString() + "/a/1" + postfix );
validateLength( firstFlow.openTapForRead( test ), 6, Pattern.compile( "[A-Z]" ) );
test = getPlatform().getTextFile( new Fields( "line" ), partitionTap.getIdentifier().toString() + "/b/2" + postfix );
validateLength( firstFlow.openTapForRead( test ), 6, Pattern.compile( "[A-Z]" ) );
List<Tuple> tuples = asList( firstFlow, partitionTap );
assertEquals( 2, Collections.frequency( tuples, new Tuple( "A", "a", "1" ) ) );
assertEquals( 2, Collections.frequency( tuples, new Tuple( "B", "b", "2" ) ) );
test = getPlatform().getTextFile( new Fields( "line" ), sink.getIdentifier() );
validateLength( secondFlow.openTapForRead( test ), 74, Pattern.compile( "[0-9]\\+[a-z]\\+[A-Z]" ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testMultiSinkTap() throws IOException
{
getPlatform().copyFromLocal( inputFileJoined );
Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileJoined );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new RegexSplitter( new Fields( "number", "lower", "upper" ), "\t" ) );
Tap lhsSink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "number", "lower" ), getOutputPath( "multisink/lhs" ), SinkMode.REPLACE );
Tap rhsSink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "number", "upper" ), getOutputPath( "multisink/rhs" ), SinkMode.REPLACE );
Tap sink = new MultiSinkTap( lhsSink, rhsSink );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow.openTapForRead( lhsSink ), 5 );
validateLength( flow.openTapForRead( rhsSink ), 5 );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testMultiSinkTap() throws IOException
{
getPlatform().copyFromLocal( inputFileJoined );
Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileJoined );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new RegexSplitter( new Fields( "number", "lower", "upper" ), "\t" ) );
Tap lhsSink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "number", "lower" ), getOutputPath( "multisink/lhs" ), SinkMode.REPLACE );
Tap rhsSink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "number", "upper" ), getOutputPath( "multisink/rhs" ), SinkMode.REPLACE );
Tap sink = new MultiSinkTap( lhsSink, rhsSink );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow.openTapForRead( lhsSink ), 5 );
validateLength( flow.openTapForRead( rhsSink ), 5 );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testHeaderFieldsAll() throws IOException
{
Tap input = getPlatform().getDelimitedFile( Fields.UNKNOWN, true, true, ",", "\"", null, testDelimitedHeader, SinkMode.KEEP );
Tap output1 = getPlatform().getDelimitedFile( Fields.ALL, true, true, ",", "\"", null, getOutputPath( "headerfieldsall1" ), SinkMode.REPLACE );
Tap output2 = getPlatform().getDelimitedFile( Fields.ALL, true, true, ",", "\"", null, getOutputPath( "headerfieldsall2" ), SinkMode.REPLACE );
Tap output = new MultiSinkTap( output1, output2 );
Pipe pipe = new Pipe( "pipe" );
Flow flow = getPlatform().getFlowConnector().connect( input, output, pipe );
flow.complete();
Fields fields = new Fields( "first", "second", "third", "fourth", "fifth" );
TupleEntryIterator iterator = flow.openTapForRead( getPlatform().getDelimitedFile( fields, true, true, ",", "\"", null, output1.getIdentifier(), SinkMode.REPLACE ) );
validateLength( iterator, 13, 5 );
assertHeaders( output1, flow );
assertHeaders( output2, flow );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testHeaderFieldsAll() throws IOException
{
Tap input = getPlatform().getDelimitedFile( Fields.UNKNOWN, true, true, ",", "\"", null, testDelimitedHeader, SinkMode.KEEP );
Tap output1 = getPlatform().getDelimitedFile( Fields.ALL, true, true, ",", "\"", null, getOutputPath( "headerfieldsall1" ), SinkMode.REPLACE );
Tap output2 = getPlatform().getDelimitedFile( Fields.ALL, true, true, ",", "\"", null, getOutputPath( "headerfieldsall2" ), SinkMode.REPLACE );
Tap output = new MultiSinkTap( output1, output2 );
Pipe pipe = new Pipe( "pipe" );
Flow flow = getPlatform().getFlowConnector().connect( input, output, pipe );
flow.complete();
Fields fields = new Fields( "first", "second", "third", "fourth", "fifth" );
TupleEntryIterator iterator = flow.openTapForRead( getPlatform().getDelimitedFile( fields, true, true, ",", "\"", null, output1.getIdentifier(), SinkMode.REPLACE ) );
validateLength( iterator, 13, 5 );
assertHeaders( output1, flow );
assertHeaders( output2, flow );
}
代码示例来源:origin: cwensel/cascading
validateLength( flow.openTapForRead( innerSink ), 74 );
validateLength( flow.openTapForRead( outerSink ), 84 );
validateLength( flow.openTapForRead( leftSink ), 74 );
validateLength( flow.openTapForRead( rightSink ), 84 );
代码示例来源:origin: cascading/cascading-platform
validateLength( flow.openTapForRead( innerSink ), 74 );
validateLength( flow.openTapForRead( outerSink ), 84 );
validateLength( flow.openTapForRead( leftSink ), 74 );
validateLength( flow.openTapForRead( rightSink ), 84 );
代码示例来源:origin: cascading/cascading-platform
@Test
public void testTrapTapSourceSink() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Scheme scheme = getPlatform().getTestFailScheme();
Tap source = getPlatform().getTap( scheme, inputFileApache, SinkMode.KEEP );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTap( scheme, getOutputPath( "trapsourcesink/sink" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "trapsourcesink/trap" ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
// compensate for running in cluster mode
getPlatform().setNumMapTasks( properties, 1 );
getPlatform().setNumReduceTasks( properties, 1 );
getPlatform().setNumGatherPartitionTasks( properties, 1 );
Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow.openTapForRead( getPlatform().getTextFile( sink.getIdentifier() ) ), 7 );
validateLength( flow.openTrap(), 2, Pattern.compile( "bad data" ) ); // confirm the payload is written
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSimpleCheckpointTextIntermediate() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new Checkpoint( "checkpoint", pipe );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "checkpoint/sink" ), SinkMode.REPLACE );
Tap checkpoint = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", "\"", getOutputPath( "checkpoint/tap" ), SinkMode.REPLACE );
FlowDef flowDef = flowDef()
.addSource( pipe, source )
.addTailSink( pipe, sink )
.addCheckpoint( "checkpoint", checkpoint );
Flow flow = getPlatform().getFlowConnector().connect( flowDef );
flow.complete();
validateLength( flow, 8 );
if( !( getPlatform().isMapReduce() ) )
return;
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 2, steps.size() );
validateLength( flow.openTapForRead( checkpoint ), 10 );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSimpleCheckpointTextIntermediate() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new Checkpoint( "checkpoint", pipe );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "checkpoint/sink" ), SinkMode.REPLACE );
Tap checkpoint = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", "\"", getOutputPath( "checkpoint/tap" ), SinkMode.REPLACE );
FlowDef flowDef = flowDef()
.addSource( pipe, source )
.addTailSink( pipe, sink )
.addCheckpoint( "checkpoint", checkpoint );
Flow flow = getPlatform().getFlowConnector().connect( flowDef );
flow.complete();
validateLength( flow, 8 );
if( !( getPlatform().isMapReduce() ) )
return;
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 2, steps.size() );
validateLength( flow.openTapForRead( checkpoint ), 10 );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTrapTapSourceSink() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Scheme scheme = getPlatform().getTestFailScheme();
Tap source = getPlatform().getTap( scheme, inputFileApache, SinkMode.KEEP );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTap( scheme, getOutputPath( "trapsourcesink/sink" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "trapsourcesink/trap" ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
// compensate for running in cluster mode
getPlatform().setNumMapTasks( properties, 1 );
getPlatform().setNumReduceTasks( properties, 1 );
getPlatform().setNumGatherPartitionTasks( properties, 1 );
Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow.openTapForRead( getPlatform().getTextFile( sink.getIdentifier() ) ), 7 );
validateLength( flow.openTrap(), 2, Pattern.compile( "bad data" ) ); // confirm the payload is written
}
代码示例来源:origin: cwensel/cascading
validateLength( flow.openTapForRead( innerSinkLhs ), 3900 );
validateLength( flow.openTapForRead( uniqueSinkLhs ), 15 );
validateLength( flow.openTapForRead( innerSinkRhs ), 3900 );
validateLength( flow.openTapForRead( uniqueSinkRhs ), 15 );
代码示例来源:origin: cascading/cascading-platform
validateLength( flow.openTapForRead( innerSinkLhs ), 3900 );
validateLength( flow.openTapForRead( uniqueSinkLhs ), 15 );
validateLength( flow.openTapForRead( innerSinkRhs ), 3900 );
validateLength( flow.openTapForRead( uniqueSinkRhs ), 15 );
内容来源于网络,如有侵权,请联系作者删除!