RabbitMQ+SpringBoot整合

RabbitMQ+SpringBoot整合

RabbitMQ+SpringBoot整合

Exchange类型

  1. fanoutFanout-Exchange会将它接收到的消息发往所有与他绑定的Queue中。
  2. directDirect-Exchange会把它接收到的消息发往与它有绑定关系且Routingkey完全匹配的Queue中(默认)。
  3. topicTopic-Exchange与Direct-Exchange相似,不过Topic-Exchange不需要全匹配,可以部分匹配,它约定:Routingkey为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。
  4. headerHeader-Exchange不依赖于RoutingKey或绑定关系来分发消息,而是根据发送的消息内容中的headers属性进行匹配。此模式已经不再使用,本文中也不会去讲,大家知道即可。

1. Fanout-Exchange

Fanout-Exchange又称扇形交换机

ExchangeQueue建立一个绑定关系,Exchange会分发给所有和它有绑定关系的Queue中,绑定了十个Queue就把消息复制十份进行分发。

代码实现

String QUEUE_FANOUT1 = "queue.fanout1";

String QUEUE_FANOUT2 = "queue.fanout2";

String FANOUT_EXCHANG = "fanoutExchange";
@Configuration
public class RabbitMqConfig {

    @Bean
    public Queue fanout1() {
        return new Queue(QUEUE_FANOUT1);
    }

    @Bean
    public Queue fanout2() {
        return new Queue(QUEUE_FANOUT2);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        // 三个构造参数:name durable autoDelete
        return new FanoutExchange(FANOUT_EXCHANG, false, false);
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(fanout1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(fanout2()).to(fanoutExchange());
    }
}

新建了两个演示用的队列,然后建了一个FanoutExchange,最后给他们都设置上绑定关系,这样一组队列和交换机的绑定设置就算完成了。

消息生产者

@Override
public void senFanoutExchange(String msg) {
    rabbitTemplate.convertAndSend(FANOUT_EXCHANG,null,msg);
    System.out.println("消息发送完毕。");
}

消息消费者

@Slf4j
@Component
public class FanoutExchangeCustomer {
    @RabbitListener(queues =QUEUE_FANOUT1 )
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

    @RabbitListener(queues = QUEUE_FANOUT2)
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

}

2. Direct-Exchange

Direct-Exchange是一种精准匹配的交换机

将queue绑定到不同routingKey,然后发送消息指定routingKey,就由绑定的queue消费

代码实现

String DIRECT_QUEUE1 = "queue.direct1";

String DIRECT_QUEUE2 = "queue.direct2";

String DIRECT_EXCHANGE = "directExchange";

String ROUTING_KEY_ERROR = "error";

String ROUTING_KEY_INFO = "info";

RabbitMqConfig 添加

@Bean
public Queue directQueue1() {
    return new Queue(DIRECT_QUEUE1);
}

@Bean
public Queue directQueue2() {
    return new Queue(DIRECT_QUEUE2);
}

@Bean
public DirectExchange directExchange() {
    // 三个构造参数:name durable autoDelete
    return new DirectExchange(DIRECT_EXCHANGE, false, false);
}

/**
 *     绑定到routingKey
 */
@Bean
public Binding directBinding1() {
    return BindingBuilder.bind(directQueue1()).to(directExchange()).with(ROUTING_KEY_INFO);
}

/**
 *     绑定到routingKey
 */
@Bean
public Binding directBinding2() {
    return BindingBuilder.bind(directQueue2()).to(directExchange()).with(ROUTING_KEY_ERROR);
}

生产者

    @Override
    public void sendDirect(String msg,String routingKey) {
        rabbitTemplate.convertAndSend(DIRECT_EXCHANGE,routingKey,msg);
    }
}

消费者

@Slf4j
@Component
public class RabbitDirectConsumer {
    @RabbitListener(queues = DIRECT_QUEUE1)
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

    @RabbitListener(queues = DIRECT_QUEUE2)
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

}

3. Topic-Exchange

