RabbitMQ 教程

消息队列中间件

目录

1. RabbitMQ 概述

RabbitMQ 是一个开源的消息队列中间件,实现了高级消息队列协议(AMQP)。它提供了可靠的消息传递、灵活的路由、集群、高可用性、管理工具等特性。

1.1 RabbitMQ 的主要特性

1.2 应用场景

小提示

RabbitMQ 是一个成熟的消息队列中间件,被广泛应用于企业级应用中。它的设计理念是"可靠性和灵活性优先"。

2. 安装和配置

2.1 安装 RabbitMQ

在不同操作系统上安装 RabbitMQ:

Windows

# 使用 Chocolatey 安装
choco install rabbitmq

# 或下载安装包安装
# 访问 https://www.rabbitmq.com/install-windows.html

Linux (Ubuntu/Debian)

# 添加仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash

# 安装 RabbitMQ
sudo apt-get install rabbitmq-server

macOS

# 使用 Homebrew 安装
brew install rabbitmq

2.2 启用管理插件

# 启用管理插件
rabbitmq-plugins enable rabbitmq_management

2.3 创建用户和设置权限

# 创建用户
rabbitmqctl add_user myuser mypassword

# 设置用户标签
rabbitmqctl set_user_tags myuser administrator

# 设置权限
rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"
注意

默认的管理界面访问地址是 http://localhost:15672,默认用户名和密码都是 guest。

3. 基本概念

3.1 核心组件

3.2 消息流转过程

  1. Producer 发送消息到 Exchange
  2. Exchange 根据路由规则将消息转发到相应的 Queue
  3. Queue 存储消息
  4. Consumer 从 Queue 接收消息

3.3 连接和通道

在 Java 中创建连接和通道:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
小提示

Channel 是轻量级的连接,可以在一个 Connection 中创建多个 Channel,每个 Channel 代表一个会话。

4. 交换机类型

4.1 Direct Exchange

直接交换机,根据路由键精确匹配:

// 声明交换机
channel.exchangeDeclare("direct_logs", "direct");

// 绑定队列
channel.queueBind(queueName, "direct_logs", "error");
channel.queueBind(queueName, "direct_logs", "warning");
channel.queueBind(queueName, "direct_logs", "info");

4.2 Fanout Exchange

扇形交换机,广播消息到所有绑定的队列:

// 声明交换机
channel.exchangeDeclare("logs", "fanout");

// 绑定队列
channel.queueBind(queueName, "logs", "");

4.3 Topic Exchange

主题交换机,根据路由键模式匹配:

// 声明交换机
channel.exchangeDeclare("topic_logs", "topic");

// 绑定队列
channel.queueBind(queueName, "topic_logs", "*.orange.*");
channel.queueBind(queueName, "topic_logs", "*.*.rabbit");
channel.queueBind(queueName, "topic_logs", "lazy.#");

4.4 Headers Exchange

头交换机,根据消息属性匹配:

// 声明交换机
channel.exchangeDeclare("headers_logs", "headers");

// 绑定队列
Map<String, Object> args = new HashMap<>();
args.put("x-match", "any");
args.put("format", "pdf");
args.put("type", "report");
channel.queueBind(queueName, "headers_logs", "", args);
注意

选择合适的交换机类型对于实现消息路由策略至关重要。Direct Exchange 适用于点对点通信,Fanout Exchange 适用于广播,Topic Exchange 适用于模式匹配,Headers Exchange 适用于基于消息属性的路由。

5. 消息模式

5.1 简单模式

最基本的消息模式,一个生产者,一个消费者:

// 生产者
channel.queueDeclare("simple_queue", true, false, false, null);
String message = "Hello World!";
channel.basicPublish("", "simple_queue", null, message.getBytes());

// 消费者
channel.queueDeclare("simple_queue", true, false, false, null);
channel.basicConsume("simple_queue", true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    }
});

5.2 工作队列模式

一个生产者,多个消费者:

// 生产者
String message = "Hello World!";
channel.basicPublish("", "task_queue", null, message.getBytes());

// 消费者
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    }
});

5.3 发布/订阅模式

一个生产者,多个消费者,每个消费者收到所有消息:

// 生产者
channel.exchangeDeclare("logs", "fanout");
String message = "Hello World!";
channel.basicPublish("logs", "", null, message.getBytes());

// 消费者
channel.exchangeDeclare("logs", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    }
});

