org.apache.storm.task.TopologyContext.getThisTaskIndex()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(10.8k)|赞(0)|评价(0)|浏览(88)

本文整理了Java中org.apache.storm.task.TopologyContext.getThisTaskIndex()方法的一些代码示例,展示了TopologyContext.getThisTaskIndex()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getThisTaskIndex()方法的具体详情如下:
包路径:org.apache.storm.task.TopologyContext
类名称:TopologyContext
方法名:getThisTaskIndex

TopologyContext.getThisTaskIndex介绍

[英]Gets the index of this task id in getComponentTasks(getThisComponentId()). An example use case for this method is determining which task accesses which resource in a distributed resource to ensure an even distribution.
[中]获取getComponentTasks(getThisComponentId())中此任务id的索引。该方法的一个示例用例是确定哪个任务访问分布式资源中的哪个资源,以确保均匀分布。

代码示例

代码示例来源:origin: apache/storm

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
  _collector = collector;
  _base = context.getThisTaskIndex();
}

代码示例来源:origin: apache/storm

public int getPartitionIndex() {
  return _topoContext.getThisTaskIndex();
}

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
  this.collector = collector;
  this.taskIndex = context.getThisTaskIndex();
}

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> stormConf,
    TopologyContext context) {
  executorIndex = context.getThisTaskIndex();
  sleep.prepare();
}

代码示例来源:origin: apache/storm

@Override
public Emitter<Map<Integer, List<List<Object>>>> getEmitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
  return new FeederEmitter(context.getThisTaskIndex());
}

代码示例来源:origin: apache/storm

@Override
  public Set<TopicPartition> getPartitionsForThisTask(List<TopicPartition> allPartitionsSorted, TopologyContext context) {
    int thisTaskIndex = context.getThisTaskIndex();
    int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
    Set<TopicPartition> myPartitions = new HashSet<>(allPartitionsSorted.size() / totalTaskCount + 1);
    for (int i = thisTaskIndex; i < allPartitionsSorted.size(); i += totalTaskCount) {
      myPartitions.add(allPartitionsSorted.get(i));
    }
    return myPartitions;
  }
}

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
  outputStreams = Collections.unmodifiableList(outputStreamStats.stream()
    .map((ss) -> new OutputStreamEngine(ss)).collect(Collectors.toList()));
  this.collector = collector;
  executorIndex = context.getThisTaskIndex();
  sleep.prepare();
}

代码示例来源:origin: apache/storm

/**
 * Get the input partitions in sorted order.
 */
public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
  List<TopicPartition> sortedPartitions = allPartitionInfo.stream()
    .map(map -> tpSerializer.fromMap(map))
    .sorted(TopicPartitionComparator.INSTANCE)
    .collect(Collectors.toList());
  final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(sortedPartitions);
  LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
    allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
  return allPartitions;
}

代码示例来源:origin: apache/storm

public Emitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
  _emitter = _spout.getEmitter(conf, context);
  _index = context.getThisTaskIndex();
  _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
  _state = TransactionalState.newUserState(conf, txStateId);
  LOG.debug("Created {}", this);
}

代码示例来源:origin: apache/storm

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
  _collector = collector;
  if (_local_drpc_id == null) {
    _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
                          60L, TimeUnit.SECONDS,
                          new SynchronousQueue<Runnable>());
    _futures = new LinkedList<>();
    int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
    int index = context.getThisTaskIndex();
    int port = ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
    List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
    if (servers == null || servers.isEmpty()) {
      throw new RuntimeException("No DRPC servers configured for topology");
    }
    if (numTasks < servers.size()) {
      for (String s : servers) {
        _futures.add(_backround.submit(new Adder(s, port, conf)));
      }
    } else {
      int i = index % servers.size();
      _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
    }
  }
}

代码示例来源:origin: apache/storm

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
  this.collector = collector;
  kinesisRecordsManager = new KinesisRecordsManager(kinesisConfig);
  kinesisRecordsManager.initialize(context.getThisTaskIndex(), context.getComponentTasks(context.getThisComponentId()).size());
}

代码示例来源:origin: apache/storm

public Emitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
  _emitter = _spout.getEmitter(conf, context);
  _state = TransactionalState.newUserState(conf, txStateId);
  _index = context.getThisTaskIndex();
  _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
}

代码示例来源:origin: apache/storm

