在Spring里我们可以使用Spring Cloud AWS里的SqsListener方便地在后台持续接收SQS消息。官方文档给出的样例如下:
@SqsListener("queueName")
public void queueListener(Person person) {
// ...
}
如果消费消息的过程中出现exception
怎么办呢?这点文档中并没有讲解。最简单的异常处理,显而易见,就是直接在消息处理中使用try-catch
:
@SqsListener("queueName")
public void queueListener(Person person) {
try {
// ...
} catch (Exception e) {
// exception handling
}
}
但是(凡事就怕“但是”),queueListener
中不再抛出任何exception
会导致Spring认为这条消息消费成功了,于是依据@SqsListener
的deletionPolicy
属性,Spring有可能向SQS请求删除这条消息。那么如何更优雅地处理SQS消息消费中出现的exception
呢?
我们知道,Spring常用separation of concerns原则来分解问题,所以我猜我们可以用某种方式向Spring注册一个处理异常方法。细心查阅SqsListener的文档,我们可以发现Spring在实现时使用了@MessageMapping这个来自spring-messaging的注解。从这个注解出发去继续查找文档,不难找到,spring-messaging会扫描@MessageExceptionHandler来发现自定义的异常处理方法。于是,我们在@SqsListener
所修饰的方法的同一类中同时加入用@MessageExceptionHandler
修饰的方法便可以捕捉消息消费时产生的异常,同时又不影响Spring对消息消费成功与否的判定:
public class SqsConsumer {
@SqsListener("queueName")
public void queueListener(Persone person) {
// ...
}
@MessageExceptionHandler
public void exceptionHandler(Exception e) {
// exception handling
}
}
Spring支持在调用异常处理方法时注入消息的payload
和headers
等信息帮助我们记录和处理。如果我们想更细致地控制SQS消息生命周期,Spring也支持注入Acknowledgment
来删除消息或Visibility
来更新消息的timeout
。
@MessageExceptionHandler
public void exceptionHandler(
Exception e,
// 以String注入payload,或`T message`,Spring会使用`MessageConverter`将消息转换为T类型
String payload,
// 注入全部headers,或用`@Header("MessageId") String messageId`注入具体某一条header
MessageHeaders headers,
Acknowledgment acknowledgment,
Visibility visibilityHandler
) {
// exception handling
}