org.apache.storm.tuple.Tuple类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(12.6k)|赞(0)|评价(0)|浏览(201)

本文整理了Java中org.apache.storm.tuple.Tuple类的一些代码示例,展示了Tuple类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tuple类的具体详情如下:
包路径:org.apache.storm.tuple.Tuple
类名称:Tuple

Tuple介绍

[英]The tuple is the main data structure in Storm. A tuple is a named list of values, where each value can be any type. Tuples are dynamically typed -- the types of the fields do not need to be declared. Tuples have helper methods like getInteger and getString to get field values without having to cast the result. Storm needs to know how to serialize all the values in a tuple. By default, Storm knows how to serialize the primitive types, strings, and byte arrays. If you want to use another type, you'll need to implement and register a serializer for that type.
[中]元组是Storm中的主要数据结构。元组是一个命名的值列表,其中每个值可以是任何类型。元组是动态类型的——不需要声明字段的类型。元组有getInteger和getString等帮助方法,可以在不强制转换结果的情况下获取字段值。Storm需要知道如何序列化元组中的所有值。默认情况下,Storm知道如何序列化基元类型、字符串和字节数组。如果要使用另一种类型,则需要实现并注册该类型的序列化程序。

代码示例

代码示例来源:origin: apache/storm

@Override
public byte[] mapRecord(Tuple tuple) {
  JSONObject obj = new JSONObject();
  if (this.columnFields != null) {
    for (String field : this.columnFields) {
      obj.put(field, tuple.getValueByField(field));
    }
  }
  return obj.toJSONString().getBytes();
}

代码示例来源:origin: apache/storm

public void execute(Tuple input, BasicOutputCollector collector) {
  String word = input.getStringByField("word");
  int count;
  if (wordCounter.containsKey(word)) {
    count = wordCounter.get(word) + 1;
    wordCounter.put(word, wordCounter.get(word) + 1);
  } else {
    count = 1;
  }
  wordCounter.put(word, count);
  collector.emit(new Values(word, String.valueOf(count)));
}

代码示例来源:origin: apache/storm

public void execute(Tuple tuple) {
  Object requestId = tuple.getValue(0);
  if (tuple.getSourceComponent().equals(returnComponent)) {
    returns.put(requestId, tuple);
  } else {
    results.put(requestId, tuple);
  }
  if (returns.containsKey(requestId) && results.containsKey(requestId)) {
    Tuple result = results.remove(requestId);
    Tuple returner = returns.remove(requestId);
    LOG.debug(result.getValue(1).toString());
    List<Tuple> anchors = new ArrayList<>();
    anchors.add(result);
    anchors.add(returner);
    _collector.emit(anchors, new Values("" + result.getValue(1), returner.getValue(1)));
    _collector.ack(result);
    _collector.ack(returner);
  }
}

代码示例来源:origin: apache/storm

@Override
public K getKeyFromTuple(Tuple tuple) {
  //for backward compatibility, we return null when key is not present.
  return tuple.contains(boltKeyField) ? (K) tuple.getValueByField(boltKeyField) : null;
}

代码示例来源:origin: apache/storm

public TupleImpl(Tuple t) {
  this.values = t.getValues();
  this.taskId = t.getSourceTask();
  this.streamId = t.getSourceStreamId();
  this.id = t.getMessageId();
  this.context = t.getContext();
  this.srcComponent = t.getSourceComponent();
  try {
    TupleImpl ti = (TupleImpl) t;
    this._processSampleStartTime = ti._processSampleStartTime;
    this._executeSampleStartTime = ti._executeSampleStartTime;
    this._outAckVal = ti._outAckVal;
  } catch (ClassCastException e) {
    // ignore ... if t is not a TupleImpl type .. faster than checking and then casting
  }
}

代码示例来源:origin: apache/storm

private String getStreamSelector(Tuple ti) {
  switch (selectorType) {
    case STREAM:
      return ti.getSourceStreamId();
    case SOURCE:
      return ti.getSourceComponent();
    default:
      throw new RuntimeException(selectorType + " stream selector type not yet supported");
  }
}

代码示例来源:origin: apache/metron

@Test
public void testNonBatchHappyPath() throws Exception {
 ParserConfigurations configurations = getConfigurations(1);
 String sensorType = "test";
 Tuple t = mock(Tuple.class);
 when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
 WriterBolt bolt = new WriterBolt(new WriterHandler(writer), configurations, sensorType);
 bolt.prepare(new HashMap(), topologyContext, outputCollector);
 verify(writer, times(1)).init();
 bolt.execute(t);
 verify(outputCollector, times(1)).ack(t);
 verify(writer, times(1)).write(eq(sensorType), any(), any(), any());
 verify(outputCollector, times(0)).reportError(any());
 verify(outputCollector, times(0)).fail(any());
}
@Test

