聊聊并发与一致性问题

前言

列举日常开发中最容易犯的并发错误,并基于这些错误,跟大家聊聊并发与一致性,如果有哪里写得不对的话,欢迎大家指出。

并发与一致性概念

并发与并行有什么区别?

并发: 是指同一个时间段内多个任务同时都在执行,并且都没有执行结束.

并行: 是说在单位时间 内多个任务同时在执行。

并发任务强调在一个时间段内同时执行,而一个时间段由多个单位时间累积而成,所以说并发的多个任务在单位时间内不一定同时在执行。在这里,我举一个生活的例子,来比喻并发与并行。

现时生活中的并发与并行:
假设公路有四条道路,四辆汽车可以同一时刻通过同一位置,我把它看做并行。而一条单行道路,同一时刻只有一辆汽车可以通过同一个地方,但是其他的汽车可以陆续的通过,这就是并发的一个缩影。

image

一致性是什么?

一致性指的就是最终的结果是否和设定的规则保持一致,一般指数据保持一致,如果在分布式系统中,可以理解为多个节点中数据的值是一致的。

强一致性

这种一致性级别是最符合用户直觉的,它要求系统写入什么,读出来的也会是什么,用户体验好,但实现起来往往对系统的性能影响大

弱一致性

这种一致性级别约束了系统在写入成功后,不承诺立即可以读到写入的值,也不承诺多久之后数据能够达到一致,但会尽可能地保证到某个时间级别(比如秒级别)后,数据能够达到一致状态

最终一致性

最终一致性是弱一致性的一个特例,系统会保证在一定时间内,能够达到一个数据一致的状态。这里之所以将最终一致性单独提出来,是因为它是弱一致性中非常推崇的一种一致性模型,也是业界在大型分布式系统的数据一致性上比较推崇的模型

image

日常代码中的并发问题

下面列举大家平时在工作中最容易犯的并发错误,都是在实际项目代码中看到的鲜活例子。

First Blood

线上总是出现:ERROR 1062 (23000) Duplicate entry 'xxx' for key 'yyy',我们来看一下有问题的这段代码:

UserBindInfo info = selectFromDB(${userId});
if(info == null){
    info = new UserBindInfo(${userId},${deviceId});
    insertIntoDB(info);
}else{
    info.setDeviceId(${deviceId});
    updateDB(info);
    }

并发情况下,第二步判断都为空,就会有2个或者多个线程进入插入数据库操作,
这时候就出现了同一个ID插入多次,
正确处理姿势:

insert into UserBindInfo values(#{userId},#{deviceId}) on duplicate key update deviceId=#{deviceId}多次的情况,导致插入失败。

一般情况下,可以用insert...on duplicate key update... 解决这个问题。

注意: 如果UserBindInfo表存在主键以及一个以上的唯一索引,在并发情况下,使用insert...on duplicate key,可能会产生死锁,可以这样处理:

try{
   UserBindInfoMapper.insertIntoDB(userBindInfo);
}catch(DuplicateKeyException ex){
    UserBindInfoMapper.update(userBindInfo);
}

Double Kill

现在有如下业务:控制同一个用户访问某个接口的频率不能小于5秒。一般很容易想到使用redis的 setnx操作来控制并发访问,于是有以下代码:

if(RedisOperation.setnx(${userId}, 1)){
    RedisOperation.expire(${userId},5,TimeUnit.SECONDS));
    //执行正常业务逻辑
}else{
    return “访问过于频繁”;
}

假设执行完setnx操作,还没来得及设置expireTime,机器重启或者突然崩溃,将会发生死锁。该用户id,后面执行setnx永远将为false,这可能让你永远损失那个用户

那么怎么解决这个问题呢,可以考虑用SET key value NX EX max-lock-time ,它是一种在 Redis 中实现锁的方法,是原子性操作,不会像以上代码分两步执行,先set再expire,它是一步到位

客户端执行以上的命令:

  • 如果服务器返回 OK ,那么这个客户端获得锁。
  • 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
  • 设置的过期时间到达之后,锁将自动释放

Trible Kill

我们看一下有关ConcurrentHashMap的一段代码,如下:

