RocketMQ学习笔记

x33g5p2x  于2021-12-06 转载在 其他  
字(6.5k)|赞(0)|评价(0)|浏览(225)

学习RocketMQ做的笔记。

1、MQ概述。

2、为什么使用MQ。

(1)优势:

1)应用解耦。

2)异步提速。

3)削峰填谷、限流削峰。

4)数据收集。

(2)劣势:

1)系统可用性降低。

2)系统复杂度提高。

3)一致性问题。

3、常见MQ产品。

(1)ActiveMQ。

(2)RabbitMQ。

(3)Kafka。

(4)RocketMQ。

4、常见MQ协议。

(1)JMS。

(2)STOMP。

(3)AMQP。

(4)MQTT。

5、基本概念。

(1)Message,消息。

(2)Topic,主题。

(3)Tag,标签。

(4)Queue,队列。

(5)消息标识MessageId|key。

6、RocketMQ工作原理图,系统架构。

(1)Producer生产者。

(2)Consumer消费者。

(3)Name Server命名服务器。

(4)Broker。

7、消息类型。

(1)同步消息。

(2)异步消息。

(3)单向消息。

(4)延时消息。

(5)批量消息。

(6)消息过滤。

8、集群搭建,集群分类。

(1)单机。

(2)集群。

(3)部署方式。

9、RocketMQ集群工作流程。

10、RocketMQ高级特性。

(1)消息的存储特性。

(2)高效读写的两个特征。

(3)消息储存结构。

(4)刷盘机制。

1)同步刷盘。

2)异步刷盘。

(5)高可用性。

(6)broker主从复制。

1)同步复制。

2)异步复制。

3)Broker的角色。

(7)负载均衡模式。读写负载均衡。

(8)消息重试。

1)顺序消息。

2)无序消息。

3)死信队列。

(9)消息重复消费。

1)消息重复消费的原因。

2)消息幂等。

3)常见的幂等方法:

1、MQ概述。

MQ(Message Queue)消息队列,是在消息的的传输过程中保存消息的容器。

多用于分布式系统之间进行通信。

MQ是一种提供消息队列的服务的中间件,消息中间件,提供了消息生产、存储、消费全过程API的软件系统。消息就是数据。

RocketMQ是一个统一消息引擎、轻量级数据处理平台。

2、为什么使用MQ。

(1)优势:

1)应用解耦。

系统的耦合性越高,容错性就越低,可维护性就越低,提高系统容错性和可维护性。

上游系统对下流系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高。在系统调用之间添加MQ,实现系统之间由同步到异步的转换。

2)异步提速。

生产方发完消息,可以继续下一步业务逻辑。提升用户体验和系统吞吐量。

3)削峰填谷、限流削峰。

MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢处理,避免了请求的丢失或系统被压垮。提高系统稳定性。 

4)数据收集。

分布式系统会产生海量级数据流,对这些数据进行收集。

(2)劣势:

1)系统可用性降低。

MQ有可能宕机,会对业务造成影响。(如何保证MQ的高可用性。)

2)系统复杂度提高。

系统由同步调用变成异步调用,增加了系统的复杂度。(如何保证消息没有被重复消费。怎么处理消息丢失情况。保证消息传递的顺序性。)

3)一致性问题。

A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。(如何保证消息数据处理的一致性。)

3、常见MQ产品。

(1)ActiveMQ。

基于java语言开发的产品。

(2)RabbitMQ。

基于Erlang语言开发的产品。

(3)Kafka。

Scala/java语言开发的产品,高吞吐量,常用于大数据领域的实时计算、日志采集等场景。

(4)RocketMQ。

使用Java语言开发的产品。

4、常见MQ协议。

(1)JMS。

 Java消息服务。

(2)STOMP。

面向流文本的消息协议。

(3)AMQP。

高级消息队列协议。

(4)MQTT。

消息队列遥测传输。

5、基本概念。

(1)Message,消息。

消息系统所有传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个topic主题。

(2)Topic,主题。

Topic表示一种类型消息的集合。每个主题包含若干条消息,每条消息只能属于一个主题。

一个生产者可以同时发送多种topic的消息,而一个消费者只订阅和消费一种topic消息。

(3)Tag,标签。

为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。消费者可以根据Tag实现对不同子主题的不同消费逻辑。

(4)Queue,队列。

存储消息的物理实体,一个topic中可以包含多个queue,每个queue存放的是消息,queue也被称为消息分区partition,一个topic的queue中的消息只能被一个消费组中的一个消费者消费。

一个queue中的消息不充许同一个消费者组中的多个消费者同时消费。

(5)消息标识MessageId|key。

RocketMQ中每个消息拥有唯一的MessageId,可以携带具有业务标识的key。

Key:由用户指定的业务相关的唯一标识。

生产者MessageId:生产者send()消息时生成MsgId。

