本文整理了Java中org.apache.storm.task.TopologyContext.getStormId()
方法的一些代码示例,展示了TopologyContext.getStormId()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getStormId()
方法的具体详情如下:
包路径:org.apache.storm.task.TopologyContext
类名称:TopologyContext
方法名:getStormId
暂无
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
try {
url = new URL((String)registrationArgument);
this.errorReporter = errorReporter;
serializer = new KryoValuesSerializer(topoConf);
topologyId = context.getStormId();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
代码示例来源:origin: apache/storm
/**
* Create a manager with the given context.
*/
public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
this.context = context;
try {
commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
this.processingGuarantee = processingGuarantee;
} catch (JsonProcessingException e) {
LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
throw new RuntimeException(e);
}
}
代码示例来源:origin: apache/storm
public static String metricName(String name, TopologyContext context) {
StringBuilder sb = new StringBuilder("storm.topology.");
sb.append(context.getStormId());
sb.append(".");
sb.append(hostName);
sb.append(".");
sb.append(dotToUnderScore(context.getThisComponentId()));
sb.append(".");
sb.append(context.getThisTaskId());
sb.append(".");
sb.append(context.getThisWorkerPort());
sb.append("-");
sb.append(name);
return sb.toString();
}
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context) {
String stormId = context.getStormId();
int port = context.getThisWorkerPort();
/*
* Include the topology name & worker port in the file name so that
* multiple event loggers can log independently.
*/
String workersArtifactRoot = ConfigUtils.workerArtifactsRoot(conf, stormId, port);
Path path = Paths.get(workersArtifactRoot, "events.log");
File dir = path.toFile().getParentFile();
if (!dir.exists()) {
dir.mkdirs();
}
initLogWriter(path);
setUpFlushTask();
}
代码示例来源:origin: apache/storm
/**
* Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
*
* @param tp The topic partition the commit metadata belongs to.
* @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
* @param offsetManagers The offset managers.
* @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
*/
public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset,
Map<TopicPartition, OffsetManager> offsetManagers) {
try {
if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
&& offsetManagers.containsKey(tp)
&& offsetManagers.get(tp).hasCommitted()) {
return true;
}
final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
return committedMetadata.getTopologyId().equals(context.getStormId());
} catch (IOException e) {
LOG.warn("Failed to deserialize expected commit metadata [{}]."
+ " This error is expected to occur once per partition, if the last commit to each partition"
+ " was by an earlier version of the KafkaSpout, or by a process other than the KafkaSpout. "
+ "Defaulting to behavior compatible with earlier version", committedOffset);
LOG.trace("", e);
return false;
}
}
代码示例来源:origin: apache/storm
if (isFirstPollSinceExecutorStarted(tp)) {
boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null
|| !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId());
if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) {
LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
代码示例来源:origin: apache/storm
records.get(0).offset(),
records.get(records.size() - 1).offset(),
topologyContext.getStormId());
} else {
currentBatch = new KafkaTridentSpoutBatchMetadata(lastEmittedOffset, lastEmittedOffset, topologyContext.getStormId());
代码示例来源:origin: apache/storm
@BeforeEach
public void setUp() {
when(topologyContextMock.getStormId()).thenReturn(topologyId);
consumer.assign(Collections.singleton(partition));
consumer.updateBeginningOffsets(Collections.singletonMap(partition, firstOffsetInKafka));
consumer.updateEndOffsets(Collections.singletonMap(partition, firstOffsetInKafka + recordsInKafka));
List<ConsumerRecord<String, String>> records = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstOffsetInKafka, recordsInKafka);
records.forEach(record -> consumer.addRecord(record));
}
代码示例来源:origin: apache/storm
Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();
if (!topologyContext.getStormId().equals(currBatchMeta.getTopologyId())
&& isFirstPollOffsetStrategyIgnoringCommittedOffsets()) {
LOG.debug("Skipping re-emit of batch that was originally emitted by another topology,"
代码示例来源:origin: apache/storm
@Test
public void test_FirstPollStrategy_Earliest_Enforced_OnlyOnTopologyDeployment() throws Exception {
when(topologyContext.getStormId()).thenReturn("topology-1");
final int messageCount = 2;
prepareSpout(messageCount);
nextTuple_verifyEmitted_ack_resetCollector(0);
//Commits offsets during deactivation
spout.deactivate();
verifyAllMessagesCommitted(1);
// Restart topology with a different topology id
setUp();
when(topologyContext.getStormId()).thenReturn("topology-2");
// Initialize spout using the same populated data (i.e same kafkaUnitRule)
SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
//Emit all messages and check that they are emitted. Ack the messages too
for (int i = 0; i < messageCount; i++) {
nextTuple_verifyEmitted_ack_resetCollector(i);
}
commitAndVerifyAllMessagesCommitted(messageCount);
}
}
代码示例来源:origin: apache/storm
@Test
public void test_FirstPollStrategy_Earliest_NotEnforced_OnPartitionReassignment() throws Exception {
when(topologyContext.getStormId()).thenReturn("topology-1");
final int messageCount = 2;
prepareSpout(messageCount);
nextTuple_verifyEmitted_ack_resetCollector(0);
//Commits offsets during deactivation
spout.deactivate();
verifyAllMessagesCommitted(1);
// Restart topology with the same topology id, which mimics the behavior of partition reassignment
setUp();
// Initialize spout using the same populated data (i.e same kafkaUnitRule)
SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
nextTuple_verifyEmitted_ack_resetCollector(1);
commitAndVerifyAllMessagesCommitted(messageCount);
}
代码示例来源:origin: apache/metron
public void initialize(TopologyContext context) {
_callback = createCallback(callbackClazz);
_context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, _spoutConfig)
.with(EmitContext.Type.UUID, context.getStormId())
;
_callback.initialize(_context);
}
代码示例来源:origin: org.apache.storm/storm-kafka-client
/**
* Create a manager with the given context.
*/
public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
this.context = context;
try {
commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
this.processingGuarantee = processingGuarantee;
} catch (JsonProcessingException e) {
LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
throw new RuntimeException(e);
}
}
代码示例来源:origin: org.apache.storm/storm-core
public static String metricName(String name, TopologyContext context){
StringBuilder sb = new StringBuilder("storm.topology.");
sb.append(context.getStormId());
sb.append(".");
sb.append(hostName);
sb.append(".");
sb.append(dotToUnderScore(context.getThisComponentId()));
sb.append(".");
sb.append(context.getThisTaskId());
sb.append(".");
sb.append(context.getThisWorkerPort());
sb.append("-");
sb.append(name);
return sb.toString();
}
代码示例来源:origin: org.apache.storm/storm-kafka
@Override
public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map conf, TopologyContext context) {
return new TridentKafkaEmitter(conf, context, _config, context
.getStormId()).asOpaqueEmitter();
}
代码示例来源:origin: org.apache.storm/storm-core
@Override
public void prepare(Map<String, Object> stormConf, Map<String, Object> arguments, TopologyContext context) {
String workersArtifactDir; // workers artifact directory
String stormId = context.getStormId();
int port = context.getThisWorkerPort();
if ((workersArtifactDir = (String) stormConf.get(Config.STORM_WORKERS_ARTIFACTS_DIR)) == null) {
workersArtifactDir = "workers-artifacts";
}
/*
* Include the topology name & worker port in the file name so that
* multiple event loggers can log independently.
*/
Path path = Paths.get(workersArtifactDir, stormId, Integer.toString(port), "events.log");
if (!path.isAbsolute()) {
path = Paths.get(getLogDir(stormConf), workersArtifactDir,
stormId, Integer.toString(port), "events.log");
}
File dir = path.toFile().getParentFile();
if (!dir.exists()) {
dir.mkdirs();
}
initLogWriter(path);
setUpFlushTask();
}
代码示例来源:origin: org.apache.storm/storm-kafka
@Override
public IPartitionedTridentSpout.Emitter getEmitter(Map conf, TopologyContext context) {
return new TridentKafkaEmitter(conf, context, _config, context.getStormId()).asTransactionalEmitter();
}
代码示例来源:origin: org.apache.storm/storm-kafka-client
/**
* Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
*
* @param tp The topic partition the commit metadata belongs to.
* @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
* @param offsetManagers The offset managers.
* @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
*/
public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset,
Map<TopicPartition, OffsetManager> offsetManagers) {
try {
if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
&& offsetManagers.containsKey(tp)
&& offsetManagers.get(tp).hasCommitted()) {
return true;
}
final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
return committedMetadata.getTopologyId().equals(context.getStormId());
} catch (IOException e) {
LOG.warn("Failed to deserialize expected commit metadata [{}]."
+ " This error is expected to occur once per partition, if the last commit to each partition"
+ " was by an earlier version of the KafkaSpout, or by a process other than the KafkaSpout. "
+ "Defaulting to behavior compatible with earlier version", committedOffset);
LOG.trace("", e);
return false;
}
}
代码示例来源:origin: org.apache.storm/storm-kafka
@Override
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;
String topologyInstanceId = context.getStormId();
Map<String, Object> stateConf = new HashMap<>(conf);
List<String> zkServers = _spoutConfig.zkServers;
代码示例来源:origin: flipkart-incubator/storm-mysql
@Override
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector spoutOutputCollector) {
Preconditions.checkNotNull(this.spoutConfig.getZkBinLogStateConfig(),
"Zookeeper Config cannot be null");
Preconditions.checkNotNull(this.spoutConfig.getMysqlConfig(),
"Mysql Config cannot be null");
LOGGER.info("Initiating MySql Spout with config {}", this.spoutConfig.toString());
this.collector = spoutOutputCollector;
this.topologyInstanceId = context.getStormId();
this.topologyName = conf.get(Config.TOPOLOGY_NAME).toString();
this.databaseName = this.spoutConfig.getMysqlConfig().getDatabase();
this.sidelineStrategy = this.spoutConfig.getFailureConfig().getSidelineStrategy();
this.sidelineStrategy.initialize(conf, context);
initializeAndRegisterAllMetrics(context, this.spoutConfig.getMetricsTimeBucketSizeInSecs());
txQueue = this.clientFactory.initializeBuffer(this.spoutConfig.getBufferCapacity());
zkClient = this.clientFactory.getZkClient(conf, this.spoutConfig.getZkBinLogStateConfig());
mySqlClient = this.clientFactory.getMySqlClient(this.spoutConfig.getMysqlConfig());
openReplicatorClient = this.clientFactory.getReplicatorClient(mySqlClient, zkClient);
begin();
}
内容来源于网络,如有侵权,请联系作者删除!