SpringAMQP

在实际业务开发中,我们不会通过 RabbitMQ 控制台手动收发消息,而是基于编程实现。由于 RabbitMQ 遵循 AMQP 协议,具备跨语言特性,但官方 Java 客户端编码复杂,因此生产环境中常结合 Spring 生态使用 ——Spring 官方提供的 SpringAMQP 框架,基于 RabbitMQ 封装了消息收发模板,且支持 SpringBoot 自动装配,大幅简化开发。

SpringAMQP 官方地址:https://spring.io/projects/spring-amqp

SpringAMQP 核心提供三大功能:

  • 自动声明队列、交换机及绑定关系(无需手动在控制台创建)
  • 基于注解的监听器模式,异步接收消息(无需手动编写监听逻辑)
  • 封装 RabbitTemplate 工具类,简化消息发送(无需处理底层连接、序列化等细节)

本章将详细讲解如何通过 SpringAMQP 实现 RabbitMQ 的消息收发,覆盖核心模型与实际业务改造。

1.1 导入 Demo 工程

课前资料提供了用于学习 SpringAMQP 的 Demo 工程,建议按以下步骤导入:

  1. 将工程压缩包解压到本地工作空间(如 D:\workspace
  2. 打开 IDEA,通过「File -> Open」选择解压后的工程根目录,完成导入

1.1.1 工程结构说明

导入后的工程包含 3 个模块,职责分工明确:

  • mq-demo:父工程,统一管理项目依赖(如 SpringAMQP、Lombok、单元测试等)
  • publisher:消息生产者模块,负责发送消息到 RabbitMQ
  • consumer:消息消费者模块,负责监听 RabbitMQ 队列并处理消息

1.1.2 核心依赖配置

父工程 mq-demopom.xml 已预先配置 SpringAMQP 相关依赖,子模块可直接继承使用,无需重复引入。核心依赖如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>cn.itcast.demo</groupId>
<artifactId>mq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<packaging>pom</packaging>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/>
</parent>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ客户端及Spring封装-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试依赖,用于编写消息发送测试用例-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>

1.2 快速入门(简单队列模型)

实际开发中消息通常经交换机路由到队列,但为了快速上手,入门案例将跳过交换机,直接向队列发送消息(此模式仅用于测试,生产环境极少使用)。

核心流程:

  • 生产者(publisher)直接发送消息到指定队列
  • 消费者(consumer)监听该队列,异步接收并处理消息

1.2.1 步骤 1:手动创建队列(控制台操作)

首先在 RabbitMQ 管理控制台创建测试队列,操作如下:

  1. 访问 RabbitMQ 控制台(默认地址:http:// 虚拟机 IP:15672,如 http://192.168.150.101:15672
  2. 输入用户名(如 hmall)和密码(如 123)登录,进入「Queues」页面
  3. 点击「Add a new queue」,输入队列名 simple.queue,其他参数(如 Durability、Auto delete)保持默认,点击「Add queue」完成创建
  4. 刷新页面,可在队列列表中看到 simple.queue,状态为「Ready: 0」(暂无消息)

1.2.2 步骤 2:生产者发送消息(publisher 模块)

1.2.2.1 配置 MQ 连接信息

publisher 模块的 src/main/resources/application.yml 中,添加 RabbitMQ 连接配置(需替换为你的虚拟机 IP 和账号密码):

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.150.101 # 你的RabbitMQ所在虚拟机IP
port: 5672 # RabbitMQ默认通信端口(控制台端口是15672,注意区分)
virtual-host: /hmall # 虚拟主机(用于隔离不同项目的消息资源)
username: hmall # 登录RabbitMQ的用户名
password: 123 # 登录密码

1.2.2.2 编写消息发送测试类

publisher 模块的 src/test/java/com/itheima/publisher/amqp 目录下,创建 SpringAmqpTest 类,使用 RabbitTemplate 发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.itheima.publisher.amqp;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest // 启动Spring容器,自动装配RabbitTemplate
public class SpringAmqpTest {

// 自动注入SpringAMQP提供的消息发送模板
@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 测试向simple.queue队列发送消息
*/
@Test
public void testSimpleQueue() {
// 1.指定目标队列名
String queueName = "simple.queue";
// 2.准备消息内容(可自定义,如文本、JSON等)
String message = "hello, spring amqp!";
// 3.发送消息:参数1=队列名,参数2=消息内容
rabbitTemplate.convertAndSend(queueName, message);
}
}

1.2.2.3 执行测试

右键运行 testSimpleQueue 方法,执行成功后,回到 RabbitMQ 控制台的「Queues」页面:

  • 刷新 simple.queue 的详情,可看到「Ready: 1」(1 条待消费消息)
  • 点击队列名进入详情页,在「Get messages」区域点击「Get Message (s)」,可查看消息内容为 hello, spring amqp!

1.2.3 步骤 3:消费者接收消息(consumer 模块)

1.2.3.1 配置 MQ 连接信息

与生产者配置一致,在 consumer 模块的 src/main/resources/application.yml 中添加 RabbitMQ 连接:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 通信端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码

1.2.3.2 编写消息监听类

consumer 模块的 src/main/java/com/itheima/consumer/listener 目录下,创建 SpringRabbitListener 类,通过 @RabbitListener 注解监听队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.itheima.consumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component // 注入Spring容器,让Spring管理监听逻辑
public class SpringRabbitListener {

/**
* 监听simple.queue队列:一旦队列有新消息,自动触发该方法
* @param msg 队列中的消息内容(Spring自动反序列化为String类型)
* @throws InterruptedException 模拟处理耗时(可选)
*/
@RabbitListener(queues = "simple.queue") // 声明监听的队列名
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}

1.2.3.3 测试消息接收

  1. 启动 consumer 模块的主类(如 ConsumerApplication

  2. 回到 publisher 模块,再次运行 testSimpleQueue 方法发送消息

  3. 查看 consumer 模块的控制台输出,会看到:

    1
    spring 消费者接收到消息:【hello, spring amqp!】
  4. 同时 RabbitMQ 控制台的 simple.queue 队列「Ready」数量变为 0(消息已被消费)

1.3 WorkQueues 模型(工作队列模型)

当消息处理耗时较长(如生成报表、调用第三方接口),生产者发送消息的速度可能远快于消费者处理速度,导致消息堆积。此时可使用WorkQueues 模型:多个消费者绑定到同一个队列,共同分担消息处理任务,提升整体处理效率。

1.3.1 核心原理

  • 队列中的消息会被均匀分配给多个消费者(默认策略),但可通过配置实现「能者多劳」
  • 同一条消息仅会被一个消费者处理(避免重复处理)
  • 适用于「任务拆分」场景,如电商订单异步处理、日志批量消费等

1.3.2 步骤 1:创建工作队列(控制台操作)

参考 1.2.1 的步骤,在 RabbitMQ 控制台创建队列 work.queue(用于模拟消息堆积场景)。

1.3.3 步骤 2:生产者发送大量消息(模拟堆积)

publisher 模块的 SpringAmqpTest 类中,添加循环发送消息的测试方法,模拟每秒发送 50 条消息(远超单个消费者处理能力):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 测试WorkQueues模型:循环发送50条消息,模拟消息堆积
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 目标队列名
String queueName = "work.queue";
// 消息模板(后缀加序号区分不同消息)
String message = "hello, message_";
// 循环发送50条消息
for (int i = 0; i < 50; i++) {
// 发送消息:message + i 如"hello, message_0"
rabbitTemplate.convertAndSend(queueName, message + i);
// 每20毫秒发送一条,相当于每秒发送50条
Thread.sleep(20);
}
}

1.3.4 步骤 3:多个消费者接收消息(默认均匀分配)

consumer 模块的 SpringRabbitListener 类中,添加 2 个监听方法(模拟两个处理速度不同的消费者),注意通过 Thread.sleep 模拟处理耗时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.time.LocalTime;

// 消费者1:处理速度快(每次休眠20毫秒,每秒约处理50条)
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20); // 模拟处理耗时
}