//全局变量
Map<String, Integer> map = new ConcurrentHashMap(); 

Integer value = count.get(k);
if(value == null){
       map.put(k,1);
}else{
    map.put(k,value+1);
}

假设两条线程都进入 value==null,这一步,得出的结果是不是会变小?OK,客官先稍作休息,闭目养神一会,我们验证一下,请看一个demo:

  public static void main(String[] args)  {
        for (int i = 0; i < 1000; i++) {
            testConcurrentMap();
        }
    }
    private static void testConcurrentMap() {
        final Map<String, Integer> count = new ConcurrentHashMap<>();
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        final CountDownLatch endLatch = new CountDownLatch(2);
        Runnable task = ()->  {
                for (int i = 0; i < 5; i++) {
                    Integer value = count.get("k");
                    if (null == value) {
                        System.out.println(Thread.currentThread().getName());
                        count.put("k", 1);
                    } else {
                        count.put("k", value + 1);
                    }
                }
                endLatch.countDown();
        };

        executorService.execute(task);
        executorService.execute(task);

        try {
            endLatch.await();
            if (count.get("k") < 10) {
                System.out.println(count);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

表面看,运行结果应该都是10对吧,好的,我们再看运行结果


image

运行结果出现了5,所以这样实现是有并发问题的,那么正确的实现姿势是啥呢?

Map<K,V> map = new ConcurrentHashMap(); 
V v = map.get(k);
if(v == null){
        V v = new V();
        V old = map. putIfAbsent(k,v);
        if(old != null){
                  v = old;
        }
}

可以考虑使用putIfAbsent解决这个问题

(1)如果key是新的记录,那么会向map中添加该键值对,并返回null。

(2)如果key已经存在,那么不会覆盖已有的值,返回已经存在的值

我们再来看看以下代码以及运行结果:

 public static void main(String[] args)  {
        for (int i = 0; i < 1000; i++) {
            testConcurrentMap();
        }
    }

    private static void testConcurrentMap() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        final Map<String, AtomicInteger> map = Maps.newConcurrentMap();
        final CountDownLatch countDownLatch = new CountDownLatch(2);

        Runnable task = ()->  {
                AtomicInteger oldValue;
                for (int i = 0; i < 5; i++) {
                    oldValue = map.get("k");
                    if (null == oldValue) {
                        AtomicInteger initValue = new AtomicInteger(0);
                        oldValue = map.putIfAbsent("k", initValue);
                        if (oldValue == null) {
                            oldValue = initValue;
                        }
                    }
                    oldValue.incrementAndGet();
                }
            countDownLatch.countDown();
        };

        executorService.execute(task);
        executorService.execute(task);

        try {
            countDownLatch.await();
            System.out.println(map);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
image

Quadra Kill

小心你的全局变量,如下面这段代码:

@Component
public class GlobalVariableConcurrentTest {

    public static List<String> desc = new ArrayList<>();

    public List<String> getDescByUserType(int userType) {
        if (userType == 1) {
            desc.add("普通会员不可以发送和查看邮件,请购买会员");
            return desc;
        } else if (userType == 2) {
            desc.add("恭喜你已经是VIP会员,尽情的发邮件吧");
            return desc;
        }else {
            desc.add("你的身份未知");
            return desc;
        }
    }
}

因为desc是全局变量,在并发情况下,请求getDescByUserType方法,得到的并不是你想要的结果。

Penta Kill

现有如下业务场景:用户手上有一张现金券,可以兑换相应的现金,

错误示范一

if(isAvailable(ticketId){
    1、给现金增加操作
    2、deleteTicketById(ticketId)
}else{
    return “没有可用现金券”
}

解析: 假设有两条线程A,B兑换现金,执行顺序如下:

image
  • 1.线程A加现金
  • 2.线程B加现金
  • 3.线程A删除票标志
  • 4.线程B删除票标志

显然,这样有问题了,已经给用户加了两次现金了

错误示范2

if(isAvailable(ticketId){
    1、deleteTicketById(ticketId)
    2、给现金增加操作
}else{
    return “没有可用现金券”
}

并发情况下,如果一条线程,第一步deleteTicketById删除失败了,也会多添加现金。

正确处理方案

if(deleteAvailableTicketById(ticketId) == 1){
    1、给现金增加操作
}else{
    return “没有可用现金券”
}

并发环境下数据库缓存一致性

在这里,我先问大家一个问题,有写操作的时候,先操作数据库还是先操作缓存呢? 你可以先思考一下,可能会存在哪些问题,再往下看。下面我分几种方案阐述:

缓存维护方案一

一写(线程A)一读(线程B)操作,先操作缓存,在操作数据库

image

1)线程A发起一个写操作,第一步del cache

2)线程A第二步写入新数据到DB

3)线程B发起一个读操作,cache miss,

4)线程B从DB获取最新数据

5)请求B同时set cache

这样看,没啥问题。我们再看第二个流程图,如下:

image

1)线程A发起一个写操作,第一步del cache

