backtype.storm.task.TopologyContext.getThisSources()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(98)

本文整理了Java中backtype.storm.task.TopologyContext.getThisSources()方法的一些代码示例,展示了TopologyContext.getThisSources()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getThisSources()方法的具体详情如下:
包路径:backtype.storm.task.TopologyContext
类名称:TopologyContext
方法名:getThisSources

TopologyContext.getThisSources介绍

[英]Gets the declared inputs to this component.
[中]获取此组件的声明输入。

代码示例

代码示例来源:origin: alibaba/jstorm

private Set<GlobalStreamId> getComponentStreams(TopologyContext context) {
  Set<GlobalStreamId> streams = new HashSet<>();
  for (GlobalStreamId streamId : context.getThisSources().keySet()) {
    if (!streamId.get_streamId().equals(CheckpointSpout.CHECKPOINT_STREAM_ID)) {
      streams.add(streamId);
    }
  }
  return streams;
}

代码示例来源:origin: alibaba/jstorm

public static Set<String> getInputStreamIds(TopologyContext context) {
  Set<String> ret = new HashSet<>();
  Set<GlobalStreamId> inputs = context.getThisSources().keySet();
  for (GlobalStreamId streamId : inputs) {
    ret.add(streamId.get_streamId());
  }
  return ret;
}

代码示例来源:origin: alibaba/jstorm

private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
  for (GlobalStreamId g : context.getThisSources().keySet()) {
    if (g.get_streamId().equals(sourceStream)) {
      return context.getComponentOutputFields(g);
    }
  }
  throw new RuntimeException("Could not find fields for source stream " + sourceStream);
}

代码示例来源:origin: alibaba/jstorm

public Map<String, List<Integer>> getThisSourceComponentTasks() {
  Map<String, List<Integer>> ret = new HashMap<>();
  Map<GlobalStreamId, Grouping> sources = getThisSources();
  Set<String> sourceComponents = new HashSet<>();
  if (sources != null) {
    for (GlobalStreamId globalStreamId : sources.keySet()) {
      sourceComponents.add(globalStreamId.get_componentId());
    }
  }
  for (String component : sourceComponents) {
    ret.put(component, getComponentTasks(component));
  }
  return ret;
}

代码示例来源:origin: alibaba/jstorm

/**
 * returns the total number of input checkpoint streams across
 * all input tasks to this component.
 */
private int getCheckpointInputTaskCount(TopologyContext context) {
  int count = 0;
  for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
    if (CheckpointSpout.CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
      count += context.getComponentTasks(inputStream.get_componentId()).size();
    }
  }
  return count;
}

代码示例来源:origin: alibaba/jstorm

int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
_pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
_numSources = context.getThisSources().size();
Set<String> idFields = null;
for (GlobalStreamId source : context.getThisSources().keySet()) {
  Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
  Set<String> setFields = new HashSet<String>(fields.toList());

代码示例来源:origin: alibaba/jstorm

for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
  GlobalStreamId gid = entry.getKey();
  Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());

代码示例来源:origin: alibaba/jstorm

@Override
public void initState(T state) {
  if (stateInitialized) {
    LOG.warn("State is already initialized. Ignoring initState");
    return;
  }
  statefulWindowedBolt.initState((T) state);
  // query the streamState for each input task stream and compute recoveryStates
  for (GlobalStreamId streamId : topologyContext.getThisSources().keySet()) {
    for (int taskId : topologyContext.getComponentTasks(streamId.get_componentId())) {
      WindowState windowState = streamState.get(new TaskStream(taskId, streamId));
      if (windowState != null) {
        recoveryStates.put(new TaskStream(taskId, streamId), windowState);
      }
    }
  }
  LOG.debug("recoveryStates {}", recoveryStates);
  stateInitialized = true;
  start();
}

代码示例来源:origin: alibaba/jstorm

