基于Springboot跟rabbitmq实现的死信队列
技术:springboot2.4.5 + rabbitmq3.7 + maven3.0.5
概述
RabbitMQ是流行的开源消息队列系统,使用erlang语言开发。为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。但由于对死信队列的概念及配置不熟悉,导致曾一度陷入百度的汪洋大海,无法自拔,很多文章都看起来可行,但是实际上却并不能帮我解决实际问题。最终,在官网文档中找到了我想要的答案,通过官网文档的学习,才发现对于死信队列存在一些误解,导致配置死信队列之路困难重重。
详细
一、运行效果
二、实现过程
①、先创建一个Springboot项目。然后在pom文件中添加 spring-boot-starter-amqp
和 spring-boot-starter-web
的依赖,接下来创建一个Config类,这里是关键:
package com.zyf.rabbitmqdeadletterdemo.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.dead.letter.business.exchange"; public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.dead.letter.business.queueA"; public static final String BUSINESS_QUEUEB_NAME = "rabbitmq.dead.letter.business.queueB"; public static final String DEAD_LETTER_EXCHANGE = "rabbitmq.dead.letter.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueA.routingkey"; public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueB.routingkey"; public static final String DEAD_LETTER_QUEUEA_NAME = "rabbitmq.dead.letter.deadletter.queueA"; public static final String DEAD_LETTER_QUEUEB_NAME = "rabbitmq.dead.letter.deadletter.queueB"; // 声明业务Exchange @Bean("businessExchange") public FanoutExchange businessExchange(){ return new FanoutExchange(BUSINESS_EXCHANGE_NAME); } // 声明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 声明业务队列A @Bean("businessQueueA") public Queue businessQueueA(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY); return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build(); } // 声明业务队列B @Bean("businessQueueB") public Queue businessQueueB(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY); return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build(); } // 声明死信队列A @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue(DEAD_LETTER_QUEUEA_NAME); } // 声明死信队列B @Bean("deadLetterQueueB") public Queue deadLetterQueueB(){ return new Queue(DEAD_LETTER_QUEUEB_NAME); } // 声明业务队列A绑定关系 @Bean public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } // 声明业务队列B绑定关系 @Bean public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } // 声明死信队列A绑定关系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } // 声明死信队列B绑定关系 @Bean public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY); } }
②、接下来,是业务队列的消费代码:
@Slf4j@Componentpublic class BusinessMessageReceiver { @RabbitListener(queues = BUSINESS_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("收到业务消息A:{}", msg); boolean ack = true; Exception exception = null; try { if (msg.contains("deadletter")){ throw new RuntimeException("dead letter exception"); } } catch (Exception e){ ack = false; exception = e; } if (!ack){ log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } else { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } @RabbitListener(queues = BUSINESS_QUEUEB_NAME) public void receiveB(Message message, Channel channel) throws IOException { System.out.println("收到业务消息B:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
③、然后配置死信队列的消费者:
@Componentpublic class DeadLetterMessageReceiver { @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { System.out.println("收到死信消息A:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME) public void receiveB(Message message, Channel channel) throws IOException { System.out.println("收到死信消息B:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
④、为了方便测试,写一个简单的消息生产者,并通过controller层来生产消息。
@Componentpublic class BusinessMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg){ rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg); } }
@RequestMapping("rabbitmq")@RestControllerpublic class RabbitMQMsgController { @Autowired private BusinessMessageSender sender; @RequestMapping("sendmsg") public void sendMsg(String msg){ sender.sendMsg(msg); } }
三、项目结构图
四、补充总结
死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。
总结一下死信消息的生命周期:
业务消息被投入业务队列
消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
被nck或reject的消息由RabbitMQ投递到死信交换机中
死信交换机将消息投入相应的死信队列
死信队列的消费者消费死信消息
死信消息是RabbitMQ为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,当你明白了这些之后,这些Exchange和Queue想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。
本实例支付的费用只是购买源码的费用,如有疑问欢迎在文末留言交流,如需作者在线代码指导、定制等,在作者开启付费服务后,可以点击“购买服务”进行实时联系,请知悉,谢谢
手机上随时阅读、收藏该文章 ?请扫下方二维码