Topic-Exchange是直接交换机的模糊匹配版本,Topic类型的交换器,支持使用"*"和"#"通配符定义模糊bindingKey,然后按照routingKey进行模糊匹配队列进行分发。

  • *:能够模糊匹配一个单词。
  • #:能够模糊匹配零个或多个单词。

因为加入了两个通配定义符,所以Topic交换机的routingKey也有些变化,routingKey可以使用.将单词分开。

String TOPIC_FANOUT1 = "topic.fanout1";

String TOPIC_FANOUT2 = "topic.fanout2";

String TOPIC_EXCHANG = "topicExchange";

String ROUTING_KEY_ERROR_ONE = "log.error.*";

String ROUTING_KEY_INFO_ALL = "log.info.#";

RabbitMqConfig

// 主题交换机示例
@Bean
public Queue topicQueue1() {
    return new Queue(TOPIC_FANOUT1);
}

@Bean
public Queue topicQueue2() {
    return new Queue(TOPIC_FANOUT2);
}

@Bean
public TopicExchange topicExchange() {
    // 三个构造参数:name durable autoDelete
    return new TopicExchange(TOPIC_EXCHANG, false, false);
}

@Bean
public Binding topicBinding1() {
    return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(ROUTING_KEY_INFO_ALL);
}

@Bean
public Binding topicBinding2() {
    return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(ROUTING_KEY_ERROR_ONE);
}

生产者

@Override
public void sendTopic(String msg,String routingKey) {
    rabbitTemplate.convertAndSend(TOPIC_EXCHANG,routingKey,msg);
}

消费者

@Slf4j
@Component
public class RabbitTopicConsumer {
    @RabbitListener(queues = TOPIC_FANOUT1)
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

    @RabbitListener(queues = TOPIC_FANOUT2)
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

}

4. 注解方式

使用注解方式实现 topic

String BROADCAST_TOPIC = "topic.broadcast";

String FANOUT_TOPIC = "fanout";
@Component
public class FanoutCustomer {
  
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name= RabbitmqConstant.BROADCAST_TOPIC,type = RabbitmqConstant.FANOUT_TOPIC)
    ))
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, //创建临时队列
            exchange = @Exchange(name=RabbitmqConstant.BROADCAST_TOPIC,type = RabbitmqConstant.FANOUT_TOPIC)  //绑定交换机类型
    ))
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}

5.消息可靠性

  • 生产者发出后保证到达了MQ。
  • MQ收到消息保证分发到了消息对应的Exchange。
  • Exchange分发消息入队之后保证消息的持久性。
  • 消费者收到消息之后保证消息的正确消费。

经历了这四个保证,我们才能保证消息的可靠性,从而保证消息不会丢失。

1. 生产者发送消息到MQ失败

我们的生产者发送消息之后可能由于网络闪断等各种原因导致我们的消息并没有发送到MQ之中,但是这个时候我们生产端又不知道我们的消息没有发出去,这就会造成消息的丢失。

为了解决这个问题,RabbitMQ引入了事务机制发送方确认机制(publisher confirm),由于事务机制过于耗费性能所以一般不用,这里我着重讲述发送方确认机制

这个机制很好理解,就是消息发送到MQ那端之后,MQ会回一个确认收到的消息给我们

spring:
    application:
        name: spring-boot-rabbitmq
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        # 发送者开启 return 确认机制
        publisher-returns: true
        # 发送者开启 confirm 确认机制
        publisher-confirm-type: correlated
      
2. 消息入队之后MQ宕机

到这一步基本都是一些很小概率的问题了,比如MQ突然宕机了或者被关闭了,这种问题就必须要对消息做持久化,以便MQ重新启动之后消息还能重新恢复过来。

消息的持久化要做,但是不能只做消息的持久化,还要做队列的持久化和Exchange的持久化。

/**
 * value      队列名称
 * durable    是否持久化
 * exclusive  否为独占队列
 * autoDelete 是否自动删除
 */
@Bean
public Queue topicConfirmCallbackQueue() {
    // 其三个参数:durable exclusive autoDelete
    return new Queue(CONFIRM_CALLBACK_QUEUE,true);
}

/**
 * value      交换机名称
 * type       交换机类型,默认 direct
 * durable    是否持久化,默认 true
 * autoDelete 是否自动删除,默认 false
 * internal   是否为内部交换机,默认为 false
 */