规则:ProducerIP + 进程Pid + messageClientIDSetter类的classLoader的hashCode + 当前时间 + AutomicInteger自增计数器。

消费者Broker messageId:消息到达Broker后生成。

规则:brokerIP + 物理分区的offset(Queue中的偏移量)。

6、RocketMQ工作原理图,系统架构。

(1)Producer生产者。

消息生产者负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递。

(2)Consumer消费者。

消息消费者负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。

负载均衡:将一个topic中的不同的queue平均分配给同一个Consumer Group的不同的Consumer,把Queue的数量平均分配给消费者组。

容错:一个consumer挂掉了,该Consumer Group中的其他Consumer可以接着消费原Consumer消费的Queue。

一个topic类型的消息可以被多个消费者组同时消费。

消费者组只能消费一个topic的消息,不能同时消费多个topic消息。

一个消费者组中的消费者必须订阅完全相同的topic。

(3)Name Server命名服务器。

Name Server是一个Broker与topic路由的注册中心,支持Broker的动态注册与发现。

Broker管理:将Broker集群的信息注册到NameServer,作为路由信息的基本数据。提供心

跳检测机制,检查Broker是否存活。

路由信息管理:每个NameServer中都保存着Broker集群的整个路由信息,用于客户端查询的队列信息。Producer和Conumser可以获取信息。

1)路由注册。

2)路由剔除。禁用broker读写,没有请求后,才可以下线。

3)路由发现。

RocketMQ的路由发现采用的是Pull模型。客户端定时拉取主题最新的路由。

Push模型:推送模型,其实时性较好,是一个“发布-订阅”模型,需要维护一个长连接 ,适用于实时性要求高的场景。Client数量不多,server数据变化较频繁。

Pull模型:拉取模型,实时性较差。

Long Poling模型:长轮询模型,保持一段比较长时间的连接。

4)客户端Name Server选择策略。

首先采用的是随机策略进行的选择,失败后采用的是轮询策略。

(4)Broker。

Broker充当着消息中转角色,负责存储消息、转发消息。

Broker存储着消息相关的元数据,包括消费者组消费进度偏移offset、主题队列等。

部署RocketMQ的机器就叫Broker。

Broker Cluster消息服务器集群。接收消息、提供消息、消息持久化、过滤消息、高可用。

7、消息类型。

(1)同步消息。

即时性较强,重要的消息,必须有回执的消息。

例如:短信、通知(转账成功)。

send(msg);

(2)异步消息。

即时性较弱,需要有回执的消息。

例如:订单中的某些信息。

send(msg, new Callback() { });

(3)单向消息。

不需要有回执的消息。

例如:日志类消息。

sendoneway(msg);

(4)延时消息。

消息发送时并不直接发送到信息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用。

message.setDelayTimeLevel(3);

(5)批量消息。

一次发送多条消息,节约网络开销。

List<Message> msgLisit = new ArrayList<Message>();

Message msg1 = new Message(“topic1”, “hello rockermq 1”.getBytes(“UTF-8”));

msgList.add(msg1);

SendResult result = Producer.send(msgList);

(6)消息过滤。

语法过滤(属性过滤、语法过滤、SQL过滤),按照消息的某些属性过渡。

基本语法:

数值比较,例如:>,>=,<,<=,BETWEEN,=。

字符比较,例如:=,<>,IN。

IS NULL或者IS NOT NULL。

逻辑符号AND,OR,NOT。

8、集群搭建,集群分类。

(1)单机。

一个broker提供服务,宕机后服务瘫痪。

(2)集群。

多个broker提供,单机宕机后消息无法及时被消费。

多个master多个slave,master到slave消息同步方式,同步、异步。

(3)部署方式。

2主2从同步,两个name server,两个主broker,两个从broker。

2主2从异步。

2主无从。

两台虚拟机可以部署2主2从同步。

9、RocketMQ集群工作流程。

(1)Name Server启动,开启监听,等待broker、producer与consumer的连接。

(2)Broker启动,根据配置信息,连接所有的Name Server,并保持长连接。

如果broker中有现存数据,Name Server将保存topic与broker关系。

(3)producer发信息,连接某个Name Server,并建立长连接。

(4)Producer发消息。Topic不存在,由Name Server直接分配。Topic存在,由Name Server

创建Topic与broker关系,并分配。

(5)Producer在broker的topic选择一个消息队列,从列表中选择。

(6)Producer与broker建立长连接,用于发送消息。

(7)Producer发送消息。

10、RocketMQ高级特性。

(1)消息的存储特性。

[1] 消息生产者生成消息发送消息到MQ。

[2] MQ收到消息,将消息进行持久化,存储消息到数据库或文件系统。

[3] MQ返回ACK给生产者。ACK(Acknowledge character)。

[4] MQ push消息给对应的消费者。