int taskIndex = context.getThisTaskIndex();
if (totalTasks > eventHubConfig.getPartitionCount()) {
  throw new RuntimeException("total tasks of EventHubSpout is greater than partition count.");

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> config, TopologyContext context,
          OutputCollector collector) {
  this.collector = collector;
  String myPartitionId = null;
  if (boltConfig.getPartitionMode()) {
    // We can use the task index (starting from 0) as the partition ID
    myPartitionId = "" + context.getThisTaskIndex();
  }
  logger.info("creating sender: " + boltConfig.getConnectionString()
        + ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
  try {
    ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
    if (boltConfig.getPartitionMode()) {
      sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
    }
  } catch (Exception ex) {
    collector.reportError(ex);
    throw new RuntimeException(ex);
  }
}

代码示例来源:origin: apache/storm

for (Node n : _nodes) {
  if (n.stateInfo != null) {
    State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
    context.setTaskData(n.stateInfo.id, s);

代码示例来源:origin: apache/storm

/**
 * Open and activate a KafkaSpout that acts as a single-task/executor spout.
 *
 * @param <K> Kafka key type
 * @param <V> Kafka value type
 * @param spout The spout to prepare
 * @param topoConf The topoConf
 * @param topoContextMock The TopologyContext mock
 * @param collectorMock The output collector mock
 */
public static <K, V> void initializeSpout(KafkaSpout<K, V> spout, Map<String, Object> topoConf, TopologyContext topoContextMock,
  SpoutOutputCollector collectorMock) throws Exception {
  when(topoContextMock.getThisTaskIndex()).thenReturn(0);
  when(topoContextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
  spout.open(topoConf, topoContextMock, collectorMock);
  spout.activate();
}

代码示例来源:origin: elastic/elasticsearch-hadoop

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  this.collector = collector;
  LinkedHashMap copy = new LinkedHashMap(conf);
  copy.putAll(spoutConfig);
  StormSettings settings = new StormSettings(copy);
  InitializationUtils.setValueReaderIfNotSet(settings, JdkValueReader.class, log);
  ackReads = settings.getStormSpoutReliable();
  if (ackReads) {
    inTransitQueue = new LinkedHashMap<Object, Object>();
    replayQueue = new LinkedList<Object[]>();
    retries = new HashMap<Object, Integer>();
    queueSize = settings.getStormSpoutReliableQueueSize();
    tupleRetries = settings.getStormSpoutReliableRetriesPerTuple();
    tupleFailure = settings.getStormSpoutReliableTupleFailureHandling();
  }
  int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
  int currentTask = context.getThisTaskIndex();
  // match the partitions based on the current topology
  List<PartitionDefinition> partitions = RestService.findPartitions(settings, log);
  List<PartitionDefinition> assigned = RestService.assignPartitions(partitions, currentTask, totalTasks);
  iterator = RestService.multiReader(settings, assigned, log);
}

代码示例来源:origin: elastic/elasticsearch-hadoop

@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  this.collector = collector;
  LinkedHashMap copy = new LinkedHashMap(conf);
  copy.putAll(boltConfig);
  StormSettings settings = new StormSettings(copy);
  flushOnTickTuple = settings.getStormTickTupleFlush();
  ackWrites = settings.getStormBoltAck();
  // trigger manual flush
  if (ackWrites) {
    settings.setProperty(ES_BATCH_FLUSH_MANUAL, Boolean.TRUE.toString());
    // align Bolt / es-hadoop batch settings
    numberOfEntries = settings.getStormBulkSize();
    settings.setProperty(ES_BATCH_SIZE_ENTRIES, String.valueOf(numberOfEntries));
    inflightTuples = new ArrayList<Tuple>(numberOfEntries + 1);
  }
  int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
  InitializationUtils.setValueWriterIfNotSet(settings, StormValueWriter.class, log);
  InitializationUtils.setBytesConverterIfNeeded(settings, StormTupleBytesConverter.class, log);
  InitializationUtils.setFieldExtractorIfNotSet(settings, StormTupleFieldExtractor.class, log);
  writer = RestService.createWriter(settings, context.getThisTaskIndex(), totalTasks, log);
}

代码示例来源:origin: apache/storm

@Test
public void testRoundRobinPartitioning() {
  List<TopicPartition> allPartitions = new ArrayList<>();
  for(int i = 0; i < 11; i++) {
    allPartitions.add(createTp(i));
  }
  List<TopologyContext> contextMocks = new ArrayList<>();
  String thisComponentId = "A spout";
  List<Integer> allTasks = Arrays.asList(new Integer[]{0, 1, 2});
  for(int i = 0; i < 3; i++) {
    TopologyContext contextMock = mock(TopologyContext.class);
    when(contextMock.getThisTaskIndex()).thenReturn(i);
    when(contextMock.getThisComponentId()).thenReturn(thisComponentId);
    when(contextMock.getComponentTasks(thisComponentId)).thenReturn(allTasks);
    contextMocks.add(contextMock);
  }
  RoundRobinManualPartitioner partitioner = new RoundRobinManualPartitioner();
  
  Set<TopicPartition> partitionsForFirstTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(0));
  assertThat(partitionsForFirstTask, is(partitionsToTps(new int[]{0, 3, 6, 9})));
  
  Set<TopicPartition> partitionsForSecondTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(1));
  assertThat(partitionsForSecondTask, is(partitionsToTps(new int[]{1, 4, 7, 10})));
  
  Set<TopicPartition> partitionsForThirdTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(2));
  assertThat(partitionsForThirdTask, is(partitionsToTps(new int[]{2, 5, 8})));
}

代码示例来源:origin: DigitalPebble/storm-crawler

@Override
public void prepare(Map conf, TopologyContext topologyContext) {
  this.taskIndex = topologyContext.getThisTaskIndex();
  int totalTasks = topologyContext.getComponentTasks(
      topologyContext.getThisComponentId()).size();
  // single task? let's not bother with the task index in the file name
  if (totalTasks == 1) {
    this.taskIndex = -1;
  }
}

相关文章

微信公众号

最新文章

更多