本文整理了Java中org.apache.storm.task.TopologyContext.getThisTaskIndex()
方法的一些代码示例,展示了TopologyContext.getThisTaskIndex()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getThisTaskIndex()
方法的具体详情如下:
包路径:org.apache.storm.task.TopologyContext
类名称:TopologyContext
方法名:getThisTaskIndex
[英]Gets the index of this task id in getComponentTasks(getThisComponentId()). An example use case for this method is determining which task accesses which resource in a distributed resource to ensure an even distribution.
[中]获取getComponentTasks(getThisComponentId())中此任务id的索引。该方法的一个示例用例是确定哪个任务访问分布式资源中的哪个资源,以确保均匀分布。
代码示例来源:origin: apache/storm
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_base = context.getThisTaskIndex();
}
代码示例来源:origin: apache/storm
public int getPartitionIndex() {
return _topoContext.getThisTaskIndex();
}
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.taskIndex = context.getThisTaskIndex();
}
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> stormConf,
TopologyContext context) {
executorIndex = context.getThisTaskIndex();
sleep.prepare();
}
代码示例来源:origin: apache/storm
@Override
public Emitter<Map<Integer, List<List<Object>>>> getEmitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
return new FeederEmitter(context.getThisTaskIndex());
}
代码示例来源:origin: apache/storm
@Override
public Set<TopicPartition> getPartitionsForThisTask(List<TopicPartition> allPartitionsSorted, TopologyContext context) {
int thisTaskIndex = context.getThisTaskIndex();
int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
Set<TopicPartition> myPartitions = new HashSet<>(allPartitionsSorted.size() / totalTaskCount + 1);
for (int i = thisTaskIndex; i < allPartitionsSorted.size(); i += totalTaskCount) {
myPartitions.add(allPartitionsSorted.get(i));
}
return myPartitions;
}
}
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
outputStreams = Collections.unmodifiableList(outputStreamStats.stream()
.map((ss) -> new OutputStreamEngine(ss)).collect(Collectors.toList()));
this.collector = collector;
executorIndex = context.getThisTaskIndex();
sleep.prepare();
}
代码示例来源:origin: apache/storm
/**
* Get the input partitions in sorted order.
*/
public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
List<TopicPartition> sortedPartitions = allPartitionInfo.stream()
.map(map -> tpSerializer.fromMap(map))
.sorted(TopicPartitionComparator.INSTANCE)
.collect(Collectors.toList());
final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(sortedPartitions);
LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
return allPartitions;
}
代码示例来源:origin: apache/storm
public Emitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
_emitter = _spout.getEmitter(conf, context);
_index = context.getThisTaskIndex();
_numTasks = context.getComponentTasks(context.getThisComponentId()).size();
_state = TransactionalState.newUserState(conf, txStateId);
LOG.debug("Created {}", this);
}
代码示例来源:origin: apache/storm
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
if (_local_drpc_id == null) {
_backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
_futures = new LinkedList<>();
int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
int index = context.getThisTaskIndex();
int port = ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
if (servers == null || servers.isEmpty()) {
throw new RuntimeException("No DRPC servers configured for topology");
}
if (numTasks < servers.size()) {
for (String s : servers) {
_futures.add(_backround.submit(new Adder(s, port, conf)));
}
} else {
int i = index % servers.size();
_futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
}
}
}
代码示例来源:origin: apache/storm
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
kinesisRecordsManager = new KinesisRecordsManager(kinesisConfig);
kinesisRecordsManager.initialize(context.getThisTaskIndex(), context.getComponentTasks(context.getThisComponentId()).size());
}
代码示例来源:origin: apache/storm
public Emitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
_emitter = _spout.getEmitter(conf, context);
_state = TransactionalState.newUserState(conf, txStateId);
_index = context.getThisTaskIndex();
_numTasks = context.getComponentTasks(context.getThisComponentId()).size();
}
代码示例来源:origin: apache/storm
int taskIndex = context.getThisTaskIndex();
if (totalTasks > eventHubConfig.getPartitionCount()) {
throw new RuntimeException("total tasks of EventHubSpout is greater than partition count.");
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> config, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
String myPartitionId = null;
if (boltConfig.getPartitionMode()) {
// We can use the task index (starting from 0) as the partition ID
myPartitionId = "" + context.getThisTaskIndex();
}
logger.info("creating sender: " + boltConfig.getConnectionString()
+ ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
try {
ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
if (boltConfig.getPartitionMode()) {
sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
}
} catch (Exception ex) {
collector.reportError(ex);
throw new RuntimeException(ex);
}
}
代码示例来源:origin: apache/storm
for (Node n : _nodes) {
if (n.stateInfo != null) {
State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
context.setTaskData(n.stateInfo.id, s);
代码示例来源:origin: apache/storm
/**
* Open and activate a KafkaSpout that acts as a single-task/executor spout.
*
* @param <K> Kafka key type
* @param <V> Kafka value type
* @param spout The spout to prepare
* @param topoConf The topoConf
* @param topoContextMock The TopologyContext mock
* @param collectorMock The output collector mock
*/
public static <K, V> void initializeSpout(KafkaSpout<K, V> spout, Map<String, Object> topoConf, TopologyContext topoContextMock,
SpoutOutputCollector collectorMock) throws Exception {
when(topoContextMock.getThisTaskIndex()).thenReturn(0);
when(topoContextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
spout.open(topoConf, topoContextMock, collectorMock);
spout.activate();
}
代码示例来源:origin: elastic/elasticsearch-hadoop
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
LinkedHashMap copy = new LinkedHashMap(conf);
copy.putAll(spoutConfig);
StormSettings settings = new StormSettings(copy);
InitializationUtils.setValueReaderIfNotSet(settings, JdkValueReader.class, log);
ackReads = settings.getStormSpoutReliable();
if (ackReads) {
inTransitQueue = new LinkedHashMap<Object, Object>();
replayQueue = new LinkedList<Object[]>();
retries = new HashMap<Object, Integer>();
queueSize = settings.getStormSpoutReliableQueueSize();
tupleRetries = settings.getStormSpoutReliableRetriesPerTuple();
tupleFailure = settings.getStormSpoutReliableTupleFailureHandling();
}
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
int currentTask = context.getThisTaskIndex();
// match the partitions based on the current topology
List<PartitionDefinition> partitions = RestService.findPartitions(settings, log);
List<PartitionDefinition> assigned = RestService.assignPartitions(partitions, currentTask, totalTasks);
iterator = RestService.multiReader(settings, assigned, log);
}
代码示例来源:origin: elastic/elasticsearch-hadoop
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
LinkedHashMap copy = new LinkedHashMap(conf);
copy.putAll(boltConfig);
StormSettings settings = new StormSettings(copy);
flushOnTickTuple = settings.getStormTickTupleFlush();
ackWrites = settings.getStormBoltAck();
// trigger manual flush
if (ackWrites) {
settings.setProperty(ES_BATCH_FLUSH_MANUAL, Boolean.TRUE.toString());
// align Bolt / es-hadoop batch settings
numberOfEntries = settings.getStormBulkSize();
settings.setProperty(ES_BATCH_SIZE_ENTRIES, String.valueOf(numberOfEntries));
inflightTuples = new ArrayList<Tuple>(numberOfEntries + 1);
}
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
InitializationUtils.setValueWriterIfNotSet(settings, StormValueWriter.class, log);
InitializationUtils.setBytesConverterIfNeeded(settings, StormTupleBytesConverter.class, log);
InitializationUtils.setFieldExtractorIfNotSet(settings, StormTupleFieldExtractor.class, log);
writer = RestService.createWriter(settings, context.getThisTaskIndex(), totalTasks, log);
}
代码示例来源:origin: apache/storm
@Test
public void testRoundRobinPartitioning() {
List<TopicPartition> allPartitions = new ArrayList<>();
for(int i = 0; i < 11; i++) {
allPartitions.add(createTp(i));
}
List<TopologyContext> contextMocks = new ArrayList<>();
String thisComponentId = "A spout";
List<Integer> allTasks = Arrays.asList(new Integer[]{0, 1, 2});
for(int i = 0; i < 3; i++) {
TopologyContext contextMock = mock(TopologyContext.class);
when(contextMock.getThisTaskIndex()).thenReturn(i);
when(contextMock.getThisComponentId()).thenReturn(thisComponentId);
when(contextMock.getComponentTasks(thisComponentId)).thenReturn(allTasks);
contextMocks.add(contextMock);
}
RoundRobinManualPartitioner partitioner = new RoundRobinManualPartitioner();
Set<TopicPartition> partitionsForFirstTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(0));
assertThat(partitionsForFirstTask, is(partitionsToTps(new int[]{0, 3, 6, 9})));
Set<TopicPartition> partitionsForSecondTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(1));
assertThat(partitionsForSecondTask, is(partitionsToTps(new int[]{1, 4, 7, 10})));
Set<TopicPartition> partitionsForThirdTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(2));
assertThat(partitionsForThirdTask, is(partitionsToTps(new int[]{2, 5, 8})));
}
代码示例来源:origin: DigitalPebble/storm-crawler
@Override
public void prepare(Map conf, TopologyContext topologyContext) {
this.taskIndex = topologyContext.getThisTaskIndex();
int totalTasks = topologyContext.getComponentTasks(
topologyContext.getThisComponentId()).size();
// single task? let's not bother with the task index in the file name
if (totalTasks == 1) {
this.taskIndex = -1;
}
}
内容来源于网络,如有侵权,请联系作者删除!