Storm可靠性保证测试

x33g5p2x  于2020-09-30 发布在 Storm  
字(6.2k)|赞(0)|评价(0)|浏览(598)

Storm是一个分布式的实时计算框架,可以很方便地对流式数据进行实时处理和分析,能运用在实时分析、在线数据挖掘、持续计算以及分布式 RPC 等场景下。Storm 的实时性可以使得数据从收集到处理展示在秒级别内完成,从而为业务方决策提供实时的数据支持。

在美团点评公司内部,实时计算主要应用场景包括实时日志解析、用户行为分析、实时消息推送、消费趋势展示、实时新客判断、实时活跃用户数统计等。这些数据提供给各事业群,并作为他们实时决策的有力依据,弥补了离线计算“T+1”的不足。

在实时计算中,用户不仅仅关心时效性的问题,同时也关心消息处理的成功率。本文将通过实验验证 Storm 的消息可靠性保证机制,文章分为消息保证机制、测试目的、测试环境、测试场景以及总结等五节。

1. Storm 的消息保证机制

Storm 提供了三种不同层次的消息保证机制,分别是 At Most Once、At Least Once 以及 Exactly Once。消息保证机制依赖于消息是否被完全处理

1.1 消息完全处理

每个从 Spout(Storm 中数据源节点)发出的 Tuple(Storm 中的最小消息单元)可能会生成成千上万个新的 Tuple,形成一棵 Tuple 树,当整棵 Tuple 树的节点都被成功处理了,我们就说从 Spout 发出的 Tuple 被完全处理了。 我们可以通过下面的例子来更好地诠释消息被完全处理这个概念:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KafkaSpout(spoutConfig), spoutNum);
builder.setBolt("split", new SplitSentence(), 10)
    .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
    .fieldsGrouping("split", new Fields("word"));

这个 Topology 从 Kafka(一个开源的分布式消息队列)读取信息发往下游,下游的 Bolt 将收到的句子分割成单独的单词,并进行计数。每一个从 Spout 发送出来的 Tuple 会衍生出多个新的 Tuple,从 Spout 发送出来的 Tuple 以及后续衍生出来的 Tuple 形成一棵 Tuple 树,下图是一棵 Tuple 树示例:

上图中所有的 Tuple 都被成功处理了,我们才认为 Spout 发出的 Tuple 被完全处理。如果在一个固定的时间内(这个时间可以配置,默认为 30 秒),有至少一个 Tuple 处理失败或超时,则认为整棵 Tuple 树处理失败,即从 Spout 发出的 Tuple 处理失败。

1.2 如何实现不同层次的消息保证机制

<img src="img/storm_tuple_guarantee.jpg" style="zoom:67%;" />

Tuple 的完全处理需要 Spout、Bolt 以及 Acker(Storm 中用来记录某棵 Tuple 树是否被完全处理的节点)协同完成,如上图所示。从 Spout 发送 Tuple 到下游,并把相应信息通知给 Acker,整棵 Tuple 树中某个 Tuple 被成功处理了都会通知 Acker,待整棵 Tuple 树都被处理完成之后,Acker 将成功处理信息返回给 Spout;如果某个 Tuple 处理失败,或者超时,Acker 将会给 Spout 发送一个处理失败的消息,Spout 根据 Acker 的返回信息以及用户对消息保证机制的选择判断是否需要进行消息重传。

Storm 提供的三种不同消息保证机制中。利用 Spout、Bolt 以及 Acker 的组合我们可以实现 At Most Once 以及 At Least Once 语义,Storm 在 At Least Once 的基础上进行了一次封装(Trident),从而实现 Exactly Once 语义。

Storm 的消息保证机制中,如果需要实现 At Most Once 语义,只需要满足下面任何一条即可:

  • 关闭 ACK 机制,即 Acker 数目设置为 0
  • Spout 不实现可靠性传输
    • Spout 发送消息是使用不带 message ID 的 API
    • 不实现 fail 函数
  • Bolt 不把处理成功或失败的消息发送给 Acker

如果需要实现 At Least Once 语义,则需要同时保证如下几条:

  • 开启 ACK 机制,即 Acker 数目大于 0
  • Spout 实现可靠性传输保证
    • Spout 发送消息时附带 message 的 ID
    • 如果收到 Acker 的处理失败反馈,需要进行消息重传,即实现 fail 函数
  • Bolt 在处理成功或失败后需要调用相应的方法通知 Acker

