org.apache.storm.task.TopologyContext.getStormId()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(12.0k)|赞(0)|评价(0)|浏览(82)

本文整理了Java中org.apache.storm.task.TopologyContext.getStormId()方法的一些代码示例,展示了TopologyContext.getStormId()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getStormId()方法的具体详情如下:
包路径:org.apache.storm.task.TopologyContext
类名称:TopologyContext
方法名:getStormId

TopologyContext.getStormId介绍

暂无

代码示例

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { 
  try {
    url = new URL((String)registrationArgument);
    this.errorReporter = errorReporter;
    serializer = new KryoValuesSerializer(topoConf);
    topologyId = context.getStormId();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}

代码示例来源:origin: apache/storm

/**
 * Create a manager with the given context.
 */
public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
  this.context = context;
  try {
    commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
      context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
    this.processingGuarantee = processingGuarantee;
  } catch (JsonProcessingException e) {
    LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
    throw new RuntimeException(e);
  }
}

代码示例来源:origin: apache/storm

public static String metricName(String name, TopologyContext context) {
  StringBuilder sb = new StringBuilder("storm.topology.");
  sb.append(context.getStormId());
  sb.append(".");
  sb.append(hostName);
  sb.append(".");
  sb.append(dotToUnderScore(context.getThisComponentId()));
  sb.append(".");
  sb.append(context.getThisTaskId());
  sb.append(".");
  sb.append(context.getThisWorkerPort());
  sb.append("-");
  sb.append(name);
  return sb.toString();
}

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context) {
  String stormId = context.getStormId();
  int port = context.getThisWorkerPort();
  /*
   * Include the topology name & worker port in the file name so that
   * multiple event loggers can log independently.
   */
  String workersArtifactRoot = ConfigUtils.workerArtifactsRoot(conf, stormId, port);
  Path path = Paths.get(workersArtifactRoot, "events.log");
  File dir = path.toFile().getParentFile();
  if (!dir.exists()) {
    dir.mkdirs();
  }
  initLogWriter(path);
  setUpFlushTask();
}

代码示例来源:origin: apache/storm

/**
 * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
 *
 * @param tp The topic partition the commit metadata belongs to.
 * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
 * @param offsetManagers The offset managers.
 * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
 */
public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset,
  Map<TopicPartition, OffsetManager> offsetManagers) {
  try {
    if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
      && offsetManagers.containsKey(tp)
      && offsetManagers.get(tp).hasCommitted()) {
      return true;
    }
    final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
    return committedMetadata.getTopologyId().equals(context.getStormId());
  } catch (IOException e) {
    LOG.warn("Failed to deserialize expected commit metadata [{}]."
      + " This error is expected to occur once per partition, if the last commit to each partition"
      + " was by an earlier version of the KafkaSpout, or by a process other than the KafkaSpout. "
      + "Defaulting to behavior compatible with earlier version", committedOffset);
    LOG.trace("", e);
    return false;
  }
}

代码示例来源:origin: apache/storm

if (isFirstPollSinceExecutorStarted(tp)) {
  boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null 
    || !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId());
  if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) {
    LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);

代码示例来源:origin: apache/storm

records.get(0).offset(),
    records.get(records.size() - 1).offset(),
    topologyContext.getStormId());
} else {
  currentBatch = new KafkaTridentSpoutBatchMetadata(lastEmittedOffset, lastEmittedOffset, topologyContext.getStormId());

代码示例来源:origin: apache/storm

@BeforeEach
public void setUp() {
  when(topologyContextMock.getStormId()).thenReturn(topologyId);
  consumer.assign(Collections.singleton(partition));
  consumer.updateBeginningOffsets(Collections.singletonMap(partition, firstOffsetInKafka));
  consumer.updateEndOffsets(Collections.singletonMap(partition, firstOffsetInKafka + recordsInKafka));
  List<ConsumerRecord<String, String>> records = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstOffsetInKafka, recordsInKafka);
  records.forEach(record -> consumer.addRecord(record));
}

代码示例来源:origin: apache/storm

Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();
if (!topologyContext.getStormId().equals(currBatchMeta.getTopologyId())
  && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) {
  LOG.debug("Skipping re-emit of batch that was originally emitted by another topology,"

代码示例来源:origin: apache/storm

@Test
  public void test_FirstPollStrategy_Earliest_Enforced_OnlyOnTopologyDeployment() throws Exception {
    when(topologyContext.getStormId()).thenReturn("topology-1");

    final int messageCount = 2;
    prepareSpout(messageCount);

    nextTuple_verifyEmitted_ack_resetCollector(0);

    //Commits offsets during deactivation
    spout.deactivate();

    verifyAllMessagesCommitted(1);

    // Restart topology with a different topology id
    setUp();
    when(topologyContext.getStormId()).thenReturn("topology-2");
    // Initialize spout using the same populated data (i.e same kafkaUnitRule)
    SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);

    //Emit all messages and check that they are emitted. Ack the messages too
    for (int i = 0; i < messageCount; i++) {
      nextTuple_verifyEmitted_ack_resetCollector(i);
    }

    commitAndVerifyAllMessagesCommitted(messageCount);
  }
}

代码示例来源:origin: apache/storm

@Test
public void test_FirstPollStrategy_Earliest_NotEnforced_OnPartitionReassignment() throws Exception {
  when(topologyContext.getStormId()).thenReturn("topology-1");
  final int messageCount = 2;
  prepareSpout(messageCount);
  nextTuple_verifyEmitted_ack_resetCollector(0);
  //Commits offsets during deactivation
  spout.deactivate();
  verifyAllMessagesCommitted(1);
  // Restart topology with the same topology id, which mimics the behavior of partition reassignment
  setUp();
  // Initialize spout using the same populated data (i.e same kafkaUnitRule)
  SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
  nextTuple_verifyEmitted_ack_resetCollector(1);
  commitAndVerifyAllMessagesCommitted(messageCount);
}

