Spring Cloud Stream 錯誤處理詳解

TIPS

本文基於Spring Cloud Greenwich SR1,理論支持Finchley及更高版本。

本節詳細探討Spring Cloud Stream的錯誤處理。

應用處理

局部處理(通用)

配置:

spring:
 cloud:
 stream:
 bindings:
 input:
 destination: my-destination
 group: my-group
 output:
 destination: my-destination

代碼:

@Slf4j
@SpringBootApplication
@EnableBinding({Processor.class})
@EnableScheduling
public class ConsumerApplication {
 public static void main(String[] args) {
 SpringApplication.run(ConsumerApplication.class, args);
 }
 @StreamListener(value = Processor.INPUT)
 public void handle(String body) {
 throw new RuntimeException("x");
 }
 @ServiceActivator(inputChannel = "my-destination.my-group.errors")
 public void handleError(ErrorMessage message) {
 Throwable throwable = message.getPayload();
 log.error("截獲異常", throwable);
 Message> originalMessage = message.getOriginalMessage();
 assert originalMessage != null;
 log.info("原始消息體 = {}", new String((byte[]) originalMessage.getPayload()));
 }
 @Bean
 @InboundChannelAdapter(value = Processor.OUTPUT,
 poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
 public MessageSource test() {
 return () -> new GenericMessage<>("adfdfdsafdsfa");
 }
}

全局處理(通用)

@StreamListener(value = Processor.INPUT)
public void handle(String body) {
 throw new RuntimeException("x");
}
@StreamListener("errorChannel")
public void error(Message> message) {
 ErrorMessage errorMessage = (ErrorMessage) message;
 System.out.println("Handling ERROR: " + errorMessage);
}

系統處理

系統處理方式,因消息中間件不同而異。如果應用沒有配置錯誤處理,那麼error將會被傳播給binder,binder將error回傳給消息中間件。消息中間件可以丟棄消息、requeue(重新排隊,從而重新處理)或將失敗的消息發送給DLQ(死信隊列)。

丟棄

默認情況下,錯誤消息將被丟棄。雖然在某些情況下可以接受,但這種方式一般不適用於生產。

DLQ(RabbitMQ)

TIPS

•雖然RocketMQ也支持DLQ,但目前RocketMQ控制檯並不支持在界面上操作,將死信放回消息隊列,讓客戶端重新處理。所以使用很不方便,而且用法也和本節有一些差異。

•如使用RocketMQ,建議參考上面

應用處理

一節的用法,也可額外訂閱這個Topic %DLQ%+consumerGroup

•個人給RocketMQ控制檯提的Issue:https://github.com/apache/rocketmq/issues/1334

配置:

spring:
 cloud:
 stream:
 bindings:
 input:
 destination: my-destination
 group: my-group
 output:
 destination: my-destination
 rabbit:
 bindings:
 input:
 consumer:
 auto-bind-dlq: true

代碼:

@StreamListener(value = Processor.INPUT)
public void handle(String body) {
 throw new RuntimeException("x");
}
@Bean
@InboundChannelAdapter(value = Processor.OUTPUT,
 poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource test() {
 return () -> new GenericMessage<>("adfdfdsafdsfa");
}

這樣,消息消費失敗後,就會放入死信隊列。在控制檯操作一下,即可將死信放回消息隊列,這樣,客戶端就可以重新處理。

如果想獲取原始錯誤的異常堆棧,可添加如下配置:

spring:
 cloud:
 stream:
 rabbit:
 bindings:
 input:
 consumer:
 republish-to-dlq: true

requeue(RabbitMQ)

Rabbit/Kafka的binder依賴RetryTemplate實現重試,從而提升消息處理的成功率。然而,如果設置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那麼RetryTemplate則不再重試。此時可通過requeue方式處理異常。

添加如下配置:

# 默認是3,設為1則禁用重試
spring.cloud.stream.bindings..consumer.max-attempts=1
# 表示是否要requeue被拒絕的消息(即:requeue處理失敗的消息)
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

這樣,失敗的消息將會被重新提交到同一個handler進行處理,直到handler拋出 AmqpRejectAndDontRequeueException 異常為止。

RetryTemplate(通用)

配置方式

RetryTemplate重試也是錯誤處理的一種手段。

spring:
 cloud:
 stream:
 bindings:
 :
 consumer:
 # 最多嘗試處理幾次,默認3
 maxAttempts: 3
 # 重試時初始避退間隔,單位毫秒,默認1000
 backOffInitialInterval: 1000
 # 重試時最大避退間隔,單位毫秒,默認10000
 backOffMaxInterval: 10000
 # 避退乘數,默認2.0
 backOffMultiplier: 2.0
 # 當listen拋出retryableExceptions未列出的異常時,是否要重試
 defaultRetryable: true
 # 異常是否允許重試的map映射
 retryableExceptions:
 java.lang.RuntimeException: true
 java.lang.IllegalStateException: false
 

測試代碼:

@StreamListener(value = Processor.INPUT)
public void handle(String body) {
 throw new RuntimeException(body);
}
private AtomicInteger count = new AtomicInteger(0);
@Bean
@InboundChannelAdapter(value = Processor.OUTPUT,
 poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource test() {
 return () -> new GenericMessage<>(count.getAndAdd(1) + "");
}

編碼方式

多數場景下,使用配置方式定製重試行為都是可以滿足需求的,但配置方式可能無法滿足一些複雜需求。此時可使用編碼方式配置RetryTemplate:

@Configuration
class RetryConfiguration {
 @StreamRetryTemplate
 public RetryTemplate sinkConsumerRetryTemplate() {
 RetryTemplate retryTemplate = new RetryTemplate();
 retryTemplate.setRetryPolicy(retryPolicy());
 retryTemplate.setBackOffPolicy(backOffPolicy());
 return retryTemplate;
 }
 private ExceptionClassifierRetryPolicy retryPolicy() {
 BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(
 Collections.singletonList(IllegalAccessException.class
 ));
 keepRetryingClassifier.setTraverseCauses(true);
 SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
 AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();
 ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
 retryPolicy.setExceptionClassifier(
 classifiable -> keepRetryingClassifier.classify(classifiable) ?
 alwaysRetryPolicy : simpleRetryPolicy);
 return retryPolicy;
 }
 private FixedBackOffPolicy backOffPolicy() {
 final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
 backOffPolicy.setBackOffPeriod(2);
 return backOffPolicy;
 }
}

然後添加配置:

spring.cloud.stream.bindings..consumer.retry-template-name=myRetryTemplate

注意

Spring Cloud Stream 2.2才支持設置retry-template-name

乾貨分享

最近將個人學習筆記整理成冊,使用PDF分享。關注我,回覆如下代碼,即可獲得百度盤地址,無套路領取!

•001:《Java併發與高併發解決方案》學習筆記;

•002:《深入JVM內核——原理、診斷與優化》學習筆記;

•003:《Java面試寶典》

•004:《Docker開源書》

•005:《Kubernetes開源書》

•006:《DDD速成(領域驅動設計速成)》


分享到:


相關文章: