cascading.flow.Flow.openTapForRead()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(15.6k)|赞(0)|评价(0)|浏览(107)

本文整理了Java中cascading.flow.Flow.openTapForRead()方法的一些代码示例,展示了Flow.openTapForRead()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.openTapForRead()方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:openTapForRead

Flow.openTapForRead介绍

[英]Method openTapForRead return a cascading.tuple.TupleEntryIterator for the given Tap instance.

Note the returned iterator will return the same instance of cascading.tuple.TupleEntry on every call, thus a copy must be made of either the TupleEntry or the underlying Tuple instance if they are to be stored in a Collection.
[中]方法openTapForRead返回一个级联。元组。给定Tap实例的TupleEntryIterator。
注意,返回的迭代器将返回相同的级联实例。元组。因此,如果要将TupleEntry或基础Tuple实例存储在集合中,则必须对它们进行复制。

代码示例

代码示例来源:origin: cwensel/cascading

public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, Fields selector, C collection ) throws IOException
 {
 try( TupleEntryIterator iterator = flow.openTapForRead( tap ) )
  {
  return asCollection( iterator, selector, collection );
  }
 }

代码示例来源:origin: cwensel/cascading

private void assertHeaders( Tap output, Flow flow ) throws IOException
 {
 TupleEntryIterator iterator = flow.openTapForRead( getPlatform().getTextFile( new Fields( "line" ), output.getIdentifier() ) );
 assertEquals( iterator.next().getObject( 0 ), "first,second,third,fourth,fifth" );
 iterator.close();
 }

代码示例来源:origin: cascading/cascading-platform

private void assertHeaders( Tap output, Flow flow ) throws IOException
 {
 TupleEntryIterator iterator = flow.openTapForRead( getPlatform().getTextFile( new Fields( "line" ), output.getIdentifier() ) );
 assertEquals( iterator.next().getObject( 0 ), "first,second,third,fourth,fifth" );
 iterator.close();
 }

代码示例来源:origin: cwensel/cascading

private void runPartitionTest( String postfix ) throws IOException
 {
 getPlatform().copyFromLocal( inputFileCrossX2 );
 Tap source = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), " ", inputFileCrossX2 );
 Tap partitionTap = getPlatform().getDelimitedFile( new Fields( "upper" ), "+", getOutputPath( "/partitioned" ), SinkMode.REPLACE );
 Partition partition = new DelimitedPartition( new Fields( "lower", "number" ), "/", postfix );
 partitionTap = getPlatform().getPartitionTap( partitionTap, partition, 1 );
 Flow firstFlow = getPlatform().getFlowConnector().connect( source, partitionTap, new Pipe( "partition" ) );
 firstFlow.complete();
 Tap sink = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), "+", getOutputPath( "/final" ), SinkMode.REPLACE );
 Flow secondFlow = getPlatform().getFlowConnector().connect( partitionTap, sink, new Pipe( "copy" ) );
 secondFlow.complete();
 Tap test = getPlatform().getTextFile( new Fields( "line" ), partitionTap.getIdentifier().toString() + "/a/1" + postfix );
 validateLength( firstFlow.openTapForRead( test ), 6, Pattern.compile( "[A-Z]" ) );
 test = getPlatform().getTextFile( new Fields( "line" ), partitionTap.getIdentifier().toString() + "/b/2" + postfix );
 validateLength( firstFlow.openTapForRead( test ), 6, Pattern.compile( "[A-Z]" ) );
 List<Tuple> tuples = asList( firstFlow, partitionTap );
 assertEquals( 2, Collections.frequency( tuples, new Tuple( "A", "a", "1" ) ) );
 assertEquals( 2, Collections.frequency( tuples, new Tuple( "B", "b", "2" ) ) );
 test = getPlatform().getTextFile( new Fields( "line" ), sink.getIdentifier() );
 validateLength( secondFlow.openTapForRead( test ), 74, Pattern.compile( "[0-9]\\+[a-z]\\+[A-Z]" ) );
 }

代码示例来源:origin: cascading/cascading-platform