5.4 路由模式

根据路由键将消息发送到特定的队列:

// 生产者
channel.exchangeDeclare("direct_logs", "direct");
String severity = "error";
String message = "Error message";
channel.basicPublish("direct_logs", severity, null, message.getBytes());

// 消费者
channel.exchangeDeclare("direct_logs", "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "error");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    }
});

5.5 主题模式

使用通配符路由键将消息发送到匹配的队列:

// 生产者
channel.exchangeDeclare("topic_logs", "topic");
String routingKey = "kern.critical"; // 例如:设备.严重程度
String message = "A critical kernel error";
channel.basicPublish("topic_logs", routingKey, null, message.getBytes());

// 消费者
channel.exchangeDeclare("topic_logs", "topic");
String queueName = channel.queueDeclare().getQueue();

// 可以绑定多个路由键模式
// * 匹配一个单词,# 匹配零个或多个单词
channel.queueBind(queueName, "topic_logs", "kern.*"); // 所有内核消息
channel.queueBind(queueName, "topic_logs", "*.critical"); // 所有严重错误消息
channel.queueBind(queueName, "topic_logs", "kern.#"); // 所有内核相关消息

channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) {
        String message = new String(body, "UTF-8");
        String receivedRoutingKey = envelope.getRoutingKey();
        System.out.println(" [x] Received '" + message + "' with routing key '" + 
                          receivedRoutingKey + "'");
    }
});

5.6 RPC模式

请求/响应模式,客户端发送请求并等待响应:

// 服务端(RPC服务提供者)
channel.queueDeclare("rpc_queue", false, false, false, null);
channel.basicQos(1); // 每次只处理一个请求
channel.basicConsume("rpc_queue", false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 提取相关ID,用于响应
        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                .Builder()
                .correlationId(properties.getCorrelationId())
                .build();
        
        String response = "";
        try {
            String message = new String(body, "UTF-8");
            int n = Integer.parseInt(message);
            
            // 执行RPC操作(这里是计算斐波那契数列)
            System.out.println(" [x] Received request for fib(" + message + ")");
            response = String.valueOf(fib(n));
        } catch (Exception e) {
            response = "Error: " + e.getMessage();
        } finally {
            // 发送响应到回调队列
            channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes());
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
});

// 斐波那契函数示例
private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

// 客户端(RPC调用者)
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
        .Builder()
        .correlationId(corrId)
        .replyTo(replyQueueName)
        .build();

// 发送RPC请求
String message = "30"; // 请求计算fib(30)
channel.basicPublish("", "rpc_queue", props, message.getBytes());

// 等待响应
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) {
        if (properties.getCorrelationId().equals(corrId)) {
            response.offer(new String(body, "UTF-8"));
        }
    }
});

// 获取结果(设置超时)
String result = response.poll(5, TimeUnit.SECONDS);
System.out.println(" [x] Got '" + result + "'");
小提示

RabbitMQ提供了六种主要的消息模式,每种适合不同的场景:

  • 简单模式:最基本的点对点通信
  • 工作队列模式:任务分发,适合多消费者处理耗时任务
  • 发布/订阅模式:广播消息,适合一对多通知
  • 路由模式:按照精确的路由键分发消息
  • 主题模式:按照通配符路由键分发消息,最灵活的消息路由
  • RPC模式:请求/响应,适合需要返回结果的远程调用场景

6. 队列管理

6.1 队列声明

创建持久化队列:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

6.2 消息持久化

发送持久化消息:

channel.basicPublish("", "task_queue",
    MessageProperties.PERSISTENT_TEXT_PLAIN,
    message.getBytes());

6.3 公平调度

设置预取计数:

int prefetchCount = 1;
channel.basicQos(prefetchCount);

6.4 消息确认

手动确认消息:

channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
        // 处理消息
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});
注意

消息持久化需要同时设置队列持久化和消息持久化。在消费者处理消息时,应该使用手动确认模式以确保消息被正确处理。

7. 消息属性

7.1 基本属性

设置消息属性:

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .contentType("text/plain")
    .contentEncoding("UTF-8")
    .headers(new HashMap<String, Object>())
    .deliveryMode(2) // 持久化消息
    .priority(1)
    .correlationId("correlation-id")
    .replyTo("reply-queue")
    .expiration("60000") // 消息过期时间(毫秒)
    .messageId("message-id")
    .timestamp(new Date())
    .type("message-type")
    .userId("user-id")
    .appId("app-id")
    .build();