代码示例来源:origin: apache/storm

@Test
public void testCommitBeforeInitstate() throws Exception {
  Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
  Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
  Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
  Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
  executor.execute(mockCheckpointTuple);
  Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockCheckpointTuple);
  Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(ROLLBACK);
  Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
  executor.execute(mockCheckpointTuple);
  Mockito.verify(mockState, Mockito.times(1)).rollback();
}

代码示例来源:origin: apache/metron

@Test
public void shouldThrowExceptionOnFailedExecute() {
 when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
 when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
 doThrow(new IllegalStateException("parserRunner.execute failed")).when(parserRunner).execute(eq("yaf"), any(), eq(parserConfigurations));
 verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
     argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
 verify(outputCollector, times(1)).reportError(any(IllegalStateException.class));
 verify(outputCollector, times(1)).ack(t1);

代码示例来源:origin: apache/metron

@SuppressWarnings("unchecked")
 @Test
 public void testExecuteShouldReportError() throws ExecutionException {
  joinBolt.withMaxCacheSize(100);
  joinBolt.withMaxTimeRetain(10000);
  joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
  when(tuple.getValueByField("key")).thenReturn(key);
  when(tuple.getValueByField("message")).thenReturn(new JSONObject());
  joinBolt.cache = mock(LoadingCache.class);
  when(joinBolt.cache.get(any())).thenThrow(new RuntimeException(new Exception("join exception")));

  joinBolt.execute(tuple);
  RuntimeException expectedExecutionException = new RuntimeException(new Exception("join exception"));
  MetronError error = new MetronError()
      .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
      .withMessage("Joining problem: {}")
      .withThrowable(expectedExecutionException)
      .addRawMessage(new JSONObject());
  verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
  verify(outputCollector, times(1)).reportError(any(ExecutionException.class));
  verify(outputCollector, times(1)).ack(eq(tuple));
  verifyNoMoreInteractions(outputCollector);
 }
}

代码示例来源:origin: apache/metron

Tuple geoTuple = mock(Tuple.class);
when(geoTuple.getValueByField("key")).thenReturn(key);
when(geoTuple.getSourceStreamId()).thenReturn("geo");
when(geoTuple.getValueByField("message")).thenReturn(geoMessage);
joinBolt.execute(geoTuple);
Tuple messageTuple = mock(Tuple.class);
when(messageTuple.getValueByField("key")).thenReturn(key);
when(messageTuple.getSourceStreamId()).thenReturn("message");
when(messageTuple.getValueByField("message")).thenReturn(sampleMessage);
joinBolt.execute(messageTuple);
Tuple hostTuple = mock(Tuple.class);
when(hostTuple.getValueByField("key")).thenReturn(key);
when(hostTuple.getSourceStreamId()).thenReturn("host");
when(hostTuple.getValueByField("message")).thenReturn(hostMessage);
joinBolt.execute(hostTuple);
Tuple hbaseEnrichmentTuple = mock(Tuple.class);
when(hbaseEnrichmentTuple.getValueByField("key")).thenReturn(key);
when(hbaseEnrichmentTuple.getSourceStreamId()).thenReturn("hbaseEnrichment");
when(hbaseEnrichmentTuple.getValueByField("message")).thenReturn(hbaseEnrichmentMessage);
joinBolt.execute(hbaseEnrichmentTuple);
Tuple stellarTuple = mock(Tuple.class);
when(stellarTuple.getValueByField("key")).thenReturn(key);
when(stellarTuple.getSourceStreamId()).thenReturn("stellar");
when(stellarTuple.getValueByField("message")).thenReturn(new JSONObject());
joinBolt.execute(stellarTuple);

代码示例来源:origin: apache/metron

@Test
public void jsonFromFieldShouldReturnJSON() {
 JSONObject actual = new JSONObject();
 actual.put("field", "value");
 Tuple tuple = mock(Tuple.class);
 when(tuple.getValueByField("tuple_field")).thenReturn(actual);
 JSONObject expected = new JSONObject();
 expected.put("field", "value");
 MessageGetStrategy messageGetStrategy = MessageGetters.JSON_FROM_FIELD.get("tuple_field");
 assertEquals(expected, messageGetStrategy.get(tuple));
}

