cascading.tuple.TupleEntry.selectTuple()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(6.5k)|赞(0)|评价(0)|浏览(108)

本文整理了Java中cascading.tuple.TupleEntry.selectTuple()方法的一些代码示例,展示了TupleEntry.selectTuple()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TupleEntry.selectTuple()方法的具体详情如下:
包路径:cascading.tuple.TupleEntry
类名称:TupleEntry
方法名:selectTuple

TupleEntry.selectTuple介绍

[英]Method selectTuple selects the fields specified in the selector from this instance. If Fields#ALL or the same fields as declared are given, this.getTuple() will be returned.

The returned Tuple will be either modifiable or unmodifiable, depending on the state of this TupleEntry instance.

See #selectTupleCopy(Fields) to guarantee a copy suitable for modifying or caching/storing in a collection.

Note this is a bug fix and change from 2.0 and 2.1. In previous versions the modifiable state was dependent on the given selector.
[中]方法selectTuple从该实例中选择选择器中指定的字段。如果字段#给出了声明的所有或相同的字段,则。将返回getTuple()。
根据TupleEntry实例的状态,返回的元组将是可修改的或不可修改的。
请参阅#选择TupleCopy(字段)以确保副本适合在集合中修改或缓存/存储。
请注意,这是一个错误修复和2.0和2.1版本的更改。在以前的版本中,可修改状态取决于给定的选择器。

代码示例

代码示例来源:origin: cwensel/cascading

/**
 * Method tupleStream returns a {@link Stream} of {@link Tuple} instances from the given
 * Tap instance.
 * <p>
 * Also see {@link cascading.tuple.TupleStream#tupleStream(Tap, FlowProcess, Fields)}.
 *
 * @param flowProcess represents the current platform configuration
 * @param selector    the fields to select from the underlying Tuple
 * @return a Stream of TupleE instances
 */
public Stream<Tuple> tupleStream( FlowProcess<? extends Config> flowProcess, Fields selector )
 {
 return entryStream( flowProcess ).map( tupleEntry -> tupleEntry.selectTuple( selector ) );
 }

代码示例来源:origin: cwensel/cascading

@Override
public boolean isRemove( FlowProcess flowProcess, FilterCall filterCall )
 {
 Context context = (Logic.Context) filterCall.getContext();
 TupleEntry lhsEntry = context.argumentEntries[ 0 ];
 TupleEntry rhsEntry = context.argumentEntries[ 1 ];
 lhsEntry.setTuple( filterCall.getArguments().selectTuple( argumentSelectors[ 0 ] ) );
 rhsEntry.setTuple( filterCall.getArguments().selectTuple( argumentSelectors[ 1 ] ) );
 boolean lhsResult = filters[ 0 ].isRemove( flowProcess, context.calls[ 0 ] );
 boolean rhsResult = filters[ 1 ].isRemove( flowProcess, context.calls[ 1 ] );
 return lhsResult != rhsResult;
 }
}

代码示例来源:origin: com.hotels/plunger

private void flushTupleEntry() {
 if (tupleEntry != null) {
  if (types != null) {
   CoercibleType<?>[] coercions = Coercions.coercibleArray(types.length, types);
   Object[] values = Tuples.asArray(tupleEntry.selectTuple(fields), coercions, types, new Object[types.length]);
   tupleEntry.setTuple(new Tuple(values));
  }
  list.add(tupleEntry.getTuple());
 }
}

代码示例来源:origin: cwensel/cascading

private Tuple selectTupleFrom( TupleEntry tupleEntry, Fields expectedFields )
 {
 try
  {
  return tupleEntry.selectTuple( expectedFields );
  }
 catch( TupleException exception )
  {
  Fields givenFields = tupleEntry.getFields();
  String string = "given TupleEntry fields: " + givenFields.printVerbose();
  string += " do not match the operation declaredFields: " + expectedFields.printVerbose();
  string += ", operations must emit tuples that match the fields they declare as output";
  throw new TupleException( string, exception );
  }
 }