实现 Exactly Once 语义,则需要在 At Least Once 的基础上进行状态的存储,用来防止重复发送的数据被重复处理,在 Storm 中使用 Trident API 实现。

下图中,每种消息保证机制中左边的字母表示上游发送的消息,右边的字母表示下游接收到的消息。从图中可以知道,At Most Once 中,消息可能会丢失(上游发送了两个 A,下游只收到一个 A);At Least Once 中,消息不会丢失,可能重复(上游只发送了一个B ,下游收到两个B);Exactly Once 中,消息不丢失、不重复,因此需要在 At Least Once 的基础上保存相应的状态,表示上游的哪些消息已经成功发送到下游,防止同一条消息发送多次给下游的情况。

2. 测试目的

Storm 官方提供 At Most Once、At Least Once 以及 Exactly Once 三种不同层次的消息保证机制,我们希望通过相关测试,达到如下目的:

  • 三种消息保证机制的表现,是否与官方的描述相符;
  • At Most Once 语义下,消息的丢失率和什么有关系、关系如何;
  • At Least Once 语义下,消息的重复率和什么有关系、关系如何。

3. 测试环境

本文的测试环境如下: 每个 worker(worker 为一个 物理 JVM 进程,用于运行实际的 Storm 作业)分配 1 CPU 以及 1.6G 内存。Spout、Bolt、Acker分别跑在单独的 worker 上。并通过在程序中控制抛出异常以及人工 Kill Spout/Bolt/Acker 的方式来模拟实际情况中的异常情况。

三种消息保证机制的测试均由 Spout 从 Kafka 读取测试数据,经由相应 Bolt 进行处理,然后发送到 Kafka,并将 Kafka 上的数据同步到 MySQL 方便最终结果的统计。

测试数据为 Kafka 上顺序保存的一系列纯数字,数据量分别有十万、五十万、一百万等,每个数字在每个测试样例中出现且仅出现一次。

4. 测试场景

对于三种不同的消息保证机制,我们分别设置了不同的测试场景,来进行充分的测试。其中为了保证 Spout/Bolt/Acker 发生异常的情况下不影响其他节点,在下面的测试中,所有的节点单独运行在独立的 Worker 上。

4.1 At Most Once

从背景中可以得知,如果希望实现 At Most Once 语义,将 Acker 的数目设置为 0 即可,本文的测试过程中通过把设置 Acker 为 0 来进行 At Most Once 的测试。

4.1.1 输入数据

保存在 Kafka 上的一系列纯数字,数据量从十万到五百万不等,每个测试样例中,同一个数字在 Kafka 中出现且仅出现一次。

4.1.2 测试结果

异常次数测试数据总量结果集中不同 Tuple 的总量丢失的 Tuple 数据量Tuple 的丢失百分比Tuple 的重复量
050000050000000%0
01000000100000000%0
02000000200000000%0
03000000300000000%0
异常次数测试数据总量结果集中不同 Tuple 的总量丢失的 Tuple 数据量Tuple 的丢失百分比Tuple 的重复量
1300000027749402250607.50%0
23000000230708769291323.09%0
33000000208282391717730.57%0
430000001420725157927552.64%0

4.1.4 结论

不发生异常的情况下,消息能够不丢不重;Bolt 发生异常的情况下,消息会丢失,不会重复,其中消息的丢失数目异常次数正相关。与官方文档描述相符,符合预期。

4.2 At Least Once

为了实现 At Least Once 语义,需要 Spout、Bolt、Acker 进行配合。我们使用 Kafka-Spout 并通过自己管理 offset 的方式来实现可靠的 Spout;Bolt 通过继承 BaseBasicBolt,自动帮我们建立 Tuple 树以及消息处理之后通知 Acker;将 Acker 的数目设置为 1,即打开 ACK 机制,这样整个 Topology 即可提供 At Least Once 的语义。

4.2.1 测试数据

Kafka 上保存的十万到五十万不等的纯数字,其中每个测试样例中,每个数字在 Kafka 中出现且仅出现一次。

4.2.2 测试结果

