Spring Boot应用Shutdown的正确姿势
基于Spring Boot的应用,程序在发布新版本时,如何正确地停服升级呢?直接 kill -9 pid
肯定是不合适的,可能导致被处理的数据处在灰色的中间状态。
我们项目分两类应用,一种是基于Spring Boot的Web项目,涉及Servlet容器,一种是基于Spring Cloud Stream的Consumer应用,来监听消息队列处理event。下面我针对这两类应用,添加各自的gracefully shutdown机制。
版本说明
- Spring Boot 2.0.3.RELEASE
- Spring Cloud Stream 2.0.0.RELEASE
Web应用
此类应用在停服时,应该先终止Servlet容器响应新的请求,然后待已接收的请求处理完后,再停止服务。项目中使用的是内嵌Tomcat,使用 wilkinsona 的方案来实现。
/**
* Gracefully shut down tomcat server
*
* Ref: https://dzone.com/articles/graceful-shutdown-spring-boot-applications
*/
@Configuration
public class WebConfig {
@Bean
public GracefulShutdown gracefulShutdown() {
return new GracefulShutdown();
}
@Bean
public ConfigurableServletWebServerFactory webServerFactory(GracefulShutdown gracefulShutdown) {
TomcatServletWebServerFactory factory = new TomcatServletWebServerFactory();
factory.addConnectorCustomizers(gracefulShutdown);
return factory;
}
}
public class GracefulShutdown implements ApplicationListener<ContextClosedEvent>, TomcatConnectorCustomizer {
private static final Logger log = LoggerFactory.getLogger(GracefulShutdown.class);
private volatile Connector connector;
@Value("${app.shutdownTimeout}")
int shutdownTimeout;
@Override
public void onApplicationEvent(ContextClosedEvent event) {
this.connector.pause();
Executor executor = this.connector.getProtocolHandler().getExecutor();
if (executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
threadPoolExecutor.shutdown();
try {
if (threadPoolExecutor.awaitTermination(shutdownTimeout, TimeUnit.SECONDS)) {
log.warn("Tomcat thread pool did not shutdown gracefully within " + shutdownTimeout
+ " seconds. Proceeding with forceful shutdown.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override
public void customize(Connector connector) {
this.connector = connector;
}
}
Consumer应用
项目中使用Spring Cloud Stream来监听RabbitMQ的消息,配置监听比较方便,基本在yml中声明下,然后再 @EnableBinding @StreamListener
就完了,比如下面专门发邮件的例子:
spring:
main:
web-application-type: none
cloud:
stream:
bindings:
input:
consumer:
maxAttempts: 1
concurrency: 5
shutdownTimeout: 40000 # 添加consumer shutdown timeout
destination: sendMailTopic
content-type: application/json
group: mailGroup
@EnableBinding(Sink.class)
public class MailHandler {
private Logger log = LoggerFactory.getLogger(MailHandler.class);
@Value("${spring.mail.username}")
String sendFrom;
@Value("${spring.profiles.active}")
String activeProfile;
@Autowired
Tracer tracer;
@Autowired
JavaMailSender mailSender;
@Autowired
LogService logService;
/**
* Listen to sendMailTopic of RabbitMQ
*
* @param vo MailVO
*/
@StreamListener(Sink.INPUT)
public void sendMail(MailVO vo) {
try {
String subject = vo.getSubject() + ", Env - " + activeProfile;
if ("admin".equals(vo.getEmailTo())) {
this.sendMailToAdmin(vo, subject);
} else {
SimpleMailMessage smm = new SimpleMailMessage();
smm.setFrom(sendFrom);
smm.setTo(vo.getEmailTo());
smm.setSubject(subject);
smm.setText(vo.getContent());
mailSender.send(smm);
}
log.info("Send mail success. mail content - {}", vo);
} catch (Exception e) {
log.error("Send mail failed. mail content - {}", vo, e);
}
}
}
那针对此类非Web应用如何正确停服呢?
查看 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doShutdown
可以看到框架已经考虑了此类问题,先停止消费消息,然后执行 cancellationLock.await
去等待woker线程完成,如果超时就强制停止了。
@Override
protected void doShutdown() {
Thread thread = this.containerStoppingForAbort.get();
if (thread != null && !thread.equals(Thread.currentThread())) {
logger.info("Shutdown ignored - container is stopping due to an aborted consumer");
return;
}
try {
List<BlockingQueueConsumer> canceledConsumers = new ArrayList<>();
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
while (consumerIterator.hasNext()) {
BlockingQueueConsumer consumer = consumerIterator.next();
consumer.basicCancel(true);
canceledConsumers.add(consumer);
consumerIterator.remove();
if (consumer.declaring) {
consumer.thread.interrupt();
}
}
}
else {
logger.info("Shutdown ignored - container is already stopped");
return;
}
}
logger.info("Waiting for workers to finish.");
boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
if (finished) {
logger.info("Successfully waited for workers to finish.");
}
else {
logger.info("Workers not finished.");
if (isForceCloseChannel()) {
canceledConsumers.forEach(consumer -> {
if (logger.isWarnEnabled()) {
logger.warn("Closing channel for unresponsive consumer: " + consumer);
}
consumer.stop();
});
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted waiting for workers. Continuing with shutdown.");
}
synchronized (this.consumersMonitor) {
this.consumers = null;
this.cancellationLock.deactivate();
}
}
但是通过调试代码发现,Spring Cloud Stream在创建MessageListenerContainer时并没有设置shutdownTimeout,所以永远是默认的5秒,而且在yml的配置中也没有shutdown超时时间的相关设置。
此时,最简单直接的办法就是修改源码,添加shutdown超时时间的配置以及在创建MessageListenerContainer时使用该配置。修改如下:
org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder#createConsumerEndpoint
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String group,
ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
Assert.state(!HeaderMode.embeddedHeaders.equals(properties.getHeaderMode()),
"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
String destination = consumerDestination.getName();
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(
this.connectionFactory);
// 这里加上超时时间设置
listenerContainer.setShutdownTimeout(properties.getShutdownTimeout());
listenerContainer.setAcknowledgeMode(properties.getExtension().getAcknowledgeMode());
listenerContainer.setChannelTransacted(properties.getExtension().isTransacted());
listenerContainer.setDefaultRequeueRejected(properties.getExtension().isRequeueRejected());
int concurrency = properties.getConcurrency();
concurrency = concurrency > 0 ? concurrency : 1;
listenerContainer.setConcurrentConsumers(concurrency);
int maxConcurrency = properties.getExtension().getMaxConcurrency();
if (maxConcurrency > concurrency) {
listenerContainer.setMaxConcurrentConsumers(maxConcurrency);
}
listenerContainer.setPrefetchCount(properties.getExtension().getPrefetch());
listenerContainer.setRecoveryInterval(properties.getExtension().getRecoveryInterval());
listenerContainer.setTxSize(properties.getExtension().getTxSize());
listenerContainer.setTaskExecutor(new SimpleAsyncTaskExecutor(consumerDestination.getName() + "-"));
listenerContainer.setQueueNames(destination);
listenerContainer.setAfterReceivePostProcessors(this.decompressingPostProcessor);
listenerContainer.setMessagePropertiesConverter(
RabbitMessageChannelBinder.inboundMessagePropertiesConverter);
listenerContainer.setExclusive(properties.getExtension().isExclusive());
listenerContainer.setMissingQueuesFatal(properties.getExtension().getMissingQueuesFatal());
if (properties.getExtension().getQueueDeclarationRetries() != null) {
listenerContainer.setDeclarationRetries(properties.getExtension().getQueueDeclarationRetries());
}
if (properties.getExtension().getFailedDeclarationRetryInterval() != null) {
listenerContainer.setFailedDeclarationRetryInterval(
properties.getExtension().getFailedDeclarationRetryInterval());
}
if (getApplicationEventPublisher() != null) {
listenerContainer.setApplicationEventPublisher(getApplicationEventPublisher());
}
else if (getApplicationContext() != null) {
listenerContainer.setApplicationEventPublisher(getApplicationContext());
}
listenerContainer.afterPropertiesSet();
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setBeanFactory(this.getBeanFactory());
adapter.setBeanName("inbound." + destination);
DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.inboundMapper();
mapper.setRequestHeaderNames(properties.getExtension().getHeaderPatterns());
adapter.setHeaderMapper(mapper);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(consumerDestination, group, properties);
if (properties.getMaxAttempts() > 1) {
adapter.setRetryTemplate(buildRetryTemplate(properties));
adapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
}
else {
adapter.setErrorMessageStrategy(errorMessageStrategy);
adapter.setErrorChannel(errorInfrastructure.getErrorChannel());
}
return adapter;
}
package org.springframework.cloud.stream.binder;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class ConsumerProperties {
/**
* 添加shutdown超时时间的相关配置
* Message container shutdown timeout, default is 30s
*/
private int shutdownTimeout = 30000;
/**
* The concurrency setting of the consumer. Default: 1.
*/
private int concurrency = 1;
/**
* Whether the consumer receives data from a partitioned producer. Default: 'false'.
*/
private boolean partitioned;
// set get ...
}
然后将修改后的spring-cloud-stream-2.0.0.RELEASE和spring-cloud-stream-binder-rabbit-2.0.0.RELEASE发布到Maven私服,替换官方的版本即可。
最后替换shutdown.sh的脚本,将kill -9 改为kill 或 kill -15
#!/bin/sh
. ./common-env.sh
shutdownService(){
# Find the service process id
sid=`ps -ef | grep $1 | grep -v grep | awk '{print $2}'`
if [ -n "${sid}" ]; then
echo "Shutdown $1"
kill ${sid}
else
echo "$1 not found"
fi
}
# Shut down all services
for service in ${oxx_services[*]}
do
shutdownService ${service}
done
水平有限,能力一般,文中如有错误,欢迎大家留言指正。
https://github.com/spring-projects/spring-boot/issues/4657#issuecomment-161354811