代码示例来源:origin: cwensel/cascading

@Override
public boolean isRemove( FlowProcess flowProcess, FilterCall filterCall )
 {
 TupleEntry arguments = filterCall.getArguments();
 Context context = (Context) filterCall.getContext();
 TupleEntry[] argumentEntries = context.argumentEntries;
 for( int i = 0; i < argumentSelectors.length; i++ )
  {
  Tuple selected = arguments.selectTuple( argumentSelectors[ i ] );
  argumentEntries[ i ].setTuple( selected );
  if( filters[ i ].isRemove( flowProcess, context.calls[ i ] ) )
   return true;
  }
 return false;
 }
}

代码示例来源:origin: cwensel/cascading

@Override
public boolean isRemove( FlowProcess flowProcess, FilterCall filterCall )
 {
 TupleEntry arguments = filterCall.getArguments();
 Context context = (Context) filterCall.getContext();
 TupleEntry[] argumentEntries = context.argumentEntries;
 for( int i = 0; i < argumentSelectors.length; i++ )
  {
  Tuple selected = arguments.selectTuple( argumentSelectors[ i ] );
  argumentEntries[ i ].setTuple( selected );
  if( !filters[ i ].isRemove( flowProcess, context.calls[ i ] ) )
   return false;
  }
 return true;
 }
}

代码示例来源:origin: com.twitter/maple

@Override
public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall)
  throws IOException {
 TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
 OutputCollector outputCollector = sinkCall.getOutput();
 Tuple key = tupleEntry.selectTuple(keyField);
 ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0);
 Put put = new Put(keyBytes.get());
 for (int i = 0; i < valueFields.length; i++) {
  Fields fieldSelector = valueFields[i];
  TupleEntry values = tupleEntry.selectEntry(fieldSelector);
  for (int j = 0; j < values.getFields().size(); j++) {
   Fields fields = values.getFields();
   Tuple tuple = values.getTuple();
   ImmutableBytesWritable valueBytes = (ImmutableBytesWritable) tuple.getObject(j);
   put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get());
  }
 }
 outputCollector.collect(null, put);
}

代码示例来源:origin: cwensel/cascading

public void operate( FlowProcess flowProcess, FunctionCall functionCall )
 {
 Set<Tuple> set = new TreeSet<Tuple>();
 TupleEntry input = functionCall.getArguments();
 for( Fields field : fields )
  set.add( input.selectTuple( field ) );
 int i = 0;
 Tuple inputCopy = new Tuple( input.getTuple() );
 for( Tuple tuple : set )
  inputCopy.put( input.getFields(), fields[ i++ ], tuple );
 functionCall.getOutputCollector().add( inputCopy );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTupleEntryTupleWriter() throws Exception
 {
 getPlatform().copyFromLocal( inputFileNums20 );
 FlowProcess flowProcess = getPlatform().getFlowProcess();
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", Integer.class ), " ", inputFileNums20 );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "num", Integer.class ), " ", getOutputPath() );
 Stream<Tuple> stream = TupleEntryStream.entryStream( source, flowProcess )
  .map( entry -> entry.selectTuple( new Fields( "num" ) ) );
 Tap result = TupleStream.writeTuple( stream, sink, flowProcess );
 assertEquals( 20, TupleEntryStream.entryStream( result, flowProcess ).count() );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTupleEntryTupleWriter() throws Exception
 {
 getPlatform().copyFromLocal( inputFileNums20 );
 FlowProcess flowProcess = getPlatform().getFlowProcess();
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", Integer.class ), " ", inputFileNums20 );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "num", Integer.class ), " ", getOutputPath() );
 Stream<Tuple> stream = TupleEntryStream.entryStream( source, flowProcess )
  .map( entry -> entry.selectTuple( new Fields( "num" ) ) );
 Tap result = TupleStream.writeTuple( stream, sink, flowProcess );
 assertEquals( 20, TupleEntryStream.entryStream( result, flowProcess ).count() );
 }

相关文章