org.apache.storm.task.TopologyContext类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(126)

本文整理了Java中org.apache.storm.task.TopologyContext类的一些代码示例,展示了TopologyContext类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext类的具体详情如下:
包路径:org.apache.storm.task.TopologyContext
类名称:TopologyContext

TopologyContext介绍

[英]A TopologyContext is given to bolts and spouts in their prepare() and open() methods, respectively. This object provides information about the component's place within the topology, such as task ids, inputs and outputs, etc. The TopologyContext is also used to declare ISubscribedState objects to synchronize state with StateSpouts this object is subscribed to.
[中]在'prepare()'和'open()'方法中,分别为螺栓和喷嘴提供了'TopologyContext'。此对象提供有关组件在拓扑中的位置的信息,如任务ID、输入和输出等。“TopologyContext”还用于声明“IsSubscribedState”对象,以将状态与此对象订阅的状态同步。

代码示例

代码示例来源:origin: apache/storm

@Override
  public Set<TopicPartition> getPartitionsForThisTask(List<TopicPartition> allPartitionsSorted, TopologyContext context) {
    int thisTaskIndex = context.getThisTaskIndex();
    int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
    Set<TopicPartition> myPartitions = new HashSet<>(allPartitionsSorted.size() / totalTaskCount + 1);
    for (int i = thisTaskIndex; i < allPartitionsSorted.size(); i += totalTaskCount) {
      myPartitions.add(allPartitionsSorted.get(i));
    }
    return myPartitions;
  }
}

代码示例来源:origin: apache/storm

public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
  return _topoContext.registerMetric(name, metric, timeBucketSizeInSecs);
}

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> conf, TopologyContext topologyContext) {
  this.componentId = topologyContext.getThisComponentId();
  this.taskId = topologyContext.getThisTaskId();
}

代码示例来源:origin: apache/storm

/**
 * Gets the set of streams declared for the component of this task.
 */
public Set<String> getThisStreams() {
  return getComponentStreams(getThisComponentId());
}

代码示例来源:origin: apache/storm

/**
 * Gets the declared inputs to this component.
 *
 * @return A map from subscribed component/stream to the grouping subscribed with.
 */
public Map<GlobalStreamId, Grouping> getThisSources() {
  return getSources(getThisComponentId());
}

代码示例来源:origin: apache/storm

public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
  _collector = collector;
  this.source = context.getThisTaskId();
  long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
  myCount = totalCount / taskCount;
}

代码示例来源:origin: apache/storm

