服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - 编程技术 - 从RocketMQ的Broker源码层面验证一下这两个点

从RocketMQ的Broker源码层面验证一下这两个点

2021-03-15 23:51SH的全栈笔记 编程技术

Producer从启动到发送消息的整个过程,从源码级别分析了Producer在发送消息到Broker的时候,是如何拿到Broker的数据的,如何从多个MessageQueue中选择对应的Queue发送消息。

 从RocketMQ的Broker源码层面验证一下这两个点

本篇博客会从源码层面,验证在RocketMQ基础概念剖析,并分析一下Producer的底层源码中提到的结论,分别是:

  • Broker在启动时,会将自己注册到所有的NameServer上
  • Broker在启动之后,会每隔30S向NameServer发送心跳

之前的文章中,我们知道了RocketMQ中的一些核心概念,例如Broker、NameServer、Topic和Tag等等。Producer从启动到发送消息的整个过程,从源码级别分析了Producer在发送消息到Broker的时候,是如何拿到Broker的数据的,如何从多个MessageQueue中选择对应的Queue发送消息。

但是由于篇幅原因,文章开头提到的两个已知结论在上篇博客里并没没有对其进行验证,这次就从源码层面来验证一下。

一开头就看到Broker主从架构相关的源码

在上篇博客中提到过,Broker为了保证自身的高可用,会采取一主一从的架构。即使Master Broker因为意外原因挂了,Slave Broker上还有一份完整的数据,Broker可以继续提供服务。

从RocketMQ的Broker源码层面验证一下这两个点

isEnableDLegerCommitLog中提到的DLeger可以先不管,我们目前只需要知道其默认返回的结果是false。所以Broker首次启动的时候,就会执行被If包裹住的逻辑。

RocketMQ本身是有主从架构的,但是功能不够完善,如果Master Broker出现了故障,需要人工的将Slave Broker切换成Master。

就有点类似于手动的将一台Redis设置成另一台Redis的Slave节点,如果此时Redis的Master挂了,还需要手动的进行切换一样。为了解决这个问题,Redis搞出了Sentinel,可以在发生故障的时候自动的实现故障转移。所以RocketMQ在4.5版本之后推出的Dleger差不多也是这么个东西,除此之外,Dleger还可以实现多副本。

不使用Dleger时,主从数据如何进行同步

先给出结论,在RocketMQ的主从架构下,主从同步采取的是Slave主动拉取的方式。

如果当前执行注册的Broker角色是Slave,那就会使用ScheduledExecutorService启动一个周期性的定时任务,每隔10秒就会去Master同步一次,同步的数据包括Topic的相关配置、Consumer的消费偏移量、延迟消息的Offset、订阅组的相关数据和配置。

从RocketMQ的Broker源码层面验证一下这两个点

ScheduledExecutorService的作用和原理下面会做简单介绍。

首次启动时强制进行Broker注册

从RocketMQ的Broker源码层面验证一下这两个点

因为是首次启动,所以参数forceRegister被直接设置成了true。

使用ScheduledExecutorService启动定时任务

通过入口进来之后,Broker会启动一个定时任务,周期性的去注册。ScheduledExecutorService底层就是一个newSingleThreadScheduledExecutor,只有一个线程的线程池,其关键的参数corePoolSize值为1,然后按照指定的频率周期性的执行某个任务。

从RocketMQ的Broker源码层面验证一下这两个点

ScheduledExecutorService主要的功能有两个,分别是:

  • ScheduledExecutorService 以固定的频率执行任务
  • ScheduledExecutorService 执行完之后,间隔制定的时间后再执行下一个任务

使用scheduleAtFixedRate实现心跳机制

此处我们使用的是scheduleAtFixedRate,如下图。

至于执行的频率,我们能够配置的范围最大不能超过一分钟,也就是说这个范围是在10-60秒之间,默认30秒执行一次,这也就验证了每30秒,Broker会向NameServer发送一次心跳。

获取执行频率的这个判断有点意思,甚至看起来有那么一丝丝简洁,但是理解其具体可配置的时间范围可能需要花点时间。在实际业务性代码中,个人建议还是不要这么写,业务中代码的可读性和可维护性我认为是需要放在首位的。

