专注Java教育14年 全国咨询/投诉热线:400-8080-105
动力节点LOGO图
始于2009,口口相传的Java黄埔军校
首页 hot资讯 使用RabbitMQ配置死信队列

使用RabbitMQ配置死信队列

更新时间:2022-05-10 10:18:20 来源:动力节点 浏览2756次

当发生以下情况之一时,来自消息队列的可能是“死信”:

消息被拒绝并且重新排队设置为 false

消息的 TTL 过期

超出队列长度限制

为了通过示例进行演示,我选择了第一种情况,即消息被拒绝。生产者将PaymentOrders作为消息发送,这些消息将由消费者处理。当PaymentOrder付款人账户资金不足时,消息将被拒绝。

生产者

生产者是一个 Spring Boot 应用程序,它使用Spring AMQP库向PaymentOrderRabbitMQ 发送消息。

生产者的 API

生产者 API 的第一部分是定义交换器的名称、路由密钥、传入和死信队列。

public class Constants {
    public static final String EXCHANGE_NAME = "payment-orders.exchange";
    public static final String ROUTING_KEY_NAME = "payment-orders";
    public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
    public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
}

第二部分是定义消息格式。我们在此示例中使用 JSON。以下 JSON 文档显示了我们如何建模PaymentOrder

{
  "from":"SA54 22PS JCLV 7LWT 7LHY EBLO",
  "to":"IT23 K545 5414 339G WLPI 2YF6 VBP",
  "amount":54.75
}

请注意,最好不要使用自定义序列化格式(如有效负载的 Java 序列化),因为这意味着您需要有一个基于 Java 的使用者。好的做法是将有效负载格式化为 JSON。每个平台和/或语言都可以解析 JSON。

生产者配置

我们需要配置 AMQP 基础设施。死信队列配置封装在传入队列声明中。

有一个死信交换direct(DLX) 的概念,它是类型topic或的正常交换fanout。如果在处理从队列中获取的消息期间发生故障,RabbitMQ 会检查是否为该队列配置了死信交换。如果通过x-dead-letter-exchange参数配置了一个,那么它将使用原始路由密钥将失败的消息路由到它。可以通过x-dead-letter-routing-key参数覆盖此路由键。

在此示例中,我们使用default exchange(no-name) 作为 the dead letter exchange,并使用死信队列名称作为新的路由键。这将起作用,因为任何队列都绑定到默认交换,绑定键等于队列名称。

@Configuration
public class AmqpConfig {
    @Bean
    DirectExchange exchange() {
        return new DirectExchange(Constants.EXCHANGE_NAME);
    }
    @Bean
    Queue incomingQueue() {
        return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
                .build();
    }
    @Bean
    Binding binding() {
        return BindingBuilder.bind(incomingQueue()).to(exchange()).with(Constants.ROUTING_KEY_NAME);
    }
    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(Constants.DEAD_LETTER_QUEUE_NAME).build();
    }
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

用于队列和交换的构建器 API 非常方便,并且从 Spring AMQP 库的 1.6 版本开始可用。

在 RabbitMQ 管理控制台中,DLX和DLK标签指示在传入队列上设置了dead letter exchange和dead letter routing key参数。

生产者逻辑

生产者每 5 秒生成一次随机PaymentOrder消息,这些消息被发送到 RabbitMQ 进行进一步处理。SpringAmqpTemplate是自动配置的,它可以连接到我们的组件中。由于消息格式是 JSON ,Jackson2JsonMessageConverter因此定义了它将自动关联到 auto-configured AmqpTemplate。

@Component
public class Producer {
    private AmqpTemplate amqpTemplate;
    public Producer(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }
    @Scheduled(fixedDelay = 1000L)
    public void send() {
        PaymentOrder paymentOrder = new PaymentOrder(
                Iban.random().toFormattedString(),
                Iban.random().toFormattedString(),
                new BigDecimal(1D + new Random().nextDouble() * 100D).setScale(2, BigDecimal.ROUND_FLOOR));
        amqpTemplate.convertAndSend(Constants.EXCHANGE_NAME, Constants.ROUTING_KEY_NAME, paymentOrder);
    }
}

消费者

对于这个简单的示例,消费者也是一个 Spring Boot 应用程序,但在实际应用程序中,消费者和生产者不必在同一平台/语言上。