Acker 发生异常的情况

异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量最大积压量
0100000100000--02000(默认值)
0200000200000--02000
0300000300000--02000
0400000400000--02000
异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量最大积压量
11000001000002200002000
21000001000002400102000
31000001000002600002000
41000001000002800002000

Spout 发生异常的情况

异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
0100000100000--0
0200000200000--0
0300000300000--0
0400000400000--0
异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
1100000100000220520
2100000100000244140
4100000100000290080
6100000100000296900

Bolt 发生异常的情况

调用 emit 函数之前发生异常

异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
0100000100000--0
0200000200000--0
0300000300000--0
0400000400000--0
异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
1100000100000--0
2200000200000--0
4300000300000--0
8400000400000--0
10400000400000--0

调用 emit 函数之后发生异常

异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
0100000100000--0
0200000200000--0
0300000300000--0
0400000400000--0
异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
1100000100000220
2200000200000230
4300000300000250
8400000400000290
104000004000002110

4.2.3 结论

从上面的表格中可以得到,消息不会丢失,可能发生重复,重复的数目与异常的情况相关。

  • 不发生任何异常的情况下,消息不会重复不会丢失。
  • Spout 发生异常的情况下,消息的重复数目约等于 spout.max.pending(Spout 的配置项,每次可以发送的最多消息条数) * NumberOfException(异常次数)。
  • Acker 发生异常的情况下,消息重复的数目等于 spout.max.pending * NumberOfException。
  • Bolt 发生异常的情况:
    • emit 之前发生异常,消息不会重复。
    • emit 之后发生异常,消息重复的次数等于异常的次数

结论与官方文档所述相符,每条消息至少发送一次,保证数据不会丢失,但可能重复,符合预期。

4.3 Exactly Once

对于 Exactly Once 的语义,利用 Storm 中的 Trident 来实现。

4.3.1 测试数据

Kafka 上保存的一万到一百万不等的数字,每个数字在每次测试样例中出现且仅出现一次。

4.3.2 测试结果

Spout 发生异常情况

异常数测试数据量结果集中不重复的 Tuple 数结果集中所有 Tuple 的总和
1100001000050005000
2100001000050005000
3100001000050005000

Acker 发生异常的情况

异常数测试数据量结果集中不重复的 Tuple 数结果集中所有 Tuple 的总和
1100001000050005000
2100001000050005000
3100001000050005000

Bolt发生异常情况

异常数测试数据量结果集中不重复的 Tuple 数结果集中所有 Tuple 的总和
1100001000050005000
2100001000050005000
3100001000050005000

4.3.3 结论

在所有情况下,最终结果集中的消息不会丢失,不会重复,与官方文档中的描述相符,符合预期。

5. 总结

对 Storm 提供的三种不同消息保证机制,用户可以根据自己的需求选择不同的消息保证机制。

5.1 不同消息可靠性保证的使用场景

对于 Storm 提供的三种消息可靠性保证,优缺点以及使用场景如下所示:

可靠性保证层次优点缺点使用场景
At most once处理速度快数据可能丢失都处理速度要求高,且对数据丢失容忍度高的场景
At least once数据不会丢失数据可能重复不能容忍数据丢失,可以容忍数据重复的场景
Exactly once数据不会丢失,不会重复处理速度慢对数据不丢不重性质要求非常高,且处理速度要求没那么高,比如支付金额

5.2 如何实现不同层次的消息可靠性保证

对于 At Least Once 的保证需要做如下几步:

  • 需要开启 ACK 机制,即 Topology 中的 Acker 数量大于零;
  • Spout 是可靠的。即 Spout 发送消息的时候需要附带 msgId,并且实现失败消息重传功能(fail 函数 ,可以参考下面的 Spout 代码);
  • Bolt 在发送消息时,需要调用 emit(inputTuple, outputTuple)进行建立 anchor 树(参考下面建立 anchor 树的代码),并且在成功处理之后调用 ack ,处理失败时调用 fail 函数,通知 Acker。

不满足以上三条中任意一条的都只提供 At Most Once 的消息可靠性保证,如果希望得到 Exactly Once 的消息可靠性保证,可以使用 Trident 进行实现。

5.3 不同层次的可靠性保证如何实现

相关文章