@Bean
public DirectExchange topicConfirmCallbackExchange() {
    // 三个构造参数:name durable autoDelete
    return new DirectExchange(CONFIRM_CALLBACK, true, false);
}

@Bean
public Binding topicConfirmCallbackBinding() {
    return BindingBuilder.bind(topicConfirmCallbackQueue()).to(topicConfirmCallbackExchange()).with(ROUTING_KEY_INFO);
}
3. 消费者无法正常消费

最后一步会出问题的地方就在消费者端了, 就是消费者的消息确认。

spring:
    application:
        name: spring-boot-rabbitmq
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        # 发送者开启 return 确认机制
        publisher-returns: true
        # 发送者开启 confirm 确认机制
        publisher-confirm-type: correlated
        # 设置消费端手动 ack
        listener:
          simple:
              acknowledge-mode: manual
              # 是否支持重试
              retry:
                  enabled: true
4. 消费者收到消息之后保证消息的正确消费。

可以将消费失败的存入数据库或者重新消费

@Slf4j
@Component
public class ReceiverMessage {

    @RabbitListener(queues = CONFIRM_CALLBACK_QUEUE)
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("小富收到消息:{}", msg);
            //发生异常
            double a = 1/0;
            //TODO 具体业务

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {

            if (message.getMessageProperties().getRedelivered()) {

                log.error("消息已重复处理失败,拒绝再次接收...");

                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {

                log.error("消息即将再次返回队列处理...");

                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}
5.代码实现
String CONFIRM_CALLBACK = "topic.confirm.callback";

String CONFIRM_CALLBACK_QUEUE = "topic.confirm.callback.queue";
/**
 * value      队列名称
 * durable    是否持久化
 * exclusive  否为独占队列
 * autoDelete 是否自动删除
 */
@Bean
public Queue topicConfirmCallbackQueue() {
    // 其三个参数:durable exclusive autoDelete
    return new Queue(CONFIRM_CALLBACK_QUEUE,true);
}

/**
 * value      交换机名称
 * type       交换机类型,默认 direct
 * durable    是否持久化,默认 true
 * autoDelete 是否自动删除,默认 false
 * internal   是否为内部交换机,默认为 false
 */

@Bean
public DirectExchange topicConfirmCallbackExchange() {
    // 三个构造参数:name durable autoDelete
    return new DirectExchange(CONFIRM_CALLBACK, true, false);
}

@Bean
public Binding topicConfirmCallbackBinding() {
    return BindingBuilder.bind(topicConfirmCallbackQueue()).to(topicConfirmCallbackExchange()).with(ROUTING_KEY_INFO);
}

生产者


    private final ConfirmCallbackService confirmCallbackService;

    private final ReturnCallbackService returnCallbackService;

@Override
public void confirmCallback(String msg,String routingKey) {
    /**
     * 确保消息发送失败后可以重新返回到队列中
     * 注意:yml需要配置 publisher-returns: true
     */
    rabbitTemplate.setMandatory(true);

    /**
     * 消费者确认收到消息后,手动ack回执回调处理
     */
    rabbitTemplate.setConfirmCallback(confirmCallbackService);

    /**
     * 消息投递到队列失败回调处理
     */
    rabbitTemplate.setReturnCallback(returnCallbackService);

    /**
     * 发送消息
     */
    rabbitTemplate.convertAndSend(RabbitmqConstant.CONFIRM_CALLBACK,routingKey, msg,
            message -> {
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return message;
            },
            new CorrelationData(UUID.randomUUID().toString()));

}
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            log.error("消息发送异常!");
        } else {
            log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }
}
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}

消费者

@Slf4j
@Component
public class ReceiverMessage {