eventHubConfig.setTopologyName(topologyName);
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
int taskIndex = context.getThisTaskIndex();
if (totalTasks > eventHubConfig.getPartitionCount()) {
  throw new RuntimeException("total tasks of EventHubSpout is greater than partition count.");
context.registerMetric("EventHubReceiver", new IMetric() {
  @Override
  public Object getValueAndReset() {

代码示例来源:origin: apache/storm

public int numPartitions() {
  return _topoContext.getComponentTasks(_topoContext.getThisComponentId()).size();
}

代码示例来源:origin: apache/storm

public static String metricName(String name, TopologyContext context) {
  StringBuilder sb = new StringBuilder("storm.topology.");
  sb.append(context.getStormId());
  sb.append(".");
  sb.append(hostName);
  sb.append(".");
  sb.append(dotToUnderScore(context.getThisComponentId()));
  sb.append(".");
  sb.append(context.getThisTaskId());
  sb.append(".");
  sb.append(context.getThisWorkerPort());
  sb.append("-");
  sb.append(name);
  return sb.toString();
}

代码示例来源:origin: apache/storm

@Before
public void setUp() throws Exception {
  mockBolt = Mockito.mock(IStatefulBolt.class);
  executor = new StatefulBoltExecutor<>(mockBolt);
  mockTopologyContext = Mockito.mock(TopologyContext.class);
  mockOutputCollector = Mockito.mock(OutputCollector.class);
  mockState = Mockito.mock(KeyValueState.class);
  Mockito.when(mockTopologyContext.getThisComponentId()).thenReturn("test");
  Mockito.when(mockTopologyContext.getThisTaskId()).thenReturn(1);
  GlobalStreamId globalStreamId = new GlobalStreamId("test", CheckpointSpout.CHECKPOINT_STREAM_ID);
  Map<GlobalStreamId, Grouping> thisSources = Collections.singletonMap(globalStreamId, mock(Grouping.class));
  Mockito.when(mockTopologyContext.getThisSources()).thenReturn(thisSources);
  Mockito.when(mockTopologyContext.getComponentTasks(Mockito.any())).thenReturn(Collections.singletonList(1));
  mockTuple = Mockito.mock(Tuple.class);
  mockCheckpointTuple = Mockito.mock(Tuple.class);
  executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
}

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector batchCollector) {
  int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
  for (Node n : _nodes) {
    if (n.stateInfo != null) {
      State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
      context.setTaskData(n.stateInfo.id, s);

代码示例来源:origin: apache/storm

/**
 * returns the total number of input checkpoint streams across all input tasks to this component.
 */
private int getCheckpointInputTaskCount(TopologyContext context) {
  int count = 0;
  for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
    if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
      count += context.getComponentTasks(inputStream.get_componentId()).size();
    }
  }
  return count;
}

代码示例来源:origin: apache/storm

@Override
public String toJSONString() {
  Map<String, Object> obj = new HashMap<>();
  obj.put("task->component", this.getTaskToComponent());
  obj.put("taskid", this.getThisTaskId());
  obj.put("componentid", this.getThisComponentId());
  List<String> streamList = new ArrayList<>();
  streamList.addAll(this.getThisStreams());
  obj.put("streams", streamList);
  obj.put("stream->outputfields", this.getThisOutputFieldsForStreams());
  for (Map.Entry<String, Map<String, Grouping>> entry : this.getThisTargets().entrySet()) {
    Map<String, Object> stringTargetMap = new HashMap<>();
    for (Map.Entry<String, Grouping> innerEntry : entry.getValue().entrySet()) {
      stringTargetMap.put(innerEntry.getKey(), groupingToJSONableMap(innerEntry.getValue()));
  for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
    GlobalStreamId gid = entry.getKey();
    Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());
      stringSources.put(gid.get_componentId(), stringSourceMap);
    stringSourceMap.put(gid.get_streamId(), groupingToJSONableMap(entry.getValue()));
  obj.put("source->stream->fields", this.getThisInputFields());
  return JSONValue.toJSONString(obj);

代码示例来源:origin: apache/storm

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
  _collector = collector;
  _base = context.getThisTaskIndex();
}

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
  componentId = context.getThisComponentId();
  this.collector = collector;
}

代码示例来源:origin: org.apache.flink/flink-storm

this.numberOfAttributes, this.topologyContext.getThisTaskId(), this.flinkCollector));
Map<GlobalStreamId, Grouping> inputs = this.topologyContext.getThisSources();
  for (Integer tid : this.topologyContext.getComponentTasks(inputStream
      .get_componentId())) {
    this.inputComponentIds.put(tid, inputStream.get_componentId());
    this.inputStreamIds.put(tid, inputStream.get_streamId());
    this.inputSchemas.put(tid,
        this.topologyContext.getComponentOutputFields(inputStream));

代码示例来源:origin: apache/storm

/**
 * Gets the declared output fields for the specified stream id for the component this task is a part of.
 */
public Fields getThisOutputFields(String streamId) {
  return getComponentOutputFields(getThisComponentId(), streamId);
}

代码示例来源:origin: apache/storm

/**
 * Open and activate a KafkaSpout that acts as a single-task/executor spout.
 *
 * @param <K> Kafka key type
 * @param <V> Kafka value type
 * @param spout The spout to prepare
 * @param topoConf The topoConf
 * @param topoContextMock The TopologyContext mock
 * @param collectorMock The output collector mock
 */
public static <K, V> void initializeSpout(KafkaSpout<K, V> spout, Map<String, Object> topoConf, TopologyContext topoContextMock,
  SpoutOutputCollector collectorMock) throws Exception {
  when(topoContextMock.getThisTaskIndex()).thenReturn(0);
  when(topoContextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
  spout.open(topoConf, topoContextMock, collectorMock);
  spout.activate();
}

代码示例来源:origin: apache/storm

/**
 * Gets information about who is consuming the outputs of this component, and how.
 *
 * @return Map from stream id to component id to the Grouping used.
 */
public Map<String, Map<String, Grouping>> getThisTargets() {
  return getTargets(getThisComponentId());
}

代码示例来源:origin: apache/storm

/**
 * Create a manager with the given context.
 */
public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
  this.context = context;
  try {
    commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
      context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
    this.processingGuarantee = processingGuarantee;
  } catch (JsonProcessingException e) {
    LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
    throw new RuntimeException(e);
  }
}

相关文章

微信公众号

最新文章

更多