org.apache.storm.task.TopologyContext.getThisTaskId()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(111)

本文整理了Java中org.apache.storm.task.TopologyContext.getThisTaskId()方法的一些代码示例,展示了TopologyContext.getThisTaskId()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getThisTaskId()方法的具体详情如下:
包路径:org.apache.storm.task.TopologyContext
类名称:TopologyContext
方法名:getThisTaskId

TopologyContext.getThisTaskId介绍

[英]Gets the task id of this task.
[中]获取此任务的任务id。

代码示例

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> conf, TopologyContext topologyContext) {
  this.componentId = topologyContext.getThisComponentId();
  this.taskId = topologyContext.getThisTaskId();
}

代码示例来源:origin: apache/storm

/**
 * Gets the index of this task id in getComponentTasks(getThisComponentId()). An example use case for this method is determining which
 * task accesses which resource in a distributed resource to ensure an even distribution.
 */
public int getThisTaskIndex() {
  List<Integer> tasks = new ArrayList<>(getComponentTasks(getThisComponentId()));
  Collections.sort(tasks);
  for (int i = 0; i < tasks.size(); i++) {
    if (tasks.get(i) == getThisTaskId()) {
      return i;
    }
  }
  throw new RuntimeException("Fatal: could not find this task id in this component");
}

代码示例来源:origin: apache/storm

/**
 * Create a manager with the given context.
 */
public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
  this.context = context;
  try {
    commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
      context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
    this.processingGuarantee = processingGuarantee;
  } catch (JsonProcessingException e) {
    LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
    throw new RuntimeException(e);
  }
}

代码示例来源:origin: apache/storm

public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
  _context = context;
  List<Integer> tasks = context.getComponentTasks(context.getThisComponentId());
  int startIndex;
  for (startIndex = 0; startIndex < tasks.size(); startIndex++) {
    if (tasks.get(startIndex) == context.getThisTaskId()) {
      break;
    }
  }
  _collector = collector;
  _pending = new HashMap<String, FixedTuple>();
  _serveTuples = new ArrayList<FixedTuple>();
  for (int i = startIndex; i < _tuples.size(); i += tasks.size()) {
    _serveTuples.add(_tuples.get(i));
  }
}

代码示例来源:origin: apache/storm

public static String metricName(String name, TopologyContext context) {
  StringBuilder sb = new StringBuilder("storm.topology.");
  sb.append(context.getStormId());
  sb.append(".");
  sb.append(hostName);
  sb.append(".");
  sb.append(dotToUnderScore(context.getThisComponentId()));
  sb.append(".");
  sb.append(context.getThisTaskId());
  sb.append(".");
  sb.append(context.getThisWorkerPort());
  sb.append("-");
  sb.append(name);
  return sb.toString();
}

代码示例来源:origin: apache/storm

@SuppressWarnings("unchecked")
@Override
public void prepare(Map<String, Object> conf, TopologyContext topologyContext) {
  this.componentId = topologyContext.getThisComponentId();
  this.taskId = topologyContext.getThisTaskId();
  try {
    this.host = Utils.localHostname();
  } catch (UnknownHostException e) {
    throw new RuntimeException(e);
  }
}

代码示例来源:origin: apache/storm

public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
  _collector = collector;
  this.source = context.getThisTaskId();
  long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
  myCount = totalCount / taskCount;
}

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
  this.collector = collector;
  this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
  this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain);
  try {
    this.publisher.connectMqtt(this.topologyName + "-" + context.getThisComponentId() + "-" + context.getThisTaskId());
  } catch (Exception e) {
    LOG.error("Unable to connect to MQTT Broker.", e);
    throw new RuntimeException("Unable to connect to MQTT Broker.", e);
  }
}

代码示例来源:origin: apache/storm

private KeyValueState<TaskStream, WindowState> getWindowState(Map<String, Object> topoConf, TopologyContext context) {
  String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window";
  return (KeyValueState<TaskStream, WindowState>) StateFactory.getState(namespace, topoConf, context);
}

代码示例来源:origin: apache/storm

private KeyValueState<String, Deque<Long>> getPartitionState(Map<String, Object> topoConf, TopologyContext context) {
  String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window-partitions";
  return (KeyValueState<String, Deque<Long>>) StateFactory.getState(namespace, topoConf, context);
}

代码示例来源:origin: apache/storm

private KeyValueState<Long, WindowPartition<Tuple>> getWindowState(Map<String, Object> topoConf, TopologyContext context) {
  String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window";
  return (KeyValueState<Long, WindowPartition<Tuple>>) StateFactory.getState(namespace, topoConf, context);
}

代码示例来源:origin: apache/storm

private KeyValueState<String, Optional<?>> getWindowSystemState(Map<String, Object> topoConf, TopologyContext context) {
  String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window-systemstate";
  return (KeyValueState<String, Optional<?>>) StateFactory.getState(namespace, topoConf, context);
}

代码示例来源:origin: apache/storm

