这是一个基于消息的分布式事务的一部分,主要通过消息来实现,生产者把消息发到队列后,由消费方去执行剩下的逻辑,而当消费方处理失败后,我们需要进行重试,即为了最现数据的最终一致性,在rabbitmq里,它有消息重试和重试次数的配置,但当你配置之后,你的TTL达到
后,消息不能自动放入死信队列,所以这块需要手工处理一下.
rabbitmq关于消息重试的配置
rabbitmq: host: xxx port: xxx username: xxx password: xxx virtual-host: xxx
###开启消息确认机制 confirms publisher-confirms: true publisher-returns: true listener:
simple: acknowledge-mode: manual #设置确认方式 prefetch: 1 #每次处理1条消息
retry.max-attempts: 3 # 最大重试次数 retry.enabled: true
#是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息) retry.initial-interval: 2000
#重试间隔时间(单位毫秒) default-requeue-rejected: true
#该配置项是决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为true,需要手动basicNack时这些参数谅失效了
手工实现消息重试并放入死信的方式
定义队列的相关配置
/** * 创建普通交换机. */ @Bean public TopicExchange lindExchange() { //消息持久化 return
(TopicExchange) ExchangeBuilder.topicExchange(EXCHANGE).durable(true).build();
} @Bean public TopicExchange deadExchange() { return (TopicExchange)
ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true).build(); } /** *
基于消息事务的处理方式,当消费失败进行重试,有时间间隔,当达到超时时间,就发到死信队列,等待人工处理. * @return */ @Bean public
Queue testQueue() { //设置死信交换机 return
QueueBuilder.durable(QUEUE).withArgument("x-dead-letter-exchange",
LIND_DL_EXCHANGE) //毫秒 .withArgument("x-message-ttl", CONSUMER_EXPIRE)
//设置死信routingKey .withArgument("x-dead-letter-routing-key",
LIND_DEAD_QUEUE).build(); } @Bean public Queue deadQueue() { return new
Queue(LIND_DEAD_QUEUE); } @Bean public Binding bindBuildersRouteKey() { return
BindingBuilder.bind(testQueue()).to(lindExchange()).with(ROUTER); } @Bean
public Binding bindDeadBuildersRouteKey() { return
BindingBuilder.bind(deadQueue()).to(deadExchange()).with(LIND_DEAD_QUEUE); }
消费者实现的代码
/** * 延时队列:不应该有RabbitListener订阅者,应该让它自己达到超时时间后自动转到死信里去消费 *
消息异常处理:消费出现异常后,延时几秒,然后从新入队列消费,直到达到TTL超时时间,再转到死信,证明这个信息有问题需要人工干预 * * @param
message */ @RabbitListener(queues = MqConfig.QUEUE) public void
testSubscribe(Message message, Channel channel) throws IOException,
InterruptedException { try { System.out.println(LocalDateTime.now() +
":Subscriber:" + new String(message.getBody(), "UTF-8"));
//当程序处理出现问题时,消息使用basicReject上报 int a = 0; int b = 1 / a;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); }
catch (Exception ex) { //出现异常手动放回队列 Thread.sleep(2000);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,
true); } } /** * 死信队列. * * @param message */ @RabbitListener(queues =
MqConfig.LIND_DEAD_QUEUE) public void dealSubscribe(Message message, Channel
channel) throws IOException { System.out.println("Dead Subscriber:" + new
String(message.getBody(), "UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); }
消费者这块,也可以直接声明队列和绑定交换机,直接在注解上添加 QueueBinding即可.
@RabbitListener(bindings = {@QueueBinding(value = @Queue( name =
MqConfig.QUEUE, durable = "true",arguments = {@Argument(name =
"x-dead-letter-exchange", value = MqConfig.LIND_DL_EXCHANGE), @Argument(name =
"x-message-ttl", value = MqConfig.CONSUMER_EXPIRE,type="java.lang.Long"),
@Argument(name = "x-dead-letter-routing-key", value =
MqConfig.LIND_DEAD_QUEUE)}), exchange = @Exchange(value = MqConfig.EXCHANGE,
durable = "true",type="topic") )}) public void testSubscribe(Message message,
Channel channel) throws IOException, InterruptedException { }
这边尝试让消费者执行出错,然后走到catch里使用basicNack方法把消息从新放里队列里,并让线程让休息2秒,以避免频繁操作,之后就是我们希望看到的代码
2019-12-20T17:21:31.190:Subscriber:send a message to mq
2019-12-20T17:21:33.200:Subscriber:send a message to mq
2019-12-20T17:21:35.206:Subscriber:send a message to mq
2019-12-20T17:21:37.213:Subscriber:send a message to mq
2019-12-20T17:21:39.221:Subscriber:send a message to mq Dead Subscriber:send a
message to mq
这就是一个消息队列的补偿机制,使用死信队列也可以实现延时消息的机制,有时间再给大家分享!
热门工具 换一换