    @RabbitListener(queues = CONFIRM_CALLBACK_QUEUE)
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("小富收到消息:{}", msg);
            //发生异常
            double a = 1/0;
            //TODO 具体业务

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {

            if (message.getMessageProperties().getRedelivered()) {

                log.error("消息已重复处理失败,拒绝再次接收...");

                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {

                log.error("消息即将再次返回队列处理...");

                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

6. 死信队列

1.什么是死信队列

死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。那么死信是个什么东西呢?

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  1. 消息被否定确认,使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

2.如何配置死信队列
  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列
	  // RabbitmqConstant添加
    String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
    String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
    String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
    String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
    String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
    String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
    String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
    String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";

		// RabbitMqConfig 添加

    // 声明业务Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange() {
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 声明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 声明业务队列A
    @Bean("businessQueueA")
    public Queue businessQueueA() {
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key 这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
    }

    // 声明业务队列B
    @Bean("businessQueueB")
    public Queue businessQueueB() {
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key 这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
    }

    // 声明死信队列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA() {
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }

    // 声明死信队列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB() {
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }

    // 声明业务队列A绑定关系
    @Bean
    public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    // 声明业务队列B绑定关系
    @Bean
    public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    // 声明死信队列A绑定关系
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

    // 声明死信队列B绑定关系
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }

消费者

@Slf4j
@Component
public class BusinessMessageReceiver {
    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到业务消息A:{}", msg);
        boolean ack = true;
        Exception exception = null;
        try {
            if (msg.contains("deadletter")) {
                throw new RuntimeException("dead letter exception");
            }
        } catch (Exception e) {
            ack = false;
            exception = e;
        }
        if (!ack) {
            log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到业务消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}
@Component
public class DeadLetterMessageReceiver {

    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息A:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

这里记得将default-requeue-rejected属性设置为false。

rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 发送者开启 return 确认机制
    publisher-returns: true
    # 发送者开启 confirm 确认机制
    publisher-confirm-type: correlated
    # 设置消费端手动 ack
    listener:
      simple:
          default-requeue-rejected: false
          acknowledge-mode: manual
          # 是否支持重试
          retry:
              enabled: true

生产者

		@GetMapping("sendmsg") public void sendMsg(String msg){ sendService.sendMsg(msg); }

	
    /**
     * @param msg
     * @return void
     * @description 死信队列
     * @author yz
     * @date 2021/10/18 11:26 下午
     * @method sendMsg
     */
    void sendMsg(String msg);


