Broker有一个Reactor主线程。
Reactor主线程负责监听一个网络端口,比如监听2888等端口。
有一个Producer他就要跟Broker建立一个TCP长连接了,此时Broker上的这个Reactor主线程。
Reactor主线程作用:一方面他会在端口上监听到这个Producer建立连接的请求,另一方面 负责跟这个Producer按照TCP协议规定的一系列步骤和规范,建立好一个长连接。
Producer和Broker里面各会有一个SocketChannel,这两个SocketChannel就代表了他们俩建立好的这个长连接。
首先 Producer和Broker之间已经通过SocketChannel维持了一个长连接了,接着Producer会通过这个SocketChannel去发送消息给Broker。
Reactor线程池里面默认3个线程。Reactor主线程建立好的每个连接SocketChannel,都会交给这个Reactor线程池里的其中一个线程去监听请求。
Producer发送消息:Producer通过SocketChannel发送一个消息到达Broker里的SocketChannel,此时Reactor线程池里的一个线程会监听到这个SocketChannel中有请求到达了!
Reactor线程从SocketChannel中读取出来一个请求,这个请求在正式进行处理之前,必须就先要进行一些准备工作和预处理,比如SSL加密验证、编码解码、连接空闲检查、网络连接管理,诸如此类的一些事。
Worker线程池,他默认有8个线程,此时Reactor线程收到的这个请求会交给Worker线程池中的一个线程进行处理,会完成上述一系列的准备工作。
SendMessage线程池:对于处理发送消息请求而言,就会把请求转交给SendMessage线程池,而且如果大家还有一点点印象的话,其实在之前讲集群部署的时候,我们讲到过这个SendMessage线程是可以配置的,你配置的越多,自然处理消息的吞吐量越高。
必须专门分配一个Reactor主线程出来,就是专门负责跟各种Producer、Consumer之类的建立长连接。 一旦连接建立好之后,大量的长连接均匀的分配给Reactor线程池里的多个线程。
每个Reactor线程负责监听一部分连接的请求,这个也是一个优化点,通过多线程并发的监听不同连接的请求,可以有效的提升大量并发请求过来时候的处理能力,可以提升网络框架的并发能力。
接着后续对大量并发过来的请求都是基于Worker线程池进行预处理的,当Worker线程池预处理多个请求的时候,Reactor线程还是可以有条不紊的继续监听和接收大量连接的请求是否到达。
而且最终的读写磁盘文件之类的操作都是交给业务线程池来处理的,当他并发执行多个请求的磁盘读写操作的时候,不影响其他线程池同时接收请求、预处理请求,没任何的影响。
所以最终的效果就是:
这些事情全部是利用不同的线程池并发执行的!任何一个环节在执行的时候,都不会影响其他线程池在其他环节进行请求的处理!
这样的一套网络通信架构,最终实现的效果就是可以高并发、高吞吐的对大量网络连接发送过来的大量请求进行处理,这是保证Broker实现高吞吐的一个非常关键的环节,就是这套网络通信架构。因此对于这类中间件,如果你给他部署在高配置的物理机上,有几十个CPU核,那么此时你可以增加他的各种线程池的线程数量,这样就可以让各个环节同时高并发的处理大量的请求,由大量的CPU核来支持大量线程的并发工作
通过之前的学习,我们知道了一点,就是Broker对磁盘文件的写入主要是借助直接写入os cache来实现性能优化的,因为直接写入os cache,相当于就是写入内存一样的性能,后续等os内核中的线程异步把cache中的数据刷入磁盘文件即可。
Broker中就是大量的使用mmap 技术去实现CommitLog这种大磁盘文件的高性能读写优化的。
参考:内存映射文件原理探索_我可能长大了-CSDN博客_内存映射
从 RocketMQ 全链路分析为什么用户支付后没收到红包?
不一定你发送消息出去就一定会成功,有可能就会失败,此时你的代码里可能会抛出异常,也可能不会抛出异常,这都不好说,具体要看到底什么原因导致的消息推送失败。
消息写入MQ之后,其实MQ可能仅仅是把这个消息给写入到page cache里,也就是os自己管理的一个缓冲区,这本质也是内存你认为写成功了一个消息,但是此时仅仅进入了os cache,还没写入磁盘呢。这个时候,假如要是出现了Broker机器的崩溃,机器一旦宕机,os cache内存中的数据就没了。
磁盘出现故障,比如磁盘坏了,你上面存储的数据还是会丢失
默认情况下,MQ的消费者有可能会自动提交已经消费的offset,那么如果此时你还没处理这个消息派发红包的情况下,MQ的消费者可能直接自动给你提交这个消息1的offset到broker去了,标识为你已经成功处理了这个消息,接着恰巧在这个时候,我们的红包系统突然重启了,或者是宕机了,或者是可能在派发红包的时候更新数据库失败了,总之就是他突然 故障了,红包系统的机器重启了一下,然后此时内存里的消息1必然就丢失了,而且红包也没发出去。
事务消息的功能,凭借这个事务级的消息机制,就可以让我们确保订单系统推送给出去的消息一定会成功写入MQ里,绝对不会半路就搞丢了。
第一件事,不是先让订单系统做一些增删改操作,而是先发一个half消息给MQ以及收到他的成功的响应,初步先跟MQ做个联系和沟通,确认一下MQ还活着,MQ也知道你后续可能想发送一条很关键的不希望丢失的消息给他了!
首先要让订单系统去发送一条half消息到MQ去,这个half消息本质就是一个订单支付成功
的消息,只不过你可以理解为他这个消息的状态是half状态,这个时候红包系统是看不见这个half消息的,然后我们去等待接收这个half消息写入成功的响应通知。
可能你发现报错了,可能MQ就挂了,或者这个时候网络就是故障了,所以导致你的half消息都没发送成功,总之你现在肯定没法跟MQ通信了。
这个时候你的订单系统就应该执行一系列的回滚操作,比如对订单状态做一个更新,让状态变成“关闭交易”,同时通知支付系统自动进行退款,这才是正确的做法。因为你订单虽然支付了,但是包括派发红包、发送优惠券之类的后续操作是无法执行的,所以此时必然应该把钱款退还给用户,说交易失败了。
half消息成功,则订单系统就应该在自己本地的数据库里执行一些增删改操作了,因为一旦half消息写成功了,就说明MQ肯定已经收到这条消息了,MQ还活着,而且目前你是可以跟MQ正常沟通的。
订单系统的数据库当时也有网络异常,或者数据库挂了,总而言之,就是你想把订单更新为“已完成”这个状态,是干不成了。
直接让订单系统发送一个rollback请求给MQ就可以了。这个意思就是说,你可以把之前我发给你的half 消息给删除掉了,因为我自己这里都出问题了,已经无力跟你继续后续的流程了。
如果订单系统成功完成了本地的事务操作,比如把订单状态都更新为“已完成”了。
发送一个commit请求给MQ,要求让MQ对之前的half消息进行commit操作,让红包系统可以看见这个订单支付成功消息。 所谓的half消息实际就是订单支付成功的消息,只不过他的状态是half也就是他是half状态的时候,红包系统是看不见他的,没法获取到这条消息,必须等到订单系统执行commit请求,消息被commit之后,红包系统才可以看到和获取这条消息进行后续处理
本质原因就是RocketMQ一旦发现你发送的是一个half消息,他不会把这个half消息的offset写入OrderPaySuccessTopic的ConsumeQueue里去。 他会把这条half消息写入到自己内部的“RMQ_SYS_TRANS_HALF_TOPIC”这个Topic对应的一个ConsumeQueue里去。
且在基于Kafka作为消息中间件的消息零丢失方案中,对于发送消息这块,因为Kafka本身不具备RocketMQ这种事务消息的高级功能,所以一般我们都是对Kafka会采用同步发消息 + 反复重试多次的方案,去保证消息成功投递到Kafka的。
只要你把Broker的刷盘策略调整为同步刷盘,那么绝对不会因为机器宕机而丢失数据;只要你采用了主从架构的Broker集群,那么一条消息写入成功,就意味着多个Broker机器都写入了,此时任何一台机器的磁盘故障,数 据也是不会丢失的。 最起码只要Broker层面保证写入的数据不丢失,那就一定可以让红包系统消费到这条消息了!
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/mingyuli/article/details/120520963
内容来源于网络,如有侵权,请联系作者删除!