backtype.storm.task.TopologyContext.getComponentTasks()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(120)

本文整理了Java中backtype.storm.task.TopologyContext.getComponentTasks()方法的一些代码示例,展示了TopologyContext.getComponentTasks()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getComponentTasks()方法的具体详情如下:
包路径:backtype.storm.task.TopologyContext
类名称:TopologyContext
方法名:getComponentTasks

TopologyContext.getComponentTasks介绍

[英]Gets the task ids allocated for the given component id. The task ids are always returned in ascending order.
[中]获取为给定组件id分配的任务id。任务id始终按升序返回。

代码示例

代码示例来源:origin: alibaba/jstorm

private Map<String, Set<Integer>> componentToComponentTasks(TopologyContext context, Set<String> components) {
  Map<String, Set<Integer>> ret = new HashMap<>();
  for (String component : components) {
    ret.put(component, new HashSet<Integer>(context.getComponentTasks(component)));
  }
  return ret;
}

代码示例来源:origin: alibaba/jstorm

public Map<String, List<Integer>> getThisTargetComponentTasks() {
  Map<String, Map<String, Grouping>> outputGroupings = getThisTargets();
  Map<String, List<Integer>> ret = new HashMap<>();
  Set<String> targetComponents = new HashSet<>();
  for (Map.Entry<String, Map<String, Grouping>> entry : outputGroupings.entrySet()) {
    Map<String, Grouping> componentGrouping = entry.getValue();
    targetComponents.addAll(componentGrouping.keySet());
  }
  for (String component : targetComponents) {
    ret.put(component, getComponentTasks(component));
  }
  return ret;
}

代码示例来源:origin: alibaba/jstorm

public int numPartitions() {
  return _topoContext.getComponentTasks(_topoContext.getThisComponentId()).size();
}

代码示例来源:origin: alibaba/jstorm

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  _context = context;
  List<Integer> tasks = context.getComponentTasks(context.getThisComponentId());
  int startIndex;
  for (startIndex = 0; startIndex < tasks.size(); startIndex++) {
    if (tasks.get(startIndex) == context.getThisTaskId()) {
      break;
    }
  }
  _collector = collector;
  _pending = new HashMap<String, FixedTuple>();
  _serveTuples = new ArrayList<FixedTuple>();
  for (int i = startIndex; i < _tuples.size(); i += tasks.size()) {
    _serveTuples.add(_tuples.get(i));
  }
}

代码示例来源:origin: alibaba/jstorm

public Map<String, List<Integer>> getThisSourceComponentTasks() {
  Map<String, List<Integer>> ret = new HashMap<>();
  Map<GlobalStreamId, Grouping> sources = getThisSources();
  Set<String> sourceComponents = new HashSet<>();
  if (sources != null) {
    for (GlobalStreamId globalStreamId : sources.keySet()) {
      sourceComponents.add(globalStreamId.get_componentId());
    }
  }
  for (String component : sourceComponents) {
    ret.put(component, getComponentTasks(component));
  }
  return ret;
}

代码示例来源:origin: alibaba/jstorm

/**
 * Gets the index of this task id in getComponentTasks(getThisComponentId()).
 * An example use case for this method is determining which task accesses which
 * resource in a distributed resource to ensure an even distribution.
 */
public int getThisTaskIndex() {
  List<Integer> tasks = new ArrayList<>(getComponentTasks(getThisComponentId()));
  Collections.sort(tasks);
  for (int i = 0; i < tasks.size(); i++) {
    if (tasks.get(i) == getThisTaskId()) {
      return i;
    }
  }
  throw new RuntimeException("Fatal: could not find this task id in this component");
}

代码示例来源:origin: alibaba/jstorm

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  _collector = collector;
  this.source = context.getThisTaskId();
  long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
  myCount = totalCount / taskCount;
}

代码示例来源:origin: alibaba/jstorm

private void createPartitionConsumers(Map conf, TopologyContext context) {
  partitionConsumerMap = new HashMap<Integer, PartitionConsumer>();
  int taskSize = context.getComponentTasks(context.getThisComponentId()).size();
  for(int i=context.getThisTaskIndex(); i<config.numPartitions; i+=taskSize) {
    PartitionConsumer partitionConsumer = new PartitionConsumer(conf, config, i, zkState);
    partitionConsumers.add(partitionConsumer);
    partitionConsumerMap.put(i, partitionConsumer);
  }
}

代码示例来源:origin: alibaba/jstorm

/**
 * returns the total number of input checkpoint streams across
 * all input tasks to this component.
 */
private int getCheckpointInputTaskCount(TopologyContext context) {
  int count = 0;
  for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
    if (CheckpointSpout.CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
      count += context.getComponentTasks(inputStream.get_componentId()).size();
    }
  }
  return count;
}

代码示例来源:origin: alibaba/jstorm

