#storm:如何为同一数据源设置各种度量

tf7tbtn2  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(273)

我试图设置storm来聚合一个流,但是在同一个流上有不同的(drpc可用)度量。
e、 g.流由具有发送者、接收者、消息到达的通道和消息传递的网关的消息组成。我在决定如何组织一个或多个拓扑结构时遇到了问题,这些拓扑结构可以为我提供例如按网关和/或按通道的消息总数。除了总数,每分钟的计数也不错。
其基本思想是拥有一个接受消息传递事件的喷口,并从中根据需要聚合数据。目前我正在研究trident和drpc,我提出了两种可能的拓扑结构来解决这个阶段的问题。不能决定哪种方法更好,如果有的话?!
整个来源可在此要点。它有三个类:
无规喷口
用于发出消息数据
模拟真实数据源
分离主义
为所需的每个度量创建单独的drpc流
此外,还会为每个度量创建一个单独的查询状态
它们都使用相同的喷口示例
组合拓扑
创建一个包含所有所需指标的drpc流
为每个度量创建单独的查询状态
每个查询状态提取所需的度量并为其分组结果
现在,对于问题和疑问:
分离主义
是否需要使用同一个spout示例,或者我可以每次都说new randommessagespout()?
我喜欢这样的想法,即我不需要按照所有的度量来持久化分组数据,而只需要我们以后需要提取的分组
喷口发出的数据是否由所有状态/查询组合实际处理,例如不是第一个出现的?
这是否会在运行时支持动态添加新的状态/查询组合?
组合拓扑
我真的不喜欢这样的想法,即我需要按所有指标分组来持久化数据,因为我不需要所有的组合
令人惊讶的是,所有的度量总是返回相同的数据
e、 通道和网关查询返回状态度量数据
我发现这总是状态定义中按第一个字段分组的数据
本主题解释了这种行为背后的原因
但我想知道这是否是一个很好的方式做的事情放在第一位(并会找到一个方法围绕这个问题,如果需要的话)
statequery中的snapshotget与tuplecollectionget
使用snapshotget时,事情往往是可行的,但并非总是如此,只有TupleCollection才能解决问题
有没有关于正确方法的建议?
我想这是一个很长的问题/主题,但任何帮助都是非常感谢的!另外,如果我完全忽略了体系结构,那么关于如何实现这一点的建议将是最受欢迎的。提前感谢:-)

zynd9foi

zynd9foi1#

实际上你不能把一条小溪分成两段 SeparateTopology 通过调用 newStream() 使用相同的喷口示例,因为这样会创建相同喷口的新示例 RandomMessageSpout 喷口,这将导致多个单独的喷口示例向拓扑发送重复的值(喷口并行化只有在带有分区喷口的storm中才可能,其中每个喷口示例处理整个数据集的一个分区(例如,一个kafka分区)。
正确的方法是修改 CombinedTopology 根据需要为每个度量将流拆分为多个流(见下文),然后执行 groupBy() 根据这个指标的领域和 persistentAggregate() 在每一条新分支的河流上。
从三叉戟常见问题,
“each”返回一个流对象,您可以将其存储在变量中。然后可以在同一个流上运行多个eaches来拆分它,例如:

Stream s = topology.each(...).groupBy(...).aggregate(...)
Stream branch1 = s.each(...)
Stream branch2 = s.each(...)

请参阅storm邮件列表上的此线程,以及此线程以获取更多信息。

相关问题