代码示例来源:origin: apache/metron

@Test
public void executeShouldHandleTickTuple() throws Exception {
 when(t1.getSourceComponent()).thenReturn("__system");
 when(t1.getSourceStreamId()).thenReturn("__tick");
 ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
 ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
  put("yaf", writerHandler);
 }}) {
  @Override
  public ParserConfigurations getConfigurations() {
   return parserConfigurations;
  }
 };
 parserBolt.setMessageGetStrategy(messageGetStrategy);
 parserBolt.setOutputCollector(outputCollector);
 parserBolt.execute(t1);
 verify(writerHandler, times(1)).flush(parserConfigurations, messageGetStrategy);
 verify(outputCollector, times(1)).ack(t1);
}

代码示例来源:origin: apache/storm

private void setUpPunctuation(Tuple punctuation) {
    Mockito.when(punctuation.size()).thenReturn(1);
    Mockito.when(punctuation.getValue(0)).thenReturn(WindowNode.PUNCTUATION);
    Mockito.when(punctuation.getSourceComponent()).thenReturn("bolt0");
    Mockito.when(punctuation.getSourceStreamId()).thenReturn("inputstream");
  }
}

代码示例来源:origin: apache/storm

@Test
public void testHandleTupleBeforeInit() throws Exception {
  Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
  executor.execute(mockTuple);
  Mockito.verify(mockBolt, Mockito.times(0)).execute(Mockito.any(Tuple.class));
}

代码示例来源:origin: apache/metron

@Test
public void testNonBatchErrorPath() throws Exception {
 ParserConfigurations configurations = getConfigurations(1);
 String sensorType = "test";
 Tuple t = mock(Tuple.class);
 when(t.getValueByField(eq("message"))).thenThrow(new IllegalStateException());
 WriterBolt bolt = new WriterBolt(new WriterHandler(writer), configurations, sensorType);
 bolt.prepare(new HashMap(), topologyContext, outputCollector);
 verify(writer, times(1)).init();
 bolt.execute(t);
 verify(outputCollector, times(1)).ack(t);
 verify(writer, times(0)).write(eq(sensorType), any(), any(), any());
 verify(outputCollector, times(1)).reportError(any());
 verify(outputCollector, times(0)).fail(any());
}
@Test

代码示例来源:origin: apache/storm

public static Tuple mockTuple(String componentId, String streamId) {
    Tuple tuple = Mockito.mock(Tuple.class);
    Mockito.when(tuple.getSourceComponent()).thenReturn(componentId);
    Mockito.when(tuple.getSourceStreamId()).thenReturn(streamId);
    return tuple;
  }
}

代码示例来源:origin: apache/metron

@Test
public void testSourceTypeMissing() throws Exception {
 // setup the bolt
 BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new BulkMessageWriterBolt<IndexingConfigurations>(
    "zookeeperUrl", "INDEXING")
     .withBulkMessageWriter(bulkMessageWriter)
     .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
     .withMessageGetterField("message");
 bulkMessageWriterBolt.setCuratorFramework(client);
 bulkMessageWriterBolt.setZKCache(cache);
 bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
     new FileInputStream(sampleSensorIndexingConfigPath));
 // initialize the bolt
 bulkMessageWriterBolt.declareOutputFields(declarer);
 Map stormConf = new HashMap();
 bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
 // create a message with no source type
 JSONObject message = (JSONObject) new JSONParser().parse(sampleMessageString);
 message.remove("source.type");
 when(tuple.getValueByField("message")).thenReturn(message);
 // the tuple should be handled as an error and ack'd
 bulkMessageWriterBolt.execute(tuple);
 verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any());
 verify(outputCollector, times(1)).ack(tuple);
}

代码示例来源:origin: apache/metron

public Tuple toTuple(byte[] record) {
  Tuple ret = mock(Tuple.class);
  when(ret.getStringByField("topic")).thenReturn(sensorType);
  when(ret.getBinary(eq(0))).thenReturn(record);
  return ret;
 }
}

代码示例来源:origin: apache/metron

@Test
public void objectFromFieldShouldReturnObject() {
 Object actual = "object";
 Tuple tuple = mock(Tuple.class);
 when(tuple.getValueByField("tuple_field")).thenReturn(actual);
 Object expected = "object";
 MessageGetStrategy messageGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("tuple_field");
 assertEquals(expected, messageGetStrategy.get(tuple));
}

相关文章