值得注意的是,此处启动心跳,给了一个10秒的延迟,因为在不使用Dleger的情况下,在之前的逻辑中已经执行过一次注册了。如果不做延迟,那么几乎是同一个时间就会有两次注册操作,而这明显是不符合预期的;同时forceRegister也从true变成了通过函数isForceRegister来进行获取。

调用registerBrokerAll注册

定时任务注册完成之后,之后的每次触发都会执行registerBrokerAll方法来执行注册,你可能会有疑问,我当前不就是一个Broker吗,怎么名字有个后缀All?那是因为NameServer会有多个,Broker启动的时候会将自己注册到所有的NameServer上去。当然,口说无凭,我们继续看下去。

继续往里走,如果当前满足注册条件,则会实际的执行注册操作。那具体满足什么条件呢?由变量forceRegister和一个needRegister方法来决定,forceRegister默认是true,所以当第一执行这个逻辑的时候是一定会执行注册操作的。

从RocketMQ的Broker源码层面验证一下这两个点

通过对比数据版本判断当前Broker是否需要进行注册

感兴趣的话,可以继续跟随文章了解一下,needRegister是根据什么来判断是否需要注册的。

首先,Broker一旦注册到了NameServer之后,由于Producer不停的在写入数据,Consumer也在不停的消费数据,Broker也可能因为故障导致MessageQueue等关键路由信息发生变动,NameServer中的数据和Broker中实际的数据就会不一致,如果不及时更新,Producer拉取到的路由数据就可能有误。

所以每次定时任务触发的时候会去对比NameServer和Broker的数据,如果发现数据版本不一致,Broker会重新进行注册,将最新的数据更新到NameServer。说直白一点,就是做一个数据定时更新。以下红框中的代码就是数据对比的核心代码。

从RocketMQ的Broker源码层面验证一下这两个点

当Broker和所有的NameServer节点一一完成数据对比之后,就会进行结果判定,但凡有一个节点数据不一致,都需要进行重新注册,把最新的数据更新到NameServer,核心判断逻辑同样用红框标出。

从RocketMQ的Broker源码层面验证一下这两个点

至此,其实我们就已经完成了 Broker在启动的时候会向所有NameServer进行注册 的验证。但是由于后续仍然有值得关注发光点,我们继续后续的源码阅读。

使用CountDownLatch获取所有注册异步任务的返回结果

除此之外,还值得注意的是在needRegister中,对于和多个NameServer的交互,RocketMQ是通过线程池异步实现的,同时使用了CountDownLatch来等待所有的请求结束,返回结果给主线程。

从RocketMQ的Broker源码层面验证一下这两个点

既然聊到了CountDownLatch,就顺带提一下。假设我们有5个互不依赖的计算任务,如果快速的计算出结果并返回呢?那当然是5个任务并发执行,这就需要通过新开线程实现,结果就无法一起返回了。

而CountDownLatch可以让主线程等待,等待这5个计算任务全部结束之后,唤醒主线程再继续后面的逻辑。这就是CountDownLatch的作用,如果平时只是单纯的CRUD功能的话,可能连CountDownLatch是什么都做不知道,这也是为什么大厂面试会问这些问题,因为在大厂的复杂业务背景下,你必须要会使用它们。

指定需要注册之后,接下来就是核心的注册方法了,核心逻辑由registerBrokerAll来实现。Broker同样会去每一个NameServer节点上注册自己,并且为了提前执行的效率,同样开线程采用了异步的方式。在获取所有结果时,同样的使用了CountDownLatch。

从RocketMQ的Broker源码层面验证一下这两个点

使用CopyOnWriteArrayList存储注册请求的返回

除此之外,用于保存注册结果的列表,使用的是CopyOnWriteArrayList,被面试虐过的同学应该熟悉。我们知道此处开启了多线程去不同的NameServer注册,写入注册结果的时候,多线程对同一个列表进行写入,会产生线程安全的问题。

从RocketMQ的Broker源码层面验证一下这两个点

而我们知道ArrayList是非线程安全的,这也是为什么此处要使用CopyOnWriteArrayList来保存注册结果。为什么CopyOnWriteArrayList能够保证线程安全?

