---
url: /blog/o31n0s24/index.md
---
![延迟队列](./RabbitMQ.jpg)

## 1.概述

* 延迟队列存储的对象肯定是对应的延时消息，所谓”延时消息”是指当消息被发送以后，并不想让消费者立即拿到消息，而是等待指定时间后，消费者才拿到这个消息进行消费。

* 场景：在订单系统中，一个用户下单之后通常有30分钟的时间进行支付，如果30分钟之内没有支付成功，那么这个订单将进行取消处理。这时就可以使用延时队列将订单信息发送到延时队列。

* 需求：

1. **==下单后，30分钟未支付，取消订单，回滚库存。==**

2. **==新用户注册成功30分钟后，发送短信问候。==**

* 实现：

使用延迟队列实现

![image-20240806095835083](./assets/image-20240806095835083.png)

很可惜，在RabbitMQ中并未提供延迟队列功能

我们可以采用以下方案实现：

方案1：借助消息超时时间+死信队列

方案2：给RabbitMQ安装插件

![image-20240806100015448](./assets/image-20240806100015448.png)

![延迟队列](./第9章-延迟队列/img-4.jpg)

**==注：使用消息超时时间+死信队列，前面已经演示过了==**

## 2.延迟插件

### 一、插件简介

* 官网地址：https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
* 延迟极限：最多两天

### 二、插件安装

#### 1、确定卷映射目录

```shell
docker inspect rabbitmq
```

**运行结果：**

```json
"Mounts": [
            {
                "Type": "volume",
                "Name": "rabbitmq-plugin",
                "Source": "/var/lib/docker/volumes/rabbitmq-plugin/_data",
                "Destination": "/plugins",
                "Driver": "local",
                "Mode": "z",
                "RW": true,
                "Propagation": ""
            },
            {
                "Type": "volume",
                "Name": "2869cda20d9d3b5ca4f271a42029d93b4e8f57ba16d363e72548bd8611d05c98",
                "Source": "/var/lib/docker/volumes/2869cda20d9d3b5ca4f271a42029d93b4e8f57ba16d363e72548bd8611d05c98/_data",
                "Destination": "/var/lib/rabbitmq",
                "Driver": "local",
                "Mode": "",
                "RW": true,
                "Propagation": ""
            }
]
```

> **`容器内/plugins目录`对应的`宿主机目录`是：`/var/lib/docker/volumes/rabbitmq-plugin/_data`**

#### 2、下载延迟插件

官方文档说明页地址：https://www.rabbitmq.com/community-plugins.html

![image-20231107180045135](./assets/image-20231107180045135.png)

下载插件安装文件：

```shell
# 从 GitHub 官方 releases 下载 RabbitMQ 延迟消息插件 v3.12.0 的 .ez 文件
# 该文件将保存在当前工作目录下
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez
```

```shell
# 运行一个临时的 Alpine 容器
# --rm：容器退出后自动删除，不留痕迹
# -v rabbitmq-plugin:/target：将 Docker 卷 'rabbitmq-plugin' 挂载到容器的 /target 目录
# -v D:\LenovoHZB\Downloads\6:/source：将 Windows 主机上的插件存放目录挂载到容器的 /source 目录
# alpine：使用轻量级 Alpine Linux 镜像
# cp /source/rabbitmq_delayed_message_exchange-3.12.0.ez /target/：将插件文件从 /source 复制到 /target（即卷中）
docker run --rm -v rabbitmq-plugin:/target -v D:\LenovoHZB\Downloads\6:/source alpine cp /source/rabbitmq_delayed_message_exchange-3.12.0.ez /target/
```

![延迟队列](./第9章-延迟队列/img-1.jpg)

![延迟队列](./第9章-延迟队列/img-2.jpg)

#### 3、启用插件

```shell
# 登录进入容器内部
docker exec -it rabbitmq /bin/bash

# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了，可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 退出Docker容器
exit

# 重启Docker容器
docker restart rabbitmq
```

![延迟队列](./第9章-延迟队列/img-3.jpg)

#### 4、确认

**==确认点1：查看当前节点已启用插件的列表：Overview->Nodes->Advanced->Plugins==**

![image-20240321115348525](./assets/image-20240321115348525.png)

**==确认点2：如果创建新交换机时可以在type中看到`x-delayed-message`选项，那就说明插件安装好了==**

![image-20231107181914265](./assets/image-20231107181914265.png)

### 三、创建交换机

rabbitmq\_delayed\_message\_exchange插件在工作时要求交换机是x-delayed-message类型才可以，创建方式如下：

![image-20240319163915574](./assets/image-20240319163915574.png)

关于x-delayed-type参数的理解：

> 原本指定交换机类型的地方使用了x-delayed-message这个值，那么这个交换机除了支持延迟消息之外，到底是direct、fanout、topic这些类型中的哪一个呢？
>
> 这里就额外使用x-delayed-type来指定交换机本身的类型

### 四、代码测试

#### 1、生产者端代码

```java
 // 定义交换机、队列、绑定关系
public static final String EXCHANGE_DELAY = "exchange.delay.video";
public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";
public static final String QUEUE_DELAY = "queue.delay.video";

@Test
public void testSendDelayMessage() {
        rabbitTemplate.convertAndSend(
                EXCHANGE_DELAY, // 延迟交换机
                ROUTING_KEY_DELAY, // 延迟路由键
                "咱是一个延迟消息...[" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]", (message) -> { // 后置处理器
                    message.getMessageProperties().setHeader("x-delay", "10000"); // 单位毫秒
                    return message;
                });
}
```

#### 2、消费者端代码

##### ①情况A：资源已创建

```java
package fun.xingji.mq.listener;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;  
  
@Component  
@Slf4j
public class MyDelayedListener {
    
    public static final String QUEUE_DELAY = "queue.delay.video";
    
    @RabbitListener(queues = {QUEUE_DELAY})
    public void process(String dataString,Message message,Channel channel) throws IOException {  
        log.info("[生产者]" + dataString);
        log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}
```

##### ②情况B：资源未创建

```java
package fun.xingji.mq.listener;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
@Slf4j
public class MyDelayedListener {

    // 定义交换机、队列、绑定关系
    public static final String EXCHANGE_DELAY = "exchange.delay.video";
    public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";
    public static final String QUEUE_DELAY = "queue.delay.video";

    // 定义延迟交换机、普通队列、绑定关系
    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = QUEUE_DELAY),
                            exchange = @Exchange(
                                    name = EXCHANGE_DELAY,
                                    // 延迟交换机
                                    type = "x-delayed-message",
                                    // 延迟交换机类型
                                    arguments = {
                                            @Argument(name = "x-delayed-type", value = "direct")
                                    }
                            ),
                            key = {
                                    ROUTING_KEY_DELAY
                            }
                    )
            }
    )
    public void delayedMessageHandle(String msg, Message message, Channel channel) throws IOException {
        // 处理消息
        log.info("[生产者]" + msg);
        log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
        // 手动确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
```

#### 3、执行效果

##### ①交换机类型

![image-20240319171359652](./assets/image-20240319171359652.png)

##### ②生产者端效果

\==注意：使用rabbitmq\_delayed\_message\_exchange插件后，即使`消息成功发送到队列`上，也会`导致returnedMessage()方法执行`==

![image-20240321115605608](./assets/image-20240321115605608.png)

##### ③消费者端效果

![延迟队列](./第9章-延迟队列/img-5.jpg)
