TIPS
本文基於Spring Cloud Greenwich SR1,理論支持Finchley及更高版本。
本節詳細探討Spring Cloud Stream的錯誤處理。
應用處理
局部處理(通用)
配置:
spring: cloud: stream: bindings: input: destination: my-destination group: my-group output: destination: my-destination
代碼:
@Slf4j @SpringBootApplication @EnableBinding({Processor.class}) @EnableScheduling public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } @StreamListener(value = Processor.INPUT) public void handle(String body) { throw new RuntimeException("x"); } @ServiceActivator(inputChannel = "my-destination.my-group.errors") public void handleError(ErrorMessage message) { Throwable throwable = message.getPayload(); log.error("截獲異常", throwable); Message> originalMessage = message.getOriginalMessage(); assert originalMessage != null; log.info("原始消息體 = {}", new String((byte[]) originalMessage.getPayload())); } @Bean @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource test() { return () -> new GenericMessage<>("adfdfdsafdsfa"); } }
全局處理(通用)
@StreamListener(value = Processor.INPUT) public void handle(String body) { throw new RuntimeException("x"); } @StreamListener("errorChannel") public void error(Message> message) { ErrorMessage errorMessage = (ErrorMessage) message; System.out.println("Handling ERROR: " + errorMessage); }
系統處理
系統處理方式,因消息中間件不同而異。如果應用沒有配置錯誤處理,那麼error將會被傳播給binder,binder將error回傳給消息中間件。消息中間件可以丟棄消息、requeue(重新排隊,從而重新處理)或將失敗的消息發送給DLQ(死信隊列)。
丟棄
默認情況下,錯誤消息將被丟棄。雖然在某些情況下可以接受,但這種方式一般不適用於生產。
DLQ(RabbitMQ)
TIPS
•雖然RocketMQ也支持DLQ,但目前RocketMQ控制檯並不支持在界面上操作,將死信放回消息隊列,讓客戶端重新處理。所以使用很不方便,而且用法也和本節有一些差異。
•如使用RocketMQ,建議參考上面
應用處理
一節的用法,也可額外訂閱這個Topic %DLQ%+consumerGroup
•個人給RocketMQ控制檯提的Issue:https://github.com/apache/rocketmq/issues/1334
配置:
spring: cloud: stream: bindings: input: destination: my-destination group: my-group output: destination: my-destination rabbit: bindings: input: consumer: auto-bind-dlq: true
代碼:
@StreamListener(value = Processor.INPUT) public void handle(String body) { throw new RuntimeException("x"); } @Bean @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource test() { return () -> new GenericMessage<>("adfdfdsafdsfa"); }
這樣,消息消費失敗後,就會放入死信隊列。在控制檯操作一下,即可將死信放回消息隊列,這樣,客戶端就可以重新處理。
如果想獲取原始錯誤的異常堆棧,可添加如下配置:
spring: cloud: stream: rabbit: bindings: input: consumer: republish-to-dlq: true
requeue(RabbitMQ)
Rabbit/Kafka的binder依賴RetryTemplate實現重試,從而提升消息處理的成功率。然而,如果設置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那麼RetryTemplate則不再重試。此時可通過requeue方式處理異常。
添加如下配置:
# 默認是3,設為1則禁用重試 spring.cloud.stream.bindings..consumer.max-attempts=1 # 表示是否要requeue被拒絕的消息(即:requeue處理失敗的消息) spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true
這樣,失敗的消息將會被重新提交到同一個handler進行處理,直到handler拋出 AmqpRejectAndDontRequeueException 異常為止。
RetryTemplate(通用)
配置方式
RetryTemplate重試也是錯誤處理的一種手段。
spring: cloud: stream: bindings: : consumer: # 最多嘗試處理幾次,默認3 maxAttempts: 3 # 重試時初始避退間隔,單位毫秒,默認1000 backOffInitialInterval: 1000 # 重試時最大避退間隔,單位毫秒,默認10000 backOffMaxInterval: 10000 # 避退乘數,默認2.0 backOffMultiplier: 2.0 # 當listen拋出retryableExceptions未列出的異常時,是否要重試 defaultRetryable: true # 異常是否允許重試的map映射 retryableExceptions: java.lang.RuntimeException: true java.lang.IllegalStateException: false
測試代碼:
@StreamListener(value = Processor.INPUT) public void handle(String body) { throw new RuntimeException(body); } private AtomicInteger count = new AtomicInteger(0); @Bean @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource test() { return () -> new GenericMessage<>(count.getAndAdd(1) + ""); }
編碼方式
多數場景下,使用配置方式定製重試行為都是可以滿足需求的,但配置方式可能無法滿足一些複雜需求。此時可使用編碼方式配置RetryTemplate:
@Configuration class RetryConfiguration { @StreamRetryTemplate public RetryTemplate sinkConsumerRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.setRetryPolicy(retryPolicy()); retryTemplate.setBackOffPolicy(backOffPolicy()); return retryTemplate; } private ExceptionClassifierRetryPolicy retryPolicy() { BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier( Collections.singletonList(IllegalAccessException.class )); keepRetryingClassifier.setTraverseCauses(true); SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3); AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy(); ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy(); retryPolicy.setExceptionClassifier( classifiable -> keepRetryingClassifier.classify(classifiable) ? alwaysRetryPolicy : simpleRetryPolicy); return retryPolicy; } private FixedBackOffPolicy backOffPolicy() { final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(2); return backOffPolicy; } }
然後添加配置:
spring.cloud.stream.bindings..consumer.retry-template-name=myRetryTemplate
注意:
Spring Cloud Stream 2.2才支持設置retry-template-name
乾貨分享
最近將個人學習筆記整理成冊,使用PDF分享。關注我,回覆如下代碼,即可獲得百度盤地址,無套路領取!
•001:《Java併發與高併發解決方案》學習筆記;
•002:《深入JVM內核——原理、診斷與優化》學習筆記;
•003:《Java面試寶典》
•004:《Docker開源書》
•005:《Kubernetes開源書》
•006:《DDD速成(領域驅動設計速成)》