本文整理了Java中org.apache.storm.task.TopologyContext.getThisSources()
方法的一些代码示例,展示了TopologyContext.getThisSources()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getThisSources()
方法的具体详情如下:
包路径:org.apache.storm.task.TopologyContext
类名称:TopologyContext
方法名:getThisSources
[英]Gets the declared inputs to this component.
[中]获取此组件的声明输入。
代码示例来源:origin: apache/storm
private Set<GlobalStreamId> getComponentStreams(TopologyContext context) {
Set<GlobalStreamId> streams = new HashSet<>();
for (GlobalStreamId streamId : context.getThisSources().keySet()) {
if (!streamId.get_streamId().equals(CheckpointSpout.CHECKPOINT_STREAM_ID)) {
streams.add(streamId);
}
}
return streams;
}
代码示例来源:origin: apache/storm
private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
for (GlobalStreamId g : context.getThisSources().keySet()) {
if (g.get_streamId().equals(sourceStream)) {
return context.getComponentOutputFields(g);
}
}
throw new RuntimeException("Could not find fields for source stream " + sourceStream);
}
代码示例来源:origin: apache/storm
private int getStreamInputTaskCount(TopologyContext context, String stream) {
int count = 0;
for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
if (stream.equals(getStreamId(inputStream))) {
count += context.getComponentTasks(inputStream.get_componentId()).size();
}
}
return count;
}
代码示例来源:origin: apache/storm
/**
* returns the total number of input checkpoint streams across all input tasks to this component.
*/
private int getCheckpointInputTaskCount(TopologyContext context) {
int count = 0;
for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
count += context.getComponentTasks(inputStream.get_componentId()).size();
}
}
return count;
}
代码示例来源:origin: apache/storm
/**
* Gets the declared input fields for this component.
*
* @return A map from sources to streams to fields.
*/
public Map<String, Map<String, List<String>>> getThisInputFields() {
Map<String, Map<String, List<String>>> outputMap = new HashMap<>();
for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
String componentId = entry.getKey().get_componentId();
Set<String> streams = getComponentStreams(componentId);
for (String stream : streams) {
Map<String, List<String>> streamFieldMap = outputMap.get(componentId);
if (streamFieldMap == null) {
streamFieldMap = new HashMap<>();
outputMap.put(componentId, streamFieldMap);
}
streamFieldMap.put(stream, getComponentOutputFields(componentId, stream).toList());
}
}
return outputMap;
}
代码示例来源:origin: apache/storm
private TopologyContext getTopologyContext() {
TopologyContext context = Mockito.mock(TopologyContext.class);
Map<GlobalStreamId, Grouping> sources = Collections.singletonMap(
new GlobalStreamId("s1", "default"),
null
);
Mockito.when(context.getThisSources()).thenReturn(sources);
return context;
}
代码示例来源:origin: apache/storm
for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
GlobalStreamId gid = entry.getKey();
Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
_fieldLocations = new HashMap<String, GlobalStreamId>();
_collector = collector;
int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
_pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
_numSources = context.getThisSources().size();
Set<String> idFields = null;
for (GlobalStreamId source : context.getThisSources().keySet()) {
Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
Set<String> setFields = new HashSet<String>(fields.toList());
if (idFields == null) {
idFields = setFields;
} else {
idFields.retainAll(setFields);
}
for (String outfield : _outFields) {
for (String sourcefield : fields) {
if (outfield.equals(sourcefield)) {
_fieldLocations.put(outfield, source);
}
}
}
}
_idFields = new Fields(new ArrayList<String>(idFields));
if (_fieldLocations.size() != _outFields.size()) {
throw new RuntimeException("Cannot find all outfields among sources");
}
}
代码示例来源:origin: apache/storm
@Override
public void initState(T state) {
if (stateInitialized) {
LOG.warn("State is already initialized. Ignoring initState");
return;
}
statefulWindowedBolt.initState((T) state);
// query the streamState for each input task stream and compute recoveryStates
for (GlobalStreamId streamId : topologyContext.getThisSources().keySet()) {
for (int taskId : topologyContext.getComponentTasks(streamId.get_componentId())) {
WindowState windowState = streamState.get(new TaskStream(taskId, streamId));
if (windowState != null) {
recoveryStates.put(new TaskStream(taskId, streamId), windowState);
}
}
}
LOG.debug("recoveryStates {}", recoveryStates);
stateInitialized = true;
start();
}
代码示例来源:origin: apache/storm
@Before
public void setUp() throws Exception {
mockBolt = Mockito.mock(IStatefulBolt.class);
executor = new StatefulBoltExecutor<>(mockBolt);
mockTopologyContext = Mockito.mock(TopologyContext.class);
mockOutputCollector = Mockito.mock(OutputCollector.class);
mockState = Mockito.mock(KeyValueState.class);
Mockito.when(mockTopologyContext.getThisComponentId()).thenReturn("test");
Mockito.when(mockTopologyContext.getThisTaskId()).thenReturn(1);
GlobalStreamId globalStreamId = new GlobalStreamId("test", CheckpointSpout.CHECKPOINT_STREAM_ID);
Map<GlobalStreamId, Grouping> thisSources = Collections.singletonMap(globalStreamId, mock(Grouping.class));
Mockito.when(mockTopologyContext.getThisSources()).thenReturn(thisSources);
Mockito.when(mockTopologyContext.getComponentTasks(Mockito.any())).thenReturn(Collections.singletonList(1));
mockTuple = Mockito.mock(Tuple.class);
mockCheckpointTuple = Mockito.mock(Tuple.class);
executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
}
代码示例来源:origin: apache/storm
@Test
public void testRecovery() throws Exception {
mockStormConf.put(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME, "msgid");
mockStormConf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, 5);
mockStormConf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, 5);
KeyValueState<TaskStream, WindowState> mockState;
mockState = Mockito.mock(KeyValueState.class);
Map<GlobalStreamId, Grouping> mockMap = Mockito.mock(Map.class);
Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockMap);
Mockito.when(mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
Mockito.when(mockMap.keySet()).thenReturn(Collections.singleton(new GlobalStreamId("a", "s")));
WindowState mockWindowState = new WindowState(4, 4);
Mockito.when(mockState.get(Mockito.any(TaskStream.class))).thenReturn(mockWindowState);
executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
executor.initState(null);
List<Tuple> tuples = getMockTuples(10);
for (Tuple tuple : tuples) {
executor.execute(tuple);
}
WindowState expectedState = new WindowState(4, 9);
Mockito.verify(mockState, Mockito.times(1)).put(Mockito.any(TaskStream.class), Mockito.eq(expectedState));
}
代码示例来源:origin: apache/storm
private void setUpProcessorBolt(Processor<?> processor,
Set<String> windowedParentStreams,
boolean isWindowed,
String tsFieldName) {
ProcessorNode node = new ProcessorNode(processor, "outputstream", new Fields("value"));
node.setWindowedParentStreams(windowedParentStreams);
node.setWindowed(isWindowed);
Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node));
Mockito.when(mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
Map<GlobalStreamId, Grouping> mockSources = Mockito.mock(Map.class);
GlobalStreamId mockGlobalStreamId = Mockito.mock(GlobalStreamId.class);
Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockSources);
Mockito.when(mockSources.keySet()).thenReturn(Collections.singleton(mockGlobalStreamId));
Mockito.when(mockGlobalStreamId.get_streamId()).thenReturn("inputstream");
Mockito.when(mockGlobalStreamId.get_componentId()).thenReturn("bolt0");
Mockito.when(mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
graph.addVertex(node);
bolt = new ProcessorBolt("bolt1", graph, Collections.singletonList(node));
if (tsFieldName != null && !tsFieldName.isEmpty()) {
bolt.setTimestampField(tsFieldName);
}
bolt.setStreamToInitialProcessors(mockStreamToProcessors);
bolt.prepare(new HashMap<>(), mockTopologyContext, mockOutputCollector);
}
代码示例来源:origin: org.apache.storm/storm-core
private Set<GlobalStreamId> getComponentStreams(TopologyContext context) {
Set<GlobalStreamId> streams = new HashSet<>();
for (GlobalStreamId streamId : context.getThisSources().keySet()) {
if (!streamId.get_streamId().equals(CheckpointSpout.CHECKPOINT_STREAM_ID)) {
streams.add(streamId);
}
}
return streams;
}
代码示例来源:origin: org.apache.storm/storm-core
private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
for(GlobalStreamId g: context.getThisSources().keySet()) {
if(g.get_streamId().equals(sourceStream)) {
return context.getComponentOutputFields(g);
}
}
throw new RuntimeException("Could not find fields for source stream " + sourceStream);
}
代码示例来源:origin: org.apache.storm/storm-core
/**
* returns the total number of input checkpoint streams across
* all input tasks to this component.
*/
private int getCheckpointInputTaskCount(TopologyContext context) {
int count = 0;
for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
count += context.getComponentTasks(inputStream.get_componentId()).size();
}
}
return count;
}
代码示例来源:origin: org.apache.storm/storm-core
/**
* Gets the declared input fields for this component.
*
* @return A map from sources to streams to fields.
*/
public Map<String, Map<String, List<String>>> getThisInputFields() {
Map<String, Map<String, List<String>>> outputMap = new HashMap<>();
for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
String componentId = entry.getKey().get_componentId();
Set<String> streams = getComponentStreams(componentId);
for (String stream : streams) {
Map<String, List<String>> streamFieldMap = outputMap.get(componentId);
if (streamFieldMap == null) {
streamFieldMap = new HashMap<>();
outputMap.put(componentId, streamFieldMap);
}
streamFieldMap.put(stream, getComponentOutputFields(componentId, stream).toList());
}
}
return outputMap;
}
代码示例来源:origin: Paleozoic/storm_spring_boot_demo
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_fieldLocations = new HashMap<String, GlobalStreamId>();
_collector = collector;
int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
_pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
_numSources = context.getThisSources().size();
Set<String> idFields = null;
for (GlobalStreamId source : context.getThisSources().keySet()) {
Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
Set<String> setFields = new HashSet<String>(fields.toList());
if (idFields == null)
idFields = setFields;
else
idFields.retainAll(setFields);
for (String outfield : _outFields) {
for (String sourcefield : fields) {
if (outfield.equals(sourcefield)) {
_fieldLocations.put(outfield, source);
}
}
}
}
_idFields = new Fields(new ArrayList<String>(idFields));
if (_fieldLocations.size() != _outFields.size()) {
throw new RuntimeException("Cannot find all outfields among sources");
}
}
代码示例来源:origin: org.apache.storm/storm-core
for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
GlobalStreamId gid = entry.getKey();
Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());
代码示例来源:origin: org.apache.storm/storm-core
@Override
public void initState(T state) {
if (stateInitialized) {
LOG.warn("State is already initialized. Ignoring initState");
return;
}
statefulWindowedBolt.initState((T) state);
// query the streamState for each input task stream and compute recoveryStates
for (GlobalStreamId streamId : topologyContext.getThisSources().keySet()) {
for (int taskId : topologyContext.getComponentTasks(streamId.get_componentId())) {
WindowState windowState = streamState.get(new TaskStream(taskId, streamId));
if (windowState != null) {
recoveryStates.put(new TaskStream(taskId, streamId), windowState);
}
}
}
LOG.debug("recoveryStates {}", recoveryStates);
stateInitialized = true;
start();
}
代码示例来源:origin: org.apache.flink/flink-storm
Map<GlobalStreamId, Grouping> inputs = this.topologyContext.getThisSources();
内容来源于网络,如有侵权,请联系作者删除!