@Override
public void init(TopologyMasterContext tmContext) {
  TopologyContext context = tmContext.getContext();
  boltTasks = context.getComponentTasks("TMUdfBolt");
  collector = tmContext.getCollector();
}

代码示例来源:origin: alibaba/jstorm

@Override
public BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
  int numTasks = context.getComponentTasks(
            TridentTopologyBuilder.spoutIdFromCoordinatorId(
              context.getThisComponentId()))
                .size();
  return new FeederCoordinator(numTasks);
}

代码示例来源:origin: alibaba/jstorm

public MkGrouper(TopologyContext _topology_context, Fields _out_fields, Grouping _thrift_grouping,
         String targetComponent, String streamId, WorkerData workerData) {
  this.topologyContext = _topology_context;
  this.outFields = _out_fields;
  this.thriftGrouping = _thrift_grouping;
  this.streamId = streamId;
  this.targetComponent = targetComponent;
  List<Integer> outTasks = topologyContext.getComponentTasks(targetComponent);
  this.outTasks = new ArrayList<>();
  this.outTasks.addAll(outTasks);
  Collections.sort(this.outTasks);
  this.localTasks = _topology_context.getThisWorkerTasks();
  this.fields = Thrift.groupingType(thriftGrouping);
  this.groupType = this.parseGroupType(workerData);
  String id = _topology_context.getThisTaskId() + ":" + streamId;
  LOG.info(id + " groupType is " + groupType + ", outTasks is " + this.outTasks + ", localTasks" + localTasks);
}

代码示例来源:origin: alibaba/jstorm

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  _delegate.open(conf, context, new SpoutOutputCollector(new StreamOverrideCollector(collector)));
  _outputTasks = new ArrayList<>();
  for(String component: Utils.get(context.getThisTargets(),
                  _coordStream,
                  new HashMap<String, Grouping>()).keySet()) {
    _outputTasks.addAll(context.getComponentTasks(component));
  }
  _rand = new Random(Utils.secureRandomLong());
}

代码示例来源:origin: alibaba/jstorm

public Emitter(String txStateId, Map conf, TopologyContext context) {
  _emitter = _spout.getEmitter(conf, context);
  _index = context.getThisTaskIndex();
  _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
  _state = TransactionalState.newUserState(conf, txStateId);             
}

代码示例来源:origin: alibaba/jstorm

public Emitter(Map conf, TopologyContext context) {
  _emitter = _spout.getEmitter(conf, context);
  _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration());
  _index = context.getThisTaskIndex();
  _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
}

代码示例来源:origin: alibaba/jstorm

public Emitter(String txStateId, Map conf, TopologyContext context) {
  _emitter = _spout.getEmitter(conf, context);
  _state = TransactionalState.newUserState(conf, txStateId); 
  _index = context.getThisTaskIndex();
  _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
}

代码示例来源:origin: alibaba/jstorm

public Emitter(Map conf, TopologyContext context) {
  _emitter = _spout.getEmitter(conf, context);
  _index = context.getThisTaskIndex();
  _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
  _state = TransactionalState.newUserState(
      conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration());
  List<String> existingPartitions = _state.list("");
  for (String p : existingPartitions) {
    int partition = Integer.parseInt(p);
    if ((partition - _index) % _numTasks == 0) {
      _partitionStates.put(partition, new RotatingTransactionalState(_state, p));
    }
  }
}

代码示例来源:origin: alibaba/mdrill

public Emitter(Map conf, TopologyContext context) {
  _emitter = _spout.getEmitter(conf, context);
  _index = context.getThisTaskIndex();
  _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
  _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration()); 
  List<String> existingPartitions = _state.list("");
  for(String p: existingPartitions) {
    int partition = Integer.parseInt(p);
    if((partition - _index) % _numTasks == 0) {
      _partitionStates.put(partition, new RotatingTransactionalState(_state, p));
    }
  }
}

代码示例来源:origin: alibaba/mdrill

public Emitter(Map conf, TopologyContext context) {
  _emitter = _spout.getEmitter(conf, context);
  _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration()); 
  _index = context.getThisTaskIndex();
  _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
}

代码示例来源:origin: alibaba/jstorm

@Override
public void init(TopologyContext context) {
  this.context = context;
  int componentTaskNum = context.getComponentTasks(context.getThisComponentId()).size();
  this.keyRangeNum = JStormUtils.getScaleOutNum(ConfigExtension.getKeyRangeNum(context.getStormConf()), componentTaskNum);
  Collection<Integer> keyRanges = keyRangesByTaskIndex(keyRangeNum, componentTaskNum, context.getThisTaskIndex());
  for (Integer keyRange : keyRanges) {
    initKeyRangeState(keyRange);
  }
  LOG.info("Finish KeyRangeState init for task-{} with key range: {}", context.getThisTaskId(), keyRanges);
}

相关文章

微信公众号

最新文章

更多