消费者 API

消费者 API 的第一部分是指定它连接到哪个队列。

public class Constants {
    public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
    public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
}

第二部分是适应生产者定义的消息格式。请注意,在这种情况下,两个应用程序都是基于 Java 的,因此我可以创建一个包含PaymentOrder类文件的 jar 文件并与消费者和生产者共享它。然而,这是不好的做法,因为它引入了基于共享库的紧密耦合。更好的方法是使用一些代码重复(PaymentOrder在这种情况下为类)并通过同意消息格式来使用更松散的耦合方法。

public class PaymentOrder {
    String from;
    String to;
    BigDecimal amount;
    @JsonCreator
    public PaymentOrder(@JsonProperty("from") String from,
                        @JsonProperty("to") String to,
                        @JsonProperty("amount") BigDecimal amount) {
        this.from = from;
        this.to = to;
        this.amount = amount;
    }
    // getters and toString()
}

消费者配置

消费者只关心从中获取消息的队列。传入队列必须存在,否则消费者将无法启动。请注意,dead letter queue消费者启动时不必存在 ,但在消息需要“死信”时它应该存在。如果它丢失,则消息将被静默丢弃。

@Configuration
public class AmqpConfig {
    @Bean
    Queue incomingQueue() {
        return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
                .build();
    }
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

默认情况下启用重新排队。为了“死信”消息,您需要将以下属性设置为 false。

spring:
  rabbitmq:
    listener:
      default-requeue-rejected: false

但是,如果您想在某些错误情况下启用重新排队,最好保持启用重新排队并利用AmqpRejectAndDontRequeueException将发送basic.reject带有 requeue=false 的选项。

消费逻辑

每当传入队列上有消息可用时,将使用反序列化的实例process调用该方法。在这里,我们通过抛出一个扩展异常PaymentOrder来模拟消息拒绝。InsufficientFundsExceptionAmqpRejectAndDontRequeueException

@Component
public class Consumer {
    @RabbitListener(queues = Constants.INCOMING_QUEUE_NAME)
    public void process(@Payload PaymentOrder paymentOrder) throws InsufficientFundsException {
        if (new Random().nextBoolean()) {
            throw new InsufficientFundsException("insufficient funds on account " + paymentOrder.getFrom());
        }
    }
}

下图显示了一条消息的示例,该PaymentOrder消息被拒绝并最终进入dead letter queue

有时它有助于自动重试失败的操作,以防它可能在后续尝试中成功。RetryTemplateSpring AMQP 库在Spring Retry项目(从 Spring Batch 中提取)的帮助下提供了对此的支持。Spring Boot 使配置变得非常容易,RetryTemplate如下面的示例所示。

spring:
  rabbitmq:
    listener:
      retry:
        enabled: true
        initial-interval: 2000
        max-attempts: 2
        multiplier: 1.5
        max-interval: 5000

使用上述配置,重试功能已启用(默认情况下禁用),最多应有 2 次尝试传递消息,第一次和第二次尝试之间应为 2 秒,稍后与上一次重试间隔乘以 1.5 和最多 5 秒。运行您将在日志中看到的消费者

2016-09-07 21:56:53.396  INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer            : Processing at 'Wed Sep 07 21:56:53 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'
2016-09-07 21:56:55.399  INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer            : Processing at 'Wed Sep 07 21:56:55 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'
2016-09-07 21:56:55.401  WARN 11995 --- [cTaskExecutor-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'{"from":"RS32 5346 0536 6006 4886 88","to":"FI61 8364 3364 9834 16","amount":45.57}' MessageProperties [headers={__TypeId__=com.example.producer.api.PaymentOrder}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=payment-orders.exchange, receivedRoutingKey=payment-orders, receivedDelay=null, deliveryTag=31, messageCount=0, consumerTag=amq.ctag-vd18OXS9PSOeJmBQLY4o-w, consumerQueue=payment-orders.incoming.queue])

结论

如您所见,使用 RabbitMQ 配置死信队列非常简单。如果大家想了解更多相关知识,不妨来关注一下动力节点的RabbitMQ教程,里面有更丰富的知识等着大家去学习,希望对大家能够有所帮助。

提交申请后,顾问老师会电话与您沟通安排学习

免费课程推荐 >>
技术文档推荐 >>