本文整理了Java中org.apache.storm.task.TopologyContext.getComponentOutputFields()
方法的一些代码示例,展示了TopologyContext.getComponentOutputFields()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopologyContext.getComponentOutputFields()
方法的具体详情如下:
包路径:org.apache.storm.task.TopologyContext
类名称:TopologyContext
方法名:getComponentOutputFields
暂无
代码示例来源:origin: apache/storm
/**
* Gets the declared output fields for the specified stream id for the component this task is a part of.
*/
public Fields getThisOutputFields(String streamId) {
return getComponentOutputFields(getThisComponentId(), streamId);
}
代码示例来源:origin: apache/storm
private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
for (GlobalStreamId g : context.getThisSources().keySet()) {
if (g.get_streamId().equals(sourceStream)) {
return context.getComponentOutputFields(g);
}
}
throw new RuntimeException("Could not find fields for source stream " + sourceStream);
}
代码示例来源:origin: apache/storm
/**
* Gets the declared input fields for this component.
*
* @return A map from sources to streams to fields.
*/
public Map<String, Map<String, List<String>>> getThisInputFields() {
Map<String, Map<String, List<String>>> outputMap = new HashMap<>();
for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
String componentId = entry.getKey().get_componentId();
Set<String> streams = getComponentStreams(componentId);
for (String stream : streams) {
Map<String, List<String>> streamFieldMap = outputMap.get(componentId);
if (streamFieldMap == null) {
streamFieldMap = new HashMap<>();
outputMap.put(componentId, streamFieldMap);
}
streamFieldMap.put(stream, getComponentOutputFields(componentId, stream).toList());
}
}
return outputMap;
}
代码示例来源:origin: apache/storm
@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
_fieldLocations = new HashMap<String, GlobalStreamId>();
_collector = collector;
int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
_pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
_numSources = context.getThisSources().size();
Set<String> idFields = null;
for (GlobalStreamId source : context.getThisSources().keySet()) {
Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
Set<String> setFields = new HashSet<String>(fields.toList());
if (idFields == null) {
idFields = setFields;
} else {
idFields.retainAll(setFields);
}
for (String outfield : _outFields) {
for (String sourcefield : fields) {
if (outfield.equals(sourcefield)) {
_fieldLocations.put(outfield, source);
}
}
}
}
_idFields = new Fields(new ArrayList<String>(idFields));
if (_fieldLocations.size() != _outFields.size()) {
throw new RuntimeException("Cannot find all outfields among sources");
}
}
代码示例来源:origin: org.apache.storm/storm-core
/**
* Gets the declared output fields for the specified stream id for the
* component this task is a part of.
*/
public Fields getThisOutputFields(String streamId) {
return getComponentOutputFields(getThisComponentId(), streamId);
}
代码示例来源:origin: org.apache.storm/storm-core
private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
for(GlobalStreamId g: context.getThisSources().keySet()) {
if(g.get_streamId().equals(sourceStream)) {
return context.getComponentOutputFields(g);
}
}
throw new RuntimeException("Could not find fields for source stream " + sourceStream);
}
代码示例来源:origin: org.apache.storm/storm-core
/**
* Gets the declared input fields for this component.
*
* @return A map from sources to streams to fields.
*/
public Map<String, Map<String, List<String>>> getThisInputFields() {
Map<String, Map<String, List<String>>> outputMap = new HashMap<>();
for (Map.Entry<GlobalStreamId, Grouping> entry : this.getThisSources().entrySet()) {
String componentId = entry.getKey().get_componentId();
Set<String> streams = getComponentStreams(componentId);
for (String stream : streams) {
Map<String, List<String>> streamFieldMap = outputMap.get(componentId);
if (streamFieldMap == null) {
streamFieldMap = new HashMap<>();
outputMap.put(componentId, streamFieldMap);
}
streamFieldMap.put(stream, getComponentOutputFields(componentId, stream).toList());
}
}
return outputMap;
}
代码示例来源:origin: Paleozoic/storm_spring_boot_demo
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_fieldLocations = new HashMap<String, GlobalStreamId>();
_collector = collector;
int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
_pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
_numSources = context.getThisSources().size();
Set<String> idFields = null;
for (GlobalStreamId source : context.getThisSources().keySet()) {
Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
Set<String> setFields = new HashSet<String>(fields.toList());
if (idFields == null)
idFields = setFields;
else
idFields.retainAll(setFields);
for (String outfield : _outFields) {
for (String sourcefield : fields) {
if (outfield.equals(sourcefield)) {
_fieldLocations.put(outfield, source);
}
}
}
}
_idFields = new Fields(new ArrayList<String>(idFields));
if (_fieldLocations.size() != _outFields.size()) {
throw new RuntimeException("Cannot find all outfields among sources");
}
}
代码示例来源:origin: org.apache.flink/flink-storm
this.inputStreamIds.put(tid, inputStream.get_streamId());
this.inputSchemas.put(tid,
this.topologyContext.getComponentOutputFields(inputStream));
内容来源于网络,如有侵权,请联系作者删除!