channel.basicPublish("", "queue-name", props, message.getBytes());

7.2 消息过期时间

设置消息过期时间:

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .expiration("60000") // 60秒后过期
    .build();

7.3 消息优先级

设置消息优先级:

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .priority(5) // 优先级范围 0-9
    .build();
小提示

消息属性可以帮助实现更复杂的消息处理逻辑,如消息过期、优先级处理、消息追踪等。

8. 集群部署

8.1 集群配置

配置集群节点:

# 在节点1上
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

# 在节点2上
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

8.2 镜像队列

配置镜像队列:

# 设置镜像策略
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}'

8.3 集群管理

查看集群状态:

# 查看集群状态
rabbitmqctl cluster_status

# 查看节点信息
rabbitmqctl list_cluster_nodes
注意

在集群环境中,应该使用镜像队列来确保消息的高可用性。同时,应该注意网络分区的情况,并配置适当的处理策略。

9. 监控和管理

9.1 管理界面

访问管理界面:

9.2 监控指标

9.3 告警配置

配置告警规则:

# 设置内存告警阈值
rabbitmqctl set_vm_memory_high_watermark 0.8

# 设置磁盘告警阈值
rabbitmqctl set_disk_free_limit "100MB"
小提示

定期监控 RabbitMQ 的状态对于及时发现和解决问题至关重要。建议配置适当的告警机制。

10. 最佳实践

10.1 性能优化

10.2 可靠性保证

10.3 安全配置

笔记

遵循最佳实践可以帮助构建可靠、高效、安全的消息队列系统。根据具体需求选择合适的技术和配置。

11. 消息确认机制

11.1 消息确认的重要性

消息确认机制是 RabbitMQ 保证消息可靠性的核心机制,它确保消息在处理过程中不会丢失。

11.2 消费者确认模式

RabbitMQ 支持三种消费者确认模式:

  1. 自动确认(Auto Ack):消息一旦被发送给消费者,立即从队列中移除
  2. 手动确认(Manual Ack):消费者处理完消息后手动发送确认
  3. 批量确认(Batch Ack):消费者可以一次性确认多条消息
// 设置为手动确认模式
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        try {
            // 处理消息
            System.out.println(" [x] Received '" + message + "'");
            // 处理完成后确认
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            // 处理失败时拒绝消息,并重新入队
            channel.basicNack(envelope.getDeliveryTag(), false, true);
            // 或者拒绝消息,不重新入队
            // channel.basicReject(envelope.getDeliveryTag(), false);
        }
    }
});

11.3 发布者确认

发布者确认(Publisher Confirms)允许发布者确认消息已被 RabbitMQ 成功接收:

// 启用发布者确认模式
channel.confirmSelect();

// 同步等待确认
channel.basicPublish("", queueName, null, message.getBytes());
if (channel.waitForConfirms()) {
    System.out.println("消息已确认");
} else {
    System.out.println("消息未确认");
}

// 异步确认
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) {
        System.out.println("消息已确认: " + deliveryTag);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) {
        System.out.println("消息未确认: " + deliveryTag);
    }
});

11.4 事务机制

RabbitMQ 也支持事务机制,但性能较低,一般不推荐使用:

// 开启事务
channel.txSelect();

try {
    // 发送消息
    channel.basicPublish("", queueName, null, message.getBytes());
    // 提交事务
    channel.txCommit();
} catch (Exception e) {
    // 回滚事务
    channel.txRollback();
    throw e;
}
最佳实践

对于关键业务,推荐使用手动确认模式和发布者确认(Publisher Confirms)机制,而不是使用事务。发布者确认机制性能更好,且提供了异步确认的能力。

12. 死信队列

12.1 什么是死信队列

死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中处理无法被正常消费的消息的机制。当消息变成死信时,它会被重新发布到指定的死信交换机(Dead Letter Exchange,DLX)。

12.2 消息成为死信的情况

12.3 配置死信队列

配置一个带有死信队列的正常队列:

// 声明死信交换机
channel.exchangeDeclare("dead.letter.exchange", "direct");

// 声明死信队列
channel.queueDeclare("dead.letter.queue", true, false, false, null);

// 绑定死信队列到死信交换机
channel.queueBind("dead.letter.queue", "dead.letter.exchange", "dead.letter.routing.key");

