org.apache.storm.task.TopologyContext.getComponentTasks()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(12.0k)|赞(0)|评价(0)|浏览(89)

本文整理了Java中org.apache.storm.task.TopologyContext.getComponentTasks()方法的一些代码示例,展示了TopologyContext.getComponentTasks()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getComponentTasks()方法的具体详情如下:
包路径:org.apache.storm.task.TopologyContext
类名称:TopologyContext
方法名:getComponentTasks

TopologyContext.getComponentTasks介绍

暂无

代码示例

代码示例来源:origin: apache/storm

public int numPartitions() {
  return _topoContext.getComponentTasks(_topoContext.getThisComponentId()).size();
}

代码示例来源:origin: apache/storm

private int getNumTasks() {
  return topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
}

代码示例来源:origin: apache/storm

@Override
  public Set<TopicPartition> getPartitionsForThisTask(List<TopicPartition> allPartitionsSorted, TopologyContext context) {
    int thisTaskIndex = context.getThisTaskIndex();
    int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
    Set<TopicPartition> myPartitions = new HashSet<>(allPartitionsSorted.size() / totalTaskCount + 1);
    for (int i = thisTaskIndex; i < allPartitionsSorted.size(); i += totalTaskCount) {
      myPartitions.add(allPartitionsSorted.get(i));
    }
    return myPartitions;
  }
}

代码示例来源:origin: apache/storm

/**
 * Gets the index of this task id in getComponentTasks(getThisComponentId()). An example use case for this method is determining which
 * task accesses which resource in a distributed resource to ensure an even distribution.
 */
public int getThisTaskIndex() {
  List<Integer> tasks = new ArrayList<>(getComponentTasks(getThisComponentId()));
  Collections.sort(tasks);
  for (int i = 0; i < tasks.size(); i++) {
    if (tasks.get(i) == getThisTaskId()) {
      return i;
    }
  }
  throw new RuntimeException("Fatal: could not find this task id in this component");
}

代码示例来源:origin: apache/storm

public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
  _context = context;
  List<Integer> tasks = context.getComponentTasks(context.getThisComponentId());
  int startIndex;
  for (startIndex = 0; startIndex < tasks.size(); startIndex++) {
    if (tasks.get(startIndex) == context.getThisTaskId()) {
      break;
    }
  }
  _collector = collector;
  _pending = new HashMap<String, FixedTuple>();
  _serveTuples = new ArrayList<FixedTuple>();
  for (int i = startIndex; i < _tuples.size(); i += tasks.size()) {
    _serveTuples.add(_tuples.get(i));
  }
}

代码示例来源:origin: apache/storm

public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
  _collector = collector;
  this.source = context.getThisTaskId();
  long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
  myCount = totalCount / taskCount;
}

代码示例来源:origin: apache/storm

private int getStreamInputTaskCount(TopologyContext context, String stream) {
  int count = 0;
  for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
    if (stream.equals(getStreamId(inputStream))) {
      count += context.getComponentTasks(inputStream.get_componentId()).size();
    }
  }
  return count;
}

代码示例来源:origin: apache/storm

/**
 * returns the total number of input checkpoint streams across all input tasks to this component.
 */
private int getCheckpointInputTaskCount(TopologyContext context) {
  int count = 0;
  for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
    if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
      count += context.getComponentTasks(inputStream.get_componentId()).size();
    }
  }
  return count;
}

代码示例来源:origin: apache/storm

@Override
public BatchCoordinator<Map<Integer, List<List<Object>>>> getCoordinator(String txStateId, Map<String, Object> conf,
                                     TopologyContext context) {
  int numTasks = context.getComponentTasks(
    TridentTopologyBuilder.spoutIdFromCoordinatorId(
      context.getThisComponentId()))
             .size();
  return new FeederCoordinator(numTasks);
}