    @Override
    public void sendMsg(String msg) {
        rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
    }

调用

### 正常
GET http://localhost:8080/sendmsg?msg=hello word

日志

2021-10-19 00:19:49.691  INFO 1847 --- [nio-8080-exec-3] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-xnAuIjQEQI6V4AI6oLgmag identity=d2235db] started
收到业务消息B:hello word
2021-10-19 00:19:49.697  INFO 1847 --- [ntContainer#1-1] c.e.s.consumer.BusinessMessageReceiver   : 收到业务消息A:hello word

表示两个Consumer都正常收到了消息。这代表正常消费的消息,ack后正常返回。然后我们再来测试nck的消息。

### 失败
GET http://localhost:8080/sendmsg?msg=deadletter

日志

收到业务消息B:deadletter
收到业务消息A:deadletter
消息消费发生异常,error msg:dead letter exception
java.lang.RuntimeException: dead letter exception
收到死信消息A:deadletter

可以看到,死信队列的Consumer接受到了这个消息,所以流程到此为止就打通了。

3.死信消息的变化

那么“死信”被丢到死信队列中后,会发生什么变化呢?

如果队列配置了参数 x-dead-letter-routing-key 的话,“死信”的路由key将会被替换成该参数对应的值。如果没有设置,则保留该消息原有的路由key。

举个栗子:

如果原有消息的路由key是testA,被发送到业务Exchage中,然后被投递到业务队列QueueA中,如果该队列没有配置参数x-dead-letter-routing-key,则该消息成为死信后,将保留原有的路由keytestA,如果配置了该参数,并且值设置为testB,那么该消息成为死信后,路由key将会被替换为testB,然后被抛到死信交换机中。

另外,由于被抛到了死信交换机,所以消息的Exchange Name也会被替换为死信交换机的名称。

消息的Header中,也会添加很多奇奇怪怪的字段,修改一下上面的代码,在死信队列的消费者中添加一行日志输出:

log.info("死信消息properties:{}", message.getMessageProperties());

Header中看起来有很多信息,实际上并不多,只是值比较长而已。下面就简单说明一下Header中的值:

字段名 含义
x-first-death-exchange 第一次被抛入的死信交换机的名称
x-first-death-reason 第一次成为死信的原因,rejected:消息在重新进入队列时被队列拒绝,由于default-requeue-rejected 参数被设置为falseexpired :消息过期。maxlen : 队列内消息数量超过队列最大容量
x-first-death-queue 第一次成为死信前所在队列名称
x-death 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新
4. 死信队列应用场景

通过上面的信息,我们已经知道如何使用死信队列了,那么死信队列一般在什么场景下使用呢?

一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息(没错,以前就是这么干的= =)。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。

7.延迟队列

那么什么时候需要用延时队列呢?考虑一下以下场景:

  1. 订单在十分钟之内未支付则自动取消。
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 账单在一周内未支付,则自动结算。
  4. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  6. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
1.rabbitmq安装插件实现延时队列

下载rabbitmq_delayed_message_exchange插件,下载前请确认自己的 RabbitMQ 版本,下载对应版本的插件。

2.安装插件
#拿到CONTAINER ID
docker ps
#宿主机上传文件到容器
docker cp /Users/yz/Downloads/rabbitmq_delayed_message_exchange-3.8.0.ez 7bc42bc3bf07:/plugins
#进入容器
docker exec -it 7bc42bc3bf07 bash
#进入文件夹
cd plugins
#安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@7bc42bc3bf07:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@7bc42bc3bf07...
Plugin configuration unchanged.
#退出
exit
#重启
docker restart 7bc42bc3bf07

容器启动成功之后,登录 RabbitMQ的管理界面,找到Exchanges Tab页。点击 add a new...,在 Type 里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。

3.代码实现
// RabbitmqConstant 添加
String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue";
String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange";
String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey";
// RabbitMqConfig 添加
@Bean
public Queue immediateQueue() {
    return new Queue(DELAYED_QUEUE_NAME);
}

@Bean
public CustomExchange customExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}

@Bean
public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,
                             @Qualifier("customExchange") CustomExchange customExchange) {
    return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
}
// 消息生产者
@GetMapping("delayMsg")
public void delayMsg2(String msg, Integer delayTime) {
    log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), msg, delayTime);
    sendService.sendDelayMsg(msg, delayTime);
}

  /**
     * @description 延时队列
     * @author yz
     * @date 2021/10/27 12:37 上午
     * @method  sendDelayMsg
     * @param msg
    * @param delayTime
     * @return void
     */
    void sendDelayMsg(String msg, Integer delayTime);
    
    
    
    @Override
    public void sendDelayMsg(String msg, Integer delayTime) {
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
            a.getMessageProperties().setDelay(delayTime);
            return a;
        });
    }
//消息消费者
@Component
@Slf4j
public class DelayMessageReceiver {
    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},延时队列收到消息:{}", new Date(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

测试

### 20秒
http://localhost:8080/delayMsg?msg=msg1&delayTime=20000

### 2秒
http://localhost:8080/delayMsg?msg=msg2&delayTime=2000


日志

2021-10-27 00:41:25.250  INFO 1988 --- [nio-8080-exec-5] c.e.s.controller.SenController           : 当前时间:Wed Oct 27 00:41:25 CST 2021,收到请求,msg:msg1,delayTime:20000
2021-10-27 00:41:45.244  INFO 1988 --- [ntContainer#4-1] c.e.s.consumer.DelayMessageReceiver      : 当前时间:Wed Oct 27 00:41:45 CST 2021,延时队列收到消息:msg1
2021-10-27 00:41:50.399  INFO 1988 --- [nio-8080-exec-7] c.e.s.controller.SenController           : 当前时间:Wed Oct 27 00:41:50 CST 2021,收到请求,msg:msg2,delayTime:2000
2021-10-27 00:41:52.421  INFO 1988 --- [ntContainer#4-1] c.e.s.consumer.DelayMessageReceiver      : 当前时间:Wed Oct 27 00:41:52 CST 2021,延时队列收到消息:msg2