基础
什么是消息队列:具备生产者,消费者,消息队列的场景
应用场景:
- 异步(电商订单的创建、支付、发货流程)
- 解耦
- 削峰填谷(淘宝的双十一)
需解决的问题:
- 消息重复(唯一ID,幂等)
- 消息丢失(ack确认机制,死信队列)
- 消息堆积(增加消费者,增加消费能力,增加集群分担)
- 高可用(集群,主从,多副本)
- 高性能(集群,分区,多机部署,负载均衡)
RocketMQ
整体架构视频:小白debug的视频
面试题:
- CSDN上找的一篇,够用
- 知乎上找的,够用
官方文档**【中文】**【写的比较好的官方文档】
什么是RocketMQ
是阿里云开发,后并入Apche的开源分布式消息队列框架。支持高并发,低延迟,以及集群部署能力。
有哪些结构
Producer:
生产者,可以是生产者集群,想消息队列中发送消息。需要通过NameServer建立连接。发送消息时,需要配置目标Broker的IP,以及Topic名称,Tag标记(相当于Topic的二级标记)。
发送方式:
- 普通发送
- 批量发送
- 顺序发送
- 延迟返送
- 事务发送
Broker:
消息数据的物理节点,负责消息的持久化以及高可用。
支持的集群部署方式
- 一个Master,多个Slave
- 多个Master,多个Slave
也就是Master和Slave之间可以多对多,一对多的关系
Broker节点创建是,需要像NameServer注册,并发送所有的Topic信息
Consumer:
消费者,消费者需要分组,不同的分组有不同的消费策略
- 集群消费:一个Group只有一个Consumer可以接收消息,因此有负载均衡策略。
- 广播消费:每个Group的每个Consumer都会接收消息。
NameServer:
注册中心,负责Broker、Producer、Consumer之间的协调。
Broker: 向NameServer注册节点信息,并且同步所有Topic信息。
Producer: 向NameServer获取Broker和Topic信息,并于对应的Broker中的Master建立链接,发送消息。
Consumer: 向NameServer获取目标Topic的Broker地址信息,用于接收消息。
Consumer的负载均衡策略
注意: Consumer有两种模式,集群和广播模式,其中只有集群模式有均衡策略的必要,因为广播模式会向Group中的所有Consumer发送消息。
策略有:
- 平均策略(默认)
- Broker选择策略,也就是机器选择策略
- 一致性hash(避免增删Broker时数据的迁移)
消息堆积如何处理
方式1:增加消费者
如果Topic的数量大于Consumer,可以适当增加消费者提高消费的吞吐量。
但不是无脑增加消费者就能解决,因为如果消费者超过Topic数量,再怎么增加也没有用,因为多个消费者会变成资源争抢。
方式2:增加消费者的消费能力
如果不能增加消费者,那可以提高消费效率来减少消息堆积,例如 优化代码提高代码性能
方式3:辅助措施
- 增加系统监控,提前发现,提前做出响应
- 增加Broker节点,专门增加一个消费者,只是将消息搬运到新的Broker,不做任何处理
有哪些消费方式
两种:Push、Pull
Push:
类似监听注册机制,监听某个Topic绑定一个消费函数,当收到消息就会触发函数。
缺点:如果消费能力不足,可能导致消息堆积
Pull:
主动绑定某个Topic获取消息
优点:可以主动控制消息的消费,减少消息堆积
如果让你考虑实现一个消息队列中间件,你会怎么实现
- 考虑实现多个主题的消息队列
- 为了提升并发性能,将每个主题的消息队列拆分到多个服务器,提高并发量
- 为了提高可用性,对每个主题增加主从备份,并设置一致性策略,例如Raft,Quorum,Zookeeper等
- 为了避免节点宕机问题,需要进行持久化,将每个队列存储到文件,并定期刷到磁盘中
- 为了解决多节点随机写的效率问题,集中将多个队列数据放到一个文件,对这个文件进行一次性刷盘,利用顺序写提高刷盘性能
- 由于增加了集群,因此需要增加一个注册发现服务,也就是注册中心,可以使用Zookeeper或者直接类似RocketMQ的自研NameServer
- 需要考虑和处理一些问题,例如
- 消息堆积问题
- 集群的监控问题
- 消息的消费策略
- 负载均衡问题
- 容错机制
什么是零拷贝,RocketMQ使用了什么方式
知识点:什么是零拷贝
零拷贝是一个相对的概念,针对用户空间于内核空间的交互,使用mmap (内存映射)+write操作让用户空间和内核空间实现零拷贝,但是内核空间依然有3次拷贝,因此这里的零拷贝相对的是用户和内核空间。
第二种方式是使用 sendfile,此方法也是内核方法,可以直接在内核空间发起数据拷贝,从磁盘拷贝到内核空间在拷贝到IO,并且这里的拷贝不会有CPU参与,而是使用DMA控制器(Direct Memory Access)。这里指的是CPU零拷贝。
其中Kafka使用了Sendfile,RocketMQ使用了mmap+write方式
因此kafka的性能要比RocketMQ高一些
为什么RocketMQ不使用sendfile呢?
因为sendfile返回值是一个count
而mmap可以返回数据映射,可以方便当出现消息无法消费时,放入死信队列,或者其他操作
Kafka
简单说明和部署文档
为什么kafka要弃用zookeeper
抖音视频:为什么kafka要弃用zookeeper
kafkazookeeper_198">为什么kafka要弃用zookeeper
- Zookeeper需要单独部署,维护麻烦
- Kafka只用到Zookeeper部分功能,因此过于臃肿
- 关键原因,Kafka遇到性能瓶颈,使用Zookeeper的进行元数据管理,在数据写入,以及集群管理上导致性能相对较差。
RocketMQ和Kafka
共同点和不同点
架构基本相同:
- 都有Broker节点
- 都有Topic主题,Queue和Prtitions(本质都是队列)
- 都使用中间层进行Broker集群关联,RockerMQ使用自研的NameServer,Kafka使用Zookeeper(最新已弃用Zookeeper,使用Kraft一致性协议)
不同点:
- Broker的协调方式不同,Kraft一致性协议具有去中心化的特点,更加简洁轻量,部署更方便。而RocketMQ使用自研的NameServer进行协调。
- 底层的拷贝方式不同,Kafka使用sendfile进行零拷贝,相比RocketMQ使用的mmap+write的方式,效率更高。但是RocketMQ正式基于mmap内存映射的方式,能够提高对消息的操作性,因此提供了更多可用功能,例如死信队列。
- 持久化方式不同,kafka使用segment的方式对每个队列进行持久化,这样如果队列很多,会有随机写的问题。而RocketMQ利用CommitLog的方式,通过顺序写,提高持久化性能。
怎么选择(各自的使用场景和优缺点)
维护频率:
RabbitMQ相对于Kafka和RocketMQ更新频率更慢,因此在后续bug的修复以及维护上后两者更具有优势。
可维护性:
RabbitMQ使用Erlang进行开发,维护更麻烦。而Kafka和RocketMQ都是基于Java开发(kafka底层是scala开发),由于Java市场更广因此更易于维护。
分布式架构:
RabbitMQ只有主从架构,中间有一个Exchang交换器,架构更简单,性能和可用性都不错。对于小公司来说,更适合。
Kafka和RocketMQ都是分布式架构,针对的都是高性能,高可用,因此如果没有那么大的流量需求会有大材小用的感觉,也会增加维护成本。
性能上:
RabbitMQ是万/秒级别的吞吐量,kafka和RocketMQ是10万/秒级别,并且由于kafka使用sendfile技术,甚至能达到17万/秒,因此可以根据自己的业务情况进行选择。
做技术就是一种权衡,简单易维护的性能可能欠佳,性能高的可能较难维护,要量力而行,不要一味追求某一种。
功能上:
RabbitMQ | Kafka | RocketMQ | |
---|---|---|---|
消息协议 | AMQP, MQTT, STOMP, HTTP | 自定义协议 (Kafka协议) | 自定义协议 (RocketMQ协议) |
消息持久化 | 支持(消息和队列) | 支持(默认持久化日志存储) | 支持(存储在磁盘上的主题日志) |
消息确认机制 | 支持(同步/异步确认) | 支持(消费者确认消息) | 支持(消费消息确认机制) |
消息路由 | 支持多种路由模式(Direct, Fanout, Topic, Headers) | 基于消息键(分区) | 支持标签(Tag)与消息队列的绑定 |
高可用性和容错 | 集群模式、镜像队列 | 多副本机制(分区副本) | 主从模式,支持跨地域部署 |
负载均衡 | 支持(基于消费者的公平分发) | 基于分区的负载均衡 | 基于消息队列分配的负载均衡 |
延迟 | 支持(但通常较高,适用于低延迟场景) | 较低的消息传递延迟 | 较低的消息传递延迟 |
消息顺序 | 按队列保证顺序(但不跨队列) | 保证分区内的顺序 | 保证队列内的顺序 |
吞吐量 | 较低(适合较小规模应用) | 非常高(适用于大规模数据流) | 高吞吐量(适合大规模分布式系统) |
消息过滤 | 支持(基于路由键和主题) | 不支持(消息只能基于分区进行选择) | 支持(基于Tag和MessageKey过滤) |
集群与分布式支持 | 支持(集群和高可用队列) | 支持(跨数据中心的分布式集群) | 支持(多集群支持,跨地域高可用) |
消息重复消费 | 支持(通过消息确认机制) | 支持(但不能完全避免,取决于消费端实现) | 支持(可以配置消息消费的重复策略) |
消息过期与死信队列(DLQ) | 支持(死信队列功能) | 支持(过期消息可删除,但不支持死信队列) | 支持(有死信队列功能) |
流量控制与背压 | 支持(基于消息队列的长度) | 支持(基于内存/磁盘容量的压力控制) | 支持(基于生产者和消费者的流控) |
监控和管理工具 | 提供管理控制台、API、Prometheus 集成 | 提供JMX、Prometheus、Kafka Manager 等工具 | 提供管理控制台、Prometheus 集成 |
客户端支持 | 支持多语言客户端(Java, Python, Ruby, Go等) | 支持多语言客户端(Java, Python, Go等) | 支持多语言客户端(Java, Python, Go等) |
集成与扩展 | 插件支持(如延迟消息、Shovel、Federation等) | 支持多种消费者和生产者接口 | 支持多种消费者和生产者接口 |
主要用途 | 实时消息处理,任务队列,分布式系统中的通信 | 大数据流处理,日志收集,数据传输 | 分布式消息传递,金融、电商系统中的消息通信 |
Docker部署RocketMQ
保姆级教程
Docker部署Kafka
使用kraft部署kafka
使用zookeeper部署kafka