RocketMQ

subtitle

Posted by Kun on April 23, 2025

RocketMQ

RocketMQ 是一个高性能、高可扩展的分布式消息队列系统,广泛应用于企业级应用中,尤其是处理大规模的消息流、事件驱动架构以及流式数据的场景。它支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型。

RocketMQ 架构概述

RocketMQ 的架构设计是基于 分布式高可用性 的。它的主要组件包括 NameServerBrokerProducerConsumerRocketMQ Console(Dashboard)。下面我将详细介绍这些组件以及它们的交互关系。

1. NameServer

NameServer 是 RocketMQ 系统中的路由服务,主要负责提供 Broker 的路由信息。它通过客户端(Producer 和 Consumer)与 Broker 之间的心跳和元数据交换,维护整个集群的 Broker 信息。

主要职责:

  • 路由信息存储:保存消息队列与 Broker 的映射关系。
  • 服务发现:Producer 和 Consumer 通过 NameServer 获取可用的 Broker 集群信息。
  • 负载均衡:当生产者或消费者与 Broker 连接时,NameServer 会根据负载均衡策略来提供相关 Broker 地址。

特点:

  • 无状态:NameServer 不存储消息数据,只负责路由信息的管理。
  • 高可用:支持多节点部署,通过定期的心跳保持集群状态。

2. Broker

Broker 是 RocketMQ 的核心组件,负责消息的存储、转发和消费。它是消息的生产者和消费者之间的中介。

主要职责:

  • 消息存储:Broker 存储所有的消息,并按顺序保存。
  • 消息转发:Broker 将生产者发布的消息转发给相应的消费者。
  • 消息调度:根据消息的消费模式(顺序消费、并发消费),Broker 控制消息的消费顺序和并发度。
  • 消息持久化:消息在 Broker 中持久化到磁盘,以确保消息不丢失。

组成:

  • Master Broker:主要的 Broker 节点,负责接收和存储消息。
  • Slave Broker:从节点,用于数据的备份和负载均衡。

分区和队列:

  • 主题(Topic):消息的分类,生产者将消息发送到指定的主题,消费者订阅该主题。
  • 队列(Queue):每个 Topic 下的具体队列,消费者根据队列来消费消息。

3. Producer

Producer 是消息的生产者,负责将消息发送到 RocketMQ 中的指定 Topic 上。

主要职责:

  • 发送消息:Producer 将消息发送到指定的 Topic。
  • 消息路由:Producer 将消息发送到 NameServer,由 NameServer 提供相关的 Broker 路由信息。
  • 消息确认:Producer 会等待 Broker 返回确认(如成功或者失败的确认)。

4. Consumer

Consumer 是消息的消费者,负责从 RocketMQ 中获取消息并进行处理。

主要职责:

  • 消费消息:Consumer 订阅指定的 Topic 并消费消息。
  • 消息确认:Consumer 消费完消息后需要向 Broker 发送消息确认,表明该消息已成功消费。
  • 并发消费:RocketMQ 支持多 Consumer 并发消费同一 Topic 下的消息。

5. RocketMQ Console (Dashboard)

RocketMQ Console(也叫 Dashboard)是 RocketMQ 提供的管理和监控工具。它提供了图形化的界面来查看 RocketMQ 集群的状态、查看队列的消息、生产者与消费者的性能指标等。

主要功能:

  • 查看 Broker 状态:监控 Broker 的健康状况、负载等。
  • 消息查看与管理:可以查看指定队列中的消息,并支持删除消息等操作。
  • 集群监控:提供集群级别的性能监控,显示生产者、消费者的消息发送和消费情况。
  • 配置管理:可以通过 Dashboard 修改集群配置。

6. 消息传递流程

RocketMQ 的消息传递流程如下:

  1. ProducerNameServer 请求可用的 Broker 地址。
  2. Producer 将消息发送到指定的 Broker
  3. Broker 将消息存储到磁盘中,并确认消息已接收。
  4. ConsumerNameServer 请求消息队列的路由信息。
  5. ConsumerBroker 获取消息并进行消费。
  6. ConsumerBroker 发送消息确认(ACK)。

7. RocketMQ 消息模型

RocketMQ 支持两种主要的消息模型:

1) 发布/订阅模式 (Pub/Sub)

  • 一个生产者将消息发布到某个 Topic,多个消费者可以订阅该 Topic 并接收消息。此模式适合广播消息。

2) 点对点模式 (P2P)

  • 每个消息有且只有一个消费者消费。消息只会被一个消费者接收。此模式适合任务分发场景。

8. RocketMQ 集群架构

RocketMQ 支持集群模式,能够水平扩展,增强高可用性。

