中间件(六):RocketMQ的核心底层原理

x33g5p2x  于2021-09-28 转载在 其他  
字(5.3k)|赞(0)|评价(0)|浏览(460)

1、RocketMQ是如何基于Netty扩展出高性能网络通信架构的?

Broker有一个Reactor主线程。

Reactor主线程负责监听一个网络端口,比如监听2888等端口。

  • 短连接:如果你要给别人发送一个请求,必须要建立连接 -> 发送请求 -> 接收响应 -> 断开连接,下一次你要发送请求的时候,这个过程得重新来一遍。每次建立一个连接之后,使用这个连接发送请求的时间是很短的,很快就会断开这个连接,所以他存在时间太短了,就是短连接。
  • 长连接:你建立一个连接 -> 发送请求 -> 接收响应 -> 发送请求 -> 接收响应 -> 发送请求 -> 接收响应。大家会发现,当你建立好一个长连接之后,可以不停的发送请求和接收响应,连接不会断开,等你不需要的时候再断开就行了,这个连接会存在很长时间,所以是长连接。
  • TCP就是一个协议,所谓协议的意思就是,按照TCP这个协议规定好的步骤建立连接,按照他 规定好的步骤发送请求。比如你要建立一个TCP连接,必须先给对方发送他规定好的几个数据,然后人家按照规定返回给你几个数据,你再给人家发送几个数据,一切都按TCP的规定来。按照规定来,大家就可以建立一个TCP连接。
  • TCP长连接,就是按照这个TCP协议建立的长连接。

1.1、Producer和Broker建立一个长连接

有一个Producer他就要跟Broker建立一个TCP长连接了,此时Broker上的这个Reactor主线程。

Reactor主线程作用:一方面他会在端口上监听到这个Producer建立连接的请求,另一方面 负责跟这个Producer按照TCP协议规定的一系列步骤和规范,建立好一个长连接。

 Producer和Broker里面各会有一个SocketChannel,这两个SocketChannel就代表了他们俩建立好的这个长连接。

首先 Producer和Broker之间已经通过SocketChannel维持了一个长连接了,接着Producer会通过这个SocketChannel去发送消息给Broker。

1.2、让Producer发送消息给Broker

1.2.1、基于Reactor线程池监听连接中的请求

Reactor线程池里面默认3个线程。Reactor主线程建立好的每个连接SocketChannel,都会交给这个Reactor线程池里的其中一个线程去监听请求。

Producer发送消息:Producer通过SocketChannel发送一个消息到达Broker里的SocketChannel,此时Reactor线程池里的一个线程会监听到这个SocketChannel中有请求到达了!

1.2.2、基于Worker线程池完成一系列准备工作

 Reactor线程从SocketChannel中读取出来一个请求,这个请求在正式进行处理之前,必须就先要进行一些准备工作和预处理,比如SSL加密验证、编码解码、连接空闲检查、网络连接管理,诸如此类的一些事。

Worker线程池,他默认有8个线程,此时Reactor线程收到的这个请求会交给Worker线程池中的一个线程进行处理,会完成上述一系列的准备工作。

1.2.3、基于业务线程池完成请求的处理

SendMessage线程池:对于处理发送消息请求而言,就会把请求转交给SendMessage线程池,而且如果大家还有一点点印象的话,其实在之前讲集群部署的时候,我们讲到过这个SendMessage线程是可以配置的,你配置的越多,自然处理消息的吞吐量越高。

1.3、为什么这套网络通信框架会是高性能以及高并发的?

必须专门分配一个Reactor主线程出来,就是专门负责跟各种Producer、Consumer之类的建立长连接。 一旦连接建立好之后,大量的长连接均匀的分配给Reactor线程池里的多个线程。

 每个Reactor线程负责监听一部分连接的请求,这个也是一个优化点,通过多线程并发的监听不同连接的请求,可以有效的提升大量并发请求过来时候的处理能力,可以提升网络框架的并发能力。
接着后续对大量并发过来的请求都是基于Worker线程池进行预处理的,当Worker线程池预处理多个请求的时候,Reactor线程还是可以有条不紊的继续监听和接收大量连接的请求是否到达。

而且最终的读写磁盘文件之类的操作都是交给业务线程池来处理的,当他并发执行多个请求的磁盘读写操作的时候,不影响其他线程池同时接收请求、预处理请求,没任何的影响。
所以最终的效果就是:

  • Reactor主线程在端口上监听Producer建立连接的请求,建立长连接
  • Reactor线程池并发的监听多个连接的请求是否到达
  • Worker请求并发的对多个请求进行预处理
  • 业务线程池并发的对多个请求进行磁盘读写业务操作

