backtype.storm.tuple.Tuple类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中backtype.storm.tuple.Tuple类的一些代码示例,展示了Tuple类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tuple类的具体详情如下:
包路径:backtype.storm.tuple.Tuple
类名称:Tuple

Tuple介绍

[英]The tuple is the main data structure in Storm. A tuple is a named list of values, where each value can be any type. Tuples are dynamically typed -- the types of the fields do not need to be declared. Tuples have helper methods like getInteger and getString to get field values without having to cast the result. Storm needs to know how to serialize all the values in a tuple. By default, Storm knows how to serialize the primitive types, strings, and byte arrays. If you want to use another type, you'll need to implement and register a serializer for that type. See https://github.com/nathanmarz/storm/wiki/Serialization for more info.
[中]元组是Storm中的主要数据结构。元组是一个命名的值列表,其中每个值可以是任何类型。元组是动态类型的——不需要声明字段的类型。元组有getInteger和getString等帮助方法,可以在不强制转换结果的情况下获取字段值。Storm需要知道如何序列化元组中的所有值。默认情况下,Storm知道如何序列化基元类型、字符串和字节数组。如果要使用另一种类型,则需要实现并注册该类型的序列化程序。看见https://github.com/nathanmarz/storm/wiki/Serialization了解更多信息。

代码示例

代码示例来源:origin: alibaba/jstorm

@Override
public void execute(Tuple tuple) {
  String sentence = tuple.getString(0);
  for (String word : sentence.split("\\s+")) {
    collector.emit(new Values(word));
  }
}

代码示例来源:origin: alibaba/jstorm

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
  String arg = tuple.getString(0);
  Object retInfo = tuple.getValue(1);
  collector.emit(new Values(arg + "!!!", retInfo));
}

代码示例来源:origin: alibaba/jstorm

@Override
public void execute(Tuple tuple) {
  List<Object> id = tuple.select(_idFields);
  GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
  if (!_pending.containsKey(id)) {
    _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
  }
  Map<GlobalStreamId, Tuple> parts = _pending.get(id);
  if (parts.containsKey(streamId))
    throw new RuntimeException("Received same side of single join twice");
  parts.put(streamId, tuple);
  if (parts.size() == _numSources) {
    _pending.remove(id);
    List<Object> joinResult = new ArrayList<Object>();
    for (String outField : _outFields) {
      GlobalStreamId loc = _fieldLocations.get(outField);
      joinResult.add(parts.get(loc).getValueByField(outField));
    }
    _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
    
    for (Tuple part : parts.values()) {
      _collector.ack(part);
    }
  }
}

代码示例来源:origin: alibaba/jstorm

@Override
public void execute(Tuple tuple) {
  if (tuple.getSourceComponent().equals(_wordComponent)) {
    this.word = tuple.getString(1);
  } else {
    intSet.add(tuple.getInteger(1));
  }
}

代码示例来源:origin: alibaba/jstorm

private long getRootId(Tuple input) {
  int rootIdIndex = input.getValues().size() - 1;
  return (long) input.getValue(rootIdIndex);
}

代码示例来源:origin: alibaba/jstorm

public static boolean isTickTuple(Tuple tuple) {
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
        tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
  }
}

代码示例来源:origin: mayconbordin/storm-applications

@Override
public void execute(Tuple input) {
  String deviceID = input.getStringByField(Field.DEVICE_ID);
  double movingAverageInstant = input.getDoubleByField(Field.MOVING_AVG);
  double nextDouble = input.getDoubleByField(Field.VALUE);
  
  if (Math.abs(nextDouble - movingAverageInstant) > spikeThreshold * movingAverageInstant) {
    collector.emit(input, new Values(deviceID, movingAverageInstant, nextDouble, "spike detected"));
  }
  
  collector.ack(input);
}

代码示例来源:origin: Symantec/hendrix

@Override
public void execute(Tuple tuple) {
  Alert alert = (Alert) tuple.getValueByField(Constants.FIELD_ALERT);
  collector.emit(alert.getMedia(), tuple, new Values(alert));
  collector.ack(tuple);
}

代码示例来源:origin: alibaba/jstorm

@Override
public void execute(Tuple tuple) {
  String sourceIp = tuple.getString(1);
  Integer sourcePort = tuple.getInteger(2);
  if (differentNode) {
    if (ip.equals(sourceIp)) {
      fail(tuple, _collector);
      return;
    } else if (port.equals(sourcePort)) {
      fail(tuple, _collector);
      return;
    }
    _collector.emit(tuple, new Values(tuple.getValue(0), ip, port));
    _collector.ack(tuple);
    return;
  } else {
    if (ip.equals(sourceIp) == false) {
      fail(tuple, _collector);
      return;
    }
    _collector.emit(tuple, new Values(tuple.getValue(0), ip, port));
    _collector.ack(tuple);
  }
  
}

代码示例来源:origin: davidkiss/storm-twitter-word-count

@Override
public void execute(Tuple input) {
  String lang = (String) input.getValueByField("lang");
  String word = (String) input.getValueByField("word");
  if (!IGNORE_LIST.contains(word)) {
    collector.emit(new Values(lang, word));
  }
}

代码示例来源:origin: alibaba/jstorm

