RabbitMQ | 延迟队列

1.概述
延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
场景:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行取消处理。这时就可以使用延时队列将订单信息发送到延时队列。
需求:
下单后,30分钟未支付,取消订单,回滚库存。
新用户注册成功30分钟后,发送短信问候。
- 实现:
使用延迟队列实现

很可惜,在RabbitMQ中并未提供延迟队列功能
我们可以采用以下方案实现:
方案1:借助消息超时时间+死信队列
方案2:给RabbitMQ安装插件


注:使用消息超时时间+死信队列,前面已经演示过了
2.延迟插件
一、插件简介
- 官网地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
- 延迟极限:最多两天
二、插件安装
1、确定卷映射目录
docker inspect rabbitmq运行结果:
"Mounts": [
{
"Type": "volume",
"Name": "rabbitmq-plugin",
"Source": "/var/lib/docker/volumes/rabbitmq-plugin/_data",
"Destination": "/plugins",
"Driver": "local",
"Mode": "z",
"RW": true,
"Propagation": ""
},
{
"Type": "volume",
"Name": "2869cda20d9d3b5ca4f271a42029d93b4e8f57ba16d363e72548bd8611d05c98",
"Source": "/var/lib/docker/volumes/2869cda20d9d3b5ca4f271a42029d93b4e8f57ba16d363e72548bd8611d05c98/_data",
"Destination": "/var/lib/rabbitmq",
"Driver": "local",
"Mode": "",
"RW": true,
"Propagation": ""
}
]
容器内/plugins目录对应的宿主机目录是:/var/lib/docker/volumes/rabbitmq-plugin/_data
2、下载延迟插件
官方文档说明页地址:https://www.rabbitmq.com/community-plugins.html

下载插件安装文件:
# 从 GitHub 官方 releases 下载 RabbitMQ 延迟消息插件 v3.12.0 的 .ez 文件
# 该文件将保存在当前工作目录下
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez# 运行一个临时的 Alpine 容器
# --rm:容器退出后自动删除,不留痕迹
# -v rabbitmq-plugin:/target:将 Docker 卷 'rabbitmq-plugin' 挂载到容器的 /target 目录
# -v D:\LenovoHZB\Downloads\6:/source:将 Windows 主机上的插件存放目录挂载到容器的 /source 目录
# alpine:使用轻量级 Alpine Linux 镜像
# cp /source/rabbitmq_delayed_message_exchange-3.12.0.ez /target/:将插件文件从 /source 复制到 /target(即卷中)
docker run --rm -v rabbitmq-plugin:/target -v D:\LenovoHZB\Downloads\6:/source alpine cp /source/rabbitmq_delayed_message_exchange-3.12.0.ez /target/

3、启用插件
# 登录进入容器内部
docker exec -it rabbitmq /bin/bash
# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 退出Docker容器
exit
# 重启Docker容器
docker restart rabbitmq
4、确认
确认点1:查看当前节点已启用插件的列表:Overview->Nodes->Advanced->Plugins

确认点2:如果创建新交换机时可以在type中看到x-delayed-message选项,那就说明插件安装好了

三、创建交换机
rabbitmq_delayed_message_exchange插件在工作时要求交换机是x-delayed-message类型才可以,创建方式如下:

关于x-delayed-type参数的理解:
原本指定交换机类型的地方使用了x-delayed-message这个值,那么这个交换机除了支持延迟消息之外,到底是direct、fanout、topic这些类型中的哪一个呢?
这里就额外使用x-delayed-type来指定交换机本身的类型
四、代码测试
1、生产者端代码
// 定义交换机、队列、绑定关系
public static final String EXCHANGE_DELAY = "exchange.delay.video";
public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";
public static final String QUEUE_DELAY = "queue.delay.video";
@Test
public void testSendDelayMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DELAY, // 延迟交换机
ROUTING_KEY_DELAY, // 延迟路由键
"咱是一个延迟消息...[" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]", (message) -> { // 后置处理器
message.getMessageProperties().setHeader("x-delay", "10000"); // 单位毫秒
return message;
});
}2、消费者端代码
①情况A:资源已创建
package fun.xingji.mq.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@Slf4j
public class MyDelayedListener {
public static final String QUEUE_DELAY = "queue.delay.video";
@RabbitListener(queues = {QUEUE_DELAY})
public void process(String dataString,Message message,Channel channel) throws IOException {
log.info("[生产者]" + dataString);
log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}②情况B:资源未创建
package fun.xingji.mq.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@Slf4j
public class MyDelayedListener {
// 定义交换机、队列、绑定关系
public static final String EXCHANGE_DELAY = "exchange.delay.video";
public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";
public static final String QUEUE_DELAY = "queue.delay.video";
// 定义延迟交换机、普通队列、绑定关系
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = QUEUE_DELAY),
exchange = @Exchange(
name = EXCHANGE_DELAY,
// 延迟交换机
type = "x-delayed-message",
// 延迟交换机类型
arguments = {
@Argument(name = "x-delayed-type", value = "direct")
}
),
key = {
ROUTING_KEY_DELAY
}
)
}
)
public void delayedMessageHandle(String msg, Message message, Channel channel) throws IOException {
// 处理消息
log.info("[生产者]" + msg);
log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}3、执行效果
①交换机类型

②生产者端效果
注意:使用rabbitmq_delayed_message_exchange插件后,即使消息成功发送到队列上,也会导致returnedMessage()方法执行

③消费者端效果

贡献者
更新日志
37c26-优化MySQL文章封面于