[5] 消费者消费消息后,返回ACK给MQ,MQ认为消费消息成功。如果没有在指定时间接收到消费者返回ACK,MQ认定消息消费失败,重新执行4、5、6步骤。

[6] MQ从数据库或文件系统中删除消息。

(2)高效读写的两个特征。

RocketMQ高效的消息存储与读写方式,高速读写的原因。

[1] 磁盘读写方式,rocketMQ申请预留一块连续的磁盘空间进行读写。

[2] 零拷贝。跳过了数据读写的中间状态,跳过了用户态,不区分数据是哪个用户的。

 (3)消息储存结构。

MQ数据存储区域包含如下内容。

消息数据存储区域:topic、queueId、message。

消费逻辑队列:minoffset、maxoffset、consumeroffset。

索引:key索引、创建时间索引。

(4)刷盘机制。

FlushDisk Type 刷盘类型:AsyNC_FluSH异步刷盘,SyNC_FluSH同步刷盘。

同步刷盘:安全性高,效率低,速度慢,适用于对数据安全要求较高的业务。

异步刷盘:安全性低,效率高,速度快,适用于对数据处理速度要求较高的业务。

**  1)同步刷盘。**

[1] 生产者发送消息到MQ,MQ接到消息数据。

[2] MQ挂起生产者发送消息的线程。

[3] MQ将消息数据写入内存。

[4] 内存数据写入磁盘。

[5] 磁盘存储成功后返回success。

[6] MQ恢复挂起的生产者线程。

[7] 发送ACK到生产者。

2)异步刷盘。

[1] 生产者发送消息到MQ,MQ接收到消息数据。

[2] MQ将消息数据写入内存。

[3] 发送ACK到生产者。

(5)高可用性。

Name Server:无状态,name server之间不进行通信。全服务注册,每一个broker都会向每一个name server服务注册,每一个name server都会知道全部的broker的状态。

消息服务器broker:主从架构。2M-2S,两个master两个slave,两主两从。

消息生产者:如果多个生产者要发送消息给相同的topic,绑定到多个不同的group组。

Master挂掉后,其他master仍可以正常进行消息接收。

消息消费:rocketMQ会根据master的压力,确定由master还是由slave负责数据读取工作。

(6)broker主从复制。

1)同步复制。

Master接到消息后,先复制到slave,然后反馈给生产者写操作成功。

优点:数据安全,不丢失数据,出现故障容易恢复。

缺点:影响数据吞吐量,整体性能低。

2)异步复制。

Master接到消息后,立即返回给生产者写操作成功,当消息达到一定量后再异步复制到slave。

优点:数据吞吐量大,操作延迟低,性能高。

缺点:不安全,会出现数据丢失的现象,一旦master出现故障,从上次数据同步到故障时间的数据将丢失。

3)Broker的角色。

ASYNC_MASTER:异步复制master。

SYNC_MASTER:同步双写master。

SLAVE:从服务。

(7)负载均衡模式。读写负载均衡。

Producer负载均衡。

内部实现了不同的broker集群对同一个topic对应消息队列的负载均衡。

Consumer负载均衡。

平均分配。

循环平均分配,可以解决broker宕机造成消费者负载不均衡问题。

(8)消息重试。

当消息消费后未正常返回消费成功的信息,将启动消息重试机制。

消息重试机制。

1)顺序消息。

当消费者消费消息失败后,rocketMQ每隔1秒会自动进行消息重试。

应用会出现消息消费被堵塞的情况。

2)无序消息。

无序消息包括普通消息、定时消息、延时消息、事务消息。

无序消息适用于负载均衡模式下的消息消费。

为保障无序消息的消费,MQ设定了合理的消息重试间隔时长。

每次重试失败后,会增加时间间隔再次重试。间隔时间越来越长,最大达到间隔2小时,默认重试16次。

3)死信队列。

当消息消费重试到达指定的次数后,默认16次。RocketMQ将无法正常消费的消息称为死信队列。死信消息会被保存到一个全新的队列死信队列中。不会被再次重复消费,死信队列消息有效期3天,达到时限后被清除。

(9)消息重复消费。

1)消息重复消费的原因。

生产者发送了重复的消息:网络闪断,生产者宕机。

消息服务器投递了重复的消息:网络闪断。

2)消息幂等。

接口幂等:无论接口调用了几次,在后端服务产生的效果就一次,效果是一样的。

消息幂等:对同一条消息,无论消费多少次,结果保持一致,称为消息幂等性。

解决方法:使用id作为消息的key。在消费消息时,客户端对key作判断,未使用过放行,使用过抛弃。

3)常见的幂等方法:

新增:不幂等。例如: insert into order values(...)

查询:幂等。

删除:幂等。例如: delete from table where id=1

修改:不幂等。例如: update account set balance = balance+100 where no=1

修改:幂等。例如: update account set balance=100 where no=1

相关文章

微信公众号

最新文章

更多

目录