消息队列|RabbitMQ——发布确认高级

## 发布确认高级

生产环境下发送异常问题,导致RabbitMQ重启,在重启器件生产者投递消息失败,导致消息丢失,需要手动处理。
1. SpringBoot版本发布确认
1.1 方案图 消息队列|RabbitMQ——发布确认高级
文章图片

当发送失败时,将消息存到缓存中,交换机接收到消息之后再从缓存中清除已经收到的消息。
1.2 实战 在上次博客的SpringBoot版本的基础上测试本次项目。
1.2.1 配置类
/** * @Description 发布确认绑定配置类 * @date 2022/3/13 9:24 */ @Configuration public class BingConf {public static final String CONFIRM_EXCHANGE = "confirm_exchange"; public static final String CONFIRM_QUEUE = "confirm_queue"; public static final String CONFIRM_ROTING_KEY = "confirm_routing_key"; @Bean("c_ex") public DirectExchange confirmChange(){ return new DirectExchange(CONFIRM_EXCHANGE); }@Bean("c_qu") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE).build(); }@Bean() public Binding bindingQueueAndExchange(@Qualifier("c_ex") DirectExchange exchange, @Qualifier("c_qu") Queue queue){ return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROTING_KEY); } }

配置文件新增:
# 发布到交换机触发回调 spring.rabbitmq.publisher-confirm-type=correlated

1.2.2 生产者控制器
/** * @Description 消费者控制类 * @date 2022/3/13 9:31 */ @Slf4j @RestController @RequestMapping("/confirm") public class SendMessageController {@Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/confirm/{message}") public void sendMsg(@PathVariable String message){ rabbitTemplate.convertAndSend(BingConf.CONFIRM_EXCHANGE, BingConf.CONFIRM_ROTING_KEY,message); log.info("消息内容:{}",message); } }

1.2.3 消费者监听
/** * @Description 接收消息 * @date 2022/3/13 9:36 */ @Slf4j @Component public class ConfirmConsumer {/** * 正常情况监听消息 * @param message */ @RabbitListener(queues = BingConf.CONFIRM_QUEUE) public void receiveConfirmMessage(Message message){ log.info("接收到消息:{}",new String(message.getBody(), StandardCharsets.UTF_8)); }}

1.2.4 异常接口回调
上方情况仅仅完成了对正常情况下消息的接收和发送,但出现异常时需要进行新的处理。
/** * @Description 消息处理回调接口 * @date 2022/3/13 9:45 */ @Slf4j @Component public class CallBackConf implements RabbitTemplate.ConfirmCallback {@Autowired private RabbitTemplate rabbitTemplate; /** * 将当前类注入到 setConfirmCallback 中 */ @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); }/** * 确认回调 * @param correlationData 保存回调消息ID及相关内容 * @param ack 是否接收成功 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String msgId = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("接收消息成功,ID:{}", msgId); }else { log.error("接收消息失败,原因:{},ID:{}",cause,msgId); } }}

1.2.5 测试 测试交换机:
访问:http://localhost:8080/confirm/confirm/testConfirm进行发送消息,正常情况下会接收到消息。
测试交换机接收不到:
在发送部分修改交换机名字:
public void sendMsg(@PathVariable String message){ CorrelationData correlationData = https://www.it610.com/article/new CorrelationData("1"); rabbitTemplate.convertAndSend(BingConf.CONFIRM_EXCHANGE + "1", BingConf.CONFIRM_ROTING_KEY,message,correlationData); log.info("消息内容:{}",message); }

再次进行发送:
消息队列|RabbitMQ——发布确认高级
文章图片

会消失发送失败404找不到交换机,并且异常回调也完美回调成功。
测试队列接收不到:
在发送部分修改RotingKey:
public void sendMsg(@PathVariable String message){ CorrelationData correlationData = https://www.it610.com/article/new CorrelationData("1"); rabbitTemplate.convertAndSend(BingConf.CONFIRM_EXCHANGE, BingConf.CONFIRM_ROTING_KEY + "1",message,correlationData); log.info("消息内容:{}",message); }

结果:
消息队列|RabbitMQ——发布确认高级
文章图片

显然只有交换机收到了消息,而队列没有接收到交换机发来的消息。
1.3 回退消息
上方实战中有一个很明显的问题,如果发送到交换机成功,但是出现其他问题,交换机发送到消费者时出现问题,不会进行异常处理,所以消费者不知道当前消息丢失了。
使用Mandatory参数将不能达到目的地的消息回退给生产者。
1.3.1 完善方法 在配置文件中新增:
spring.rabbitmq.publisher-returns=true

表示开启回退
修改CallBackConf额外实现RabbitTemplate.ReturnsCallback接口。
重写returnedMessage方法:
/** * 接收失败回退消息 * @param returned */ @Override public void returnedMessage(ReturnedMessage returned) { log.info("消息:{},被交换机{}回退:原因:{},RoutingKey:{}", returned.getMessage(), returned.getExchange(), returned.getReplyText(), returned.getRoutingKey()); }