这归功于COW(Copy On Write),读请求时共用同一个List,涉及到写请求时,会复制出一个List,并在写入数据的时候加入独占锁。比起直接对所有操作加锁,读写锁的形式分离了读、写请求,使其互不影响,只对写请求加锁,降低了加锁的消耗,提升了整体操作的并发。

上面并发执行的注册操作,具体做了哪些事情呢?先看代码。

从RocketMQ的Broker源码层面验证一下这两个点

上面就是单个注册的所有逻辑,可以看到在构建完请求之后,有一个oneway的判断。

oneway值为false,表示单向通信,Broker不关心NameServer的返回,也不会触发任何回调函数。接下来Broker就会把已经写进request body的所有数据发送给NameServer。请求数据统一由一个叫TopicConfigSerializeWrapper的Wrapper给包裹住。

从RocketMQ的Broker源码层面验证一下这两个点

其可以看为两部分:

  • 存在该Broker节点上的所有Topic的数据
  • 数据版本

然后带着这些数据,Broker会同步的调用invokeSync发送请求给NameServe,并且在执行之后触发实现特定功能的回调函数。

EOF

至此,我们完成了对开篇所提结论的验证,同时也发现了RocketMQ的主从架构、Master和Slave同步数据的方式、心跳机制的实现等等,也基本从源码中看完了Broker启动的所有流程。看这些老哥写的源码还是挺有意思的,之后有时间随缘再看看NameServer端相关的源码吧。

原文地址:https://mp.weixin.qq.com/s/S2BXGEYbws2CXIfY5vbRug

延伸 · 阅读

精彩推荐
  • 编程技术让开发效率倍增的 VS Code 插件

    让开发效率倍增的 VS Code 插件

    今天来分享一些提升开发效率的实用 VS Code 插件!Better Comments 扩展可以帮助我们在代码中创建更人性化的注释,有不同形式和颜色的注释供我们选择。 ...

    前端充电宝7132022-04-21
  • 编程技术Delphi - Indy idMessage和idSMTP实现邮件的发送

    Delphi - Indy idMessage和idSMTP实现邮件的发送

    这篇文章主要介绍了Delphi - Indy idMessage和idSMTP实现邮件的发送,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下...

    JJ_JeremyWu6592020-09-22
  • 编程技术简单、好懂的Svelte实现原理

    简单、好懂的Svelte实现原理

    本文会围绕一张流程图和两个Demo讲解,正确的食用方式是用电脑打开本文,跟着流程图、Demo一边看、一边敲、一边学...

    魔术师卡颂4822021-11-10
  • 编程技术2021年值得关注的React PDF 库

    2021年值得关注的React PDF 库

    今天,许多网络应用程序为其用户提供内置的PDF浏览选项。然而,选择一个并不容易,因为它们的功能远远超过显示PDF。在这篇文章中,我将评估5个React的...

    TianTianUp5232021-06-21
  • 编程技术AIOps,SRE工程师手中的利器

    AIOps,SRE工程师手中的利器

    AIOps开始成为一种极为重要的站点可靠性工程工具。它能够高效吸纳观察数据、参与数据以及来自第三方工具的数据,判断系统运行状态并保证其处于最佳...

    至顶网5972021-03-08
  • 编程技术从Context源码实现谈React性能优化

    从Context源码实现谈React性能优化

    这篇文章主要介绍Context的实现原理,源码层面掌握React组件的render时机,从而写出高性能的React组件,源码层面了解shouldComponentUpdate、React.memo、PureComponen...

    魔术师卡颂5312020-12-20
  • 编程技术用户态 Tcpdump 如何实现抓到内核网络包的?

    用户态 Tcpdump 如何实现抓到内核网络包的?

    在网络包的发送和接收过程中,绝大部分的工作都是在内核态完成的。那么问题来了,我们常用的运行在用户态的程序 tcpdump 是那如何实现抓到内核态的包...

    开发内功修炼11612021-09-08
  • 编程技术真正聪明的程序员,总有办法不加班

    真正聪明的程序员,总有办法不加班

    工作效率提升了,就可以少加班了,聪明的程序员,总会有一堆可以提升编码效率的工具?当一种工具满足不了工作需求,就去探索新的,今天纬小创就给...

    今日头条12482021-03-04