// 消费者2:处理速度慢(每次休眠200毫秒,每秒约处理5条)
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
// 用System.err输出,便于区分两个消费者的日志
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200); // 模拟处理耗时
}

1.3.5 步骤 4:测试默认分配策略

  1. 启动 consumer 模块(两个消费者同时启动)
  2. 运行 publisher 模块的 testWorkQueue 方法发送 50 条消息
  3. 查看 consumer 控制台输出,会发现:
    • 消费者 1 和消费者 2 各处理 25 条消息(默认均匀分配)
    • 消费者 1 很快处理完 25 条,进入空闲状态
    • 消费者 2 需要长时间处理 25 条,导致整体任务耗时过长(约 5 秒)

问题分析:默认策略按「消息数量」分配,不考虑消费者处理能力,导致资源浪费(快的空闲、慢的繁忙),无法解决消息堆积问题。

1.3.6 步骤 5:配置「能者多劳」(优化分配策略)

通过 prefetch 参数控制消费者「预取消息数量」,设置为 1 时,消费者必须处理完当前消息后,才能获取下一条消息,实现「能者多劳」。

修改 consumer 模块的 application.yml,添加如下配置:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次仅预取1条消息,处理完成后再取新消息

1.3.7 步骤 6:重新测试优化效果

  1. 重启 consumer 模块(使 prefetch 配置生效)
  2. 再次运行 publishertestWorkQueue 方法发送 50 条消息
  3. 查看控制台输出,会发现:
    • 消费者 1(快)处理了约 44 条消息
    • 消费者 2(慢)仅处理了约 6 条消息
    • 整体任务耗时缩短至 1 秒左右(充分利用快消费者的能力)

