backtype.storm.task.TopologyContext类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(137)

本文整理了Java中backtype.storm.task.TopologyContext类的一些代码示例,展示了TopologyContext类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext类的具体详情如下:
包路径:backtype.storm.task.TopologyContext
类名称:TopologyContext

TopologyContext介绍

[英]A TopologyContext is given to bolts and spouts in their "prepare" and "open" methods, respectively. This object provides information about the component's place within the topology, such as task ids, inputs and outputs, etc.

The TopologyContext is also used to declare ISubscribedState objects to synchronize state with StateSpouts this object is subscribed to.
[中]在“准备”和“打开”方法中,分别给出了螺栓和喷嘴的拓扑上下文。此对象提供有关组件在拓扑中的位置的信息,例如任务ID、输入和输出等。
TopologyContext还用于声明IsSubscribedState对象,以将状态与该对象订阅的状态同步。

代码示例

代码示例来源:origin: alibaba/jstorm

private void createPartitionConsumers(Map conf, TopologyContext context) {
  partitionConsumerMap = new HashMap<Integer, PartitionConsumer>();
  int taskSize = context.getComponentTasks(context.getThisComponentId()).size();
  for(int i=context.getThisTaskIndex(); i<config.numPartitions; i+=taskSize) {
    PartitionConsumer partitionConsumer = new PartitionConsumer(conf, config, i, zkState);
    partitionConsumers.add(partitionConsumer);
    partitionConsumerMap.put(i, partitionConsumer);
  }
}

代码示例来源:origin: alibaba/jstorm

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  _collector = collector;
  this.source = context.getThisTaskId();
  long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
  myCount = totalCount / taskCount;
}

代码示例来源:origin: alibaba/jstorm

public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
  return _topoContext.registerMetric(name, metric, timeBucketSizeInSecs);
}

代码示例来源:origin: alibaba/jstorm

public MetricClient(TopologyContext context) {
  taskId = context.getThisTaskId();
  this.topologyId = context.getTopologyId();
  this.componentId = context.getThisComponentId();
}

代码示例来源:origin: alibaba/jstorm

/**
 * Gets the declared output fields for the specified stream id for the component this task is a part of.
 */
public Fields getThisOutputFields(String streamId) {
  return getComponentOutputFields(getThisComponentId(), streamId);
}

代码示例来源:origin: alibaba/jstorm

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  this.topologyMasterId = context.getTopologyMasterId();
  this.spoutTaskId = context.getThisTaskId();
  this.collector = collector;
}

代码示例来源:origin: alibaba/jstorm

public void prepare(Map conf, TopologyContext context) {
  tpsCounter = new TpsCounter(context.getThisComponentId() + ":" + context.getThisTaskId());
  LOG.info("Successfully do parepare " + context.getThisComponentId());
}

代码示例来源:origin: alibaba/jstorm

@Override
public void init(TopologyContext context) {
  this.context = context;
  int componentTaskNum = context.getComponentTasks(context.getThisComponentId()).size();
  this.keyRangeNum = JStormUtils.getScaleOutNum(ConfigExtension.getKeyRangeNum(context.getStormConf()), componentTaskNum);
  Collection<Integer> keyRanges = keyRangesByTaskIndex(keyRangeNum, componentTaskNum, context.getThisTaskIndex());
  for (Integer keyRange : keyRanges) {
    initKeyRangeState(keyRange);
  }
  LOG.info("Finish KeyRangeState init for task-{} with key range: {}", context.getThisTaskId(), keyRanges);
}

代码示例来源:origin: apache/eagle

private int calculatePartitionId(TopologyContext context) {
  int thisGlobalTaskId = context.getThisTaskId();
  String componentName = context.getComponentId(thisGlobalTaskId);
  List<Integer> globalTaskIds = context.getComponentTasks(componentName);
  int index = 0;
  for (Integer id : globalTaskIds) {
    if (id == thisGlobalTaskId) {
      return index;
    }
    index++;
  }
  throw new IllegalStateException();
}

代码示例来源:origin: alibaba/jstorm