代码示例来源:origin: apache/storm

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
  _collector = collector;
  if (_local_drpc_id == null) {
    _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
                          60L, TimeUnit.SECONDS,
                          new SynchronousQueue<Runnable>());
    _futures = new LinkedList<>();
    int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
    int index = context.getThisTaskIndex();
    int port = ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
    List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
    if (servers == null || servers.isEmpty()) {
      throw new RuntimeException("No DRPC servers configured for topology");
    }
    if (numTasks < servers.size()) {
      for (String s : servers) {
        _futures.add(_backround.submit(new Adder(s, port, conf)));
      }
    } else {
      int i = index % servers.size();
      _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
    }
  }
}

代码示例来源:origin: apache/storm

public Emitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
  _emitter = _spout.getEmitter(conf, context);
  _index = context.getThisTaskIndex();
  _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
  _state = TransactionalState.newUserState(conf, txStateId);
  LOG.debug("Created {}", this);
}

代码示例来源:origin: apache/storm

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
  _delegate.open(conf, context, new SpoutOutputCollector(new StreamOverrideCollector(collector)));
  _outputTasks = new ArrayList<>();
  for (String component : Utils.get(context.getThisTargets(),
                   _coordStream,
                   new HashMap<String, Grouping>()).keySet()) {
    _outputTasks.addAll(context.getComponentTasks(component));
  }
  _rand = new Random(Utils.secureRandomLong());
}

代码示例来源:origin: apache/storm

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
  this.collector = collector;
  kinesisRecordsManager = new KinesisRecordsManager(kinesisConfig);
  kinesisRecordsManager.initialize(context.getThisTaskIndex(), context.getComponentTasks(context.getThisComponentId()).size());
}

代码示例来源:origin: apache/storm

public Emitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
  _emitter = _spout.getEmitter(conf, context);
  _state = TransactionalState.newUserState(conf, txStateId);
  _index = context.getThisTaskIndex();
  _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
}

代码示例来源:origin: apache/storm

public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
  TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
  if (_delegate instanceof TimeoutCallback) {
    callback = new TimeoutItems();
  }
  _tracked = new TimeCacheMap<>(context.maxTopologyMessageTimeout(), callback);
  _collector = collector;
  _delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
  for (String component : Utils.get(context.getThisTargets(),
                   Constants.COORDINATED_STREAM_ID,
                   new HashMap<String, Grouping>())
                 .keySet()) {
    for (Integer task : context.getComponentTasks(component)) {
      _countOutTasks.add(task);
    }
  }
  if (!_sourceArgs.isEmpty()) {
    _numSourceReports = 0;
    for (Entry<String, SourceArgs> entry : _sourceArgs.entrySet()) {
      if (entry.getValue().singleCount) {
        _numSourceReports += 1;
      } else {
        _numSourceReports += context.getComponentTasks(entry.getKey()).size();
      }
    }
  }
}

代码示例来源:origin: apache/storm

@Override
public void initState(T state) {
  if (stateInitialized) {
    LOG.warn("State is already initialized. Ignoring initState");
    return;
  }
  statefulWindowedBolt.initState((T) state);
  // query the streamState for each input task stream and compute recoveryStates
  for (GlobalStreamId streamId : topologyContext.getThisSources().keySet()) {
    for (int taskId : topologyContext.getComponentTasks(streamId.get_componentId())) {
      WindowState windowState = streamState.get(new TaskStream(taskId, streamId));
      if (windowState != null) {
        recoveryStates.put(new TaskStream(taskId, streamId), windowState);
      }
    }
  }
  LOG.debug("recoveryStates {}", recoveryStates);
  stateInitialized = true;
  start();
}

代码示例来源:origin: apache/storm

/**
 * Open and activate a KafkaSpout that acts as a single-task/executor spout.
 *
 * @param <K> Kafka key type
 * @param <V> Kafka value type
 * @param spout The spout to prepare
 * @param topoConf The topoConf
 * @param topoContextMock The TopologyContext mock
 * @param collectorMock The output collector mock
 */