【消息队列|RabbitMQ——发布确认高级】修改init方法:
/** * 接收失败回退消息 * @param returned 回退对象 */ @Override public void returnedMessage(ReturnedMessage returned) { log.error("\n消息:{}\n被交换机{}回退:\n原因:{},\nRoutingKey:{}", returned.getMessage(), returned.getExchange(), returned.getReplyText(), returned.getRoutingKey()); }

再次访问错误RoutingKey的路径:
消息队列|RabbitMQ——发布确认高级
文章图片

这样就实现了如何接收到发送失败的消息。
1.4 备份交换机
上述使用Mandatory参数仅仅能实现记录到接收失败的消息,无法达到再次发送消息的目的,这里可以使用备份交换机再次进行发送。在和Mandatory同时使用情况下,备份交换机优先级更高。
如果普通交换机的消息处理失败,可以转发给备份交换机。
消息队列|RabbitMQ——发布确认高级
文章图片

1.4.1 修改配置类
新增了备份交换机、备份队列、异常队列的声明、绑定;修改了普通交换机转发到备份交换机。
/** * @Description 发布确认绑定配置类 * @date 2022/3/13 9:24 */ @Configuration public class BingConf {public static final String CONFIRM_EXCHANGE = "confirm_exchange"; public static final String CONFIRM_QUEUE = "confirm_queue"; public static final String CONFIRM_ROTING_KEY = "confirm_routing_key"; public static final String BACKUP_EXCHANGE = "backup_exchange"; public static final String BACKUP_QUEUE = "backup_queue"; public static final String WARNING_QUEUE = "warning_queue"; @Bean("c_ex") public DirectExchange confirmChange(){ return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE). withArgument("alternate-exchange",BACKUP_EXCHANGE) .build(); }@Bean("b_ex") public FanoutExchange backupChange(){ return new FanoutExchange(BACKUP_EXCHANGE); }@Bean("c_qu") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE).build(); }@Bean("b_qu") public Queue backupQueue(){ return QueueBuilder.durable(BACKUP_QUEUE).build(); }@Bean("w_qu") public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE).build(); }@Bean() public Binding bindingQueueAndExchange(@Qualifier("c_ex") DirectExchange exchange, @Qualifier("c_qu") Queue queue){ return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROTING_KEY); }@Bean() public Binding bindingBackupQueueAndBackupExchange(@Qualifier("b_ex") FanoutExchange exchange, @Qualifier("b_qu") Queue backQueue){ return BindingBuilder.bind(backQueue).to(exchange); }@Bean() public Binding bindingWarningQueueAndBackupExchange(@Qualifier("b_ex") FanoutExchange exchange, @Qualifier("w_qu") Queue warningQueue){ return BindingBuilder.bind(warningQueue).to(exchange); } }

1.4.2 新增报警消费者和备份消费者
/** * @Description 备份消费者 * @date 2022/3/13 9:36 */ @Slf4j @Component public class BackupConsumer {@RabbitListener(queues = BingConf.BACKUP_QUEUE) public void receiveConfirmMessage(Message message){ log.info("备份消费者接收到消息:{}",new String(message.getBody(), StandardCharsets.UTF_8)); }}

/** * @Description 报警消费者 * @date 2022/3/13 9:36 */ @Slf4j @Component public class WarningConsumer {@RabbitListener(queues = BingConf.WARNING_QUEUE) public void receiveConfirmMessage(Message message){ log.info("报警消费者接收到消息:{}",new String(message.getBody(), StandardCharsets.UTF_8)); }}

访问错误RotingKey:
消息队列|RabbitMQ——发布确认高级
文章图片

    推荐阅读