org.apache.storm.task.TopologyContext.getThisSources()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(12.6k)|赞(0)|评价(0)|浏览(76)

本文整理了Java中org.apache.storm.task.TopologyContext.getThisSources()方法的一些代码示例,展示了TopologyContext.getThisSources()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getThisSources()方法的具体详情如下:
包路径:org.apache.storm.task.TopologyContext
类名称:TopologyContext
方法名:getThisSources

TopologyContext.getThisSources介绍

[英]Gets the declared inputs to this component.
[中]获取此组件的声明输入。

代码示例

代码示例来源:origin: apache/storm

private Set<GlobalStreamId> getComponentStreams(TopologyContext context) {
  Set<GlobalStreamId> streams = new HashSet<>();
  for (GlobalStreamId streamId : context.getThisSources().keySet()) {
    if (!streamId.get_streamId().equals(CheckpointSpout.CHECKPOINT_STREAM_ID)) {
      streams.add(streamId);
    }
  }
  return streams;
}

代码示例来源:origin: apache/storm

private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
  for (GlobalStreamId g : context.getThisSources().keySet()) {
    if (g.get_streamId().equals(sourceStream)) {
      return context.getComponentOutputFields(g);
    }
  }
  throw new RuntimeException("Could not find fields for source stream " + sourceStream);
}

代码示例来源:origin: apache/storm

private int getStreamInputTaskCount(TopologyContext context, String stream) {
  int count = 0;
  for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
    if (stream.equals(getStreamId(inputStream))) {
      count += context.getComponentTasks(inputStream.get_componentId()).size();
    }
  }
  return count;
}

代码示例来源:origin: apache/storm

/**
 * returns the total number of input checkpoint streams across all input tasks to this component.
 */
private int getCheckpointInputTaskCount(TopologyContext context) {
  int count = 0;
  for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
    if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
      count += context.getComponentTasks(inputStream.get_componentId()).size();
    }
  }
  return count;
}

代码示例来源:origin: apache/storm

/**
 * Gets the declared input fields for this component.
 *
 * @return A map from sources to streams to fields.
 */
public Map<String, Map<String, List<String>>> getThisInputFields() {
  Map<String, Map<String, List<String>>> outputMap = new HashMap<>();
  for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
    String componentId = entry.getKey().get_componentId();
    Set<String> streams = getComponentStreams(componentId);
    for (String stream : streams) {
      Map<String, List<String>> streamFieldMap = outputMap.get(componentId);
      if (streamFieldMap == null) {
        streamFieldMap = new HashMap<>();
        outputMap.put(componentId, streamFieldMap);
      }
      streamFieldMap.put(stream, getComponentOutputFields(componentId, stream).toList());
    }
  }
  return outputMap;
}

代码示例来源:origin: apache/storm

private TopologyContext getTopologyContext() {
  TopologyContext context = Mockito.mock(TopologyContext.class);
  Map<GlobalStreamId, Grouping> sources = Collections.singletonMap(
    new GlobalStreamId("s1", "default"),
    null
  );
  Mockito.when(context.getThisSources()).thenReturn(sources);
  return context;
}

代码示例来源:origin: apache/storm

for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
  GlobalStreamId gid = entry.getKey();
  Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());

代码示例来源:origin: apache/storm

@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
  _fieldLocations = new HashMap<String, GlobalStreamId>();
  _collector = collector;
  int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
  _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
  _numSources = context.getThisSources().size();
  Set<String> idFields = null;
  for (GlobalStreamId source : context.getThisSources().keySet()) {
    Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
    Set<String> setFields = new HashSet<String>(fields.toList());
    if (idFields == null) {
      idFields = setFields;
    } else {
      idFields.retainAll(setFields);
    }
    for (String outfield : _outFields) {
      for (String sourcefield : fields) {
        if (outfield.equals(sourcefield)) {
          _fieldLocations.put(outfield, source);
        }
      }
    }
  }
  _idFields = new Fields(new ArrayList<String>(idFields));
  if (_fieldLocations.size() != _outFields.size()) {
    throw new RuntimeException("Cannot find all outfields among sources");
  }
}

代码示例来源:origin: apache/storm