@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector batchCollector) {
  int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
  for (Node n : _nodes) {
    if (n.stateInfo != null) {
      State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
      context.setTaskData(n.stateInfo.id, s);

代码示例来源:origin: alibaba/jstorm

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  taskId = String.valueOf(context.getThisTaskId());
  taskName = context.getThisComponentId() + "_" + context.getThisTaskId();
  this.basicCollector = new BasicOutputCollector(collector);
  this.collector = collector;
  if (delegate instanceof ICommitter) {
    isCommiter = true;
    commited = new TimeCacheMap<>(context.maxTopologyMessageTimeout());
    mkCommitDir(conf);
  }
  delegate.prepare(conf, context);
}

代码示例来源:origin: alibaba/jstorm

List<String> fields_group = Thrift.fieldGrouping(thriftGrouping);
    Fields fields = new Fields(fields_group);
    Map conf = topologyContext.getStormConf();
    boolean enableKeyRangeHash = ConfigExtension.isEnableKeyRangeFieldGroup(conf);
    if (enableKeyRangeHash) 
} else if (Grouping._Fields.SHUFFLE.equals(fields)) {
  grouperType = GrouperType.shuffle;
  shuffer = new MkShuffer(topologyContext.getThisComponentId(), targetComponent, workerData);
} else if (Grouping._Fields.NONE.equals(fields)) {
  int myTaskId = topologyContext.getThisTaskId();
  String componentId = topologyContext.getComponentId(myTaskId);
  GlobalStreamId stream = new GlobalStreamId(componentId, streamId);
  customGrouper = new MkCustomGrouper(topologyContext, g, stream, outTasks, myTaskId);
  int myTaskId = topologyContext.getThisTaskId();
  String componentId = topologyContext.getComponentId(myTaskId);
  GlobalStreamId stream = new GlobalStreamId(componentId, streamId);
  customGrouper = new MkCustomGrouper(topologyContext, g, stream, outTasks, myTaskId);
} else if (Grouping._Fields.LOCAL_OR_SHUFFLE.equals(fields)) {
  grouperType = GrouperType.shuffle;
  shuffer = new MkShuffer(topologyContext.getThisComponentId(), targetComponent, workerData);
} else if (Grouping._Fields.LOCAL_FIRST.equals(fields)) {
  grouperType = GrouperType.shuffle;
  shuffer = new MkShuffer(topologyContext.getThisComponentId(), targetComponent, workerData);

代码示例来源:origin: alibaba/jstorm

public int numPartitions() {
  return _topoContext.getComponentTasks(_topoContext.getThisComponentId()).size();
}

代码示例来源:origin: com.alibaba.jstorm/jstorm-core

} else if (Grouping._Fields.SHUFFLE.equals(fields)) {
  grouperType = GrouperType.shuffle;
  shuffer = new MkShuffer(topology_context.getThisComponentId(), targetComponent, workerData);
} else if (Grouping._Fields.NONE.equals(fields)) {
  int myTaskId = topology_context.getThisTaskId();
  String componentId = topology_context.getComponentId(myTaskId);
  GlobalStreamId stream = new GlobalStreamId(componentId, streamId);
  custom_grouper = new MkCustomGrouper(topology_context, g, stream, out_tasks, myTaskId);
  int myTaskId = topology_context.getThisTaskId();
  String componentId = topology_context.getComponentId(myTaskId);
  GlobalStreamId stream = new GlobalStreamId(componentId, streamId);
  custom_grouper = new MkCustomGrouper(topology_context, g, stream, out_tasks, myTaskId);
} else if (Grouping._Fields.LOCAL_OR_SHUFFLE.equals(fields)) {
  grouperType = GrouperType.shuffle;
  shuffer = new MkShuffer(topology_context.getThisComponentId(), targetComponent, workerData);
} else if (Grouping._Fields.LOCAL_FIRST.equals(fields)) {
  grouperType = GrouperType.shuffle;
  shuffer = new MkShuffer(topology_context.getThisComponentId(), targetComponent, workerData);

代码示例来源:origin: alibaba/jstorm

public MkGrouper(TopologyContext _topology_context, Fields _out_fields, Grouping _thrift_grouping,
         String targetComponent, String streamId, WorkerData workerData) {
  this.topologyContext = _topology_context;
  this.outFields = _out_fields;
  this.thriftGrouping = _thrift_grouping;
  this.streamId = streamId;
  this.targetComponent = targetComponent;
  List<Integer> outTasks = topologyContext.getComponentTasks(targetComponent);
  this.outTasks = new ArrayList<>();
  this.outTasks.addAll(outTasks);
  Collections.sort(this.outTasks);
  this.localTasks = _topology_context.getThisWorkerTasks();
  this.fields = Thrift.groupingType(thriftGrouping);
  this.groupType = this.parseGroupType(workerData);
  String id = _topology_context.getThisTaskId() + ":" + streamId;
  LOG.info(id + " groupType is " + groupType + ", outTasks is " + this.outTasks + ", localTasks" + localTasks);
}

代码示例来源:origin: alibaba/jstorm

public TaskSendTargets(Map<Object, Object> _storm_conf, String _component,
            Map<String, Map<String, MkGrouper>> _stream_component_grouper,
            TopologyContext _topology_context, TaskBaseMetric _task_stats) {
  this.stormConf = _storm_conf;
  this.componentId = _component;
  this.streamComponentGrouper = _stream_component_grouper;
  this.topologyContext = _topology_context;
  this.taskStats = _task_stats;
  taskId = topologyContext.getThisTaskId();
  debugIdStr = " emit from " + componentId + ":" + taskId + " ";
}

代码示例来源:origin: alibaba/jstorm

/**
 * Gets information about who is consuming the outputs of this component, and how.
 *
 * @return Map from stream id to component id to the Grouping used.
 */
public Map<String, Map<String, Grouping>> getThisTargets() {
  return getTargets(getThisComponentId());
}

代码示例来源:origin: alibaba/jstorm

/**
 * Gets the set of streams declared for the component of this task.
 */
public Set<String> getThisStreams() {
  return getComponentStreams(getThisComponentId());
}

代码示例来源:origin: alibaba/jstorm

/**
 * Gets the declared inputs to this component.
 *
 * @return A map from subscribed component/stream to the grouping subscribed with.
 */
public Map<GlobalStreamId, Grouping> getThisSources() {
  return getSources(getThisComponentId());
}

代码示例来源:origin: alibaba/jstorm

private void initKeyRangeState(int keyRange) {
  IRichCheckpointKvState<K, V, String> state =
      (IRichCheckpointKvState<K, V, String>) Utils.newInstance("com.alibaba.jstorm.hdfs.transaction.RocksDbHdfsState");
  state.setStateName(context.getThisComponentId() + "/" + String.valueOf(keyRange));
  state.init(context);
  keyRangeToState.put(keyRange, state);
}

相关文章

微信公众号

最新文章

更多