RabbitMQ死信队列和延时队列
RabbitMQ本身是具有死信队列
和死信交换机
属性的,延时队列
可以通过死信队列和死信交换机来实现。在电商行业中,通常都会有一个需求:订单超时未支付,自动取消该订单。那么通过RabbitMQ实现的延时队列就是实现该需求的一种方式。
1、死信队列
死信
顾名思义,就是死掉的信息,英文是Dead Letter。死信交换机(Dead-Letter-Exchange)
和普通交换机没有区别,都是可以接受信息并转发到与之绑定并能路由到的队列,区别在于死信交换机
是转发死信
的,而和该死信交换机
绑定的队列就是死信队列
。说的再通俗一点,死信交换机和死信队列其实都只是普通的交换机和队列,只不过接受、转发的信息是死信
,其他操作并没有区别。
1.1 死信的条件
称为死信
的信息,需要如下几个条件:
- 消息被消费者拒绝(通过basic.reject 或者 back.nack),并且设置 requeue=false。
- 消息过期,因为队列设置了TTL(Time To Live)时间。
- 消息被丢弃,因为超过了队列的长度限制。
这时以上几个条件的方式基本上都有2种:1)rabbitmqctl
命令行设置policy(策略)参数; 2)硬编码,也就是在代码中设置。
1.2 消费者拒绝
1.2.1 编码方式
硬编码就是在代码中编写业务队列声明时对应的参数:
x-dead-letter-exchange
:死信交换机,必须x-dead-letter-routing-key
:死信交换机转发到死信队列的路由键,可选
Producer:
1 | // producer |
Consumer:
1 | /** |
总结:
1) consumer方使用basic.reject
或者basic.nack
都会将消息转发到匹配的死信队列(requeue=false),区别在于basic.reject
相比basic.nack
少一个参数mutil
,表示是否批量back。而且nack的数量可以再web端看到。
2) 使用x-letter-dead-exchange
设置死信交换机,这个是必须设置的。x-letter-dead-routing-key
设置死信交换机转发到死信队列的路由键,相当于重新定义了publish的路由键,该参数可选,可以根据具体业务判断是否需要设置。
1.1.2 策略方式
策略方式需要在rabbitmq的服务器上执行如下命令:
1 | rabbitmqctl set_policy {策略名称} ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues |
例如:
1 | rabbitmqctl set_policy dlx "dead.*" '{"dead-letter-exchange":"test-dead-letter-exchange"}' --apply-to queues |
表示给所有的 以dead.开头 的队列设置死信交换机 test-dead-letter-exchange ,策略名字为 dlx。然后我们在rabbitmq的web界面新建一个名字为test-dead-letter-exchange
的exchange,并且新建名为dead.order.queue
和dead.other.queue
的queue,绑定test-dead-letter-exchange
,路由键分别为:order.#
和other.#
。
说明:由于我们要模拟的死信转发到死信队列的情况,所以这两个新建的queue都设置了ttl为10000ms,也就是10s。
我们看到消息在10s之后成功到了dead.order.queue
,说明我们的配置生效。这里我将这个过程画个图:
1.3 设置过期时间
我们可以给 队列 或者 消息 设置过期时间。队列 的过期时间,类似于 autoDelete
参数,表示队列在指定时长内如果没有使用的话会被删除,队列没有使用者,队列最近未重新声明(重新声明续订租约),以及basic.get
至少在过期期间未被调用,例如,这可以用于RPC样式的回复队列,其中可以创建许多可能永远不会被耗尽的队列。消息 的过期时间我们可以在发消息时设置在消息体,也可以给这个队列设置一个消息过期时间,其实就是两种方式,一种设置在队列上,另一种是设置在消息上。
1.3.1 编码方式
1)设置消息体过期时间
1 | // 设置消息属性 |
2)设置队列的消息过期时间
1 | // 设置参数 |
我们可以看到,创建好队列之后会有个TTL
标识,x-message-ttl
标识该队列设置的消息过期时间为5s。
3)设置队列的过期时间
1 | // 设置队列参数 |
注意:无论队列中是否存在消息,如果没有操作队列,就会被自动删除。
1.3.2 策略方式
1)设置队列的消息过期时间
1 | rabbitmqctl set_policy --vhost /adu TTL ".*" '{"message-ttl":60000}' --apply-to queues |
表示在 /adu
虚拟主机下增加一个名称为 TTL 策略,设所有队列 message-ttl
消息过期时间60s。
2)设置队列的过期时间
1 | rabbitmqctl set_policy --vhost /adu expiry ".*" '{"expires":1800000}' --apply-to queues |
表示在/adu
虚拟主机下增加一个名称为 expiry 的策略,设置所有的队列过期时间为180s。
1.4 超过队列长度
默认情况下,队列没有长度限制(但是总归是有硬盘和内存的限制的)。我们可以显示的设置队列的长度,可以是消息的数量限制,也可以是队列总消息内容的占用内存长度,或者两种都设置。一个队列的最大长度,可以使用 策略 或者 编码 的方式进行设置,或者在创建队列时web界面设置。如果 策略
方式和 编码
方式都设置了,则 值更小的
会生效。
如果队列设置了队列长度限制,那么当队列中的消息达到最大长度时,默认的 溢出 规则为 丢弃最老的消息(队列头部)。我们可以改变这个规则,使用 overflow
参数来配置。overflow
可选值为 x-reject-publish
或者x-reject-publish-dls
,两者都表示拒绝接受新消息,区别在于 reject-publish-dlx
也会导致死信拒绝消息1。
此处有个疑问:reject-publish-dlx和reject-publish的区别问题。针对官网的翻译,我觉得
reject-publish-dlx
是与之绑定的死信队列不会收到消息,reject-publish
相反会收到消息。但是做实验的时候刚好和我理解的相反?熟悉的铁子们回复说一下哈呀。
1.4.1 编码方式
使用x-max-length
和x-max-length-bytes
参数设置。
1 | // 设置参数 |
1.4.2 策略方式
1 | rabbitmqctl set_policy --vhost /adu limit "^five_msg" '{"dead-letter-exchange":"test-dead-letter-exchange","max-length":5,"overflow":"reject-publish-dlx"}' --apply-to queues |
表示在 /adu
虚拟主机下增加一个名称为 limit 策略,设所有以 five_msg 开头的队列 消息最多为5个、消息溢出策略为拒绝、设置死信交换机 。
设置
max-length-bytes
也是同样的方式。
2、延时队列
延时队列,顾名思义就是存放延时消息的队列,也就是说消费者在一定的延时后才会收到消息。典型的应用场景就是如上所述的订单超时未支付自动取消。
2.1 借助死信队列实现
其实在介绍完 死信队列 之后,就能大概看出来如何使用 死信队列 来实现延时队列了。就是使用消息的TTL
属性,将过期的消息转发到死信队列中,业务监听死信队列的消息就行了。这种情况适合给队列设置消息过期时间的情况,就是队列中所有的消息都是同一个过期时间,到期按照顺序转发到死信队列中,不会有问题。
如果消息的过期时间是在发消息的时候设置在消息体上的,可能会出问题。比如按顺序发送msg1和msg2两条消息,msg1的过期时间为5s,msg2的过期时间为2s。正常理解下,结果肯定是msg2先到死信队列被消费,但是结果却是两条消息都在5s时转发到死信队列被消费。其实比较好理解,因为队列的特性就是 先进先出 ,即使msg2先到了过期时间,但是msg1在它之前阻塞,只有msg1被消费了,msg2才能到队头被消费。 我们画个图:
2.2 借助RabbitMQ插件实现
rabbitmq提供了一个插件 rabbitmq_delayed_message_exchange
让我们能够实现 延迟队列 的效果,同时能够解决 通过死信队列实现延迟队列 出现的消息阻塞问题。该插件从RabbitMQ的3.6.12开始支持,要确认当前自己的rabbitmq版本是否支持该插件。
2.2.1 下载插件
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
下载该插件后,将 rabbitmq_delayed_message_exchange-3.9.0.ez
包放到 RabbitMQ安装目录的plugins 目录下:
2.2.2 启用插件
执行控制台命令,重启rabbitmq服务:
1 | # 1.列出所有插件 |
在此之后,web界面的 exchanges 便可以创建type为 x-delayed-message
的交换机,或者在代码中声明该类型的交换机。要是用其延时功能,需要在发消息的时候加一个 header :x-delay=xxx
,表示延时xxx毫秒。
1 | // 声明延时交换机,type=x-delayed-message,x-delayed-type=direct|fanout|topic |
3、总结
上面我们提到了使用RabbitMQ实现延时队列功的方案:1)借助本事的死信队列实现,监听死信队列;2)借助插件实现。优缺点如下:
- 死信队列实现方式,需要在队列上设置消息过期时间,不灵活;需要再多用一个死信队列,占用空间;rabbitmq本事自带死信队列,实现方便。
- 插件实现方式,需要下载安装插件,要考虑版本兼容性;代码逻辑简单,容易上手。
回到我们开头的需求:订单支付超时自动取消。这个功能主要就是需要一个延时队列,那通过rabbitmq实现延时队列只是一种方式,还可以通过其他方式实现,比如Java的 DelayQueue
、Quartz定时任务
、Redis的zset
、时间轮
等都可以实现,具体方案还是要结合项目和具体方式的优缺点来选择。比如项目中使用到了RabbitMQ,那使用RabbitMQ实现延迟队列就是比较好的方式,那具体选择插件方式还是死信队列方式,还需要看项目中对该功能的灵活程度来选择。
参考:
- https://www.rabbitmq.com/dlx.html
- https://www.cnblogs.com/williamwsj/p/8108970.html
- https://www.jianshu.com/p/256d2eaf1786
- https://www.rabbitmq.com/community-plugins.html
- https://blog.csdn.net/zhenghongcs/article/details/106700446
最后:因小的才疏学浅,如有问题,请不吝指出,感谢感谢~