
x33g5p2x  于2022-01-29 转载在 其他  



[英]Method selectTuple selects the fields specified in the selector from this instance. If Fields#ALL or the same fields as declared are given, this.getTuple() will be returned.

The returned Tuple will be either modifiable or unmodifiable, depending on the state of this TupleEntry instance.

See #selectTupleCopy(Fields) to guarantee a copy suitable for modifying or caching/storing in a collection.

Note this is a bug fix and change from 2.0 and 2.1. In previous versions the modifiable state was dependent on the given selector.


代码示例来源:origin: cwensel/cascading

 * Method tupleStream returns a {@link Stream} of {@link Tuple} instances from the given
 * Tap instance.
 * <p>
 * Also see {@link cascading.tuple.TupleStream#tupleStream(Tap, FlowProcess, Fields)}.
 * @param flowProcess represents the current platform configuration
 * @param selector    the fields to select from the underlying Tuple
 * @return a Stream of TupleE instances
public Stream<Tuple> tupleStream( FlowProcess<? extends Config> flowProcess, Fields selector )
 return entryStream( flowProcess ).map( tupleEntry -> tupleEntry.selectTuple( selector ) );

代码示例来源:origin: cwensel/cascading

public boolean isRemove( FlowProcess flowProcess, FilterCall filterCall )
 Context context = (Logic.Context) filterCall.getContext();
 TupleEntry lhsEntry = context.argumentEntries[ 0 ];
 TupleEntry rhsEntry = context.argumentEntries[ 1 ];
 lhsEntry.setTuple( filterCall.getArguments().selectTuple( argumentSelectors[ 0 ] ) );
 rhsEntry.setTuple( filterCall.getArguments().selectTuple( argumentSelectors[ 1 ] ) );
 boolean lhsResult = filters[ 0 ].isRemove( flowProcess, context.calls[ 0 ] );
 boolean rhsResult = filters[ 1 ].isRemove( flowProcess, context.calls[ 1 ] );
 return lhsResult != rhsResult;


private void flushTupleEntry() {
 if (tupleEntry != null) {
  if (types != null) {
   CoercibleType<?>[] coercions = Coercions.coercibleArray(types.length, types);
   Object[] values = Tuples.asArray(tupleEntry.selectTuple(fields), coercions, types, new Object[types.length]);
   tupleEntry.setTuple(new Tuple(values));

代码示例来源:origin: cwensel/cascading

private Tuple selectTupleFrom( TupleEntry tupleEntry, Fields expectedFields )
  return tupleEntry.selectTuple( expectedFields );
 catch( TupleException exception )
  Fields givenFields = tupleEntry.getFields();
  String string = "given TupleEntry fields: " + givenFields.printVerbose();
  string += " do not match the operation declaredFields: " + expectedFields.printVerbose();
  string += ", operations must emit tuples that match the fields they declare as output";
  throw new TupleException( string, exception );

代码示例来源:origin: cwensel/cascading

public boolean isRemove( FlowProcess flowProcess, FilterCall filterCall )
 TupleEntry arguments = filterCall.getArguments();
 Context context = (Context) filterCall.getContext();
 TupleEntry[] argumentEntries = context.argumentEntries;
 for( int i = 0; i < argumentSelectors.length; i++ )
  Tuple selected = arguments.selectTuple( argumentSelectors[ i ] );
  argumentEntries[ i ].setTuple( selected );
  if( filters[ i ].isRemove( flowProcess, context.calls[ i ] ) )
   return true;
 return false;

代码示例来源:origin: cwensel/cascading

public boolean isRemove( FlowProcess flowProcess, FilterCall filterCall )
 TupleEntry arguments = filterCall.getArguments();
 Context context = (Context) filterCall.getContext();
 TupleEntry[] argumentEntries = context.argumentEntries;
 for( int i = 0; i < argumentSelectors.length; i++ )
  Tuple selected = arguments.selectTuple( argumentSelectors[ i ] );
  argumentEntries[ i ].setTuple( selected );
  if( !filters[ i ].isRemove( flowProcess, context.calls[ i ] ) )
   return false;
 return true;

代码示例来源:origin: com.twitter/maple

public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall)
  throws IOException {
 TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
 OutputCollector outputCollector = sinkCall.getOutput();
 Tuple key = tupleEntry.selectTuple(keyField);
 ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0);
 Put put = new Put(keyBytes.get());
 for (int i = 0; i < valueFields.length; i++) {
  Fields fieldSelector = valueFields[i];
  TupleEntry values = tupleEntry.selectEntry(fieldSelector);
  for (int j = 0; j < values.getFields().size(); j++) {
   Fields fields = values.getFields();
   Tuple tuple = values.getTuple();
   ImmutableBytesWritable valueBytes = (ImmutableBytesWritable) tuple.getObject(j);
   put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get());
 outputCollector.collect(null, put);

代码示例来源:origin: cwensel/cascading

public void operate( FlowProcess flowProcess, FunctionCall functionCall )
 Set<Tuple> set = new TreeSet<Tuple>();
 TupleEntry input = functionCall.getArguments();
 for( Fields field : fields )
  set.add( input.selectTuple( field ) );
 int i = 0;
 Tuple inputCopy = new Tuple( input.getTuple() );
 for( Tuple tuple : set )
  inputCopy.put( input.getFields(), fields[ i++ ], tuple );
 functionCall.getOutputCollector().add( inputCopy );

代码示例来源:origin: cascading/cascading-platform

public void testTupleEntryTupleWriter() throws Exception
 getPlatform().copyFromLocal( inputFileNums20 );
 FlowProcess flowProcess = getPlatform().getFlowProcess();
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", Integer.class ), " ", inputFileNums20 );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "num", Integer.class ), " ", getOutputPath() );
 Stream<Tuple> stream = TupleEntryStream.entryStream( source, flowProcess )
  .map( entry -> entry.selectTuple( new Fields( "num" ) ) );
 Tap result = TupleStream.writeTuple( stream, sink, flowProcess );
 assertEquals( 20, TupleEntryStream.entryStream( result, flowProcess ).count() );

代码示例来源:origin: cwensel/cascading

public void testTupleEntryTupleWriter() throws Exception
 getPlatform().copyFromLocal( inputFileNums20 );
 FlowProcess flowProcess = getPlatform().getFlowProcess();
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", Integer.class ), " ", inputFileNums20 );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "num", Integer.class ), " ", getOutputPath() );
 Stream<Tuple> stream = TupleEntryStream.entryStream( source, flowProcess )
  .map( entry -> entry.selectTuple( new Fields( "num" ) ) );
 Tap result = TupleStream.writeTuple( stream, sink, flowProcess );
 assertEquals( 20, TupleEntryStream.entryStream( result, flowProcess ).count() );
