Rabbitmq中的死信队列
创始人
2024-11-04 18:10:12
0

背景

        RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。

原理

        死信队列和普通队列区别不是很大

        普通与死信队列都有自己独立的交换机和路由key、队列和消费者。

区别:

        1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到

普通队列中缓存起来,普通队列对应有自己独立普通消费者。

        2.如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费

的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机

对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。

普通队列中,普通队列发现该消息一直没有被消费者消费

的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机

对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。

产生死信队列的原因

工具类:

public class RabbitMqUtils {     //得到一个连接的 channel     public static Channel getChannel() throws Exception{         //创建一个连接工厂         ConnectionFactory factory = new ConnectionFactory();         factory.setHost("127.0.0.1");         factory.setUsername("guest");         factory.setPassword("guest");         //创建连接         Connection connection = factory.newConnection();         //创建通道         Channel channel = connection.createChannel();         return channel;     } }

  1. 消息投递到MQ中存放 消息已经过期  消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。

消息 TTL 过期

生产者代码:

public class Producer {     private static final String NORMAL_EXCHANGE = "normal_exchange";     public static void main(String[] argv) throws Exception {         try (Channel channel = RabbitMqUtils.getChannel()) {             channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);             //设置消息的 TTL 时间             AMQP.BasicProperties properties = new                     AMQP.BasicProperties().builder().expiration("10000").build();             //该信息是用作演示队列个数限制             for (int i = 1; i <11 ; i++) {                 String message="info"+i;                 channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,                         message.getBytes());                 System.out.println("生产者发送消息:"+message);             }         }     } }

消费端:

普通消费:

public class Consumer01 {     //普通交换机名称     private static final String NORMAL_EXCHANGE = "normal_exchange";     //死信交换机名称     private static final String DEAD_EXCHANGE = "dead_exchange";     public static void main(String[] argv) throws Exception {         Channel channel = RabbitMqUtils.getChannel();         //声明死信和普通交换机 类型为 direct         channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);         //声明死信队列         String deadQueue = "dead-queue";         channel.queueDeclare(deadQueue, false, false, false, null);         //死信队列绑定死信交换机与 routingkey         channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");         //正常队列绑定死信队列信息         Map params = new HashMap<>();         //正常队列设置死信交换机 参数 key 是固定值         params.put("x-dead-letter-exchange", DEAD_EXCHANGE);         //正常队列设置死信 routing-key 参数 key 是固定值         params.put("x-dead-letter-routing-key", "lisi");          String normalQueue = "normal-queue";         channel.queueDeclare(normalQueue, false, false, false, params);         channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");         System.out.println("等待接收消息.....");         DeliverCallback deliverCallback = (consumerTag, delivery) -> {             String message = new String(delivery.getBody(), "UTF-8");             System.out.println("Consumer01 接收到消息"+message);         };         channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {         });     } }

死信消费:

public class Consumer02 {     private static final String DEAD_EXCHANGE = "dead_exchange";     public static void main(String[] argv) throws Exception {         Channel channel = RabbitMqUtils.getChannel();         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);         String deadQueue = "dead-queue";         channel.queueDeclare(deadQueue, false, false, false, null);         channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");         System.out.println("等待接收死信队列消息.....");         DeliverCallback deliverCallback = (consumerTag, delivery) -> {             String message = new String(delivery.getBody(), "UTF-8");             System.out.println("Consumer02 接收死信队列的消息" + message);         };         channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {         });     } }

运行结果查询:

  1.启动消费端->启动死信队列->启动生产者 查看结果