2)此时线程B发起一个读操作,cache miss

3)线程B继续读DB,读出来一个老数据

4)然后老数据入cache

5)线程A写入了最新的数据

OK,酱紫,就有问题了吧,老数据入到缓存了,每次读都是老数据啦,缓存与数据与数据库数据不一致

缓存维护方案二

双写操作,先操作缓存,在操作数据库

image

1)线程A发起一个写操作,第一步set cache

2)线程A第二步写入新数据到DB

3)线程B发起一个写操作,set cache,

4)线程B第二步写入新数据到DB

这样看,也没啥问题。我们再看第二个流程图,如下:

image

1)线程A发起一个写操作,第一步set cache

2)线程B发起一个写操作,第一步setcache

3)线程B写入数据库到DB

4)线程A写入数据库到DB

执行完后,缓存保存的是B操作后的数据,数据库是A操作后的数据,缓存和数据库数据不一致

缓存维护方案三

一写(线程A)一读(线程B)操作,先操作数据库,再操作缓存

image

1)线程A发起一个写操作,第一步write DB

2)线程A第二步del cache

3)线程B发起一个读操作,cache miss

4)线程B从DB获取最新数据

5)线程B同时set cache

这种方案没有明显的并发问题,但是有可能步骤二删除缓存失败,虽然概率比较小,优于方案一和方案二,平时工作中也是使用方案三。

综上对比,我们一般采用方案三,但是有没有完美全解决方案三的弊端的方法呢?

缓存维护方案四

这个是方案三的改进方案,我们来看一下流程图:

image

通过数据库的binlog异步淘汰key,以mysql为例
可以使用阿里的canal将binlog日志采集发送到MQ队列里面,然后通过ACK机制
确认处理
这条更新消息,删除缓存。

但是呢还有个问题,如果是主从数据库呢

缓存维护方案五

主从DB问题:因为主从DB同步存在同时延时时间如果删除缓存之后,数据同步到备库之前已经有请求过来时,会从备库中读到脏数据,如何解决呢?解决方案如下流程图:

image

缓存维护总结

(1)读取缓存中是否有相关数据

(2)如果缓存中有相关数据value,则返回

(3)如果缓存中没有相关数据,则从数据库读取相关数据放入缓存中key->value,再返回

(4)如果有更新数据,则先更新数据,再删除缓存

(5)为了保证第四步删除缓存成功,使用binlog异步删除

(6)如果是主从数据库,binglog取自于从库

(7)如果是一主多从,每个从库都要采集binlog,然后消费端收到最后一台binlog数据才删除缓存

更新缓存的Design Pattern

谈到数据库缓存一致性问题,我们再来看一下更新缓存的Design Pattern有四种:Cache aside, Read through, Write through, Write behind caching。

Cache Aside Pattern

最经典,同时也是最常用的缓存+数据库读写的模式,就是 Cache Aside Pattern

  • 查询: 先从缓存获取数据,如果缓存无数据,就去查数据库,将查询结果放入缓存,同时返回数据给用户。
  • 更新: 先把数据存到数据库中,成功后,再让缓存失效。

Read Through Pattern

在查询操作中更新缓存。也就是说,当缓存失效的时候,Cache Aside pattern是由调用方负责把数据加载入缓存,而Read Through则用缓存服务自己来加载,从而对应用方是透明的。

