|
4 | 4 |
|
5 | 5 | > 生产者在生产消息、消费者在消费消息之前,都需要连接到 NameServer 上,从 NameServer 拉取路由信息,从而实现消息的生产、存储与消费。为生产者和消费者提供路由信息,是 NameServer 的主要功能之一,NameServer 内部由一个路由管理器维护着路由信息,并且可以动态地管理 Broker 节点信息,包含注册、剔除、发现及心跳等。本文将着重介绍 NameServer 的路由管理机制,文章中使用到的代码均来自 RocketMQ 5.0 版本,感兴趣的读者可以 clone 下来阅读源码与注释。文中的代码仓库地址:[点击跳转](https://github.com/itlemon/rocketmq-5.0.0)。 |
6 | 6 |
|
7 | | -## 一、路由信息数据结构原理 |
| 7 | +## 一、路由信息数据结构分析 |
8 | 8 |
|
9 | 9 | NameServer 是保证消息正确地从生产者到消费者的“指挥官”,它提供了路由管理,服务注册与服务发现、故障剔除等机制,这些机制的背后原理都都依赖于 NameServer 的路由功能,接下来,将详细介绍路由信息的数据结构,并按照 2m-2s 部署方式部署两组 Broker,通过打断点的形式一起看看 Broker 数据在路由管理器中是如何存储的,方便大家理解 NameServer 的路由原理。 |
10 | 10 |
|
@@ -263,43 +263,62 @@ DefaultMQProducer ->> SendMessageProcessor: 后续再发送消息,Topic路由 |
263 | 263 | #### 1.2.4 brokerLiveTable的数据状况 |
264 | 264 |
|
265 | 265 |  |
266 | | -上图中 brokerLiveTable 对应于运行时的数据结构如下所示: |
| 266 | +上图中 brokerLiveTable,它是一个 Map 结构,Map 的键是 BrokerAddrInfo 对象,Map 的值是 BrokerLiveInfo 对象,在运行时,键的数据结构为: |
267 | 267 |
|
268 | 268 | ```json |
269 | 269 | { |
270 | | - "172.20.192.218:10921": { |
271 | | - "lastUpdateTimestamp": 1613230150507, |
272 | | - "dataVersion": "dataVersionObject", |
273 | | - "channel": "channelObject", |
274 | | - "haServerAddr": "172.20.192.218:10922" |
275 | | - }, |
276 | | - "172.20.192.218:10911": { |
277 | | - "lastUpdateTimestamp": 1613230145828, |
278 | | - "dataVersion": "dataVersionObject", |
279 | | - "channel": "channelObject", |
280 | | - "haServerAddr": "172.20.192.218:10912" |
281 | | - }, |
282 | | - "172.20.192.218:10941": { |
283 | | - "lastUpdateTimestamp": 1613230166459, |
284 | | - "dataVersion": "dataVersionObject", |
285 | | - "channel": "channelObject", |
286 | | - "haServerAddr": "172.20.192.218:10942" |
287 | | - }, |
288 | | - "172.20.192.218:10931": { |
289 | | - "lastUpdateTimestamp": 1613230154587, |
290 | | - "dataVersion": "dataVersionObject", |
291 | | - "channel": "channelObject", |
292 | | - "haServerAddr": "172.20.192.218:10932" |
293 | | - } |
| 270 | + "clusterName":"testCluster", |
| 271 | + "brokerAddr":"192.168.3.113:10911", |
| 272 | + "hash":-1481157086 |
| 273 | +} |
| 274 | +``` |
| 275 | + |
| 276 | +为了方便展示,将键简化展示在下面的运行时数据结构中: |
| 277 | + |
| 278 | +```json |
| 279 | +{ |
| 280 | + "192.168.3.113:10911":{ |
| 281 | + "lastUpdateTimestamp":1678025133747, |
| 282 | + "heartbeatTimeoutMillis":120000, |
| 283 | + "dataVersion":"dataVersionObject", |
| 284 | + "channel":"channelObject", |
| 285 | + "haServerAddr":"192.168.3.113:10912" |
| 286 | + }, |
| 287 | + "192.168.3.113:10941":{ |
| 288 | + "lastUpdateTimestamp":1678025138611, |
| 289 | + "heartbeatTimeoutMillis":120000, |
| 290 | + "dataVersion":"dataVersionObject", |
| 291 | + "channel":"channelObject", |
| 292 | + "haServerAddr":"192.168.3.113:10942" |
| 293 | + }, |
| 294 | + "192.168.3.113:10931":{ |
| 295 | + "lastUpdateTimestamp":1678025130572, |
| 296 | + "heartbeatTimeoutMillis":120000, |
| 297 | + "dataVersion":"dataVersionObject", |
| 298 | + "channel":"channelObject", |
| 299 | + "haServerAddr":"192.168.3.113:10932" |
| 300 | + }, |
| 301 | + "192.168.3.113:10921":{ |
| 302 | + "lastUpdateTimestamp":1678025130599, |
| 303 | + "heartbeatTimeoutMillis":120000, |
| 304 | + "dataVersion":"dataVersionObject", |
| 305 | + "channel":"channelObject", |
| 306 | + "haServerAddr":"192.168.3.113:10922" |
| 307 | + } |
294 | 308 | } |
295 | 309 | ``` |
296 | 310 |
|
297 | | -### 3.2 路由信息注册 |
| 311 | +上面的 brokerLiveTable 中,存储的就是两组 Broker 共 $4$ 个实例注册到 NameServer 的存活信息,在探活机制中,NameServer 会检查每一个 Broker 实例,上一次更新时间戳(lastUpdateTimestamp)加上心跳超时时间(heartbeatTimeoutMillis,默认值为 $2$ 分钟),如果结果小于当前时间戳,则认为该 Broker 实例不再活跃了,将触发剔除操作,关闭通道等。 |
| 312 | + |
| 313 | +## 二、路由信息管理原理分析 |
| 314 | + |
| 315 | +### 2.1 路由信息注册 |
| 316 | + |
| 317 | +路由信息注册通常是指,将自身的信息告诉服务注册中心,在 RocketMQ 中,这里的“自身”是指 Broker,而服务注册中心指得就是 NameServer。Broker 在启动后,会向所有的 NameServer 注册自身的元信息,通常包括:集群名称(clusterName)、Broker 地址(brokerAddr)、Broker 名称(brokerName)、Broker ID(brokerId)、高可用地址(haServerAddr)、Topic相关信息(topicConfigWrapper)、过滤服务器列表、通信通道等信息等。这些元信息的注册,都是通过心跳机制来实现的,所谓的心跳机制,一般都是通过定时任务来实现的,按照一定的频率向NameServer 发送元信息数据,从而实现续约。每个 Broker 会每隔 $30$ 秒向 NameServer 发送心跳,NameServer 接收到 Broker 心跳数据后,会去实时更新 brokerLiveTable 中 BrokerLiveInfo 的 lastUpdateTimestamp 字段(上一次心跳时间戳),当然,NameServer 也有检查机制,会每隔 $10$ 秒扫描 brokerLiveTable,如果发现某个 Broker 的 lastUpdateTimestamp 字段超过 $2$min 没有更新,那么就认为该 Broker 存在故障,NameServer 会主动将其从路由表中剔除,同时关闭通信通道。 |
298 | 318 |
|
299 | | -路由信息注册通常是指,将自身的信息告诉服务注册中心,在RocketMQ中,这里的“自身”是指Broker,而服务注册中心指得就是NameServer。Broker在启动后,会向所有的NameServer注册自身的元信息,通常包括:集群名称(clusterName)、Broker地址(brokerAddr)、Broker名称(brokerName)、Broker ID(brokerId)、高可用地址(haServerAddr)、Topic相关信息(topicConfigWrapper)、过滤服务器列表、通信通道等信息等。这些元信息的注册,都是通过心跳机制来实现的,所谓的心跳机制,一般都是通过定时任务来实现的,按照一定的频率向NameServer发送元信息数据,从而实现续约。每个Broker会每隔30秒向NameServer发送心跳,NameServer接收到Broker心跳数据后,会去实时更新brokerLiveTable中BrokerLiveInfo的lastUpdateTimestamp字段(上一次心跳时间戳),当然,NameServer也有检查机制,会每隔10秒扫描brokerLiveTable,如果发现某个Broker的lastUpdateTimestamp字段超过2min没有更新,那么就认为该Broker存在故障,NameServer会主动将其从路由表中剔除,同时关闭通信通道。 |
300 | | -那么Broker是如何向NameServer进行注册的呢?下面的内容将一一揭秘。 |
| 319 | +那么 Broker 是如何向 NameServer 进行注册的呢?下面的内容将一一揭秘。 |
301 | 320 |
|
302 | | -从BrokerController的start()方法可以看出,Broker在启动的时候,会注册一个定时任务,每隔30s(默认值,可配置10~60s)向NameServer发送元数据信息。`brokerConfig.getRegisterNameServerPeriod()`的默认值是30s。 |
| 321 | +从 BrokerController 的 start() 方法可以看出,Broker 在启动的时候,会注册一个定时任务,每隔 $30$s(默认值,可配置 $10$~$60$s)向 NameServer 发送元数据信息。`brokerConfig.getRegisterNameServerPeriod()`的默认值是 $30$s。 |
303 | 322 |
|
304 | 323 | ```java |
305 | 324 | // 注册一个定时任务,默认每隔30s向NameServer发送元数据信息 |
|
0 commit comments