举例:比如快递站有 2 个快递员,A 每小时能送 50 件,B 每小时能送 5 件。默认策略给 A、B 各 25 件,A1 小时完成后闲置,B 需要 5 小时;优化后 A 多送、B 少送,整体 2 小时内完成,效率大幅提升。

1.3.8 WorkQueues 模型总结

  • 适用场景:消息处理耗时较长,需要多个消费者分担任务
  • 核心配置:prefetch: 1 实现「能者多劳」,避免资源浪费
  • 关键特性:同一个队列的消息仅被一个消费者处理,保证消息不重复消费

1.4 交换机类型(核心概念)

前面的案例跳过了交换机,但生产环境中消息必须通过交换机路由到队列。交换机本身不存储消息,仅负责按规则转发,若没有匹配的队列,消息会直接丢失。

RabbitMQ 支持 4 种交换机类型,常用前 3 种:

交换机类型 核心特点 适用场景
Fanout 广播模式:将消息转发给所有绑定的队列 全量通知(如系统公告、状态同步)
Direct 精准路由:按「RoutingKey 完全匹配」转发消息 单目标通知(如订单状态变更仅通知订单服务)
Topic 通配符路由:按「RoutingKey 通配符匹配」转发 多目标筛选通知(如「中国新闻」「日本新闻」分类转发)
Headers 头匹配:按消息头属性匹配,不依赖 RoutingKey 复杂属性筛选(极少使用)

1.5 Fanout 交换机(广播模式)

Fanout 交换机(也称「扇出交换机」)会将消息转发给所有绑定它的队列,无论 RoutingKey 是什么(甚至可以不指定 RoutingKey),适合「全量通知」场景(如用户注册后,同时通知短信服务、邮件服务、积分服务)。