// 为正常队列设置死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dead.letter.exchange");
args.put("x-dead-letter-routing-key", "dead.letter.routing.key");
// 可选:设置消息TTL
args.put("x-message-ttl", 60000); // 60秒
// 可选:设置队列最大长度
args.put("x-max-length", 1000);

// 声明正常队列
channel.queueDeclare("normal.queue", true, false, false, args);

12.4 消费者拒绝导致的死信

// 消费者拒绝消息
channel.basicConsume("normal.queue", false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        try {
            // 处理消息
            if (someCondition) {
                throw new Exception("无法处理该消息");
            }
            // 确认消息
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            // 拒绝消息,不重新入队,消息将被发送到死信队列
            channel.basicReject(envelope.getDeliveryTag(), false);
        }
    }
});

12.5 死信队列的处理

消费处理死信队列中的消息:

// 消费死信队列
channel.basicConsume("dead.letter.queue", false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        
        // 记录死信消息
        System.out.println("处理死信消息: " + message);
        
        // 分析消息失败原因,可以通过原始消息的属性获取
        Map<String, Object> headers = properties.getHeaders();
        if (headers != null) {
            String xDeathReason = headers.get("x-death") != null ? 
                                 headers.get("x-death").toString() : "未知原因";
            System.out.println("死信原因: " + xDeathReason);
        }
        
        // 确认死信消息
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});
场景应用

死信队列在以下场景特别有用:

  • 处理无法消费的消息,如格式错误的消息
  • 实现消息的延迟处理(通过TTL和死信队列组合)
  • 捕获和监控异常情况
  • 实现请求的限时响应

13. Spring AMQP 集成

13.1 Spring AMQP 简介

Spring AMQP 提供了一套基于 Spring 框架的 AMQP 抽象,简化了 RabbitMQ 的使用。它包含两个核心模块:

13.2 Maven 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.7.0</version>
</dependency>

13.3 基本配置

在 Spring Boot 应用中配置 RabbitMQ:

// application.properties 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

// Java配置类
@Configuration
public class RabbitMQConfig {

    @Bean
    public Queue queue() {
        return new Queue("my-queue", true);
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange("my-exchange");
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("my-routing-key");
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter);
        return rabbitTemplate;
    }
}

13.4 发送消息

使用 RabbitTemplate 发送消息:

@Service
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("my-exchange", "my-routing-key", message);
    }

    public void sendObjectMessage(MyObject object) {
        rabbitTemplate.convertAndSend("my-exchange", "my-routing-key", object);
    }
}

13.5 接收消息

使用 @RabbitListener 注解接收消息:

@Component
public class MessageConsumer {

    @RabbitListener(queues = "my-queue")
    public void receiveMessage(String message) {
        System.out.println("收到消息: " + message);
    }

    @RabbitListener(queues = "my-queue")
    public void receiveObjectMessage(MyObject object) {
        System.out.println("收到对象消息: " + object);
    }
}

13.6 高级配置

13.6.1 配置确认回调

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            System.out.println("消息发送成功");
        } else {
            System.out.println("消息发送失败: " + cause);
        }
    });
    template.setReturnsCallback(returned -> {
        System.out.println("消息被退回: " + 
            "交换机: " + returned.getExchange() + 
            ", 路由键: " + returned.getRoutingKey() +
            ", 原因: " + returned.getReplyText());
    });
    return template;
}

// 启用确认回调
@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
    connectionFactory.setPublisherReturns(true);
    return connectionFactory;
}

13.6.2 配置死信队列

@Bean
public Queue orderQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "order.dead.letter.exchange");
    args.put("x-dead-letter-routing-key", "order.dead");
    args.put("x-message-ttl", 60000); // 1分钟
    return new Queue("order.queue", true, false, false, args);
}

@Bean
public DirectExchange orderExchange() {
    return new DirectExchange("order.exchange");
}

@Bean
public Binding orderBinding() {
    return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order");
}

@Bean
public Queue deadLetterQueue() {
    return new Queue("order.dead.letter.queue", true);
}

@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange("order.dead.letter.exchange");
}

@Bean
public Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("order.dead");
}
最佳实践

使用 Spring AMQP 可以大大简化 RabbitMQ 的使用,特别是在 Spring Boot 应用中。利用 Spring 的依赖注入和声明式编程模型,可以更方便地配置和使用消息队列。