Write Through Pattern

当有数据更新的时候,如果没有命中缓存,直接更新数据库,然后返回。如果命中了缓存,则更新缓存,然后再由Cache自己更新数据库。

Write Behind Caching Pattern

在更新数据的时候,只更新缓存,不更新数据库,而我们的缓存会异步地批量更新数据库。

分布式系统数据一致性

现实中分布式一致性场景

我们来看一下几个典型的分布式一致性场景

1、银行转账

在跨行转账过程中,我们经常会遇到这种情况:我本行的money已经扣除成功,但是对方银行入账可能需要在N个工作日后到账!此时我们一般不担心钱丢失问题:只要在给定的期限内到账且钱不要少就好了!----这也成为了几乎所有用户对于现代银行系统最基本的需求

2、火车购票

K1314次列车,深圳-北京的卧铺仅剩下最后一张车票了,可能在同一时刻,有很多乘客在不同地点的不同售票窗口都想买这一张车票,但是这张票只会卖给一位用户,这就需要购票系统的每一个节点都要有强一致的剩余车票数据

3、网上购物

我们经常会看到某个秒杀物品会在页面上展示商品的剩余数量,其实大家都知道这个数量绝大多数是缓存数据,不是实时更新的,但是在某一段时间后会同步最终剩余数量。

CAP理论

image

一致性(C:Consistency):

一致性是指数据在多个副本之间能否保持一致的特性。例如一个数据在某个分区更新之后,在其他分区读出来的数据也是更新之后的数据

可用性(A:Availability):

可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果。这里的重点是"有限时间内"和"返回结果"。

分区容错性(P:Partition tolerance):

分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务

CAP理论:
一个分布式系统不可能同时满足一致性C、可用性(A:Availability)和分区容错性(P:Partition tolerance),最多只能同时满足其中两项.

选择 说明
CA 放弃分区容错性,加强一致性和可用性,其实就是传统的单机数据库的选择
AP 放弃一致性,分区容错性和可用性,这是很多分布式系统设计时的选择
CP 放弃可用性,追求一致性和分区容错性,网络问题会直接让整个系统不可用

分布式系统一致性解决方案

BASE理论:

1、BA:Basically Available

基本可用:通过支持局部故障而不是系统全局故障来实现的。如将用户分区在 5 个数据库服务器上,一个用户数据库的故障只影响这台特定主机那 20% 的用户,其他用户不受影响

2、S:Soft State

软状态,状态可以有一段时间不同步

3、E:Eventually Consistent

最终一致,最终数据是一致的就可以了,而不是时时保持强一致

经典案例分析:银行跨行转账

image

1、汇丰银行账户A申请汇款100元到渣打银行账户B上

2、汇丰银行执行以下事务操作:

  a、记录操作流水,生成流水单号,此流水可以理解为A的转账凭证
  b、执行A账号扣除100元的操作

3、汇丰银行通知渣打银行A的转账请求

4、渣打银行收到通知后,执行以下事务操作:

  a、插入到日志流水(即判断个凭证是否处理过)
  b、执行B账号增加100元操作

5、两个银行中每日或者定时对账,处理异常的流水订单

一句话总结:
将分布式事务转换为多个本地事务,然后依靠重试等方式达到最终一致性

经典案例分析:两阶段提交2PC

举例分析:

第一阶段,张老师作为“协调者”,给小强和小明(参与者、节点)发微信,组织他们俩明天8点在学校门口集合,一起去爬山,然后开始等待小强和小明答复。

第二阶段,如果小强和小明都回答没问题,那么大家如约而至。如果小强或者小明其中一人回答说“明天没空,不行”,那么张老师会立即通知小强和小明“爬山活动取消”。

这个过程中可能有很多问题的。如果小强没看手机,那么张老师会一直等着答复,小明可能在家里把爬山装备都准备好了却一直等着张老师确认信息。更严重的是,如果到明天8点小强还没有答复,那么就算“超时”了,那小明到底去还是不去集合爬山呢?大家茶余饭后,思考以下这个问题吧。

结语

谢谢阅读,希望本文对你有帮助。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351