本文整理了Java中org.apache.storm.task.TopologyContext.getThisTaskId()
方法的一些代码示例,展示了TopologyContext.getThisTaskId()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getThisTaskId()
方法的具体详情如下:
包路径:org.apache.storm.task.TopologyContext
类名称:TopologyContext
方法名:getThisTaskId
[英]Gets the task id of this task.
[中]获取此任务的任务id。
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> conf, TopologyContext topologyContext) {
this.componentId = topologyContext.getThisComponentId();
this.taskId = topologyContext.getThisTaskId();
}
代码示例来源:origin: apache/storm
/**
* Gets the index of this task id in getComponentTasks(getThisComponentId()). An example use case for this method is determining which
* task accesses which resource in a distributed resource to ensure an even distribution.
*/
public int getThisTaskIndex() {
List<Integer> tasks = new ArrayList<>(getComponentTasks(getThisComponentId()));
Collections.sort(tasks);
for (int i = 0; i < tasks.size(); i++) {
if (tasks.get(i) == getThisTaskId()) {
return i;
}
}
throw new RuntimeException("Fatal: could not find this task id in this component");
}
代码示例来源:origin: apache/storm
/**
* Create a manager with the given context.
*/
public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
this.context = context;
try {
commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
this.processingGuarantee = processingGuarantee;
} catch (JsonProcessingException e) {
LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
throw new RuntimeException(e);
}
}
代码示例来源:origin: apache/storm
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_context = context;
List<Integer> tasks = context.getComponentTasks(context.getThisComponentId());
int startIndex;
for (startIndex = 0; startIndex < tasks.size(); startIndex++) {
if (tasks.get(startIndex) == context.getThisTaskId()) {
break;
}
}
_collector = collector;
_pending = new HashMap<String, FixedTuple>();
_serveTuples = new ArrayList<FixedTuple>();
for (int i = startIndex; i < _tuples.size(); i += tasks.size()) {
_serveTuples.add(_tuples.get(i));
}
}
代码示例来源:origin: apache/storm
public static String metricName(String name, TopologyContext context) {
StringBuilder sb = new StringBuilder("storm.topology.");
sb.append(context.getStormId());
sb.append(".");
sb.append(hostName);
sb.append(".");
sb.append(dotToUnderScore(context.getThisComponentId()));
sb.append(".");
sb.append(context.getThisTaskId());
sb.append(".");
sb.append(context.getThisWorkerPort());
sb.append("-");
sb.append(name);
return sb.toString();
}
代码示例来源:origin: apache/storm
@SuppressWarnings("unchecked")
@Override
public void prepare(Map<String, Object> conf, TopologyContext topologyContext) {
this.componentId = topologyContext.getThisComponentId();
this.taskId = topologyContext.getThisTaskId();
try {
this.host = Utils.localHostname();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
代码示例来源:origin: apache/storm
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
this.source = context.getThisTaskId();
long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
myCount = totalCount / taskCount;
}
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain);
try {
this.publisher.connectMqtt(this.topologyName + "-" + context.getThisComponentId() + "-" + context.getThisTaskId());
} catch (Exception e) {
LOG.error("Unable to connect to MQTT Broker.", e);
throw new RuntimeException("Unable to connect to MQTT Broker.", e);
}
}
代码示例来源:origin: apache/storm
private KeyValueState<TaskStream, WindowState> getWindowState(Map<String, Object> topoConf, TopologyContext context) {
String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window";
return (KeyValueState<TaskStream, WindowState>) StateFactory.getState(namespace, topoConf, context);
}
代码示例来源:origin: apache/storm
private KeyValueState<String, Deque<Long>> getPartitionState(Map<String, Object> topoConf, TopologyContext context) {
String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window-partitions";
return (KeyValueState<String, Deque<Long>>) StateFactory.getState(namespace, topoConf, context);
}
代码示例来源:origin: apache/storm
private KeyValueState<Long, WindowPartition<Tuple>> getWindowState(Map<String, Object> topoConf, TopologyContext context) {
String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window";
return (KeyValueState<Long, WindowPartition<Tuple>>) StateFactory.getState(namespace, topoConf, context);
}
代码示例来源:origin: apache/storm
private KeyValueState<String, Optional<?>> getWindowSystemState(Map<String, Object> topoConf, TopologyContext context) {
String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window-systemstate";
return (KeyValueState<String, Optional<?>>) StateFactory.getState(namespace, topoConf, context);
}
代码示例来源:origin: apache/storm
public Tuple getTuple(String stream, List values) {
return new TupleImpl(systemTopologyContext, values, executor.getComponentId(), systemTopologyContext.getThisTaskId(), stream);
}
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
// get the last successfully committed state from state store
String namespace = context.getThisComponentId() + "-" + context.getThisTaskId();
prepare(topoConf, context, collector, StateFactory.getState(namespace, topoConf, context));
}
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
Validate.notEmpty(properties, "Producer properties can not be empty");
Validate.notNull(selector, "TopicSelector can not be null");
Validate.notNull(mapper, "TupleToMessageMapper can not be null");
producer = new DefaultMQProducer();
producer.setInstanceName(String.valueOf(context.getThisTaskId()));
RocketMqConfig.buildProducerConfigs(properties, producer);
try {
producer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}
this.collector = collector;
this.batchHelper = new BatchHelper(batchSize, collector);
this.messages = new LinkedList<>();
}
代码示例来源:origin: apache/storm
@Before
public void setUp() throws Exception {
mockTopologyContext = Mockito.mock(TopologyContext.class);
Mockito.when(mockTopologyContext.getThisComponentId()).thenReturn("test");
Mockito.when(mockTopologyContext.getThisTaskId()).thenReturn(1);
mockOutputCollector = Mockito.mock(SpoutOutputCollector.class);
}
代码示例来源:origin: apache/storm
/**
* Loads the last saved checkpoint state the from persistent storage.
*/
private KeyValueState<String, CheckPointState> loadCheckpointState(Map<String, Object> conf, TopologyContext ctx) {
String namespace = ctx.getThisComponentId() + "-" + ctx.getThisTaskId();
KeyValueState<String, CheckPointState> state =
(KeyValueState<String, CheckPointState>) StateFactory.getState(namespace, conf, ctx);
if (state.get(TX_STATE_KEY) == null) {
CheckPointState txState = new CheckPointState(-1, COMMITTED);
state.put(TX_STATE_KEY, txState);
state.commit();
LOG.debug("Initialized checkpoint spout state with txState {}", txState);
} else {
LOG.debug("Got checkpoint spout state {}", state.get(TX_STATE_KEY));
}
return state;
}
代码示例来源:origin: apache/storm
@Before
public void setUp() throws Exception {
mockBolt = Mockito.mock(IStatefulBolt.class);
executor = new StatefulBoltExecutor<>(mockBolt);
mockTopologyContext = Mockito.mock(TopologyContext.class);
mockOutputCollector = Mockito.mock(OutputCollector.class);
mockState = Mockito.mock(KeyValueState.class);
Mockito.when(mockTopologyContext.getThisComponentId()).thenReturn("test");
Mockito.when(mockTopologyContext.getThisTaskId()).thenReturn(1);
GlobalStreamId globalStreamId = new GlobalStreamId("test", CheckpointSpout.CHECKPOINT_STREAM_ID);
Map<GlobalStreamId, Grouping> thisSources = Collections.singletonMap(globalStreamId, mock(Grouping.class));
Mockito.when(mockTopologyContext.getThisSources()).thenReturn(thisSources);
Mockito.when(mockTopologyContext.getComponentTasks(Mockito.any())).thenReturn(Collections.singletonList(1));
mockTuple = Mockito.mock(Tuple.class);
mockCheckpointTuple = Mockito.mock(Tuple.class);
executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
}
代码示例来源:origin: apache/storm
private void connectMqtt() throws Exception {
String clientId = this.topologyName + "-" + this.context.getThisComponentId() + "-" +
this.context.getThisTaskId();
MQTT client = MqttUtils.configureClient(this.options, clientId, this.keyStoreLoader);
this.connection = client.callbackConnection();
this.connection.listener(this);
this.connection.connect(new ConnectCallback());
while (!this.mqttConnected && !this.mqttConnectFailed) {
LOG.info("Waiting for connection...");
Thread.sleep(500);
}
if (this.mqttConnected) {
List<String> topicList = this.options.getTopics();
Topic[] topics = new Topic[topicList.size()];
QoS qos = MqttUtils.qosFromInt(this.options.getQos());
for (int i = 0; i < topicList.size(); i++) {
topics[i] = new Topic(topicList.get(i), qos);
}
connection.subscribe(topics, new SubscribeCallback());
}
}
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, TridentContext tridentContext) {
this.topologyContext = context;
List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
if (parents.size() != 1) {
throw new RuntimeException("Aggregation related operation can only have one parent");
}
Long maxTuplesCacheSize = getWindowTuplesCacheSize(topoConf);
this.tridentContext = tridentContext;
collector = new FreshCollector(tridentContext);
projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);
windowStore = windowStoreFactory.create(topoConf);
windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);
tridentWindowManager = storeTuplesInStore ?
new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(),
maxTuplesCacheSize, inputFields)
: new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());
tridentWindowManager.prepare();
}
内容来源于网络,如有侵权,请联系作者删除!