this.streamIds = new SerializationFactory.IdDictionary(sysTopology);
this.inputStreamIds = new HashSet<>();
Set<GlobalStreamId> inputs = topologyContext.getThisSources().keySet();
for (GlobalStreamId stream : inputs) {
  inputStreamIds.add(streamIds.getStreamId(stream.get_componentId(), stream.get_streamId()));

代码示例来源:origin: com.alibaba.jstorm/jstorm-core

public static Set<String> getInputStreamIds(TopologyContext context) {
  Set<String> ret = new HashSet<String>();
  Set<GlobalStreamId> inputs = context.getThisSources().keySet();
  for (GlobalStreamId streamId : inputs) {
    ret.add(streamId.get_streamId());
  }
  return ret;
}

代码示例来源:origin: com.alibaba.jstorm/jstorm-core

private Set<GlobalStreamId> getComponentStreams(TopologyContext context) {
  Set<GlobalStreamId> streams = new HashSet<>();
  for (GlobalStreamId streamId : context.getThisSources().keySet()) {
    if (!streamId.get_streamId().equals(CheckpointSpout.CHECKPOINT_STREAM_ID)) {
      streams.add(streamId);
    }
  }
  return streams;
}

代码示例来源:origin: com.n3twork.storm/storm-core

private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
  for(GlobalStreamId g: context.getThisSources().keySet()) {
    if(g.get_streamId().equals(sourceStream)) {
      return context.getComponentOutputFields(g);
    }
  }
  throw new RuntimeException("Could not find fields for source stream " + sourceStream);
}

代码示例来源:origin: com.alibaba.jstorm/jstorm-core

private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
  for (GlobalStreamId g : context.getThisSources().keySet()) {
    if (g.get_streamId().equals(sourceStream)) {
      return context.getComponentOutputFields(g);
    }
  }
  throw new RuntimeException("Could not find fields for source stream " + sourceStream);
}

代码示例来源:origin: com.alibaba.jstorm/jstorm-core

public Map<String, List<Integer>> getThisSourceComponentTasks(){
  Map<String, List<Integer>> ret = new HashMap<>();
  Map<GlobalStreamId, Grouping> sources = getThisSources();
  Set<String> sourceComponents = new HashSet<>();
  if (sources != null){
    for (GlobalStreamId globalStreamId : sources.keySet()){
      sourceComponents.add(globalStreamId.get_componentId());
    }
  }
  for (String component : sourceComponents){
    ret.put(component, getComponentTasks(component));
  }
  return ret;
}

代码示例来源:origin: com.alibaba.jstorm/jstorm-core

/**
 * returns the total number of input checkpoint streams across
 * all input tasks to this component.
 */
private int getCheckpointInputTaskCount(TopologyContext context) {
  int count = 0;
  for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
    if (CheckpointSpout.CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
      count += context.getComponentTasks(inputStream.get_componentId()).size();
    }
  }
  return count;
}

代码示例来源:origin: com.alibaba.jstorm/jstorm-core

for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
  GlobalStreamId gid = entry.getKey();
  Map<String, Object> stringSourceMap = stringSources.get(gid.get_componentId());

代码示例来源:origin: com.alibaba.jstorm/jstorm-core

@Override
public void initState(T state) {
  if (stateInitialized) {
    LOG.warn("State is already initialized. Ignoring initState");
    return;
  }
  statefulWindowedBolt.initState((T) state);
  // query the streamState for each input task stream and compute recoveryStates
  for (GlobalStreamId streamId : topologyContext.getThisSources().keySet()) {
    for (int taskId : topologyContext.getComponentTasks(streamId.get_componentId())) {
      WindowState windowState = streamState.get(new TaskStream(taskId, streamId));
      if (windowState != null) {
        recoveryStates.put(new TaskStream(taskId, streamId), windowState);
      }
    }
  }
  LOG.debug("recoveryStates {}", recoveryStates);
  stateInitialized = true;
  start();
}

代码示例来源:origin: tomdz/storm-esper

private void setupEventTypes(TopologyContext context, Configuration configuration)
{
  Set<GlobalStreamId> sourceIds = context.getThisSources().keySet();
  for (GlobalStreamId id : sourceIds) {
    String eventTypeName = getEventTypeName(id.get_componentId(), id.get_streamId());
    Fields fields = context.getComponentOutputFields(id.get_componentId(), id.get_streamId());
    TupleTypeDescriptor typeDesc = tupleTypes.get(new StreamId(id.get_componentId(), id.get_streamId()));
    Map<String, Object> props = setupEventTypeProperties(fields, typeDesc);
    configuration.addEventType(eventTypeName, props);
  }
}

代码示例来源:origin: com.alibaba.jstorm/jstorm-core

this.streamIds = new SerializationFactory.IdDictionary(sysTopology);
this.inputStreamIds = new HashSet<Integer>();
Set<GlobalStreamId> inputs = topologyContext.getThisSources().keySet();
for (GlobalStreamId stream : inputs) {
  inputStreamIds.add(streamIds.getStreamId(stream.get_componentId(), stream.get_streamId()));

相关文章

微信公众号

最新文章

更多