在我们的项目里,使用rabbitmq作为消息队列,缓存发往数据库的更新信息。
中午观察rabbitmq队列的时候,发现队列中总有26个unack的信息。我们数据库的更新数据并不是很频繁,平均每秒1-2条,因此积压26个unack的数据一定有问题。再观察consumer数,发现比我们正常的30个连接正好多了26个,可以判断有26个连接因为message无法ack被阻塞了。
打开更新数据库的模块日志,发现如下的错误log:
org.springframework.transaction.TransactionSystemException: Could not commit JDBC transaction; nested exception is com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Communications link failure during commit(). Transaction resolution unknown.
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:275)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:761)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:730)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150)
at com.yidian.commerce.update.server.biz.BaseProcessBuilder.updateDbWithTransaction(BaseProcessBuilder.java:47)
at com.yidian.commerce.update.server.biz.BaseProcessBuilder.access$000(BaseProcessBuilder.java:20)
at com.yidian.commerce.update.server.biz.BaseProcessBuilder$1.process(BaseProcessBuilder.java:31)
at com.yidian.commerce.common.utils.rabbitmq.MQAccessBuilder$2.consume(MQAccessBuilder.java:155)
at com.yidian.commerce.common.utils.rabbitmq.ThreadPoolConsumer$1.run(ThreadPoolConsumer.java:102)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
从异常来看,是事务commit提交时网络超时导致的。这种网络异常按理说是应该在程序中被捕捉的。
回到程序更新数据库的代码,如下:
DetailRes updateDbWithTransaction(final T dbUpdateModel, final TransactionTemplate transactionTemplate) {
return transactionTemplate.execute(new TransactionCallback<DetailRes>() {
@Override
public DetailRes doInTransaction(TransactionStatus status) {
try {
updateDb(dbUpdateModel);
return new DetailRes(true, "");
} catch (DataIntegrityViolationException e) {
log.info("idempotent rollback transaction: " + status);
//instructionId重复
status.setRollbackOnly();
return new DetailRes(true, "");
} catch (Exception e) {
e.printStackTrace();
status.setRollbackOnly();
log.info("rollback transaction: " + status);
return new DetailRes(false, e.toString());
}
}
});
}
我们在doInTransaction中对异常做了补货,按理说是不应该抛出这样的异常的,我们继续看transactionTemplate.execute的代码,如下:
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
}
else {
TransactionStatus status = this.transactionManager.getTransaction(this);
T result;
try {
result = action.doInTransaction(status);
}
catch (RuntimeException ex) {
// Transactional code threw application exception -> rollback
rollbackOnException(status, ex);
throw ex;
}
catch (Error err) {
// Transactional code threw error -> rollback
rollbackOnException(status, err);
throw err;
}
catch (Throwable ex) {
// Transactional code threw unexpected exception -> rollback
rollbackOnException(status, ex);
throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
}
this.transactionManager.commit(status);
return result;
}
}
可以发现this.transactionManager.commit(status);这句并没有包含在try,catch中,也就是抛出异常的根源所在。
鉴于此,我们修改updateDbWithTransaction代码,将整个的execute模块也包裹在try,catch中,最终的代码如下:
DetailRes updateDbWithTransaction(final T dbUpdateModel, final TransactionTemplate transactionTemplate) {
DetailRes detailRes;
try {
detailRes = transactionTemplate.execute(new TransactionCallback<DetailRes>() {
@Override
public DetailRes doInTransaction(TransactionStatus status) {
try {
updateDb(dbUpdateModel);
return new DetailRes(true, "");
} catch (DataIntegrityViolationException e) {
log.info("idempotent rollback transaction: " + status);
//instructionId重复
status.setRollbackOnly();
return new DetailRes(true, "");
} catch (Exception e) {
e.printStackTrace();
status.setRollbackOnly();
log.info("rollback transaction: " + status);
return new DetailRes(false, e.toString());
}
}
});
} catch (Exception e) {
e.printStackTrace();
detailRes = new DetailRes(false, "updateDbWithTransaction failed: " + e);
}
return detailRes;
}
同时,为了杜绝这种处理层抛出异常,阻塞rabbitmq连接的问题。我们将rabbitmqAccess的process模块也包裹在try,catch中,如下:
DetailRes detailRes;
try {
detailRes = messageProcess.process(messageBean);
} catch (Exception e) {
detailRes = new DetailRes(false, "process exception: " + e);
}