本文整理了Java中org.apache.storm.task.TopologyContext.getComponentTasks()
方法的一些代码示例,展示了TopologyContext.getComponentTasks()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getComponentTasks()
方法的具体详情如下:
包路径:org.apache.storm.task.TopologyContext
类名称:TopologyContext
方法名:getComponentTasks
暂无
代码示例来源:origin: apache/storm
public int numPartitions() {
return _topoContext.getComponentTasks(_topoContext.getThisComponentId()).size();
}
代码示例来源:origin: apache/storm
private int getNumTasks() {
return topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
}
代码示例来源:origin: apache/storm
@Override
public Set<TopicPartition> getPartitionsForThisTask(List<TopicPartition> allPartitionsSorted, TopologyContext context) {
int thisTaskIndex = context.getThisTaskIndex();
int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
Set<TopicPartition> myPartitions = new HashSet<>(allPartitionsSorted.size() / totalTaskCount + 1);
for (int i = thisTaskIndex; i < allPartitionsSorted.size(); i += totalTaskCount) {
myPartitions.add(allPartitionsSorted.get(i));
}
return myPartitions;
}
}
代码示例来源:origin: apache/storm
/**
* Gets the index of this task id in getComponentTasks(getThisComponentId()). An example use case for this method is determining which
* task accesses which resource in a distributed resource to ensure an even distribution.
*/
public int getThisTaskIndex() {
List<Integer> tasks = new ArrayList<>(getComponentTasks(getThisComponentId()));
Collections.sort(tasks);
for (int i = 0; i < tasks.size(); i++) {
if (tasks.get(i) == getThisTaskId()) {
return i;
}
}
throw new RuntimeException("Fatal: could not find this task id in this component");
}
代码示例来源:origin: apache/storm
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_context = context;
List<Integer> tasks = context.getComponentTasks(context.getThisComponentId());
int startIndex;
for (startIndex = 0; startIndex < tasks.size(); startIndex++) {
if (tasks.get(startIndex) == context.getThisTaskId()) {
break;
}
}
_collector = collector;
_pending = new HashMap<String, FixedTuple>();
_serveTuples = new ArrayList<FixedTuple>();
for (int i = startIndex; i < _tuples.size(); i += tasks.size()) {
_serveTuples.add(_tuples.get(i));
}
}
代码示例来源:origin: apache/storm
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
this.source = context.getThisTaskId();
long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
myCount = totalCount / taskCount;
}
代码示例来源:origin: apache/storm
private int getStreamInputTaskCount(TopologyContext context, String stream) {
int count = 0;
for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
if (stream.equals(getStreamId(inputStream))) {
count += context.getComponentTasks(inputStream.get_componentId()).size();
}
}
return count;
}
代码示例来源:origin: apache/storm
/**
* returns the total number of input checkpoint streams across all input tasks to this component.
*/
private int getCheckpointInputTaskCount(TopologyContext context) {
int count = 0;
for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
count += context.getComponentTasks(inputStream.get_componentId()).size();
}
}
return count;
}
代码示例来源:origin: apache/storm
@Override
public BatchCoordinator<Map<Integer, List<List<Object>>>> getCoordinator(String txStateId, Map<String, Object> conf,
TopologyContext context) {
int numTasks = context.getComponentTasks(
TridentTopologyBuilder.spoutIdFromCoordinatorId(
context.getThisComponentId()))
.size();
return new FeederCoordinator(numTasks);
}
代码示例来源:origin: apache/storm
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
if (_local_drpc_id == null) {
_backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
_futures = new LinkedList<>();
int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
int index = context.getThisTaskIndex();
int port = ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
if (servers == null || servers.isEmpty()) {
throw new RuntimeException("No DRPC servers configured for topology");
}
if (numTasks < servers.size()) {
for (String s : servers) {
_futures.add(_backround.submit(new Adder(s, port, conf)));
}
} else {
int i = index % servers.size();
_futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
}
}
}
代码示例来源:origin: apache/storm
public Emitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
_emitter = _spout.getEmitter(conf, context);
_index = context.getThisTaskIndex();
_numTasks = context.getComponentTasks(context.getThisComponentId()).size();
_state = TransactionalState.newUserState(conf, txStateId);
LOG.debug("Created {}", this);
}
代码示例来源:origin: apache/storm
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_delegate.open(conf, context, new SpoutOutputCollector(new StreamOverrideCollector(collector)));
_outputTasks = new ArrayList<>();
for (String component : Utils.get(context.getThisTargets(),
_coordStream,
new HashMap<String, Grouping>()).keySet()) {
_outputTasks.addAll(context.getComponentTasks(component));
}
_rand = new Random(Utils.secureRandomLong());
}
代码示例来源:origin: apache/storm
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
kinesisRecordsManager = new KinesisRecordsManager(kinesisConfig);
kinesisRecordsManager.initialize(context.getThisTaskIndex(), context.getComponentTasks(context.getThisComponentId()).size());
}
代码示例来源:origin: apache/storm
public Emitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
_emitter = _spout.getEmitter(conf, context);
_state = TransactionalState.newUserState(conf, txStateId);
_index = context.getThisTaskIndex();
_numTasks = context.getComponentTasks(context.getThisComponentId()).size();
}
代码示例来源:origin: apache/storm
public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
if (_delegate instanceof TimeoutCallback) {
callback = new TimeoutItems();
}
_tracked = new TimeCacheMap<>(context.maxTopologyMessageTimeout(), callback);
_collector = collector;
_delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
for (String component : Utils.get(context.getThisTargets(),
Constants.COORDINATED_STREAM_ID,
new HashMap<String, Grouping>())
.keySet()) {
for (Integer task : context.getComponentTasks(component)) {
_countOutTasks.add(task);
}
}
if (!_sourceArgs.isEmpty()) {
_numSourceReports = 0;
for (Entry<String, SourceArgs> entry : _sourceArgs.entrySet()) {
if (entry.getValue().singleCount) {
_numSourceReports += 1;
} else {
_numSourceReports += context.getComponentTasks(entry.getKey()).size();
}
}
}
}
代码示例来源:origin: apache/storm
@Override
public void initState(T state) {
if (stateInitialized) {
LOG.warn("State is already initialized. Ignoring initState");
return;
}
statefulWindowedBolt.initState((T) state);
// query the streamState for each input task stream and compute recoveryStates
for (GlobalStreamId streamId : topologyContext.getThisSources().keySet()) {
for (int taskId : topologyContext.getComponentTasks(streamId.get_componentId())) {
WindowState windowState = streamState.get(new TaskStream(taskId, streamId));
if (windowState != null) {
recoveryStates.put(new TaskStream(taskId, streamId), windowState);
}
}
}
LOG.debug("recoveryStates {}", recoveryStates);
stateInitialized = true;
start();
}
代码示例来源:origin: apache/storm
/**
* Open and activate a KafkaSpout that acts as a single-task/executor spout.
*
* @param <K> Kafka key type
* @param <V> Kafka value type
* @param spout The spout to prepare
* @param topoConf The topoConf
* @param topoContextMock The TopologyContext mock
* @param collectorMock The output collector mock
*/
public static <K, V> void initializeSpout(KafkaSpout<K, V> spout, Map<String, Object> topoConf, TopologyContext topoContextMock,
SpoutOutputCollector collectorMock) throws Exception {
when(topoContextMock.getThisTaskIndex()).thenReturn(0);
when(topoContextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
spout.open(topoConf, topoContextMock, collectorMock);
spout.activate();
}
代码示例来源:origin: apache/storm
@Before
public void setUp() throws Exception {
mockBolt = Mockito.mock(IStatefulBolt.class);
executor = new StatefulBoltExecutor<>(mockBolt);
mockTopologyContext = Mockito.mock(TopologyContext.class);
mockOutputCollector = Mockito.mock(OutputCollector.class);
mockState = Mockito.mock(KeyValueState.class);
Mockito.when(mockTopologyContext.getThisComponentId()).thenReturn("test");
Mockito.when(mockTopologyContext.getThisTaskId()).thenReturn(1);
GlobalStreamId globalStreamId = new GlobalStreamId("test", CheckpointSpout.CHECKPOINT_STREAM_ID);
Map<GlobalStreamId, Grouping> thisSources = Collections.singletonMap(globalStreamId, mock(Grouping.class));
Mockito.when(mockTopologyContext.getThisSources()).thenReturn(thisSources);
Mockito.when(mockTopologyContext.getComponentTasks(Mockito.any())).thenReturn(Collections.singletonList(1));
mockTuple = Mockito.mock(Tuple.class);
mockCheckpointTuple = Mockito.mock(Tuple.class);
executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
}
代码示例来源:origin: apache/storm
@Test
public void testRecovery() throws Exception {
mockStormConf.put(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME, "msgid");
mockStormConf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, 5);
mockStormConf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, 5);
KeyValueState<TaskStream, WindowState> mockState;
mockState = Mockito.mock(KeyValueState.class);
Map<GlobalStreamId, Grouping> mockMap = Mockito.mock(Map.class);
Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockMap);
Mockito.when(mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
Mockito.when(mockMap.keySet()).thenReturn(Collections.singleton(new GlobalStreamId("a", "s")));
WindowState mockWindowState = new WindowState(4, 4);
Mockito.when(mockState.get(Mockito.any(TaskStream.class))).thenReturn(mockWindowState);
executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
executor.initState(null);
List<Tuple> tuples = getMockTuples(10);
for (Tuple tuple : tuples) {
executor.execute(tuple);
}
WindowState expectedState = new WindowState(4, 9);
Mockito.verify(mockState, Mockito.times(1)).put(Mockito.any(TaskStream.class), Mockito.eq(expectedState));
}
代码示例来源:origin: apache/storm
private void setUpProcessorBolt(Processor<?> processor,
Set<String> windowedParentStreams,
boolean isWindowed,
String tsFieldName) {
ProcessorNode node = new ProcessorNode(processor, "outputstream", new Fields("value"));
node.setWindowedParentStreams(windowedParentStreams);
node.setWindowed(isWindowed);
Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node));
Mockito.when(mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
Map<GlobalStreamId, Grouping> mockSources = Mockito.mock(Map.class);
GlobalStreamId mockGlobalStreamId = Mockito.mock(GlobalStreamId.class);
Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockSources);
Mockito.when(mockSources.keySet()).thenReturn(Collections.singleton(mockGlobalStreamId));
Mockito.when(mockGlobalStreamId.get_streamId()).thenReturn("inputstream");
Mockito.when(mockGlobalStreamId.get_componentId()).thenReturn("bolt0");
Mockito.when(mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
graph.addVertex(node);
bolt = new ProcessorBolt("bolt1", graph, Collections.singletonList(node));
if (tsFieldName != null && !tsFieldName.isEmpty()) {
bolt.setTimestampField(tsFieldName);
}
bolt.setStreamToInitialProcessors(mockStreamToProcessors);
bolt.prepare(new HashMap<>(), mockTopologyContext, mockOutputCollector);
}
内容来源于网络,如有侵权,请联系作者删除!