private void runPartitionTest( String postfix ) throws IOException
 {
 getPlatform().copyFromLocal( inputFileCrossX2 );
 Tap source = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), " ", inputFileCrossX2 );
 Tap partitionTap = getPlatform().getDelimitedFile( new Fields( "upper" ), "+", getOutputPath( "/partitioned" ), SinkMode.REPLACE );
 Partition partition = new DelimitedPartition( new Fields( "lower", "number" ), "/", postfix );
 partitionTap = getPlatform().getPartitionTap( partitionTap, partition, 1 );
 Flow firstFlow = getPlatform().getFlowConnector().connect( source, partitionTap, new Pipe( "partition" ) );
 firstFlow.complete();
 Tap sink = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), "+", getOutputPath( "/final" ), SinkMode.REPLACE );
 Flow secondFlow = getPlatform().getFlowConnector().connect( partitionTap, sink, new Pipe( "copy" ) );
 secondFlow.complete();
 Tap test = getPlatform().getTextFile( new Fields( "line" ), partitionTap.getIdentifier().toString() + "/a/1" + postfix );
 validateLength( firstFlow.openTapForRead( test ), 6, Pattern.compile( "[A-Z]" ) );
 test = getPlatform().getTextFile( new Fields( "line" ), partitionTap.getIdentifier().toString() + "/b/2" + postfix );
 validateLength( firstFlow.openTapForRead( test ), 6, Pattern.compile( "[A-Z]" ) );
 List<Tuple> tuples = asList( firstFlow, partitionTap );
 assertEquals( 2, Collections.frequency( tuples, new Tuple( "A", "a", "1" ) ) );
 assertEquals( 2, Collections.frequency( tuples, new Tuple( "B", "b", "2" ) ) );
 test = getPlatform().getTextFile( new Fields( "line" ), sink.getIdentifier() );
 validateLength( secondFlow.openTapForRead( test ), 74, Pattern.compile( "[0-9]\\+[a-z]\\+[A-Z]" ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testMultiSinkTap() throws IOException
 {
 getPlatform().copyFromLocal( inputFileJoined );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileJoined );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new RegexSplitter( new Fields( "number", "lower", "upper" ), "\t" ) );
 Tap lhsSink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "number", "lower" ), getOutputPath( "multisink/lhs" ), SinkMode.REPLACE );
 Tap rhsSink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "number", "upper" ), getOutputPath( "multisink/rhs" ), SinkMode.REPLACE );
 Tap sink = new MultiSinkTap( lhsSink, rhsSink );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow.openTapForRead( lhsSink ), 5 );
 validateLength( flow.openTapForRead( rhsSink ), 5 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testMultiSinkTap() throws IOException
 {
 getPlatform().copyFromLocal( inputFileJoined );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileJoined );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new RegexSplitter( new Fields( "number", "lower", "upper" ), "\t" ) );
 Tap lhsSink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "number", "lower" ), getOutputPath( "multisink/lhs" ), SinkMode.REPLACE );
 Tap rhsSink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "number", "upper" ), getOutputPath( "multisink/rhs" ), SinkMode.REPLACE );
 Tap sink = new MultiSinkTap( lhsSink, rhsSink );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow.openTapForRead( lhsSink ), 5 );
 validateLength( flow.openTapForRead( rhsSink ), 5 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testHeaderFieldsAll() throws IOException
 {
 Tap input = getPlatform().getDelimitedFile( Fields.UNKNOWN, true, true, ",", "\"", null, testDelimitedHeader, SinkMode.KEEP );
 Tap output1 = getPlatform().getDelimitedFile( Fields.ALL, true, true, ",", "\"", null, getOutputPath( "headerfieldsall1" ), SinkMode.REPLACE );
 Tap output2 = getPlatform().getDelimitedFile( Fields.ALL, true, true, ",", "\"", null, getOutputPath( "headerfieldsall2" ), SinkMode.REPLACE );
 Tap output = new MultiSinkTap( output1, output2 );
 Pipe pipe = new Pipe( "pipe" );
 Flow flow = getPlatform().getFlowConnector().connect( input, output, pipe );
 flow.complete();
 Fields fields = new Fields( "first", "second", "third", "fourth", "fifth" );
 TupleEntryIterator iterator = flow.openTapForRead( getPlatform().getDelimitedFile( fields, true, true, ",", "\"", null, output1.getIdentifier(), SinkMode.REPLACE ) );
 validateLength( iterator, 13, 5 );
 assertHeaders( output1, flow );
 assertHeaders( output2, flow );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testHeaderFieldsAll() throws IOException
 {
 Tap input = getPlatform().getDelimitedFile( Fields.UNKNOWN, true, true, ",", "\"", null, testDelimitedHeader, SinkMode.KEEP );
 Tap output1 = getPlatform().getDelimitedFile( Fields.ALL, true, true, ",", "\"", null, getOutputPath( "headerfieldsall1" ), SinkMode.REPLACE );
 Tap output2 = getPlatform().getDelimitedFile( Fields.ALL, true, true, ",", "\"", null, getOutputPath( "headerfieldsall2" ), SinkMode.REPLACE );
 Tap output = new MultiSinkTap( output1, output2 );
 Pipe pipe = new Pipe( "pipe" );
 Flow flow = getPlatform().getFlowConnector().connect( input, output, pipe );
 flow.complete();
 Fields fields = new Fields( "first", "second", "third", "fourth", "fifth" );
 TupleEntryIterator iterator = flow.openTapForRead( getPlatform().getDelimitedFile( fields, true, true, ",", "\"", null, output1.getIdentifier(), SinkMode.REPLACE ) );
 validateLength( iterator, 13, 5 );
 assertHeaders( output1, flow );
 assertHeaders( output2, flow );
 }

代码示例来源:origin: cwensel/cascading

validateLength( flow.openTapForRead( innerSink ), 74 );
validateLength( flow.openTapForRead( outerSink ), 84 );
validateLength( flow.openTapForRead( leftSink ), 74 );
validateLength( flow.openTapForRead( rightSink ), 84 );

代码示例来源:origin: cascading/cascading-platform

validateLength( flow.openTapForRead( innerSink ), 74 );
validateLength( flow.openTapForRead( outerSink ), 84 );
validateLength( flow.openTapForRead( leftSink ), 74 );
validateLength( flow.openTapForRead( rightSink ), 84 );

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTrapTapSourceSink() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Scheme scheme = getPlatform().getTestFailScheme();
 Tap source = getPlatform().getTap( scheme, inputFileApache, SinkMode.KEEP );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTap( scheme, getOutputPath( "trapsourcesink/sink" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "trapsourcesink/trap" ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 // compensate for running in cluster mode
 getPlatform().setNumMapTasks( properties, 1 );
 getPlatform().setNumReduceTasks( properties, 1 );
 getPlatform().setNumGatherPartitionTasks( properties, 1 );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow.openTapForRead( getPlatform().getTextFile( sink.getIdentifier() ) ), 7 );
 validateLength( flow.openTrap(), 2, Pattern.compile( "bad data" ) ); // confirm the payload is written
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSimpleCheckpointTextIntermediate() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new Checkpoint( "checkpoint", pipe );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "checkpoint/sink" ), SinkMode.REPLACE );
 Tap checkpoint = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", "\"", getOutputPath( "checkpoint/tap" ), SinkMode.REPLACE );
 FlowDef flowDef = flowDef()
  .addSource( pipe, source )
  .addTailSink( pipe, sink )
  .addCheckpoint( "checkpoint", checkpoint );
 Flow flow = getPlatform().getFlowConnector().connect( flowDef );
 flow.complete();
 validateLength( flow, 8 );
 if( !( getPlatform().isMapReduce() ) )
  return;
 List<FlowStep> steps = flow.getFlowSteps();
 assertEquals( "wrong size", 2, steps.size() );
 validateLength( flow.openTapForRead( checkpoint ), 10 );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSimpleCheckpointTextIntermediate() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new Checkpoint( "checkpoint", pipe );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "checkpoint/sink" ), SinkMode.REPLACE );
 Tap checkpoint = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", "\"", getOutputPath( "checkpoint/tap" ), SinkMode.REPLACE );
 FlowDef flowDef = flowDef()
  .addSource( pipe, source )
  .addTailSink( pipe, sink )
  .addCheckpoint( "checkpoint", checkpoint );
 Flow flow = getPlatform().getFlowConnector().connect( flowDef );
 flow.complete();
 validateLength( flow, 8 );
 if( !( getPlatform().isMapReduce() ) )
  return;
 List<FlowStep> steps = flow.getFlowSteps();
 assertEquals( "wrong size", 2, steps.size() );
 validateLength( flow.openTapForRead( checkpoint ), 10 );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTrapTapSourceSink() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Scheme scheme = getPlatform().getTestFailScheme();
 Tap source = getPlatform().getTap( scheme, inputFileApache, SinkMode.KEEP );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTap( scheme, getOutputPath( "trapsourcesink/sink" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "trapsourcesink/trap" ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 // compensate for running in cluster mode
 getPlatform().setNumMapTasks( properties, 1 );
 getPlatform().setNumReduceTasks( properties, 1 );
 getPlatform().setNumGatherPartitionTasks( properties, 1 );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow.openTapForRead( getPlatform().getTextFile( sink.getIdentifier() ) ), 7 );
 validateLength( flow.openTrap(), 2, Pattern.compile( "bad data" ) ); // confirm the payload is written
 }

代码示例来源:origin: cwensel/cascading

validateLength( flow.openTapForRead( innerSinkLhs ), 3900 );
validateLength( flow.openTapForRead( uniqueSinkLhs ), 15 );
validateLength( flow.openTapForRead( innerSinkRhs ), 3900 );
validateLength( flow.openTapForRead( uniqueSinkRhs ), 15 );

代码示例来源:origin: cascading/cascading-platform

validateLength( flow.openTapForRead( innerSinkLhs ), 3900 );
validateLength( flow.openTapForRead( uniqueSinkLhs ), 15 );
validateLength( flow.openTapForRead( innerSinkRhs ), 3900 );
validateLength( flow.openTapForRead( uniqueSinkRhs ), 15 );

相关文章