RocketMQ
RocketMQ 是一个高性能、高可扩展的分布式消息队列系统,广泛应用于企业级应用中,尤其是处理大规模的消息流、事件驱动架构以及流式数据的场景。它支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型。
RocketMQ 架构概述
RocketMQ 的架构设计是基于 分布式 和 高可用性 的。它的主要组件包括 NameServer、Broker、Producer、Consumer 和 RocketMQ Console(Dashboard)。下面我将详细介绍这些组件以及它们的交互关系。
1. NameServer
NameServer 是 RocketMQ 系统中的路由服务,主要负责提供 Broker 的路由信息。它通过客户端(Producer 和 Consumer)与 Broker 之间的心跳和元数据交换,维护整个集群的 Broker 信息。
主要职责:
- 路由信息存储:保存消息队列与 Broker 的映射关系。
- 服务发现:Producer 和 Consumer 通过 NameServer 获取可用的 Broker 集群信息。
- 负载均衡:当生产者或消费者与 Broker 连接时,NameServer 会根据负载均衡策略来提供相关 Broker 地址。
特点:
- 无状态:NameServer 不存储消息数据,只负责路由信息的管理。
- 高可用:支持多节点部署,通过定期的心跳保持集群状态。
2. Broker
Broker 是 RocketMQ 的核心组件,负责消息的存储、转发和消费。它是消息的生产者和消费者之间的中介。
主要职责:
- 消息存储:Broker 存储所有的消息,并按顺序保存。
- 消息转发:Broker 将生产者发布的消息转发给相应的消费者。
- 消息调度:根据消息的消费模式(顺序消费、并发消费),Broker 控制消息的消费顺序和并发度。
- 消息持久化:消息在 Broker 中持久化到磁盘,以确保消息不丢失。
组成:
- Master Broker:主要的 Broker 节点,负责接收和存储消息。
- Slave Broker:从节点,用于数据的备份和负载均衡。
分区和队列:
- 主题(Topic):消息的分类,生产者将消息发送到指定的主题,消费者订阅该主题。
- 队列(Queue):每个 Topic 下的具体队列,消费者根据队列来消费消息。
3. Producer
Producer 是消息的生产者,负责将消息发送到 RocketMQ 中的指定 Topic 上。
主要职责:
- 发送消息:Producer 将消息发送到指定的 Topic。
- 消息路由:Producer 将消息发送到 NameServer,由 NameServer 提供相关的 Broker 路由信息。
- 消息确认:Producer 会等待 Broker 返回确认(如成功或者失败的确认)。
4. Consumer
Consumer 是消息的消费者,负责从 RocketMQ 中获取消息并进行处理。
主要职责:
- 消费消息:Consumer 订阅指定的 Topic 并消费消息。
- 消息确认:Consumer 消费完消息后需要向 Broker 发送消息确认,表明该消息已成功消费。
- 并发消费:RocketMQ 支持多 Consumer 并发消费同一 Topic 下的消息。
5. RocketMQ Console (Dashboard)
RocketMQ Console(也叫 Dashboard)是 RocketMQ 提供的管理和监控工具。它提供了图形化的界面来查看 RocketMQ 集群的状态、查看队列的消息、生产者与消费者的性能指标等。
主要功能:
- 查看 Broker 状态:监控 Broker 的健康状况、负载等。
- 消息查看与管理:可以查看指定队列中的消息,并支持删除消息等操作。
- 集群监控:提供集群级别的性能监控,显示生产者、消费者的消息发送和消费情况。
- 配置管理:可以通过 Dashboard 修改集群配置。
6. 消息传递流程
RocketMQ 的消息传递流程如下:
- Producer 向 NameServer 请求可用的 Broker 地址。
- Producer 将消息发送到指定的 Broker。
- Broker 将消息存储到磁盘中,并确认消息已接收。
- Consumer 向 NameServer 请求消息队列的路由信息。
- Consumer 从 Broker 获取消息并进行消费。
- Consumer 向 Broker 发送消息确认(ACK)。
7. RocketMQ 消息模型
RocketMQ 支持两种主要的消息模型:
1) 发布/订阅模式 (Pub/Sub):
- 一个生产者将消息发布到某个 Topic,多个消费者可以订阅该 Topic 并接收消息。此模式适合广播消息。
2) 点对点模式 (P2P):
- 每个消息有且只有一个消费者消费。消息只会被一个消费者接收。此模式适合任务分发场景。
8. RocketMQ 集群架构
RocketMQ 支持集群模式,能够水平扩展,增强高可用性。
1) Broker 集群:
- Master-Slave 架构:一个 Topic 可以分布在多个 Broker 上,Broker 之间通过主从模式来保证数据的冗余备份。
- 负载均衡:Producer 和 Consumer 会根据路由信息选择合适的 Broker 来发送和消费消息。
2) 高可用性:
- Broker 主从复制:每个 Broker 有一个主节点和一个或多个从节点,主节点负责数据写入,从节点负责数据备份。
- NameServer 集群:NameServer 是无状态的,可以通过多个实例部署来提高可用性。
部署
1. NameServer 部署
首先,我们需要部署 NameServer。
1.1 拉取 RocketMQ 镜像
1
docker pull apache/rocketmq:5.1.0
1.2 启动 NameServer 容器
1
2
3
4
docker run -d --name rmqnamesrv \
-p 9876:9876 \
--network rmq-net \
apache/rocketmq:5.1.0 sh mqnamesrv
-d
:后台运行--name rmqnamesrv
:给容器命名-p 9876:9876
:将本地的 9876 端口映射到容器内的 9876 端口--network rmq-net
:使用之前创建的网络rmq-net
sh mqnamesrv
:启动 NameServer
1.3 查看 NameServer 容器的日志
1
docker logs -f rmqnamesrv
确保输出中看到:
1
The Name Server boot success. serializeType=JSON
2. Broker 部署
2.1 启动 Broker 容器
1
2
3
4
5
6
docker run -d --name rmqbroker \
-p 10911:10911 \
-p 10909:10909 \
--network rmq-net \
-e NAMESRV_ADDR=rmqnamesrv:9876 \
apache/rocketmq:5.1.0 sh mqbroker -n rmqnamesrv:9876
-d
:后台运行--name rmqbroker
:给容器命名-p 10911:10911 -p 10909:10909
:将本地端口 10911 和 10909 映射到容器内的相应端口--network rmq-net
:指定网络rmq-net
,这样 Broker 能够访问 NameServer-e NAMESRV_ADDR=rmqnamesrv:9876
:设置环境变量,指示 Broker 连接 NameServersh mqbroker -n rmqnamesrv:9876
:启动 Broker,连接 NameServer
2.2 查看 Broker 容器的日志
1
docker logs -f rmqbroker
确保输出中看到:
1
The broker[broker-a, 172.20.0.3:10911] boot success. serializeType=JSON and name server is rmqnamesrv:9876
3. Dashboard 部署
3.1 启动 Dashboard 容器
1
2
3
4
5
docker run -d --name rmq-dashboard \
-p 8080:8080 \
--network rmq-net \
-e JAVA_OPTS="-Drocketmq.namesrv.addr=rmqnamesrv:9876" \
apache/rocketmq-dashboard:latest
-d
:后台运行--name rmq-dashboard
:给容器命名-p 8080:8080
:将本地的 8080 端口映射到容器内的 8080 端口--network rmq-net
:指定网络rmq-net
,使其能访问rmqnamesrv
-e JAVA_OPTS="-Drocketmq.namesrv.addr=rmqnamesrv:9876"
:指定namesrv.addr
环境变量,确保 Dashboard 连接到 NameServer
3.2 查看 Dashboard 容器的日志
1
docker logs -f rmq-dashboard
确保输出中看到:
1
RocketMQ Console start success.
3.3 访问 Dashboard
在浏览器中访问:
1
http://<your-server-ip>:8080
使用默认的用户名和密码 admin
/ admin
登录。
4. Docker Compose 部署
构建docker-compose
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
version: '3'
services:
# NameServer 服务
rmqnamesrv:
image: apache/rocketmq:5.1.0
container_name: rmqnamesrv
ports:
- "9876:9876"
networks:
- rmq-net
command: sh mqnamesrv
restart: always
# Broker 服务
rmqbroker:
image: apache/rocketmq:5.1.0
container_name: rmqbroker
ports:
- "10911:10911"
- "10909:10909"
networks:
- rmq-net
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
command: sh mqbroker -n rmqnamesrv:9876
restart: always
# RocketMQ Dashboard 管理界面
rmqdashboard:
image: apache/rocketmq-dashboard:latest
container_name: rmq-dashboard
ports:
- "8080:8080"
networks:
- rmq-net
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876
restart: always
networks:
rmq-net:
driver: bridge
小内存服务器优化方案
-
内存 & JVM 优化(最重要)
RocketMQ 对内存较敏感,尤其是 Broker 节点,建议在 docker-compose 里配置合适的 JVM 参数。
优化示例:
1
2
3
4
yaml复制编辑rmqbroker:
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
- JAVA_OPT_EXT=-Xms512m -Xmx512m -Xmn256m -XX:+UseG1GC -XX:MaxGCPauseMillis=200
JVM 参数说明: 参数 说明 —————- —————- -Xms / -Xmx 初始/最大堆内存 -Xmn 年轻代大小 -XX:+UseG1GC 使用 G1 GC -XX:MaxGCPauseMillis=200 最大 GC 暂停时间
Namesrv 占用资源小,可以设置得更低:
1
2
3
yaml复制编辑rmqnamesrv:
environment:
- JAVA_OPT_EXT=-Xms256m -Xmx256m -Xmn128m -XX:+UseG1GC
2. 数据挂载(持久化)
Broker 存储消息在容器内部不安全,挂掉数据会丢。需要挂载数据卷。
推荐:
1
2
3
4
yaml复制编辑rmqbroker:
volumes:
- ./data/store:/home/rocketmq/store
- ./logs:/home/rocketmq/logs
Dashboard 没有数据挂载需求,可以不挂。
3. 日志管理
建议挂载日志目录方便查看:
1
2
3
4
5
6
7
yaml复制编辑rmqnamesrv:
volumes:
- ./logs:/home/rocketmq/logs
rmqbroker:
volumes:
- ./logs:/home/rocketmq/logs
4. 健康检查(推荐)
自动检测服务健康,防止异常不自恢复:
1
2
3
4
5
6
yaml复制编辑rmqnamesrv:
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9876"]
interval: 30s
timeout: 10s
retries: 3
Dashboard:
1
2
3
4
5
6
yaml复制编辑rmqdashboard:
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080"]
interval: 30s
timeout: 10s
retries: 3
5. 最佳实践汇总版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
yaml复制编辑version: '3'
services:
rmqnamesrv:
image: apache/rocketmq:5.1.0
container_name: rmqnamesrv
ports:
- "9876:9876"
networks:
- rmq-net
command: sh mqnamesrv
environment:
- JAVA_OPT_EXT=-Xms256m -Xmx256m -Xmn128m -XX:+UseG1GC
volumes:
- ./logs:/home/rocketmq/logs
restart: always
rmqbroker:
image: apache/rocketmq:5.1.0
container_name: rmqbroker
ports:
- "10911:10911"
- "10909:10909"
networks:
- rmq-net
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
- JAVA_OPT_EXT=-Xms512m -Xmx512m -Xmn256m -XX:+UseG1GC -XX:MaxGCPauseMillis=200
command: sh mqbroker -n rmqnamesrv:9876
volumes:
- ./data/store:/home/rocketmq/store
- ./logs:/home/rocketmq/logs
restart: always
rmqdashboard:
image: apache/rocketmq-dashboard:latest
container_name: rmq-dashboard
ports:
- "8080:8080"
networks:
- rmq-net
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876
restart: always
networks:
rmq-net:
driver: bridge
6. 后续优化建议
优化项 | 描述 |
---|---|
broker.conf | 配置 Broker 高级参数,如刷盘策略、消息大小限制等 |
集群部署 | 多个 Broker,Master-Slave 架构 |
Prometheus + Grafana | 监控 RocketMQ 性能指标 |
分布式存储 | 挂载 NFS/云盘 进行数据备份 |
项目引入
Spring 项目是基于 Spring Boot,RocketMQ 官方提供了 spring-boot-starter
来快速集成
1. 引入依赖
如果是 Maven 项目,添加 RocketMQ 依赖:
1
2
3
4
5
6
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version> <!-- 根据实际情况选择版本 -->
</dependency>
2.配置application.yml
1
2
3
4
5
rocketmq:
name-server: 127.0.0.1:9876 # NameServer 地址
producer:
group: my-producer-group # 自定义生产者组名
3. 生产者示例(发送消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制编辑import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send")
public String sendMessage() {
rocketMQTemplate.send("test-topic", MessageBuilder.withPayload("Hello RocketMQ").build());
return "消息已发送";
}
}
4. 消费者示例(接收消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制编辑import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
topic = "test-topic",
consumerGroup = "my-consumer-group")
public class TestConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到消息:" + message);
}
}