1) Broker 集群

  • Master-Slave 架构:一个 Topic 可以分布在多个 Broker 上,Broker 之间通过主从模式来保证数据的冗余备份。
  • 负载均衡:Producer 和 Consumer 会根据路由信息选择合适的 Broker 来发送和消费消息。

2) 高可用性

  • Broker 主从复制:每个 Broker 有一个主节点和一个或多个从节点,主节点负责数据写入,从节点负责数据备份。
  • NameServer 集群:NameServer 是无状态的,可以通过多个实例部署来提高可用性。

部署

1. NameServer 部署

首先,我们需要部署 NameServer

1.1 拉取 RocketMQ 镜像

1
docker pull apache/rocketmq:5.1.0

1.2 启动 NameServer 容器

1
2
3
4
docker run -d --name rmqnamesrv \
-p 9876:9876 \
--network rmq-net \
apache/rocketmq:5.1.0 sh mqnamesrv
  • -d:后台运行
  • --name rmqnamesrv:给容器命名
  • -p 9876:9876:将本地的 9876 端口映射到容器内的 9876 端口
  • --network rmq-net:使用之前创建的网络 rmq-net
  • sh mqnamesrv:启动 NameServer

1.3 查看 NameServer 容器的日志

1
docker logs -f rmqnamesrv

确保输出中看到:

1
The Name Server boot success. serializeType=JSON

2. Broker 部署

2.1 启动 Broker 容器

1
2
3
4
5
6
docker run -d --name rmqbroker \
-p 10911:10911 \
-p 10909:10909 \
--network rmq-net \
-e NAMESRV_ADDR=rmqnamesrv:9876 \
apache/rocketmq:5.1.0 sh mqbroker -n rmqnamesrv:9876
  • -d:后台运行
  • --name rmqbroker:给容器命名
  • -p 10911:10911 -p 10909:10909:将本地端口 10911 和 10909 映射到容器内的相应端口
  • --network rmq-net:指定网络 rmq-net,这样 Broker 能够访问 NameServer
  • -e NAMESRV_ADDR=rmqnamesrv:9876:设置环境变量,指示 Broker 连接 NameServer
  • sh mqbroker -n rmqnamesrv:9876:启动 Broker,连接 NameServer

2.2 查看 Broker 容器的日志

1
docker logs -f rmqbroker

确保输出中看到:

1
The broker[broker-a, 172.20.0.3:10911] boot success. serializeType=JSON and name server is rmqnamesrv:9876

3. Dashboard 部署

3.1 启动 Dashboard 容器

1
2
3
4
5
docker run -d --name rmq-dashboard \
-p 8080:8080 \
--network rmq-net \
-e JAVA_OPTS="-Drocketmq.namesrv.addr=rmqnamesrv:9876" \
apache/rocketmq-dashboard:latest
  • -d:后台运行
  • --name rmq-dashboard:给容器命名
  • -p 8080:8080:将本地的 8080 端口映射到容器内的 8080 端口
  • --network rmq-net:指定网络 rmq-net,使其能访问 rmqnamesrv
  • -e JAVA_OPTS="-Drocketmq.namesrv.addr=rmqnamesrv:9876":指定 namesrv.addr 环境变量,确保 Dashboard 连接到 NameServer

3.2 查看 Dashboard 容器的日志

1
docker logs -f rmq-dashboard

确保输出中看到:

1
RocketMQ Console start success.

3.3 访问 Dashboard

在浏览器中访问:

1
http://<your-server-ip>:8080

使用默认的用户名和密码 admin / admin 登录。

4. Docker Compose 部署


构建docker-compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
version: '3'
services:
  # NameServer 服务
  rmqnamesrv:
    image: apache/rocketmq:5.1.0
    container_name: rmqnamesrv
    ports:
      - "9876:9876"
    networks:
      - rmq-net
    command: sh mqnamesrv
    restart: always

  # Broker 服务
  rmqbroker:
    image: apache/rocketmq:5.1.0
    container_name: rmqbroker
    ports:
      - "10911:10911"
      - "10909:10909"
    networks:
      - rmq-net
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
    command: sh mqbroker -n rmqnamesrv:9876
    restart: always

  # RocketMQ Dashboard 管理界面
  rmqdashboard:
    image: apache/rocketmq-dashboard:latest
    container_name: rmq-dashboard
    ports:
      - "8080:8080"
    networks:
      - rmq-net
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876
    restart: always

networks:
  rmq-net:
    driver: bridge

小内存服务器优化方案

  1. 内存 & JVM 优化(最重要)

RocketMQ 对内存较敏感,尤其是 Broker 节点,建议在 docker-compose 里配置合适的 JVM 参数。

