@@ -3549,54 +3549,17 @@ RCVBUF_ALLOCATOR:属于 SocketChannal 参数
35493549
35503550
35513551
3552- ### 工作流程
3553-
3554- NameServer 是一个简单的 Topic 路由注册中心,支持 Broker 的动态注册与发现,生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表
3555-
3556- NameServer 主要包括两个功能:
3557-
3558- * Broker 管理,NameServer 接受 Broker 集群的注册信息并保存下来作为路由信息的基本数据,提供** 心跳检测** 检查 Broker 活性
3559- * 路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息,然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费
3560-
3561- NameServer 特点:
3562-
3563- * NameServer 通常是集群的方式部署,各实例间相互不进行信息通讯
3564- * Broker 向每一台 NameServer 注册自己的路由信息,所以每个 NameServer 实例上面** 都保存一份完整的路由信息**
3565- * 当某个 NameServer 因某种原因下线了,Broker 仍可以向其它 NameServer 同步其路由信息
3566-
3567- BrokerServer 主要负责消息的存储、投递和查询以及服务高可用保证,在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备,也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等
3568-
3569- Broker 包含了以下几个重要子模块:
3570-
3571- * Remoting Module :整个 Broker 的实体,负责处理来自 clients 端的请求
3572-
3573- * Client Manager :负责管理客户端(Producer / Consumer )和维护 Consumer 的 Topic 订阅信息
3574-
3575- * Store Service :提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能
3576-
3577- * HA Service :高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能
3578-
3579- * Index Service :根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询
3580-
3581- ! [](https: // gitee.com/seazean/images/raw/master/Frame/RocketMQ-Broker工作流程.png)
3582-
3583-
3584-
3585- ***
3586-
3587-
3588-
35893552### 相关概念
35903553
35913554RocketMQ 主要由 Producer 、Broker 、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息,NameServer 负责管理 Broker
35923555
3556+ * 代理服务器(Broker Server ):消息中转角色,负责** 存储消息、转发消息** 。在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备,也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等
3557+ * 名字服务(Name Server ):充当** 路由消息** 的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表
35933558* 消息生产者(Producer ):负责** 生产消息** ,把业务应用系统里产生的消息发送到 Broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送,同步和异步方式均需要 Broker 返回确认信息,单向发送不需要;可以通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟
35943559* 消息消费者(Consumer ):负责** 消费消息** ,一般是后台系统负责异步消费,一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而提供了两种消费形式:
35953560 * 拉取式消费(Pull Consumer ):应用通主动调用 Consumer 的拉消息方法从 Broker 服务器拉消息,主动权由应用控制,一旦获取了批量消息,应用就会启动消费过程
35963561 * 推动式消费(Push Consumer ):该模式下 Broker 收到数据后会主动推送给消费端,实时性较高
3597-
35983562* 生产者组(Producer Group ):同一类 Producer 的集合,都发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,** 则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费**
3599-
36003563* 消费者组(Consumer Group ):同一类 Consumer 的集合,消费者实例必须订阅完全相同的 Topic ,消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面更容易的实现负载均衡和容错。RocketMQ 支持两种消息模式:
36013564 * 集群消费(Clustering ):相同 Consumer Group 的每个 Consumer 实例平均分摊消息
36023565 * 广播消费(Broadcasting ):相同 Consumer Group 的每个 Consumer 实例都接收全量的消息
@@ -4225,7 +4188,9 @@ consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
42254188
42264189#### 原理解析
42274190
4228- RocketMQ 分布式消息队列的消息过滤方式是在 Consumer 端订阅消息时再做消息过滤的。因为 RocketMQ 在 Producer 端写入消息和在 Consumer 端订阅消息采用** 分离存储** 的机制实现,Consumer 端订阅消息是需要通过 ConsumeQueue 这个消息消费的逻辑队列拿到一个索引,然后再从 CommitLog 里面读取真正的消息实体内容,所以绕不开其存储结构。
4191+ RocketMQ 分布式消息队列的消息过滤方式是在 Consumer 端订阅消息时再做消息过滤的,所以是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂
4192+
4193+ RocketMQ 在 Producer 端写入消息和在 Consumer 端订阅消息采用** 分离存储** 的机制实现,Consumer 端订阅消息是需要通过 ConsumeQueue 这个消息消费的逻辑队列拿到一个索引,然后再从 CommitLog 里面读取真正的消息实体内容
42294194
42304195ConsumeQueue 的存储结构如下,有 8 个字节存储的 Message Tag 的哈希值,基于 Tag 的消息过滤就是基于这个字段
42314196
@@ -4342,10 +4307,6 @@ RocketMQ 支持分布式事务消息,采用了 2PC 的思想来实现了提交
43424307
43434308RocketMQ 会开启一个定时任务,从 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC 中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息
43444309
4345- 在 RocketMQ 中,每条消息都会有对应的索引信息,Consumer 通过 ConsumeQueue (在 Broker 端)这个类似二级索引的结构来读取消息实体内容
4346-
4347- ! [](https: // gitee.com/seazean/images/raw/master/Frame/RocketMQ-事务工作流程.png)
4348-
43494310RocketMQ 的具体实现策略:如果写入的是事务消息,对消息的 Topic 和 Queue 等属性进行替换,同时将原来的 Topic 和 Queue 信息存储到** 消息的属性** 中,因为消息的主题被替换,所以消息不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费
43504311
43514312
@@ -4504,6 +4465,163 @@ public class Producer {
45044465
45054466## 系统机制
45064467
4468+ ### 消息存储
4469+
4470+ #### 工作流程
4471+
4472+ 分布式队列因为有高可靠性的要求,所以数据要进行持久化存储
4473+
4474+ 1. 消息生产者发送消息
4475+ 2. MQ 收到消息,将消息进行持久化,在存储中新增一条记录
4476+ 3. 返回 ACK 给生产者
4477+ 4. MQ push 消息给对应的消费者,然后等待消费者返回 ACK
4478+ 5. 如果消息消费者在指定时间内成功返回 ACK ,那么 MQ 认为消息消费成功,在存储中删除消息;如果 MQ 在指定时间内没有收到 ACK ,则认为消息消费失败,会尝试重新 push 消息,重复执行 4 、5 、6 步骤
4479+ 6. MQ 删除消息
4480+
4481+ ! [](https: // gitee.com/seazean/images/raw/master/Frame/RocketMQ-消息存取.png)
4482+
4483+
4484+
4485+
4486+
4487+ ***
4488+
4489+
4490+
4491+ #### 存储结构
4492+
4493+ RocketMQ 消息的存储是由 ConsumeQueue 和 CommitLog 配合完成 的,消息真正的物理存储文件是 CommitLog ,ConsumeQueue 是消息的逻辑队列,类似数据库的索引节点,存储的是指向物理存储的地址。每个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件
4494+
4495+ 每条消息都会有对应的索引信息,Consumer 通过 ConsumeQueue (在 Broker 端)这个结构来读取消息实体内容
4496+
4497+ ! [](https: // gitee.com/seazean/images/raw/master/Frame/RocketMQ-消息存储结构.png)
4498+
4499+ * CommitLog :消息主体以及元数据的存储主体,存储 Producer 端写入的消息内容,消息内容不是定长的。消息主要是顺序写入日志文件,单个文件大小默认1G,偏移量代表下一次写入的位置,当文件写满了就继续写入下一个文件
4500+ * ConsumerQueue :消息消费队列,存储消息在 CommitLog 的索引。RocketMQ 消息消费时要遍历 CommitLog 文件,并根据主题 Topic 检索消息,这是非常低效的。引入 ConsumeQueue 作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值,每个 ConsumeQueue 文件大小约 5. 72M
4501+ * IndexFile :为了消息查询提供了一种通过 Key 或时间区间来查询消息的方法,通过 IndexFile 来查找消息的方法不影响发送与消费消息的主流程。IndexFile 的底层存储为在文件系统中实现的 HashMap 结构,故 RocketMQ 的索引文件其底层实现为 hash 索引
4502+
4503+ RocketMQ 采用的是混合型的存储结构,即为 Broker 单个实例下所有的队列共用一个日志数据文件(CommitLog )来存储。混合型存储结构(多个 Topic 的消息实体内容都存储于一个 CommitLog 中)** 针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构** ,Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中。只要消息被持久化至磁盘文件 CommitLog 中,Producer 发送的消息就不会丢失,Consumer 也就肯定有机会去消费这条消息
4504+
4505+ 服务端支持长轮询模式,当消费者无法拉取到消息后,可以等下一次消息拉取,Broker 允许等待 30s 的时间,只要这段时间内有新消息到达,将直接返回给消费端。RocketMQ 的具体做法是,使用 Broker 端的后台服务线程 ReputMessageService 不停地分发请求并异步构建 ConsumeQueue (逻辑消费队列)和 IndexFile (索引文件)数据
4506+
4507+
4508+
4509+
4510+
4511+ ****
4512+
4513+
4514+
4515+ #### 存储优化
4516+
4517+ ##### 存储媒介
4518+
4519+ 两种持久化的方案:
4520+
4521+ * 关系型数据库 DB :IO 读写性能比较差,如果 DB 出现故障,则 MQ 的消息就无法落盘存储导致线上故障,可靠性不高
4522+
4523+ * 文件系统:消息刷盘至所部署虚拟机/ 物理机的文件系统来做持久化,分为异步刷盘和同步刷盘两种模式。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式,除非部署 MQ 机器本身或是本地磁盘挂了,一般不会出现无法持久化的问题
4524+
4525+ 注意:磁盘的顺序读写要比随机读写快很多,可以匹配上网络的速度,RocketMQ 的消息采用的顺序写
4526+
4527+ 页缓存(PageCache )是 OS 对文件的缓存,用于加速对文件的读写。程序对文件进行顺序读写的速度几乎接近于内存的读写速度,就是因为 OS 将一部分的内存用作 PageCache ,对读写访问操作进行了性能优化,
4528+
4529+ * 对于数据的写入,OS 会先写入至 Cache 内,随后通过异步的方式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上
4530+ * 对于数据的读取,如果一次读取文件时出现未命中 PageCache 的情况,OS 从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取(局部性原理)
4531+
4532+ 在 RocketMQ 中,ConsumeQueue 逻辑消费队列存储的数据较少,并且是顺序读取,在 PageCache 机制的预读取作用下,Consume Queue 文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。CommitLog 消息存储的日志数据文件读取内容时会产生较多的随机访问读取,严重影响性能。选择合适的系统 IO 调度算法和固态硬盘,比如设置调度算法为 Deadline ,随机读的性能也会有所提升
4533+
4534+
4535+
4536+ ***
4537+
4538+
4539+
4540+ ##### 内存映射
4541+
4542+ 操作系统分为用户态和内核态,文件操作、网络操作需要涉及这两种形态的切换,需要进行数据复制。一台服务器把本机磁盘文件的内容发送到客户端,分为两个步骤:
4543+
4544+ * read:读取本地文件内容
4545+
4546+ * write:将读取的内容通过网络发送出去
4547+
4548+ ! [](https: // gitee.com/seazean/images/raw/master/Frame/RocketMQ-文件与网络操作.png)
4549+
4550+ 补充:Prog → NET → I / O → 零拷贝部分的笔记详解相关内容
4551+
4552+ 通过使用 mmap 的方式,可以省去向用户态的内存复制,RocketMQ 充分利用零拷贝技术,提高消息存盘和网络发送的速度。
4553+
4554+ RocketMQ 主要通过 MappedByteBuffer 对文件进行读写操作,利用了 NIO 中的 FileChannel 模型将磁盘上的物理文件直接映射到用户态的内存地址中,将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率
4555+
4556+ MappedByteBuffer 内存映射的方式限制一次只能映射 1.5 ~ 2G 的文件至用户态的虚拟内存,所以 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G。RocketMQ 的文件存储使用定长结构来存储,方便一次将整个文件映射至内存
4557+
4558+
4559+
4560+ ***
4561+
4562+
4563+
4564+ #### 刷盘机制
4565+
4566+ 同步刷盘:只有在消息真正持久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个成功的 ACK 响应,保障 MQ 消息的可靠性,但是性能上会有较大影响,一般适用于金融业务应用该模式较多
4567+
4568+ 异步刷盘:利用 OS 的 PageCache 的优势,只要消息写入内存 PageCache 即可将成功的 ACK 返回给 Producer 端,降低了读写延迟,提高了 MQ 的性能和吞吐量。消息刷盘采用** 后台异步线程** 提交的方式进行,当内存里的消息量积累到一定程度时,触发写磁盘动作
4569+
4570+ 通过 Broker 配置文件里的 flushDiskType 参数设置采用什么方式,可以配置成 SYNC_FLUSH 、ASYNC_FLUSH 中的一个
4571+
4572+ ! [](https: // gitee.com/seazean/images/raw/master/Frame/RocketMQ-刷盘机制.png)
4573+
4574+
4575+
4576+ 官方文档:https: // github.com/apache/rocketmq/blob/master/docs/cn/design.md
4577+
4578+
4579+
4580+
4581+
4582+ ****
4583+
4584+
4585+
4586+ ### 通信机制
4587+
4588+ NameServer 是一个简单的 Topic 路由注册中心,支持 Broker 的动态注册与发现,生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表
4589+
4590+ NameServer 主要包括两个功能:
4591+
4592+ * Broker 管理,NameServer 接受 Broker 集群的注册信息并保存下来作为路由信息的基本数据,提供** 心跳检测** 检查 Broker 活性
4593+ * 路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息,然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费
4594+
4595+ NameServer 特点:
4596+
4597+ * NameServer 通常是集群的方式部署,各实例间相互不进行信息通讯
4598+ * Broker 向每一台 NameServer 注册自己的路由信息,所以每个 NameServer 实例上面** 都保存一份完整的路由信息**
4599+ * 当某个 NameServer 因某种原因下线了,Broker 仍可以向其它 NameServer 同步其路由信息
4600+
4601+ BrokerServer 主要负责消息的存储、投递和查询以及服务高可用保证,在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备,也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等
4602+
4603+ Broker 包含了以下几个重要子模块:
4604+
4605+ * Remoting Module :整个 Broker 的实体,负责处理来自 clients 端的请求
4606+
4607+ * Client Manager :负责管理客户端(Producer / Consumer )和维护 Consumer 的 Topic 订阅信息
4608+
4609+ * Store Service :提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能
4610+
4611+ * HA Service :高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能
4612+
4613+ * Index Service :根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询
4614+
4615+ ! [](https: // gitee.com/seazean/images/raw/master/Frame/RocketMQ-Broker工作流程.png)
4616+
4617+
4618+
4619+
4620+
4621+ ***
4622+
4623+
4624+
45074625### 集群设计
45084626
45094627#### 集群模式
0 commit comments