@Override
public void initState(T state) {
  if (stateInitialized) {
    LOG.warn("State is already initialized. Ignoring initState");
    return;
  }
  statefulWindowedBolt.initState((T) state);
  // query the streamState for each input task stream and compute recoveryStates
  for (GlobalStreamId streamId : topologyContext.getThisSources().keySet()) {
    for (int taskId : topologyContext.getComponentTasks(streamId.get_componentId())) {
      WindowState windowState = streamState.get(new TaskStream(taskId, streamId));
      if (windowState != null) {
        recoveryStates.put(new TaskStream(taskId, streamId), windowState);
      }
    }
  }
  LOG.debug("recoveryStates {}", recoveryStates);
  stateInitialized = true;
  start();
}

代码示例来源:origin: apache/storm

@Before
public void setUp() throws Exception {
  mockBolt = Mockito.mock(IStatefulBolt.class);
  executor = new StatefulBoltExecutor<>(mockBolt);
  mockTopologyContext = Mockito.mock(TopologyContext.class);
  mockOutputCollector = Mockito.mock(OutputCollector.class);
  mockState = Mockito.mock(KeyValueState.class);
  Mockito.when(mockTopologyContext.getThisComponentId()).thenReturn("test");
  Mockito.when(mockTopologyContext.getThisTaskId()).thenReturn(1);
  GlobalStreamId globalStreamId = new GlobalStreamId("test", CheckpointSpout.CHECKPOINT_STREAM_ID);
  Map<GlobalStreamId, Grouping> thisSources = Collections.singletonMap(globalStreamId, mock(Grouping.class));
  Mockito.when(mockTopologyContext.getThisSources()).thenReturn(thisSources);
  Mockito.when(mockTopologyContext.getComponentTasks(Mockito.any())).thenReturn(Collections.singletonList(1));
  mockTuple = Mockito.mock(Tuple.class);
  mockCheckpointTuple = Mockito.mock(Tuple.class);
  executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
}

代码示例来源:origin: apache/storm

@Test
public void testRecovery() throws Exception {
  mockStormConf.put(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME, "msgid");
  mockStormConf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, 5);
  mockStormConf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, 5);
  KeyValueState<TaskStream, WindowState> mockState;
  mockState = Mockito.mock(KeyValueState.class);
  Map<GlobalStreamId, Grouping> mockMap = Mockito.mock(Map.class);
  Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockMap);
  Mockito.when(mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
  Mockito.when(mockMap.keySet()).thenReturn(Collections.singleton(new GlobalStreamId("a", "s")));
  WindowState mockWindowState = new WindowState(4, 4);
  Mockito.when(mockState.get(Mockito.any(TaskStream.class))).thenReturn(mockWindowState);
  executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
  executor.initState(null);
  List<Tuple> tuples = getMockTuples(10);
  for (Tuple tuple : tuples) {
    executor.execute(tuple);
  }
  WindowState expectedState = new WindowState(4, 9);
  Mockito.verify(mockState, Mockito.times(1)).put(Mockito.any(TaskStream.class), Mockito.eq(expectedState));
}

代码示例来源:origin: apache/storm

private void setUpProcessorBolt(Processor<?> processor,
                Set<String> windowedParentStreams,
                boolean isWindowed,
                String tsFieldName) {
  ProcessorNode node = new ProcessorNode(processor, "outputstream", new Fields("value"));
  node.setWindowedParentStreams(windowedParentStreams);
  node.setWindowed(isWindowed);
  Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node));
  Mockito.when(mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
  Map<GlobalStreamId, Grouping> mockSources = Mockito.mock(Map.class);
  GlobalStreamId mockGlobalStreamId = Mockito.mock(GlobalStreamId.class);
  Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockSources);
  Mockito.when(mockSources.keySet()).thenReturn(Collections.singleton(mockGlobalStreamId));
  Mockito.when(mockGlobalStreamId.get_streamId()).thenReturn("inputstream");
  Mockito.when(mockGlobalStreamId.get_componentId()).thenReturn("bolt0");
  Mockito.when(mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
  graph.addVertex(node);
  bolt = new ProcessorBolt("bolt1", graph, Collections.singletonList(node));
  if (tsFieldName != null && !tsFieldName.isEmpty()) {
    bolt.setTimestampField(tsFieldName);
  }
  bolt.setStreamToInitialProcessors(mockStreamToProcessors);
  bolt.prepare(new HashMap<>(), mockTopologyContext, mockOutputCollector);
}

