
x33g5p2x  于2022-01-29 转载在 其他  



[英]Class TupleEntry allows a Tuple instance and its declaring Fields instance to be used as a single object.

Once a TupleEntry is created, its Fields cannot be changed, but the Tuple instance it holds can be replaced or modified. The managed Tuple should not have elements added or removed, as this will break the relationship with the associated Fields instance.

If type information is provided on the Fields instance, all setters on this class will use that information to coerce the given object to the expected type.

For example, if position is is of type long, then entry.setString(0, "9" ) will coerce the "9" to a long 9. Thus, entry.getObject(0) == 9l.

No coercion is performed with the #getObject(Comparable) and #getObject(int) methods.

To set a value without coercion, see the #setRaw(Comparable,Object) and #setRaw(int,Object)methods.


代码示例来源:origin: elastic/elasticsearch-hadoop

static Tuple coerceToString(SinkCall<?, ?> sinkCall) {
    TupleEntry entry = sinkCall.getOutgoingEntry();
    Fields fields = entry.getFields();
    Tuple tuple = entry.getTuple();
    if (fields.hasTypes()) {
      Type types[] = new Type[fields.size()];
      for (int index = 0; index < fields.size(); index++) {
        Type type = fields.getType(index);
        if (type instanceof CoercibleType<?>) {
          types[index] = String.class;
        else {
          types[index] = type;
      tuple = entry.getCoercedTuple(types);
    return tuple;

代码示例来源:origin: elastic/elasticsearch-hadoop

@SuppressWarnings({ "rawtypes" })
protected Object extractField(Object target) {
  List<String> fieldNames = getFieldNames();
  for (int i = 0; i < fieldNames.size(); i++) {
    if (target instanceof SinkCall) {
      target = ((SinkCall) target).getOutgoingEntry().getObject(fieldNames.get(i));
      if (target == null) {
        return NOT_FOUND;
    else {
      return NOT_FOUND;
  return target;

代码示例来源:origin: elastic/elasticsearch-hadoop

if (entry.getFields().isDefined()) {
  for (Comparable<?> field : entry.getFields()) {
    Object result = data;
    entry.setObject(field, result);
  List<Object> elements = Tuple.elements(entry.getTuple());

代码示例来源:origin: elastic/elasticsearch-hadoop

static void setObject(TupleEntry entry, Comparable<?> field, Object object) {
  if (object != null && entry.getFields().getType(field) instanceof CoercibleType) {
    entry.setObject(field, object.toString());
  else {
    entry.setObject(field, object);

代码示例来源:origin: elastic/elasticsearch-hadoop

if (entry.getFields().isDefined()) {
  for (Comparable<?> field : entry.getFields()) {
  List<Object> elements = Tuple.elements(entry.getTuple());

代码示例来源:origin: cwensel/cascading

public void testAverageBy() throws IOException
 getPlatform().copyFromLocal( inputFileLhs );
 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "average" ), "\t",
  new Class[]{String.class, Double.TYPE}, getOutputPath( "average" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "average" );
 pipe = new AverageBy( pipe, new Fields( "char" ), new Fields( "num" ), new Fields( "average" ), 2 );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s[\\d.]+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( "a", (double) 6 / 2 ),
  new Tuple( "b", (double) 12 / 4 ),
  new Tuple( "c", (double) 10 / 4 ),
  new Tuple( "d", (double) 6 / 2 ),
  new Tuple( "e", (double) 5 / 1 ),
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], );

代码示例来源:origin: Big-Data-Manning/big-data-code

public void operate(FlowProcess process, FunctionCall call) {
    String url = call.getArguments().getString(0);
    String gran = call.getArguments().getString(1);
    Integer bucket = call.getArguments().getInteger(2);
    String keyStr = url + "/" + gran + "-" + bucket;
    try {
        new Tuple(keyStr.getBytes("UTF-8")));
    } catch(UnsupportedEncodingException e) {
      throw new RuntimeException(e);

代码示例来源:origin: cwensel/cascading

@Test(expected = OperationException.class)
public void testGetMissingFail() throws Exception
 TupleEntry entry = new TupleEntry( new Fields( "json", JSONCoercibleType.TYPE ), Tuple.size( 1 ) );
 entry.setObject( 0, JSONData.nested );
 JSONGetFunction function = new JSONGetFunction( new Fields( "result" ), true, "/person/foobar" );
 invokeFunction( function, entry, new Fields( "result" ) );

代码示例来源:origin: Big-Data-Manning/big-data-code

public void operate(FlowProcess process, FunctionCall call) {
    Object node1 = call.getArguments().getObject(0);
    Object node2 = call.getArguments().getObject(1);
    if(!node1.equals(node2)) {
        new Tuple(node1, node2));
        new Tuple(node2, node1));

代码示例来源:origin: cwensel/cascading

public void operate( FlowProcess flowProcess, FunctionCall functionCall )
 TupleEntry input = functionCall.getArguments();
 functionCall.getOutputCollector().add( new Tuple( Math.pow( input.getTuple().getDouble( 0 ) - input.getTuple().getDouble( 1 ), 2 ) ) );

代码示例来源:origin: cwensel/cascading

public void testSinkUnknown() throws IOException
 getPlatform().copyFromLocal( inputFileCross );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.RESULTS );
 Tap sink = getPlatform().getTabDelimitedFile( Fields.UNKNOWN, getOutputPath( "unknownsinks" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 validateLength( flow, 37, null );
 TupleEntryIterator iterator = flow.openSink();
 String line =;
 assertTrue( "not equal: wrong values: " + line, line.matches( "[0-9]\t[a-z]\t[A-Z]" ) );

代码示例来源:origin: cwensel/cascading

public void testNullsFromScheme() throws IOException
 getPlatform().copyFromLocal( inputFileComments );
 Tap source = new Hfs( new CommentScheme( new Fields( "line" ) ), inputFileComments );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Identity() );
 Tap sink = new Hfs( new TextLine( 1 ), getOutputPath( "testnulls" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( source, sink, pipe );
 validateLength( flow, 5, null );
 TupleEntryIterator iterator = flow.openSink();
 assertEquals( "not equal: tuple.get(1)", "1 a", 1 ) );
 // confirm the tuple iterator can handle nulls from the source
 validateLength( flow.openSource(), 5 );

代码示例来源:origin: cwensel/cascading

private TupleEntry getEntry( Comparable lhs, Comparable rhs )
 Fields fields = new Fields( "a", "b" );
 Tuple parameters = new Tuple( lhs, rhs );
 return new TupleEntry( fields, parameters );

代码示例来源:origin: cwensel/cascading

public void testSinkDeclaredFields() throws IOException
 getPlatform().copyFromLocal( inputFileCross );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.ALL );
 Tap sink = getPlatform().getTextFile( new Fields( "line" ), new Fields( "second", "first", "third" ), getOutputPath( "declaredsinks" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 validateLength( flow, 37, null );
 TupleEntryIterator iterator = flow.openSink();
 String line = 0 );
 assertTrue( "not equal: wrong values", line.matches( "[a-z]\t[0-9]\t[A-Z]" ) );

代码示例来源:origin: LiveRamp/cascading_ext

protected void emitTuple(CombinerAggregatorContext context, AggregatorCall aggregatorCall) {
  TupleEntry output = new TupleEntry(outputFields);

  Tuple result = context.getAggregateTuple();
  MultiCombiner.populateOutputTupleEntry(context.getDefinition(), output, result);


代码示例来源:origin: cwensel/cascading

private void useSize( TupleEntry input, TupleEntryCollector outputCollector )
 LOG.debug( "using size: {}", size );
 Tuple tuple = new Tuple( input.getTuple() ); // make clone
 Tuple group = tuple.remove( input.getFields(), groupFieldSelector );
 for( int i = 0; i < tuple.size(); i = i + size )
  Tuple result = new Tuple( group );
  result.addAll( tuple.get( Fields.offsetSelector( size, i ).getPos() ) );
  outputCollector.add( result );

代码示例来源:origin: cwensel/cascading

public void testSetNull()
 TupleEntry entryA = new TupleEntry( new Fields( "a", "b", "c" ), new Tuple( "a", "b", "c" ) );
 entryA.setTuple( null );
 assertTrue( entryA.getTuple() == null );

代码示例来源:origin: cwensel/cascading

public void operate( FlowProcess flowProcess, FunctionCall functionCall )
 Set<Tuple> set = new TreeSet<Tuple>();
 TupleEntry input = functionCall.getArguments();
 for( Fields field : fields )
  set.add( input.selectTuple( field ) );
 int i = 0;
 Tuple inputCopy = new Tuple( input.getTuple() );
 for( Tuple tuple : set )
  inputCopy.put( input.getFields(), fields[ i++ ], tuple );
 functionCall.getOutputCollector().add( inputCopy );

代码示例来源:origin: cwensel/cascading

public void doAssert( FlowProcess flowProcess, ValueAssertionCall assertionCall )
 TupleEntry input = assertionCall.getArguments();
 int pos = 0;
 for( Object element : input.getTuple() )
  if( !value.equals( element ) )
   fail( input.getFields().get( pos ), element, value, input.getTuple().print() );


public Builder<C> addTuple(Object... values) {
 values = FieldTypeValidator.validateValues(fieldMask, values);
 TupleEntry newTuple = new TupleEntry(fields, Tuple.size(fields.size()));
 newTuple.setTuple(fieldMask, new Tuple(values));
 return this;
