笔者所有文章第一时间发布于:
hhbbz的个人博客
消息队列的使用场景
校验用户名等信息,如果没问题会在数据库中添加一个用户记录
如果是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信
分析用户的个人信息,以便将来向他推荐一些志同道合的人,或向那些人推荐他
发送给用户一个包含操作指南的系统通知
消息的重发补偿解决思路
可靠消息服务定时查询状态为已发送并超时的消息
可靠消息将消息重新投递到 MQ 组件中
下游应用监听消息,在满足幂等性的条件下,重新执行业务。
下游应用通知可靠消息服务该消息已经成功消费。
通过消息状态确认和消息重发两个功能,可以确保上游应用、可靠消息服务和下游应用数据的最终一致性。
消息的幂等性解决思路
- 查询操作
查询一次和查询多次,在数据不变的情况下,查询结果是一样的。select是天然的幂等操作 - 删除操作
删除操作也是幂等的,删除一次和多次删除都是把数据删除。(注意可能返回结果不一样,删除的数据不存在,返回0,删除的数据多条,返回结果多个)
3.唯一索引,防止新增脏数据
比如:支付宝的资金账户,支付宝也有用户账户,每个用户只能有一个资金账户,怎么防止给用户创建资金账户多个,那么给资金账户表中的用户ID加唯一索引,所以一个用户新增成功一个资金账户记录 - token机制,防止页面重复提交
- 悲观锁
获取数据的时候加锁获取
select * from table_xxx where id='xxx' for update;
注意:id字段一定是主键或者唯一索引,不然是锁表,会死人的
悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,根据实际情况选用 - 乐观锁
乐观锁只是在更新数据那一刻锁表,其他时间不锁表,所以相对于悲观锁,效率更高。 - 分布式锁
还是拿插入数据的例子,如果是分布是系统,构建全局唯一索引比较困难,例如唯一性的字段没法确定,这时候可以引入分布式锁,通过第三方的系统(redis或zookeeper),在业务系统插入数据或者更新数据,获取分布式锁,然后做操作,之后释放锁,这样其实是把多线程并发的锁的思路,引入多多个系统,也就是分布式系统中得解决思路。 - select + insert
并发不高的后台系统,或者一些任务JOB,为了支持幂等,支持重复执行,简单的处理方法是,先查询下一些关键数据,判断是否已经执行过,在进行业务处理,就可以了
注意:核心高并发流程不要用这种方法 - 状态机幂等
在设计单据相关的业务,或者是任务相关的业务,肯定会涉及到状态机(状态变更图),就是业务单据上面有个状态,状态在不同的情况下会发生变更,一般情况下存在有限状态机,这时候,如果状态机已经处于下一个状态,这时候来了一个上一个状态的变更,理论上是不能够变更的,这样的话,保证了有限状态机的幂等。 - 对外提供接口的api如何保证幂等
如银联提供的付款接口:需要接入商户提交付款请求时附带:source来源,seq序列号
source+seq在数据库里面做唯一索引,防止多次付款,(并发时,只能处理一个请求)
摘抄自: http://825635381.iteye.com/blog/2276077
消息的堆积解决思路
如果还没开始投入使用kafka,那应该在设计分区数的时候,尽量设置的多点,从而提升生产和消费的并行度,避免消费太慢导致消费堆积。
增大批次
瓶颈在消费吞吐量的时候,增加批次也可以改善性能
增加线程数
如果一些消费者组中的消费者线程还是有1个消费者线程消费多个分区的情况,建议增加消费者线程。尽量1个消费者线程对应1个分区,从而发挥现有分区数下的最大并行度。
摘抄自: https://kaimingwan.com/post/framworks/kafka/kafkaxiao-xi-dui-ji-chu-li
自己如何实现消息队列
大体上的设计是由一条线程1执行从等待列表中获取任务插入任务队列再由线程池中的线程从任务队列中取出任务去执行.
添加一条线程1主要是防止在执行耗时的任务时阻塞主线程.当执行耗时任务时,添加的任务的操作快于取出任务的操作,
当任务队列长度达到最大值时,线程1将被阻塞,等待线程2,3...从任务队列取出任务执行。
如何保证消息的有序性
通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。订单号相同的消息会被先后发送到同一个队列中,
在获取到路由信息以后,会根据算法来选择一个队列,同一个OrderId获取到的肯定是同一个队列。