代码示例来源:origin: apache/metron

public void initialize(TopologyContext context) {
 _callback = createCallback(callbackClazz);
 _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, _spoutConfig)
               .with(EmitContext.Type.UUID, context.getStormId())
               ;
 _callback.initialize(_context);
}

代码示例来源:origin: org.apache.storm/storm-kafka-client

/**
 * Create a manager with the given context.
 */
public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
  this.context = context;
  try {
    commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
      context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
    this.processingGuarantee = processingGuarantee;
  } catch (JsonProcessingException e) {
    LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
    throw new RuntimeException(e);
  }
}

代码示例来源:origin: org.apache.storm/storm-core

public static String metricName(String name, TopologyContext context){
  StringBuilder sb = new StringBuilder("storm.topology.");
  sb.append(context.getStormId());
  sb.append(".");
  sb.append(hostName);
  sb.append(".");
  sb.append(dotToUnderScore(context.getThisComponentId()));
  sb.append(".");
  sb.append(context.getThisTaskId());
  sb.append(".");
  sb.append(context.getThisWorkerPort());
  sb.append("-");
  sb.append(name);
  return sb.toString();
}

代码示例来源:origin: org.apache.storm/storm-kafka

@Override
public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map conf, TopologyContext context) {
  return new TridentKafkaEmitter(conf, context, _config, context
      .getStormId()).asOpaqueEmitter();
}

代码示例来源:origin: org.apache.storm/storm-core

@Override
public void prepare(Map<String, Object> stormConf, Map<String, Object> arguments, TopologyContext context) {
  String workersArtifactDir; // workers artifact directory
  String stormId = context.getStormId();
  int port = context.getThisWorkerPort();
  if ((workersArtifactDir = (String) stormConf.get(Config.STORM_WORKERS_ARTIFACTS_DIR)) == null) {
    workersArtifactDir = "workers-artifacts";
  }
  /*
   * Include the topology name & worker port in the file name so that
   * multiple event loggers can log independently.
   */
  Path path = Paths.get(workersArtifactDir, stormId, Integer.toString(port), "events.log");
  if (!path.isAbsolute()) {
    path = Paths.get(getLogDir(stormConf), workersArtifactDir,
             stormId, Integer.toString(port), "events.log");
  }
  File dir = path.toFile().getParentFile();
  if (!dir.exists()) {
    dir.mkdirs();
  }
  initLogWriter(path);
  setUpFlushTask();
}

代码示例来源:origin: org.apache.storm/storm-kafka

@Override
public IPartitionedTridentSpout.Emitter getEmitter(Map conf, TopologyContext context) {
  return new TridentKafkaEmitter(conf, context, _config, context.getStormId()).asTransactionalEmitter();
}

代码示例来源:origin: org.apache.storm/storm-kafka-client

/**
 * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
 *
 * @param tp The topic partition the commit metadata belongs to.
 * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
 * @param offsetManagers The offset managers.
 * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
 */
public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset,
  Map<TopicPartition, OffsetManager> offsetManagers) {
  try {
    if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
      && offsetManagers.containsKey(tp)
      && offsetManagers.get(tp).hasCommitted()) {
      return true;
    }
    final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
    return committedMetadata.getTopologyId().equals(context.getStormId());
  } catch (IOException e) {
    LOG.warn("Failed to deserialize expected commit metadata [{}]."
      + " This error is expected to occur once per partition, if the last commit to each partition"
      + " was by an earlier version of the KafkaSpout, or by a process other than the KafkaSpout. "
      + "Defaulting to behavior compatible with earlier version", committedOffset);
    LOG.trace("", e);
    return false;
  }
}

代码示例来源:origin: org.apache.storm/storm-kafka

@Override
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
  _collector = collector;
  String topologyInstanceId = context.getStormId();
  Map<String, Object> stateConf = new HashMap<>(conf);
  List<String> zkServers = _spoutConfig.zkServers;

代码示例来源:origin: flipkart-incubator/storm-mysql

@Override
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector spoutOutputCollector) {
  Preconditions.checkNotNull(this.spoutConfig.getZkBinLogStateConfig(),
      "Zookeeper Config cannot be null");
  Preconditions.checkNotNull(this.spoutConfig.getMysqlConfig(),
      "Mysql Config cannot be null");
  LOGGER.info("Initiating MySql Spout with config {}", this.spoutConfig.toString());
  this.collector          = spoutOutputCollector;
  this.topologyInstanceId = context.getStormId();
  this.topologyName       = conf.get(Config.TOPOLOGY_NAME).toString();
  this.databaseName       = this.spoutConfig.getMysqlConfig().getDatabase();
  this.sidelineStrategy   = this.spoutConfig.getFailureConfig().getSidelineStrategy();
  this.sidelineStrategy.initialize(conf, context);
  initializeAndRegisterAllMetrics(context, this.spoutConfig.getMetricsTimeBucketSizeInSecs());
  txQueue = this.clientFactory.initializeBuffer(this.spoutConfig.getBufferCapacity());
  zkClient = this.clientFactory.getZkClient(conf, this.spoutConfig.getZkBinLogStateConfig());
  mySqlClient = this.clientFactory.getMySqlClient(this.spoutConfig.getMysqlConfig());
  openReplicatorClient = this.clientFactory.getReplicatorClient(mySqlClient, zkClient);
  begin();
}

相关文章

微信公众号

最新文章

更多