public static <K, V> void initializeSpout(KafkaSpout<K, V> spout, Map<String, Object> topoConf, TopologyContext topoContextMock,
  SpoutOutputCollector collectorMock) throws Exception {
  when(topoContextMock.getThisTaskIndex()).thenReturn(0);
  when(topoContextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
  spout.open(topoConf, topoContextMock, collectorMock);
  spout.activate();
}

代码示例来源:origin: apache/storm

@Before
public void setUp() throws Exception {
  mockBolt = Mockito.mock(IStatefulBolt.class);
  executor = new StatefulBoltExecutor<>(mockBolt);
  mockTopologyContext = Mockito.mock(TopologyContext.class);
  mockOutputCollector = Mockito.mock(OutputCollector.class);
  mockState = Mockito.mock(KeyValueState.class);
  Mockito.when(mockTopologyContext.getThisComponentId()).thenReturn("test");
  Mockito.when(mockTopologyContext.getThisTaskId()).thenReturn(1);
  GlobalStreamId globalStreamId = new GlobalStreamId("test", CheckpointSpout.CHECKPOINT_STREAM_ID);
  Map<GlobalStreamId, Grouping> thisSources = Collections.singletonMap(globalStreamId, mock(Grouping.class));
  Mockito.when(mockTopologyContext.getThisSources()).thenReturn(thisSources);
  Mockito.when(mockTopologyContext.getComponentTasks(Mockito.any())).thenReturn(Collections.singletonList(1));
  mockTuple = Mockito.mock(Tuple.class);
  mockCheckpointTuple = Mockito.mock(Tuple.class);
  executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
}

代码示例来源:origin: apache/storm

@Test
public void testRecovery() throws Exception {
  mockStormConf.put(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME, "msgid");
  mockStormConf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, 5);
  mockStormConf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, 5);
  KeyValueState<TaskStream, WindowState> mockState;
  mockState = Mockito.mock(KeyValueState.class);
  Map<GlobalStreamId, Grouping> mockMap = Mockito.mock(Map.class);
  Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockMap);
  Mockito.when(mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
  Mockito.when(mockMap.keySet()).thenReturn(Collections.singleton(new GlobalStreamId("a", "s")));
  WindowState mockWindowState = new WindowState(4, 4);
  Mockito.when(mockState.get(Mockito.any(TaskStream.class))).thenReturn(mockWindowState);
  executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
  executor.initState(null);
  List<Tuple> tuples = getMockTuples(10);
  for (Tuple tuple : tuples) {
    executor.execute(tuple);
  }
  WindowState expectedState = new WindowState(4, 9);
  Mockito.verify(mockState, Mockito.times(1)).put(Mockito.any(TaskStream.class), Mockito.eq(expectedState));
}

代码示例来源:origin: apache/storm

private void setUpProcessorBolt(Processor<?> processor,
                Set<String> windowedParentStreams,
                boolean isWindowed,
                String tsFieldName) {
  ProcessorNode node = new ProcessorNode(processor, "outputstream", new Fields("value"));
  node.setWindowedParentStreams(windowedParentStreams);
  node.setWindowed(isWindowed);
  Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node));
  Mockito.when(mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
  Map<GlobalStreamId, Grouping> mockSources = Mockito.mock(Map.class);
  GlobalStreamId mockGlobalStreamId = Mockito.mock(GlobalStreamId.class);
  Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockSources);
  Mockito.when(mockSources.keySet()).thenReturn(Collections.singleton(mockGlobalStreamId));
  Mockito.when(mockGlobalStreamId.get_streamId()).thenReturn("inputstream");
  Mockito.when(mockGlobalStreamId.get_componentId()).thenReturn("bolt0");
  Mockito.when(mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
  graph.addVertex(node);
  bolt = new ProcessorBolt("bolt1", graph, Collections.singletonList(node));
  if (tsFieldName != null && !tsFieldName.isEmpty()) {
    bolt.setTimestampField(tsFieldName);
  }
  bolt.setStreamToInitialProcessors(mockStreamToProcessors);
  bolt.prepare(new HashMap<>(), mockTopologyContext, mockOutputCollector);
}

相关文章

微信公众号

最新文章

更多