1. RabbitMQ高级特性
1.1 消息的可靠性投递
RabbitMQ提供了两种方式用来控制消息的投递可靠性模式。
- confirm
判断消息是否到达交换机,消息从 producer 到 exchange 则会返回一个 confirmCallback 。
代码实现,需在配置文件中将connection-factory的属性 publisher-confirms设置为true
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual}"
publisher-confirms="true" publisher-returns="true"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"/>
<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
设置回调方法
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testComfirm() {
//设置回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("confirm方法被执行了");
if(b){
//接受成功
System.out.println("方法接收成功:"+s);
}else {
//接收失败
System.out.println("方法接收失败:"+s);
}
}
});
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","msg confirm!");
}
}
通过confirm中的b判断交换机是否接收成功消息,接收成功即执行相应的方法
- return模式
判断消息是否成功被路由到某一个队列, 消息从 exchange-->queue 投递失败则会返回一个 returnCallback
代码实现,需在配置文件中将connection-factory的属性 publisher-returns设置为true
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual}"
publisher-confirms="true" publisher-returns="true"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"/>
<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
设置回调方法,需将托管开启rabbitTemplate.setMandatory(true),当路由失败时执行回调方法里头的操作。
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testReturn() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* @param message
* @param i replyCode
* @param s replyTest
* @param s1 exchange
* @param s2 routingKey
*/
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("return 执行了...");
System.out.println(message);
System.out.println(i);
System.out.println(s);
System.out.println(s1);
System.out.println(s2);
}
});
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","msg confirm!");
}
}
1.2 Consumer Ack
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。 有三种确认方式:
- 自动确认:acknowledge="none"
- 手动确认:acknowledge="manual"
- 根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
spring中xml文件配置, 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--创建connectionFactory-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual}"/>
<context:component-scan base-package="com.itheima.listener"/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="5">
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>-
</rabbit:listener-container>
</beans>
代码实现, 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息,如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(new String(message.getBody()));
System.out.println("处理业务逻辑");
int i=3/0;
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
channel.basicNack(deliveryTag,true,true);
}
}
@Override
public void onMessage(Message message) {
}
}
1.3 消费端限流
消费端一次获取一批消息确认后在获取下一批
在rabbit:listener-container 中配置 prefetch属性设置消费端一次拉取多少消息,消费端的确认模式一定为手动确认。acknowledge="manual"
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--创建connectionFactory-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual}"/>
<context:component-scan base-package="com.itheima.listener"/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"/>
</rabbit:listener-container>
</beans>
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
System.out.println(new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
@Override
public void onMessage(Message message) {
}
}
1.4 TTL
- TTL 全称 Time To Live(存活时间/过期时间)。
- 当消息到达存活时间后,还没有被消费,会被自动清除。
- RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual}"
publisher-confirms="true" publisher-returns="true"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--设置消息过期时间-->
<rabbit:queue id="test_queue_ttl" name="test_queue_ttl">
<rabbit:queue-arguments>
<!--队列的过期时间-->
<entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl">
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
单独设置过期队列,需添加后置处理器messagePostProcessor(消息后置处理器)
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test4() {
for (int i = 0; i <10; i++) {
rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.test","msg ttl!");
}
}
@Test
public void test5() {
//消息后处理对象
MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
//设置消息的参数
message.getMessageProperties().setExpiration("5000");
return message;
}
};
for (int i = 0; i <10 ; i++) {
if(i==5){
//消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.test","msg ttl!",messagePostProcessor);
}else {
rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.test","msg ttl!");
}
}
}
- 如果同时设置了队列和发送消息的过期时间以短的过期时间为准
- 消息过期之后如果没有在队列顶端不会立即被移除
1.5 死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
在spring配置文件中给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual}"
publisher-confirms="true" publisher-returns="true"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--死信队列的设置-->
<!--1.设置正常队列-->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!--3.绑定-->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="exchange_dlx"/>
<entry key="x-dead-letter-routing-key" value="dlx.hello"/>
<entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"/>
<entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--2.设置死信队列-->
<rabbit:queue name="queue_dlx" id="queue_dlx"/>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
死信队列中过期时间设置和长度限制设置
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
*死信队列,过期时间设置
*/
@Test
public void test6() {
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","hello dlx");
}
/**
*死信队列,长度限制设置
*/
@Test
public void test7() {
for (int i = 0; i <20; i++) {
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","hello dlx");
}
}
}
死信队列中访问拒绝设置,需在消费端channel.basicNack(deliveryTag,true,false)即requeue=false,此处省略在配置文件中配置监听器
@Component
public class DlxListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(new String(message.getBody()));
System.out.println("处理业务逻辑");
int i=3/0;
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("出现问题,拒绝接收");
channel.basicNack(deliveryTag,true,false);
}
}
@Override
public void onMessage(Message message) {
}
}
小结
死信交换机和死信队列和普通的没有区别
当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队 列
1.6 延迟队列
在RabbitMQ中,TTL+死信队列 组合实现延迟队列的效果
- 生产端
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual}"
publisher-confirms="true" publisher-returns="true"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--延迟队列 死信队列+TTL-->
<!-- 1.定义正常交换机-->
<rabbit:queue id="order_queue" name="order_queue">
<!--3.绑定,设置正常队列过期时间为30分钟-->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx"/>
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel"/>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--2.定义死信交换机和队列-->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx"/>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
*延迟队列
*/
@Test
public void test8() throws InterruptedException {
rabbitTemplate.convertAndSend("order_exchange","order.msg","msg:hello,order");
for (int i = 10; i >0 ; i--) {
System.out.println(i+"...");
Thread.sleep(1000);
}
}
}
- 消费端
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--创建connectionFactory-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual}"/>
<context:component-scan base-package="com.itheima.listener"/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"/>
</rabbit:listener-container>
</beans>
@Component
public class OrderListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(new String(message.getBody()));
System.out.println("处理业务逻辑");
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
channel.basicNack(deliveryTag,true,false);
}
}
@Override
public void onMessage(Message message) {
}
}
1.7 日志与监控
- RabbitMQ默认日志存放路径:/var/log/rabbitmq/rabbit@xxx.log
- 日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、 RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等
查看队列
rabbitmqctl list_queues
查看exchanges
rabbitmqctl list_exchanges
查看用户
rabbitmqctl list_users
查看连接
rabbitmqctl list_connections
查看消费者信息
rabbitmqctl list_consumers
查看环境变量
rabbitmqctl environment
查看未被确认的队列
rabbitmqctl list_queues name messages_unacknowledged
查看单个队列的内存使用
rabbitmqctl list_queues name memory
查看准备就绪的队列
rabbitmqctl list_queues name messages_ready
1.8消息追踪-Firehose
- rabbitmqctl trace_on:开启Firehose命令
- rabbitmqctl trace_off:关闭Firehose命令
- 打开 trace 会影响消息写入功能,适当打开后请关闭。
rabbitmq_tracing和Firehose在实现上如出一辙,记录所有消息的入队(生产消息)以及出队(消费)的详细信息,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。
2. 应用问题
2.1 消息补偿
-
三个队列:
生产者和消费者
消费者和状态检查服务
生产者和检查服务的
利用dubbo实现检查服务远程过程调用消息生产者(如果说延迟消息检测发现没有正常处理)
2.2. 幂等性问题(不会重复消费)
使用乐观锁的思想
{ uid: 123546, money: 100 version: 1 } { uid: 123546, money: 100 version: 1 } { uid: 123546, money: 100 version: 1 }
数据库: 123546, 50000, 1 第一次消费后: 123546, 49000, 2
update set money = money - ${money}, version = version + 1 where version = ${version}