这些事情全部是利用不同的线程池并发执行的!任何一个环节在执行的时候,都不会影响其他线程池在其他环节进行请求的处理!

这样的一套网络通信架构,最终实现的效果就是可以高并发、高吞吐的对大量网络连接发送过来的大量请求进行处理,这是保证Broker实现高吞吐的一个非常关键的环节,就是这套网络通信架构。因此对于这类中间件,如果你给他部署在高配置的物理机上,有几十个CPU核,那么此时你可以增加他的各种线程池的线程数量,这样就可以让各个环节同时高并发的处理大量的请求,由大量的CPU核来支持大量线程的并发工作

2、BIO、NIO、AIO以及Netty之间的关系是什

  • BIO同步阻塞
  • NIO 同步非阻塞
  • AIO 异步非阻塞
  • Netty 是基于 NIO + Reactor 模式封装的一套高性能网络框架,解决了原生 NIO 开发中许多问题,诸如 拆包粘包 自定义协议 消息序列化器 等。

3、基于mmap内存映射实现CommitLog磁盘文件的高性能读写

3.1、mmap:Broker读写磁盘文件的核心技术

 通过之前的学习,我们知道了一点,就是Broker对磁盘文件的写入主要是借助直接写入os cache来实现性能优化的,因为直接写入os cache,相当于就是写入内存一样的性能,后续等os内核中的线程异步把cache中的数据刷入磁盘文件即可。

 Broker中就是大量的使用mmap 技术去实现CommitLog这种大磁盘文件的高性能读写优化的。

参考:内存映射文件原理探索_我可能长大了-CSDN博客_内存映射

总结 

4、消息丢失问题

从 RocketMQ 全链路分析为什么用户支付后没收到红包?

  • 订单系统在完成支付之后,会推送一条消息到RocketMQ里去,然后红包系统会从RocketMQ里接收那条消息去给用户发现金红包,

 4.1、订单系统推送给RocketMQ的消息丢失

  • 订单系统在推送消息到RocketMQ的过程中,是通过网络去进行传输的,但是这个时候恰巧可能网络发生了抖动,也就是网络突然有点问题,导致这次网络通信失败了。
  • MQ确实是收到消息了,但是他的网络通信模块的代码出现了异常,可能是他内部的网络通信的bug,导致消息没成功处理。
  • 在写消息到RocketMQ的过程中,刚好遇到了某个Leader Broker自身故障,其他的Follower Broker正在尝试切换为LeaderBroker,这个过程中也可能会有异常。

不一定你发送消息出去就一定会成功,有可能就会失败,此时你的代码里可能会抛出异常,也可能不会抛出异常,这都不好说,具体要看到底什么原因导致的消息推送失败。

4.2、MQ自己导致消息丢失

消息写入MQ之后,其实MQ可能仅仅是把这个消息给写入到page cache里,也就是os自己管理的一个缓冲区,这本质也是内存你认为写成功了一个消息,但是此时仅仅进入了os cache,还没写入磁盘呢。这个时候,假如要是出现了Broker机器的崩溃,机器一旦宕机,os cache内存中的数据就没了。

4.3、磁盘故障

磁盘出现故障,比如磁盘坏了,你上面存储的数据还是会丢失

4.4、红包系统拿到了消息,也会丢失数据

默认情况下,MQ的消费者有可能会自动提交已经消费的offset,那么如果此时你还没处理这个消息派发红包的情况下,MQ的消费者可能直接自动给你提交这个消息1的offset到broker去了,标识为你已经成功处理了这个消息,接着恰巧在这个时候,我们的红包系统突然重启了,或者是宕机了,或者是可能在派发红包的时候更新数据库失败了,总之就是他突然 故障了,红包系统的机器重启了一下,然后此时内存里的消息1必然就丢失了,而且红包也没发出去。

 总结:为什么用户支付了,但是红包没发出去呢?

  • 比如订单系统推送消息到MQ就失败了,压根儿就没推送过去;
  • 或者是消息确实推送到MQ了,但是结果MQ自己机器故障,把消息搞丢了;
  • 或者是红包系统拿到了消息,但是他把消息搞丢了,结果红包还没来得及发。
  • 如果真的在生产环境里要搞明白这个问题,就必须要打更多的日志去一点点分析消息到底是在哪个环节丢失了?
  • 如果订单系统推送了消息,结果红包系统连消息都没收到,那可能消息根本就没发到MQ去,或者MQ自己搞丢了消息。
  • 如果红包系统收到了消息,结果红包没派发,那么就是红包系统搞丢了消息。