public Tuple getTuple(String stream, List values) {
  return new TupleImpl(systemTopologyContext, values, executor.getComponentId(), systemTopologyContext.getThisTaskId(), stream);
}

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
  // get the last successfully committed state from state store
  String namespace = context.getThisComponentId() + "-" + context.getThisTaskId();
  prepare(topoConf, context, collector, StateFactory.getState(namespace, topoConf, context));
}

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
  Validate.notEmpty(properties, "Producer properties can not be empty");
  Validate.notNull(selector, "TopicSelector can not be null");
  Validate.notNull(mapper, "TupleToMessageMapper can not be null");
  producer = new DefaultMQProducer();
  producer.setInstanceName(String.valueOf(context.getThisTaskId()));
  RocketMqConfig.buildProducerConfigs(properties, producer);
  try {
    producer.start();
  } catch (MQClientException e) {
    throw new RuntimeException(e);
  }
  this.collector = collector;
  this.batchHelper = new BatchHelper(batchSize, collector);
  this.messages = new LinkedList<>();
}

代码示例来源:origin: apache/storm

@Before
public void setUp() throws Exception {
  mockTopologyContext = Mockito.mock(TopologyContext.class);
  Mockito.when(mockTopologyContext.getThisComponentId()).thenReturn("test");
  Mockito.when(mockTopologyContext.getThisTaskId()).thenReturn(1);
  mockOutputCollector = Mockito.mock(SpoutOutputCollector.class);
}

代码示例来源:origin: apache/storm

/**
 * Loads the last saved checkpoint state the from persistent storage.
 */
private KeyValueState<String, CheckPointState> loadCheckpointState(Map<String, Object> conf, TopologyContext ctx) {
  String namespace = ctx.getThisComponentId() + "-" + ctx.getThisTaskId();
  KeyValueState<String, CheckPointState> state =
    (KeyValueState<String, CheckPointState>) StateFactory.getState(namespace, conf, ctx);
  if (state.get(TX_STATE_KEY) == null) {
    CheckPointState txState = new CheckPointState(-1, COMMITTED);
    state.put(TX_STATE_KEY, txState);
    state.commit();
    LOG.debug("Initialized checkpoint spout state with txState {}", txState);
  } else {
    LOG.debug("Got checkpoint spout state {}", state.get(TX_STATE_KEY));
  }
  return state;
}

代码示例来源:origin: apache/storm

@Before
public void setUp() throws Exception {
  mockBolt = Mockito.mock(IStatefulBolt.class);
  executor = new StatefulBoltExecutor<>(mockBolt);
  mockTopologyContext = Mockito.mock(TopologyContext.class);
  mockOutputCollector = Mockito.mock(OutputCollector.class);
  mockState = Mockito.mock(KeyValueState.class);
  Mockito.when(mockTopologyContext.getThisComponentId()).thenReturn("test");
  Mockito.when(mockTopologyContext.getThisTaskId()).thenReturn(1);
  GlobalStreamId globalStreamId = new GlobalStreamId("test", CheckpointSpout.CHECKPOINT_STREAM_ID);
  Map<GlobalStreamId, Grouping> thisSources = Collections.singletonMap(globalStreamId, mock(Grouping.class));
  Mockito.when(mockTopologyContext.getThisSources()).thenReturn(thisSources);
  Mockito.when(mockTopologyContext.getComponentTasks(Mockito.any())).thenReturn(Collections.singletonList(1));
  mockTuple = Mockito.mock(Tuple.class);
  mockCheckpointTuple = Mockito.mock(Tuple.class);
  executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
}

代码示例来源:origin: apache/storm

private void connectMqtt() throws Exception {
  String clientId = this.topologyName + "-" + this.context.getThisComponentId() + "-" +
           this.context.getThisTaskId();
  MQTT client = MqttUtils.configureClient(this.options, clientId, this.keyStoreLoader);
  this.connection = client.callbackConnection();
  this.connection.listener(this);
  this.connection.connect(new ConnectCallback());
  while (!this.mqttConnected && !this.mqttConnectFailed) {
    LOG.info("Waiting for connection...");
    Thread.sleep(500);
  }
  if (this.mqttConnected) {
    List<String> topicList = this.options.getTopics();
    Topic[] topics = new Topic[topicList.size()];
    QoS qos = MqttUtils.qosFromInt(this.options.getQos());
    for (int i = 0; i < topicList.size(); i++) {
      topics[i] = new Topic(topicList.get(i), qos);
    }
    connection.subscribe(topics, new SubscribeCallback());
  }
}

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, TridentContext tridentContext) {
  this.topologyContext = context;
  List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
  if (parents.size() != 1) {
    throw new RuntimeException("Aggregation related operation can only have one parent");
  }
  Long maxTuplesCacheSize = getWindowTuplesCacheSize(topoConf);
  this.tridentContext = tridentContext;
  collector = new FreshCollector(tridentContext);
  projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);
  windowStore = windowStoreFactory.create(topoConf);
  windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
  windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);
  tridentWindowManager = storeTuplesInStore ?
    new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(),
                      maxTuplesCacheSize, inputFields)
    : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());
  tridentWindowManager.prepare();
}

相关文章

微信公众号

最新文章

更多