优化示例:

1
2
3
4
yaml复制编辑rmqbroker:
  environment:
    - NAMESRV_ADDR=rmqnamesrv:9876
    - JAVA_OPT_EXT=-Xms512m -Xmx512m -Xmn256m -XX:+UseG1GC -XX:MaxGCPauseMillis=200
JVM 参数说明: 参数 说明   —————- —————-   -Xms / -Xmx 初始/最大堆内存   -Xmn 年轻代大小   -XX:+UseG1GC 使用 G1 GC   -XX:MaxGCPauseMillis=200 最大 GC 暂停时间

Namesrv 占用资源小,可以设置得更低:

1
2
3
yaml复制编辑rmqnamesrv:
  environment:
    - JAVA_OPT_EXT=-Xms256m -Xmx256m -Xmn128m -XX:+UseG1GC

2. 数据挂载(持久化)

Broker 存储消息在容器内部不安全,挂掉数据会丢。需要挂载数据卷。

推荐:

1
2
3
4
yaml复制编辑rmqbroker:
  volumes:
    - ./data/store:/home/rocketmq/store
    - ./logs:/home/rocketmq/logs

Dashboard 没有数据挂载需求,可以不挂。


3. 日志管理

建议挂载日志目录方便查看:

1
2
3
4
5
6
7
yaml复制编辑rmqnamesrv:
  volumes:
    - ./logs:/home/rocketmq/logs

rmqbroker:
  volumes:
    - ./logs:/home/rocketmq/logs

4. 健康检查(推荐)

自动检测服务健康,防止异常不自恢复:

1
2
3
4
5
6
yaml复制编辑rmqnamesrv:
  healthcheck:
    test: ["CMD", "curl", "-f", "http://localhost:9876"]
    interval: 30s
    timeout: 10s
    retries: 3

Dashboard:

1
2
3
4
5
6
yaml复制编辑rmqdashboard:
  healthcheck:
    test: ["CMD", "curl", "-f", "http://localhost:8080"]
    interval: 30s
    timeout: 10s
    retries: 3

5. 最佳实践汇总版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
yaml复制编辑version: '3'

services:
  rmqnamesrv:
    image: apache/rocketmq:5.1.0
    container_name: rmqnamesrv
    ports:
      - "9876:9876"
    networks:
      - rmq-net
    command: sh mqnamesrv
    environment:
      - JAVA_OPT_EXT=-Xms256m -Xmx256m -Xmn128m -XX:+UseG1GC
    volumes:
      - ./logs:/home/rocketmq/logs
    restart: always

  rmqbroker:
    image: apache/rocketmq:5.1.0
    container_name: rmqbroker
    ports:
      - "10911:10911"
      - "10909:10909"
    networks:
      - rmq-net
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
      - JAVA_OPT_EXT=-Xms512m -Xmx512m -Xmn256m -XX:+UseG1GC -XX:MaxGCPauseMillis=200
    command: sh mqbroker -n rmqnamesrv:9876
    volumes:
      - ./data/store:/home/rocketmq/store
      - ./logs:/home/rocketmq/logs
    restart: always

  rmqdashboard:
    image: apache/rocketmq-dashboard:latest
    container_name: rmq-dashboard
    ports:
      - "8080:8080"
    networks:
      - rmq-net
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876
    restart: always

networks:
  rmq-net:
    driver: bridge

6. 后续优化建议
优化项 描述
broker.conf 配置 Broker 高级参数,如刷盘策略、消息大小限制等
集群部署 多个 Broker,Master-Slave 架构
Prometheus + Grafana 监控 RocketMQ 性能指标
分布式存储 挂载 NFS/云盘 进行数据备份

项目引入

Spring 项目是基于 Spring Boot,RocketMQ 官方提供了 spring-boot-starter 来快速集成

1. 引入依赖

如果是 Maven 项目,添加 RocketMQ 依赖:

1
2
3
4
5
6
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>  <!-- 根据实际情况选择版本 -->
</dependency>

2.配置application.yml

1
2
3
4
5
rocketmq:
  name-server: 127.0.0.1:9876 # NameServer 地址
  producer:
    group: my-producer-group   # 自定义生产者组名

3. 生产者示例(发送消息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制编辑import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send")
    public String sendMessage() {
        rocketMQTemplate.send("test-topic", MessageBuilder.withPayload("Hello RocketMQ").build());
        return "消息已发送";
    }
}

4. 消费者示例(接收消息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制编辑import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
        topic = "test-topic",
        consumerGroup = "my-consumer-group")
public class TestConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("收到消息:" + message);
    }
}