public void execute(Tuple tuple) {
  Object requestId = tuple.getValue(0);
  if (tuple.getSourceComponent().equals(returnComponent)) {
    returns.put(requestId, tuple);
  } else {
    results.put(requestId, tuple);
  }
  if (returns.containsKey(requestId) && results.containsKey(requestId)) {
    Tuple result = results.remove(requestId);
    Tuple returner = returns.remove(requestId);
    LOG.debug(result.getValue(1).toString());
    List<Tuple> anchors = new ArrayList<>();
    anchors.add(result);
    anchors.add(returner);
    _collector.emit(anchors, new Values("" + result.getValue(1), returner.getValue(1)));
    _collector.ack(result);
    _collector.ack(returner);
  }
}

代码示例来源:origin: alibaba/jstorm

@Override
public void execute(Tuple tuple) {
  _rootLogger.debug("root: This is a DEBUG message");
  _rootLogger.info("root: This is an INFO message");
  _rootLogger.warn("root: This is a WARN message");
  _rootLogger.error("root: This is an ERROR message");
  
  _logger.debug("myapp: This is a DEBUG message");
  _logger.info("myapp: This is an INFO message");
  _logger.warn("myapp: This is a WARN message");
  _logger.error("myapp: This is an ERROR message");
  
  _subLogger.debug("myapp.sub: This is a DEBUG message");
  _subLogger.info("myapp.sub: This is an INFO message");
  _subLogger.warn("myapp.sub: This is a WARN message");
  _subLogger.error("myapp.sub: This is an ERROR message");
  
  _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
  _collector.ack(tuple);
  counter.incrementAndGet();
}

代码示例来源:origin: alibaba/jstorm

public void execute(Tuple tuple) {
  if (tuple.getValues().size() == 0) {
    _expectExcevieNum--;
  } else {
    JStormUtils.sleepMs(1);
    _collector.emit(tuple, new Values(tuple.getValues()));
    _collector.ack(tuple);
  }
}

代码示例来源:origin: alibaba/jstorm

@Override
public void execute(Tuple tuple) {
  Object obj = tuple.getValue(0);
  long count = tuple.getLong(1);
  int source = tuple.getSourceTask();
  Map<Integer, Long> subCounts = counts.get(obj);
  if (subCounts == null) {
    subCounts = new HashMap<Integer, Long>();
    counts.put(obj, subCounts);
  }
  // Update the current count for this object
  subCounts.put(source, count);
  // Output the sum of all the known counts so for this key
  long sum = 0;
  for (Long val : subCounts.values()) {
    sum += val;
  }
  collector.emit(new Values(obj, sum));
}

代码示例来源:origin: alibaba/jstorm

@Override
public void execute(TupleWindow inputWindow) {
  int sum = 0;
  List<Tuple> tuplesInWindow = inputWindow.get();
  LOG.debug("Events in current window: " + tuplesInWindow.size());
  if (tuplesInWindow.size() > 0) {
    /*
     * Since this is a tumbling window calculation, we use all the
     * tuples in the window to compute the avg.
     */
    for (Tuple tuple : tuplesInWindow) {
      sum += (int) tuple.getValue(0);
    }
    collector.emit(new Values(sum / tuplesInWindow.size()));
  }
}

代码示例来源:origin: mayconbordin/storm-applications

@Override
  public void execute(Tuple input) {
    CallDetailRecord cdr = (CallDetailRecord) input.getValueByField(Field.RECORD);
    boolean newCallee = input.getBooleanByField(Field.NEW_CALLEE);
    
    if (cdr.isCallEstablished() && newCallee) {
      String caller = input.getStringByField(Field.CALLING_NUM);
      long timestamp = cdr.getAnswerTime().getMillis()/1000;

      filter.add(caller, 1, timestamp);
      double rate = filter.estimateCount(caller, timestamp);

      collector.emit(new Values(caller, timestamp, rate, cdr));
    }
  }
}

代码示例来源:origin: alibaba/jstorm

LOG.info("Receive one Ticket Tuple " + input.getSourceComponent());
  return;
if (input.getSourceStreamId().equals(SequenceTopologyDef.CONTROL_STREAM_ID)) {
  String str = (input.getStringByField("CONTROL"));
  LOG.warn(str);
  return;
  Long tupleId = input.getLong(0);
  if (tupleId <= lastTupleId) {
  tradeCustomer = (TradeCustomer) input.getValue(1);
} catch (Exception e) {
  LOG.error(input.getSourceComponent() + "  " + input.getSourceTask() + " " + input.getSourceStreamId()
      + " target " +  input);
  throw new RuntimeException(e);
customerSum.addAndGet(tradeCustomer.getCustomer().getValue());
collector.ack(input);

代码示例来源:origin: alibaba/jstorm

public void execute(Tuple input) {
  String word = (String) input.getValues().get(0);
  int count = (Integer) input.getValues().get(1);
  _counts.put(word, count);
  int globalCount = 0;
  for (String w : _counts.keySet()) {
    globalCount += _counts.get(w);
  }
  _collector.emit(tuple(globalCount));
  _collector.ack(input);
}

代码示例来源:origin: alibaba/jstorm

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
  String word = tuple.getString(0);
  Integer count = counts.get(word);
  if (count == null)
    count = 0;
  count++;
  counts.put(word, count);
  collector.emit(new Values(word, count));
}

代码示例来源:origin: alibaba/jstorm

public void execute(Tuple input) {
  String component = input.getSourceComponent();
  Map<String, List<FixedTuple>> captured = emitted_tuples.get(_name);
  if (!captured.containsKey(component)) {
    captured.put(component, new ArrayList<FixedTuple>());
  }
  captured.get(component).add(new FixedTuple(input.getSourceStreamId(), input.getValues()));
  _collector.ack(input);
}

相关文章