代码示例来源:origin: org.apache.storm/storm-core

private Set<GlobalStreamId> getComponentStreams(TopologyContext context) {
  Set<GlobalStreamId> streams = new HashSet<>();
  for (GlobalStreamId streamId : context.getThisSources().keySet()) {
    if (!streamId.get_streamId().equals(CheckpointSpout.CHECKPOINT_STREAM_ID)) {
      streams.add(streamId);
    }
  }
  return streams;
}

代码示例来源:origin: org.apache.storm/storm-core

private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
  for(GlobalStreamId g: context.getThisSources().keySet()) {
    if(g.get_streamId().equals(sourceStream)) {
      return context.getComponentOutputFields(g);
    }
  }
  throw new RuntimeException("Could not find fields for source stream " + sourceStream);
}

代码示例来源:origin: org.apache.storm/storm-core

/**
 * returns the total number of input checkpoint streams across
 * all input tasks to this component.
 */
private int getCheckpointInputTaskCount(TopologyContext context) {
  int count = 0;
  for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
    if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
      count += context.getComponentTasks(inputStream.get_componentId()).size();
    }
  }
  return count;
}

代码示例来源:origin: org.apache.storm/storm-core

/**
 * Gets the declared input fields for this component.
 *
 * @return A map from sources to streams to fields.
 */
public Map<String, Map<String, List<String>>> getThisInputFields() {
  Map<String, Map<String, List<String>>> outputMap = new HashMap<>();
  for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
    String componentId = entry.getKey().get_componentId();
    Set<String> streams = getComponentStreams(componentId);
    for (String stream : streams) {
      Map<String, List<String>> streamFieldMap = outputMap.get(componentId);
      if (streamFieldMap == null) {
        streamFieldMap = new HashMap<>();
        outputMap.put(componentId, streamFieldMap);
      }
      streamFieldMap.put(stream, getComponentOutputFields(componentId, stream).toList());
    }
  }
  return outputMap;
}

代码示例来源:origin: Paleozoic/storm_spring_boot_demo

@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
 _fieldLocations = new HashMap<String, GlobalStreamId>();
 _collector = collector;
 int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
 _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
 _numSources = context.getThisSources().size();
 Set<String> idFields = null;
 for (GlobalStreamId source : context.getThisSources().keySet()) {
  Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
  Set<String> setFields = new HashSet<String>(fields.toList());
  if (idFields == null)
   idFields = setFields;
  else
   idFields.retainAll(setFields);
  for (String outfield : _outFields) {
   for (String sourcefield : fields) {
    if (outfield.equals(sourcefield)) {
     _fieldLocations.put(outfield, source);
    }
   }
  }
 }
 _idFields = new Fields(new ArrayList<String>(idFields));
 if (_fieldLocations.size() != _outFields.size()) {
  throw new RuntimeException("Cannot find all outfields among sources");
 }
}

代码示例来源:origin: org.apache.storm/storm-core

for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
  GlobalStreamId gid = entry.getKey();
  Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());

代码示例来源:origin: org.apache.storm/storm-core

@Override
public void initState(T state) {
  if (stateInitialized) {
    LOG.warn("State is already initialized. Ignoring initState");
    return;
  }
  statefulWindowedBolt.initState((T) state);
  // query the streamState for each input task stream and compute recoveryStates
  for (GlobalStreamId streamId : topologyContext.getThisSources().keySet()) {
    for (int taskId : topologyContext.getComponentTasks(streamId.get_componentId())) {
      WindowState windowState = streamState.get(new TaskStream(taskId, streamId));
      if (windowState != null) {
        recoveryStates.put(new TaskStream(taskId, streamId), windowState);
      }
    }
  }
  LOG.debug("recoveryStates {}", recoveryStates);
  stateInitialized = true;
  start();
}

代码示例来源:origin: org.apache.flink/flink-storm

Map<GlobalStreamId, Grouping> inputs = this.topologyContext.getThisSources();

相关文章

微信公众号

最新文章

更多