14. 错误处理与重试机制

14.1 基本错误处理

使用 try-catch 捕获消费者处理消息时的异常:

@RabbitListener(queues = "my-queue")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    try {
        // 处理消息
        System.out.println("处理消息: " + message);
        // 确认消息
        channel.basicAck(tag, false);
    } catch (Exception e) {
        try {
            // 拒绝消息,重新入队
            channel.basicNack(tag, false, true);
        } catch (IOException ex) {
            // 处理通道异常
        }
    }
}

14.2 Spring Retry 集成

使用 Spring Retry 实现消息处理的重试机制:

// 添加依赖
<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

// 配置重试
@Configuration
@EnableRetry
public class RetryConfig {
}

// 在服务中使用重试
@Service
public class OrderService {

    @Retryable(
        value = {OrderProcessException.class},
        maxAttempts = 3,
        backoff = @Backoff(delay = 1000, multiplier = 2)
    )
    public void processOrder(Order order) throws OrderProcessException {
        // 处理订单
        if (someCondition) {
            throw new OrderProcessException("订单处理失败");
        }
    }

    @Recover
    public void recoverOrderProcess(OrderProcessException e, Order order) {
        // 重试失败后的恢复处理
        System.out.println("重试失败,执行补偿逻辑: " + order);
    }
}

14.3 Spring AMQP 重试机制

配置 Spring AMQP 的重试机制:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        ConnectionFactory connectionFactory,
        MessageConverter messageConverter) {
        
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(messageConverter);
    
    // 配置消费者确认模式
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    
    // 配置重试
    RetryInterceptorBuilder retryInterceptorBuilder = RetryInterceptorBuilder.stateless()
        .maxAttempts(3)
        .backOffOptions(1000, 2.0, 10000)
        .recoverer(new RejectAndDontRequeueRecoverer()); // 重试失败后拒绝消息
    
    factory.setAdviceChain(new Advice[] {retryInterceptorBuilder.build()});
    
    return factory;
}

14.4 错误处理策略

不同的错误处理策略:

  1. 重试后重新入队:适用于临时性错误,如网络抖动
  2. 重试后发送到死信队列:适用于处理失败的消息
  3. 重试后丢弃:适用于不重要的消息
  4. 记录错误并继续:适用于可容忍错误的场景
// 重试后发送到死信队列
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
    return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error.routing.key");
}
最佳实践

为不同类型的消息和错误设计不同的处理策略。对于重要的业务消息,应该实现重试机制和死信处理;对于非关键消息,可以采用更简单的策略。始终记录错误信息,以便后续分析和改进。

15. 消费者预取与负载均衡

15.1 预取计数的作用

预取计数(prefetch count)控制 RabbitMQ 一次发送给消费者的未确认消息数量,对消费者负载均衡和性能有重要影响。

15.2 设置预取计数

使用原生 API 设置预取计数:

// 每次只预取一条消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);

// 也可以设置全局模式
boolean global = true; // 适用于通道上的所有消费者
channel.basicQos(prefetchCount, global);

15.3 在 Spring AMQP 中设置预取

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        ConnectionFactory connectionFactory) {
        
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    
    // 设置预取计数
    factory.setPrefetchCount(10);
    
    return factory;
}

// 或者在应用配置中设置
// application.properties
spring.rabbitmq.listener.simple.prefetch=10

15.4 负载均衡考虑

实现有效的负载均衡:

  1. 根据消费者能力设置预取:处理能力强的消费者可以设置更高的预取值
  2. 考虑消息处理时间:处理时间长的消息应该设置较小的预取值
  3. 动态调整:根据系统负载情况动态调整预取值
  4. 使用手动确认:结合手动确认模式更精确地控制消息流

15.5 并发消费者

在 Spring AMQP 中配置并发消费者:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        ConnectionFactory connectionFactory) {
        
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    
    // 设置并发消费者数量
    factory.setConcurrentConsumers(5); // 初始并发消费者数
    factory.setMaxConcurrentConsumers(10); // 最大并发消费者数
    
    return factory;
}

// 或者在应用配置中设置
// application.properties
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
性能调优

在调整预取计数和并发消费者时,建议从较小的值开始,然后根据实际负载和性能指标逐步调整。监控系统的消息吞吐量、处理延迟和资源使用情况,找到最佳配置。

返回首页