1.5.1 核心流程

  1. 创建 1 个 Fanout 交换机(如 hmall.fanout
  2. 创建多个队列(如 fanout.queue1fanout.queue2),并绑定到该交换机
  3. 生产者发送消息到 Fanout 交换机
  4. 交换机将消息广播到所有绑定的队列,每个队列的消费者都能收到消息

1.5.2 步骤 1:手动创建交换机与队列(控制台操作)

1.5.2.1 创建队列

在 RabbitMQ 控制台创建 2 个队列:

  • 队列 1:fanout.queue1
  • 队列 2:fanout.queue2
    (创建步骤同 1.2.1,参数默认)

1.5.2.2 创建 Fanout 交换机

  1. 进入 RabbitMQ 控制台的「Exchanges」页面,点击「Add a new exchange」
  2. 输入交换机名 hmall.fanout,选择类型「Fanout」,其他参数默认,点击「Add exchange」

1.5.2.3 绑定队列到交换机

  1. 在交换机列表中点击 hmall.fanout,进入交换机详情页
  2. 在「Bindings」区域,「To queue」选择 fanout.queue1,点击「Bind」
  3. 重复步骤 2,将 fanout.queue2 也绑定到 hmall.fanout
  4. 绑定完成后,在交换机详情页可看到 2 个绑定关系

1.5.3 步骤 2:生产者发送广播消息

publisher 模块的 SpringAmqpTest 类中,添加发送 Fanout 消息的测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 测试Fanout交换机:发送广播消息
*/
@Test
public void testFanoutExchange() {
// 1.指定Fanout交换机名
String exchangeName = "hmall.fanout";
// 2.准备广播消息(如系统公告)
String message = "hello, everyone! 这是一条系统公告~";
// 3.发送消息:参数1=交换机名,参数2=RoutingKey(Fanout模式可省略,填空字符串),参数3=消息内容
rabbitTemplate.convertAndSend(exchangeName, "", message);
}

1.5.4 步骤 3:消费者接收广播消息

consumer 模块的 SpringRabbitListener 类中,添加 2 个监听方法,分别监听 2 个队列:

1
2
3
4
5
6
7
8
9
10
11
// 监听fanout.queue1,接收广播消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

// 监听fanout.queue2,接收广播消息
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

1.5.5 步骤 4:测试广播效果

  1. 启动 consumer 模块

  2. 运行 publishertestFanoutExchange 方法发送消息

  3. 查看 consumer 控制台输出,会发现两个消费者都收到了相同的消息:

    1
    2
    消费者1接收到Fanout消息:【hello, everyone! 这是一条系统公告~】
    消费者2接收到Fanout消息:【hello, everyone! 这是一条系统公告~】

1.5.6 Fanout 交换机总结

  • 转发规则:无视 RoutingKey,消息广播到所有绑定的队列
  • 关键特性:交换机不存储消息,若没有绑定队列,消息会丢失
  • 适用场景:全量通知(如服务启停通知、全局配置更新)

1.6 Direct 交换机(精准路由模式)

Fanout 交换机的广播模式无法筛选消息,而 Direct 交换机支持「精准路由」:队列绑定交换机时需指定「BindingKey」,生产者发送消息时需指定「RoutingKey」,仅当两者完全匹配时,消息才会被转发到该队列。

1.6.1 核心流程

  1. 创建 1 个 Direct 交换机(如 hmall.direct
  2. 创建多个队列,每个队列绑定交换机时指定 1 个或多个 BindingKey(如 redblue
  3. 生产者发送消息时指定 RoutingKey
  4. 交换机仅将消息转发给 BindingKey 与 RoutingKey 完全匹配的队列

1.6.2 案例需求

  • 交换机:hmall.direct(类型 Direct)
  • 队列 1:direct.queue1,绑定 BindingKey 为 redblue
  • 队列 2:direct.queue2,绑定 BindingKey 为 redyellow
  • 测试场景 1:发送 RoutingKey 为 red 的消息,验证两个队列是否都收到
  • 测试场景 2:发送 RoutingKey 为 blue 的消息,验证仅队列 1 收到

1.6.3 步骤 1:手动创建交换机与队列(控制台操作)

1.6.3.1 创建队列

在控制台创建 2 个队列:direct.queue1direct.queue2(步骤同 1.2.1)。

1.6.3.2 创建 Direct 交换机

  1. 进入「Exchanges」页面,点击「Add a new exchange」
  2. 输入交换机名 hmall.direct,选择类型「Direct」,点击「Add exchange」

1.6.3.3 绑定队列与 BindingKey

  1. 点击 hmall.direct 进入交换机详情页,在「Bindings」区域绑定队列:
    • 绑定 direct.queue1:「To queue」选 direct.queue1,「Routing key」填 red,点击「Bind」;重复此操作,再绑定 blue
    • 绑定 direct.queue2:「To queue」选 direct.queue2,「Routing key」填 red,点击「Bind」;重复此操作,再绑定 yellow
  2. 绑定完成后,交换机详情页会显示 4 条绑定关系(2 个队列 ×2 个 BindingKey)

1.6.4 步骤 2:消费者接收消息

consumer 模块的 SpringRabbitListener 类中,添加 2 个监听方法:

1
2
3
4
5
6
7
8
9
10
11
// 监听direct.queue1
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

// 监听direct.queue2
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

1.6.5 步骤 3:测试场景 1(RoutingKey=red)

publisher 模块的 SpringAmqpTest 类中,添加测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 测试Direct交换机:发送RoutingKey=red的消息
*/
@Test
public void testSendDirectExchangeRed() {
// 交换机名
String exchangeName = "hmall.direct";
// 消息内容(模拟红色警报)
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息:RoutingKey=red
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

测试结果:启动 consumer 后运行测试,两个消费者都会收到消息(因为两个队列的 BindingKey 都包含 red):

1
2
消费者1接收到direct.queue1的消息:【红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!】
消费者2接收到direct.queue2的消息:【红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!】

1.6.6 步骤 4:测试场景 2(RoutingKey=blue)

修改测试方法的 RoutingKey 为 blue

1
2
3
4
5
6
7
@Test
public void testSendDirectExchangeBlue() {
String exchangeName = "hmall.direct";
String message = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";
// 发送消息:RoutingKey=blue
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

测试结果:仅监听 direct.queue1 的消费者 1 收到消息(仅 direct.queue1 的 BindingKey 包含 blue):

1
消费者1接收到direct.queue1的消息:【最新报道,哥斯拉是居民自治巨型气球,虚惊一场!】

1.6.7 Direct 交换机总结

  • 与 Fanout 的差异:Fanout 广播所有队列,Direct 仅转发给 BindingKey 匹配的队列
  • 灵活扩展:若多个队列绑定相同的 BindingKey(如 red),则这些队列会同时收到消息(类似 Fanout 的局部广播)
  • 适用场景:精准路由(如订单支付成功后,仅通知订单服务更新状态)

1.7 Topic 交换机(通配符路由模式)

Direct 交换机的 RoutingKey 需完全匹配,而 Topic 交换机支持「通配符匹配」,更灵活。BindingKey 由多个单词组成(单词间用 . 分隔,如 china.news),支持两个通配符:

  • #:匹配 1 个或多个单词(如 china.# 可匹配 china.newschina.weather.today
  • *:匹配恰好 1 个单词(如 china.* 可匹配 china.news,但不能匹配 china.weather.today

1.7.1 核心流程

  1. 创建 1 个 Topic 交换机(如 hmall.topic
  2. 创建多个队列,绑定交换机时指定带通配符的 BindingKey(如 china.##.news
  3. 生产者发送消息时指定包含多个单词的 RoutingKey(如 china.newsjapan.weather
  4. 交换机将消息转发给 BindingKey 与 RoutingKey 匹配的队列

1.7.2 案例需求

  • 交换机:hmall.topic(类型 Topic)
  • 队列 1:topic.queue1,绑定 BindingKey 为 china.#(匹配所有以 china. 开头的 RoutingKey,如 china.newschina.weather
  • 队列 2:topic.queue2,绑定 BindingKey 为 #.news(匹配所有以 .news 结尾的 RoutingKey,如 china.newsjapan.news
  • 测试场景:发送 RoutingKey 为 china.news 的消息,验证两个队列是否都收到

1.7.3 步骤 1:手动创建交换机与队列(控制台操作)

  1. 创建 2 个队列:topic.queue1topic.queue2
  2. 创建 Topic 交换机:hmall.topic(类型选「Topic」)
  3. 绑定队列与 BindingKey:
    • topic.queue1 绑定 china.#
    • topic.queue2 绑定 #.news

1.7.4 步骤 2:生产者发送消息

publisher 模块的 SpringAmqpTest 类中,添加测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 测试Topic交换机:发送RoutingKey=china.news的消息
*/
@Test
public void testSendTopicExchange() {
// 交换机名
String exchangeName = "hmall.topic";
// 消息内容(模拟中国新闻)
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息:RoutingKey=china.news(包含2个单词,符合Topic格式)
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

1.7.5 步骤 3:消费者接收消息

consumer 模块的 SpringRabbitListener 类中,添加 2 个监听方法:

1
2
3
4
5
6
7
8
9
10
11
// 监听topic.queue1(BindingKey=china.#)
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

// 监听topic.queue2(BindingKey=#.news)
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

1.7.6 测试结果

启动 consumer 后运行测试,两个消费者都会收到消息:

  • topic.queue1china.# 匹配 china.news(以 china. 开头)
  • topic.queue2#.news 匹配 china.news(以 .news 结尾)

控制台输出:

1
2
消费者1接收到topic.queue1的消息:【喜报!孙悟空大战哥斯拉,胜!】
消费者2接收到topic.queue2的消息:【喜报!孙悟空大战哥斯拉,胜!】

1.7.7 Topic 交换机总结

  • 与 Direct 的差异:Direct 需完全匹配,Topic 支持通配符,更灵活
  • RoutingKey 格式:必须是多个单词(用 . 分隔),否则通配符无法生效
  • 适用场景:多维度筛选通知(如按「地区 + 类型」分类的新闻推送、日志按「服务名 + 级别」分类)

1.8 代码声明队列与交换机(生产环境推荐)

前面的案例通过 RabbitMQ 控制台手动创建队列和交换机,但生产环境中,队列、交换机的定义应与代码绑定(避免运维手动创建时出错)。SpringAMQP 支持通过代码自动声明:

  • 启动项目时,Spring 会检查队列 / 交换机是否存在,不存在则自动创建
  • 存在则跳过(不会覆盖已有配置)

1.8.1 核心 API

SpringAMQP 提供以下核心类用于声明组件:

组件 对应的类 / 接口 说明
队列 org.springframework.amqp.core.Queue 用于定义队列(名称、是否持久化等)
Fanout 交换机 org.springframework.amqp.core.FanoutExchange 用于定义 Fanout 类型交换机
Direct 交换机 org.springframework.amqp.core.DirectExchange 用于定义 Direct 类型交换机
Topic 交换机 org.springframework.amqp.core.TopicExchange 用于定义 Topic 类型交换机
绑定关系 org.springframework.amqp.core.Binding 用于定义队列与交换机的绑定(含 BindingKey)
构建工具 BindingBuilderExchangeBuilder 简化交换机、绑定关系的创建

1.8.2 方式 1:基于 @Bean 声明(通用方式)

以 Fanout 交换机为例,在 consumer 模块创建配置类,通过 @Bean 声明队列、交换机及绑定关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration // 标识为配置类
public class FanoutConfig {

/**
* 1. 声明Fanout交换机
* @return FanoutExchange实例
*/
@Bean
public FanoutExchange fanoutExchange() {
// 参数:交换机名、是否持久化(重启RabbitMQ后是否保留)、是否自动删除(无绑定时是否删除)
return new FanoutExchange("hmall.fanout", true, false);
}

/**
* 2. 声明第一个队列(fanout.queue1)
*/
@Bean
public Queue fanoutQueue1() {
// 参数:队列名、是否持久化、是否排他(仅当前连接可见)、是否自动删除(无消费者时是否删除)
return new Queue("fanout.queue1", true, false, false);
}

/**
* 3. 绑定队列1与Fanout交换机
* (方法参数会自动从Spring容器注入:fanoutQueue1(上面的@Bean)、fanoutExchange(上面的@Bean))
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
// Fanout交换机无需BindingKey,直接绑定
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

/**
* 4. 声明第二个队列(fanout.queue2)
*/
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2", true, false, false);
}

/**
* 5. 绑定队列2与Fanout交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}

效果:启动 consumer 模块后,Spring 会自动在 RabbitMQ 中创建 hmall.fanout 交换机、fanout.queue1fanout.queue2,并完成绑定。

1.8.3 方式 2:基于 @Bean 声明 Direct 交换机(多 BindingKey 场景)

Direct 交换机需为队列绑定多个 BindingKey(如 redblue),可通过多个 Binding Bean 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

/**
* 1. 声明Direct交换机(用ExchangeBuilder简化创建)
*/
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("hmall.direct") // 交换机名
.durable(true) // 持久化
.build();
}

/**
* 2. 声明队列1(direct.queue1)
*/
@Bean
public Queue directQueue1() {
return new Queue("direct.queue1", true);
}

/**
* 3. 绑定队列1与BindingKey=red
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}

/**
* 4. 绑定队列1与BindingKey=blue
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}

/**
* 5. 声明队列2(direct.queue2)
*/
@Bean
public Queue directQueue2() {
return new Queue("direct.queue2", true);
}

/**
* 6. 绑定队列2与BindingKey=red
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}

/**
* 7. 绑定队列2与BindingKey=yellow
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}

1.8.4 方式 3:基于注解声明(简化方式)

通过 @RabbitListener 注解的 bindings 属性,可在监听方法上直接声明队列、交换机及绑定关系,无需单独编写配置类,更简洁。

1.8.4.1 注解声明 Direct 交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;

// 监听direct.queue1,同时声明队列、交换机及BindingKey
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1", durable = "true"), // 声明队列(持久化)
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT, durable = "true"), // 声明Direct交换机(持久化)
key = {"red", "blue"} // 绑定的BindingKey(多个用逗号分隔)
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

// 监听direct.queue2,同时声明队列、交换机及BindingKey
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT, durable = "true"),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

1.8.4.2 注解声明 Topic 交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 监听topic.queue1,声明队列与Topic交换机(BindingKey=china.#)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1", durable = "true"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC, durable = "true"),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

// 监听topic.queue2,声明队列与Topic交换机(BindingKey=#.news)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2", durable = "true"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC, durable = "true"),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

推荐场景:若队列仅对应一个消费者,优先使用注解声明(代码更集中);若多个消费者共享一个队列,建议用 @Bean 声明(避免重复创建)。

1.9 消息转换器(解决 JDK 序列化问题)

SpringAMQP 发送消息时,会将消息对象序列化为字节数组;接收消息时,再将字节数组反序列化为 Java 对象。默认使用JDK 序列化,但存在明显缺陷:

  • 消息体积大(包含类结构信息)
  • 可读性差(控制台查看是乱码)
  • 有安全漏洞(可能反序列化恶意类)

生产环境中推荐使用JSON 序列化,需手动配置 Jackson2JsonMessageConverter

1.9.1 步骤 1:测试默认 JDK 序列化(问题演示)

1.9.1.1 声明测试队列(代码方式)

consumer 模块创建配置类,声明队列 object.queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.itheima.consumer.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageConfig {

// 声明测试队列(用于验证消息序列化)
@Bean
public Queue objectQueue() {
return new Queue("object.queue", true);
}
}

启动 consumer 模块,Spring 会自动创建 object.queue 队列。

1.9.1.2 发送对象消息(Map 类型)

publisher 模块的 SpringAmqpTest 类中,添加发送 Map 对象的测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import java.util.HashMap;
import java.util.Map;

/**
* 测试默认JDK序列化:发送Map对象
*/
@Test
public void testSendMap() throws InterruptedException {
// 准备Map类型消息(模拟用户信息)
Map<String, Object> msg = new HashMap<>();
msg.put("name", "柳岩");
msg.put("age", 21);
// 发送消息到object.queue
rabbitTemplate.convertAndSend("object.queue", msg);
}

1.9.1.3 查看 JDK 序列化的问题

运行测试方法后,进入 RabbitMQ 控制台的 object.queue 详情页,点击「Get Message (s)」查看消息:

  • 消息体是乱码的字节数组(如 ¬í 4 java.util.HashMap...
  • 体积较大(仅简单 Map 就占用数百字节)
  • 无法直接查看消息内容,调试困难

1.9.2 步骤 2:配置 JSON 消息转换器

1.9.2.1 导入 Jackson 依赖(若未引入)

若项目未引入 SpringMVC(spring-boot-starter-web),需手动添加 Jackson 依赖(publisherconsumer 都需添加):

1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>

(若已引入 spring-boot-starter-web,则无需重复添加,Web 依赖已包含 Jackson)

1.9.2.2 配置 JSON 转换器(代码方式)

publisherconsumer 模块的启动类中,添加 MessageConverter@Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class PublisherApplication {

public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class, args);
}

/**
* 配置JSON消息转换器
*/
@Bean
public MessageConverter messageConverter(){
// 1.创建Jackson2JsonMessageConverter实例
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动生成消息ID(用于消息幂等性判断,避免重复消费)
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
}

注意consumer 模块的启动类需做相同配置,确保接收时能反序列化为 JSON。

1.9.3 步骤 3:测试 JSON 序列化效果

  1. 先删除 object.queue 中已有的 JDK 序列化消息(控制台操作)
  2. 重新运行 publishertestSendMap 方法发送 Map 对象
  3. 进入 RabbitMQ 控制台查看 object.queue 的消息:
    • 消息体为清晰的 JSON 格式:{"name":"柳岩","age":21}
    • 体积大幅减小(仅数十字节)
    • 包含自动生成的 messageId(如 spring-amqp-123456),便于后续幂等性处理

1.9.4 步骤 4:消费者接收 JSON 消息

consumer 模块的 SpringRabbitListener 类中,添加监听 object.queue 的方法,直接用 Map 接收消息(Spring 会自动将 JSON 反序列化为 Map):

1
2
3
4
5
6
7
// 监听object.queue,接收JSON格式的Map消息
@RabbitListener(queues = "object.queue")
public void listenObjectQueueMessage(Map<String, Object> msg) throws InterruptedException {
System.out.println("消费者接收到object.queue消息:【" + msg + "】");
// 可直接操作Map中的数据,如获取name和age
System.out.println("用户姓名:" + msg.get("name") + ",年龄:" + msg.get("age"));
}

测试结果:启动 consumer 后发送消息,控制台输出:

1
2
消费者接收到object.queue消息:【{name=柳岩, age=21}】
用户姓名:柳岩,年龄:21

1.9.5 消息转换器总结

  • 推荐使用 Jackson2JsonMessageConverter,解决 JDK 序列化的缺陷
  • 需在生产者和消费者两端同时配置,确保序列化 / 反序列化格式一致
  • 支持复杂对象(如自定义 POJO),Spring 会自动将 JSON 与 POJO 属性映射(需保证字段名一致)

1.10 业务改造(余额支付异步通知)

基于前面的知识,我们对「余额支付功能」进行改造:将支付成功后同步调用订单服务(OpenFeign),改为异步发送消息(RabbitMQ),降低服务间耦合,提高系统容错性。

1.10.1 改造需求

  • 交换机:pay.direct(Direct 类型,用于支付相关消息路由)
  • 队列:trade.pay.success.queue(订单服务监听的队列,用于接收支付成功通知)
  • 绑定关系:队列 trade.pay.success.queue 绑定到 pay.direct,BindingKey 为 pay.success
  • 流程:
    1. 支付服务(pay-service)支付成功后,发送消息到 pay.direct,RoutingKey 为 pay.success,消息内容为订单 ID
    2. 订单服务(trade-service)监听 trade.pay.success.queue,接收消息后更新订单状态为「已支付」

1.10.2 步骤 1:通用配置(支付服务与订单服务)

两个服务都需添加 SpringAMQP 依赖和 MQ 连接配置。

1.10.2.1 添加依赖

pay-servicetrade-servicepom.xml 中,添加 SpringAMQP 依赖:

1
2
3
4
5
<!--SpringAMQP依赖,用于消息收发-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.10.2.2 配置 MQ 连接

在两个服务的 application.yml 中,添加 RabbitMQ 连接配置(替换为实际环境信息):

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
host: 192.168.150.101 # 虚拟机IP
port: 5672 # 通信端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
listener:
simple:
prefetch: 1 # 能者多劳配置(仅消费者服务需添加,即trade-service)

1.10.3 步骤 2:订单服务(trade-service)接收消息

订单服务作为消费者,需监听 trade.pay.success.queue 队列,接收支付成功消息并更新订单状态。

1.10.3.1 编写消息监听类

trade-servicecom.hmall.trade.listener 包下,创建 PayStatusListener 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.hmall.trade.listener;

import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component // 注入Spring容器
@RequiredArgsConstructor // Lombok注解,自动注入构造方法(替代@Autowired)
public class PayStatusListener {

// 注入订单服务接口,用于更新订单状态
private final IOrderService orderService;

/**
* 监听支付成功消息:声明队列、交换机及绑定关系
* @param orderId 消息内容(支付服务发送的订单ID)
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "trade.pay.success.queue", durable = "true"), // 声明持久化队列
exchange = @Exchange(name = "pay.direct", type = ExchangeTypes.DIRECT, durable = "true"), // 声明Direct交换机
key = "pay.success" // 绑定BindingKey
))
public void listenPaySuccess(Long orderId){
// 调用订单服务方法,更新订单状态为「已支付」
orderService.markOrderPaySuccess(orderId);
System.out.println("订单服务已接收支付成功通知,订单ID:" + orderId + ",状态已更新为已支付");
}
}

1.10.3.2 订单服务方法(markOrderPaySuccess)

假设 IOrderService 接口已存在 markOrderPaySuccess 方法,用于更新订单状态(核心逻辑如下):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 更新订单状态为已支付
* @param orderId 订单ID
*/
@Override
@Transactional // 事务保证,确保状态更新与数据库操作一致性
public void markOrderPaySuccess(Long orderId) {
// 1.查询订单(判断订单是否存在、状态是否为待支付)
Order order = getById(orderId);
if (order == null || !OrderStatus.WAIT_PAY.equals(order.getStatus())) {
throw new BizIllegalException("订单不存在或已支付");
}
// 2.更新订单状态为已支付,并记录支付时间
order.setStatus(OrderStatus.PAID);
order.setPayTime(LocalDateTime.now());
updateById(order);
}

1.10.4 步骤 3:支付服务(pay-service)发送消息

支付服务作为生产者,需在支付成功后发送消息到 pay.direct 交换机。

1.10.4.1 注入 RabbitTemplate

在支付服务的支付订单实现类(PayOrderServiceImpl)中,注入 RabbitTemplate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class PayOrderServiceImpl implements IPayOrderService {

// 注入RabbitTemplate,用于发送消息
private final RabbitTemplate rabbitTemplate;

// 其他依赖注入(如PayOrderMapper、UserClient等)
private final PayOrderMapper payOrderMapper;
private final UserClient userClient;

// 构造方法注入(Lombok的@RequiredArgsConstructor可简化)
@Autowired
public PayOrderServiceImpl(RabbitTemplate rabbitTemplate, PayOrderMapper payOrderMapper, UserClient userClient) {
this.rabbitTemplate = rabbitTemplate;
this.payOrderMapper = payOrderMapper;
this.userClient = userClient;
}

// 其他方法...
}

1.10.4.2 改造支付成功逻辑

修改 tryPayOrderByBalance 方法,删除原有的 OpenFeign 同步调用,改为发送 RabbitMQ 消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import com.hmall.common.exception.BizIllegalException;
import com.hmall.pay.domain.dto.PayOrderDTO;
import com.hmall.pay.domain.po.PayOrder;
import com.hmall.pay.domain.enums.PayStatus;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;

@Slf4j // 日志注解
@Service
public class PayOrderServiceImpl implements IPayOrderService {

// 其他代码...

@Override
@Transactional // 事务保证:扣减余额、更新支付单状态在同一事务
public void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {
// 1.查询支付单(根据支付单ID)
PayOrder po = getById(payOrderDTO.getId());
if (po == null) {
throw new BizIllegalException("支付单不存在");
}

// 2.判断支付单状态(仅待支付状态可继续)
if (!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())) {
throw new BizIllegalException("交易已支付或关闭!");
}

// 3.调用用户服务,扣减用户余额(需验证支付密码)
userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());

// 4.更新支付单状态为「已支付」,记录支付时间
boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());
if (!success) {
throw new BizIllegalException("交易已支付或关闭!");
}

// 5.发送支付成功消息(替代原有的OpenFeign同步调用)
try {
// 交换机名:pay.direct,RoutingKey:pay.success,消息内容:订单ID(po.getBizOrderNo())
rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
log.info("支付成功消息发送完成,支付单ID:{},订单ID:{}", po.getId(), po.getBizOrderNo());
} catch (Exception e) {
// 消息发送失败仅记录日志,不抛出异常(避免影响支付主流程)
log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);
// 后续可通过定时任务重试发送失败的消息(保证最终一致性)
}
}

/**
* 更新支付单状态为已支付
* @param payOrderId 支付单ID
* @param payTime 支付时间
* @return 是否更新成功
*/
private boolean markPayOrderSuccess(Long payOrderId, LocalDateTime payTime) {
PayOrder payOrder = new PayOrder();
payOrder.setId(payOrderId);
payOrder.setStatus(PayStatus.SUCCESS); // 已支付状态
payOrder.setPayTime(payTime);
// 更新条件:仅当状态为待支付时才更新(避免重复更新)
return payOrderMapper.updateById(payOrder) > 0;
}

// 其他方法...
}

1.10.5 改造效果与优势

  1. 降低耦合:支付服务无需依赖订单服务的 Feign 接口,服务间通过消息通信,减少直接依赖
  2. 提高容错:若订单服务暂时不可用,消息会暂存于 RabbitMQ,待服务恢复后再消费,避免同步调用时的服务降级
  3. 异步解耦:支付服务发送消息后即可返回,无需等待订单服务处理完成,提升支付接口响应速度
  4. 最终一致性:通过消息队列保证消息不丢失(后续可配合消息确认机制、定时重试),确保订单状态最终会更新