5、发送消息零丢失方案:RocketMQ事务消息的实现流程分析

5.1、Rocket事务消息机制原理

事务消息的功能,凭借这个事务级的消息机制,就可以让我们确保订单系统推送给出去的消息一定会成功写入MQ里,绝对不会半路就搞丢了。

5.2、发送half消息到MQ,试探MQ是否正常

第一件事,不是先让订单系统做一些增删改操作,而是先发一个half消息给MQ以及收到他的成功的响应,初步先跟MQ做个联系和沟通,确认一下MQ还活着,MQ也知道你后续可能想发送一条很关键的不希望丢失的消息给他了!
首先要让订单系统去发送一条half消息到MQ去,这个half消息本质就是一个订单支付成功
的消息,只不过你可以理解为他这个消息的状态是half状态,这个时候红包系统是看不见这个half消息的,然后我们去等待接收这个half消息写入成功的响应通知。

5.2.1、万一half消息写入失败

可能你发现报错了,可能MQ就挂了,或者这个时候网络就是故障了,所以导致你的half消息都没发送成功,总之你现在肯定没法跟MQ通信了。
 这个时候你的订单系统就应该执行一系列的回滚操作,比如对订单状态做一个更新,让状态变成“关闭交易”,同时通知支付系统自动进行退款,这才是正确的做法。因为你订单虽然支付了,但是包括派发红包、发送优惠券之类的后续操作是无法执行的,所以此时必然应该把钱款退还给用户,说交易失败了。

5.2.2、half消息成功后,订单系统完成自己工作

half消息成功,则订单系统就应该在自己本地的数据库里执行一些增删改操作了,因为一旦half消息写成功了,就说明MQ肯定已经收到这条消息了,MQ还活着,而且目前你是可以跟MQ正常沟通的。

5.2.3、订单系统的本地事务执行失败了怎么办?

订单系统的数据库当时也有网络异常,或者数据库挂了,总而言之,就是你想把订单更新为“已完成”这个状态,是干不成了。
 直接让订单系统发送一个rollback请求给MQ就可以了。这个意思就是说,你可以把之前我发给你的half 消息给删除掉了,因为我自己这里都出问题了,已经无力跟你继续后续的流程了。

5.2.4、如果订单系统完成了本地事务之后,接着干什么?

如果订单系统成功完成了本地的事务操作,比如把订单状态都更新为“已完成”了。
发送一个commit请求给MQ,要求让MQ对之前的half消息进行commit操作,让红包系统可以看见这个订单支付成功消息。 所谓的half消息实际就是订单支付成功的消息,只不过他的状态是half也就是他是half状态的时候,红包系统是看不见他的,没法获取到这条消息,必须等到订单系统执行commit请求,消息被commit之后,红包系统才可以看到和获取这条消息进行后续处理

5.2.5、half 消息是如何对消费者不可见的?

本质原因就是RocketMQ一旦发现你发送的是一个half消息,他不会把这个half消息的offset写入OrderPaySuccessTopic的ConsumeQueue里去。 他会把这条half消息写入到自己内部的“RMQ_SYS_TRANS_HALF_TOPIC”这个Topic对应的一个ConsumeQueue里去。 

6、kafka消息零丢失方案

且在基于Kafka作为消息中间件的消息零丢失方案中,对于发送消息这块,因为Kafka本身不具备RocketMQ这种事务消息的高级功能,所以一般我们都是对Kafka会采用同步发消息 + 反复重试多次的方案,去保证消息成功投递到Kafka的。

7、MQ确保数据零丢失的方案总结

只要你把Broker的刷盘策略调整为同步刷盘,那么绝对不会因为机器宕机而丢失数据;只要你采用了主从架构的Broker集群,那么一条消息写入成功,就意味着多个Broker机器都写入了,此时任何一台机器的磁盘故障,数 据也是不会丢失的。 最起码只要Broker层面保证写入的数据不丢失,那就一定可以让红包系统消费到这条消息了!

相关文章

微信公众号

最新文章

更多