  2.关闭消费端->启动生产者 再次查看结果    结果原因:消费端队列关闭,消息时间过期会进入死信队列

2. 队列达到最大的长度 (队列容器已经满了)

生产者代码:

public class Producer {     private static final String NORMAL_EXCHANGE = "normal_exchange";     public static void main(String[] argv) throws Exception {         try (Channel channel = RabbitMqUtils.getChannel()) {             channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);              //该信息是用作演示队列个数限制             for (int i = 1; i <11 ; i++) {                 String message="info"+i;                 channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null,                         message.getBytes());                 System.out.println("生产者发送消息:"+message);             }         }     } }

消费端:

普通消费:

public class Consumer01 {     //普通交换机名称     private static final String NORMAL_EXCHANGE = "normal_exchange";     //死信交换机名称     private static final String DEAD_EXCHANGE = "dead_exchange";     public static void main(String[] argv) throws Exception {         Channel channel = RabbitMqUtils.getChannel();         //声明死信和普通交换机 类型为 direct         channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);         //声明死信队列         String deadQueue = "dead-queue";         channel.queueDeclare(deadQueue, false, false, false, null);         //死信队列绑定死信交换机与 routingkey         channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");         //正常队列绑定死信队列信息         Map params = new HashMap<>();         //正常队列设置死信交换机 参数 key 是固定值         params.put("x-dead-letter-exchange", DEAD_EXCHANGE);         //正常队列设置死信 routing-key 参数 key 是固定值         params.put("x-dead-letter-routing-key", "lisi");         params.put("x-max-length",6); //设置队列长度为6          String normalQueue = "normal-queue";         channel.queueDeclare(normalQueue, false, false, false, params);         channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");         System.out.println("等待接收消息.....");         DeliverCallback deliverCallback = (consumerTag, delivery) -> {             String message = new String(delivery.getBody(), "UTF-8");             System.out.println("Consumer01 接收到消息"+message);         };         channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {         });     } } 

死信消费:

public class Consumer02 {     private static final String DEAD_EXCHANGE = "dead_exchange";     public static void main(String[] argv) throws Exception {         Channel channel = RabbitMqUtils.getChannel();         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);         String deadQueue = "dead-queue";         channel.queueDeclare(deadQueue, false, false, false, null);         channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");         System.out.println("等待接收死信队列消息.....");         DeliverCallback deliverCallback = (consumerTag, delivery) -> {             String message = new String(delivery.getBody(), "UTF-8");             System.out.println("Consumer02 接收死信队列的消息" + message);         };         channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {         });     } }

运行结果查询:

  1.启动消费端->启动死信队列->启动生产者 查看结果

         结果原因:消费端队列只能存储6条消息,剩下的消息进入死信队列

3. 消费者消费多次消息失败,就会转移存放到死信队列中

注意:与其他不同的是,需要开启手动应答 

生产者代码:

public class Producer {     private static final String NORMAL_EXCHANGE = "normal_exchange";     public static void main(String[] argv) throws Exception {         try (Channel channel = RabbitMqUtils.getChannel()) {             channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);             //该信息是用作演示队列个数限制             for (int i = 1; i <11 ; i++) {                 String message="info"+i;                 channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null,                         message.getBytes());                 System.out.println("生产者发送消息:"+message);             }         }     } }

消费端:

普通消费:

public class Consumer01 {     //普通交换机名称     private static final String NORMAL_EXCHANGE = "normal_exchange";     //死信交换机名称     private static final String DEAD_EXCHANGE = "dead_exchange";     public static void main(String[] argv) throws Exception {         Channel channel = RabbitMqUtils.getChannel();         //声明死信和普通交换机 类型为 direct         channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);         //声明死信队列         String deadQueue = "dead-queue";         channel.queueDeclare(deadQueue, false, false, false, null);         //死信队列绑定死信交换机与 routingkey         channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");         //正常队列绑定死信队列信息         Map params = new HashMap<>();         //正常队列设置死信交换机 参数 key 是固定值         params.put("x-dead-letter-exchange", DEAD_EXCHANGE);         //正常队列设置死信 routing-key 参数 key 是固定值         params.put("x-dead-letter-routing-key", "lisi");          String normalQueue = "normal-queue";         channel.queueDeclare(normalQueue, false, false, false, params);         channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");         System.out.println("等待接收消息.....");         DeliverCallback deliverCallback = (consumerTag, delivery) -> {             String message = new String(delivery.getBody(), "UTF-8");             if(message.equals("info5")){                 System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");                 //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中                 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);             }else {                 System.out.println("Consumer01 接收到消息"+message);                 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);             }         };         boolean autoAck = false;         channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {         });     } }

死信消费:

public class Consumer02 {     private static final String DEAD_EXCHANGE = "dead_exchange";     public static void main(String[] argv) throws Exception {         Channel channel = RabbitMqUtils.getChannel();         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);         String deadQueue = "dead-queue";         channel.queueDeclare(deadQueue, false, false, false, null);         channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");         System.out.println("等待接收死信队列消息.....");         DeliverCallback deliverCallback = (consumerTag, delivery) -> {             String message = new String(delivery.getBody(), "UTF-8");             System.out.println("Consumer02 接收死信队列的消息" + message);         };         channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {         });     } }

运行结果查询:

  1.启动消费端->启动死信队列->启动生产者 查看结果

         结果原因:消费端队列消息中为info5时,消费端拒绝签收,只能进入死信队列

相关内容

热门资讯

两秒就懂!微信里面拼三张房卡哪... 微信游戏中心:拼三张房卡,添加微信【71319951】,进入游戏中心或相关小程序,搜索“微信拼三张房...
秒懂教程“金花房卡在哪有这么购... 长虹大厅是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:44346008许多玩家在游戏中会购买房卡来...
安卓系统的防火墙,保障安全的智... 你知道吗?在智能手机的世界里,安卓系统就像是个大花园,里面种满了各种各样的应用。但是,花园虽美,安全...
两秒就懂!微信里面炸金花房卡在... 微信游戏中心:炸金花房卡,添加微信【56001354】,进入游戏中心或相关小程序,搜索“微信炸金花房...
一分钟了解“牛牛链接房卡那里有... 九尾大厅是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房卡来...
两秒就懂!微信牛牛房卡如何购买... 微信游戏中心:牛牛房卡,添加微信【66336574】,进入游戏中心或相关小程序,搜索“微信牛牛房卡”...
终于找到“微信链接牛牛群房卡怎... 新二号牛牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:160470940许多玩家在游戏中会购买房...
两秒就懂!怎么创建牛牛房间房卡... 微信游戏中心:牛牛房卡,添加微信【71319951】,进入游戏中心或相关小程序,搜索“微信牛牛房卡”...
终于找到“金花链接房卡在哪里弄... 人皇大厅是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:15984933许多玩家在游戏中会购买房卡来...
两秒就懂!炸金花房卡如何充值,... 微信游戏中心:炸金花房卡,添加微信【56001354】,进入游戏中心或相关小程序,搜索“微信炸金花房...
给大家讲解“金花房卡在哪有这么... 悠悠大厅是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:160470940许多玩家在游戏中会购买房卡...
安卓系统应用程序更新,新功能与... 你有没有发现,你的安卓手机最近是不是总在提醒你更新应用程序呢?这可不是什么坏事哦,今天就来聊聊这个话...
两秒就懂!创建斗牛链接房间房卡... 微信游戏中心:斗牛房卡,添加微信【66336574】,进入游戏中心或相关小程序,搜索“微信斗牛房卡”...
ia实测“炸金花房卡专卖店联系... 新上游牛牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房卡...
两秒就懂!炸金花房卡如何购买,... 微信游戏中心:炸金花房卡,添加微信【71319951】,进入游戏中心或相关小程序,搜索“微信炸金花房...
终于找到“微信牛牛房卡链接在哪... 海贝之城是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:44346008许多玩家在游戏中会购买房卡来...
ia实测“哪里有卖微信炸金花房... 微信炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房卡...
两秒就懂!拼三张微信链接房卡,... 微信游戏中心:拼三张房卡,添加微信【56001354】,进入游戏中心或相关小程序,搜索“微信拼三张房...
微信牛牛房卡客服微信/购买金花... 牛牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:160470940许多玩家在游戏中会购买房卡来享...
两秒就懂!微信牛牛房卡专卖店联... 微信游戏中心:斗牛房卡,添加微信【66336574】,进入游戏中心或相关小程序,搜索“微信斗牛房卡”...