函数式内功心法-05: 无锁高并发技术之STM海市蜃楼

不知道大家听说过没有,函数式天生擅长高并发。

并发concurrency与并行parallel的区别相信大家都懂。并发是不同的逻辑相互打配合,并行则是相同的逻辑一起跑加快速度。当然不完全准确,思想上差不多是这个意思。

目前函数式语言高并发的技术有很多种,
比如以Erlang/Scala为代表的Actor,
还有Clojure/Go为代表的CSP,
以及目前要介绍的Haskell/Clojure为代表的STM.
其它技术将会在后文一一介绍.

让我们从此刻开始拥抱STM吧!

  1. STM是什么?
  2. 深入理解STM
    a. STM到底是怎么做到的呢?
    b. 进一步控制STM!
  3. STM的haskell源码解析
    a. STM世界
    b. TVar变量
    c. retry与orElse流程控制
    d. rts运行时接口清单
  4. STM的rts运行时分析
    a. PrimOps介绍
    b. rts运行时基本概念
    c. STM数据类型-StgTSO, StgTRecHeader , StgTVar
    d. Tvar操作-newTVar#, readTVar#, writeTVar#
    e. STM操作-atomically#, raiseIO#, catchSTM#, retry#, catchRetry#

一. STM是什么?

函数式值是不可变的,但是如果两个线程需要通讯,则需要特殊的变量来通讯。如果需要单个变量通讯,系统大部分提供了原子类型。如果涉及多个变量通讯,则问题比较复杂了。

在用锁的解决方案中,则需要保证锁的顺序一致性,不然则会出死锁。锁的资源管理也是非常头痛的,因为代码分散在各个地方,一处更改对其它均有影响。

STM则简单很多。我们把需要改变的事务变量读过来,在自己构建的环境里面处理它。处理完了之后进行乾坤大挪移,最终把所有更新过的虚拟事务变量一起写回去。如果处理失败,自然还是原来的值。其它线程也只能看到处理前或处理后的值。所以达到了all-or-nothing的效果。

对于用户接口来说,则只需要新建事务变量,读写事务变量,最终提交整个事务逻辑即可,其它的什么也不用做,实在是快乐得狠。。。

我们简单看一下代码:

newtype Gold = Gold Int
    deriving (Eq, Ord, Show, Num)
type Balance = TVar Gold

basicTransfer :: Gold -> Balance -> Balance -> STM ()
basicTransfer qty fromBal toBal = do
  fromQty <- readTVar fromBal
  toQty   <- readTVar toBal
  writeTVar fromBal (fromQty - qty)
  writeTVar toBal   (toQty + qty)

transferTest = do
  alice <- newTVar (12 :: Gold)
  bob   <- newTVar 4
  basicTransfer 3 alice bob
  liftM2 (,) (readTVar alice) (readTVar bob)

atomically transferTest

a. 我们创建了两个事务变量TVar,
b. 接着调用basicTransfer在STM世界里面构建转账事务逻辑
c. 最后调用atomically运行transferTest事务逻辑。

我看可以看到,只要在STM世界里进行逻辑处理,不管有多少个TVAR都是支持的,我们只需要newTVar后调用readTVar及writeTVar进行变量的修改,其它一切都不需要管。简单方便极了,好用好用好用!

二. 深入理解STM

官方的文档可以参考: https://ghc.haskell.org/trac/ghc/wiki/Commentary/Rts/STM
https://en.wikipedia.org/wiki/Concurrent_Haskell

这里请允许我用自己粗俗的再次转述一下:

1. STM到底是怎么做到的呢?

a. 每个线程创建自己的TRec事务记录


b. 每次对不同的Tvar操作时,会记录在TRec的TRecEntry里面。


c. 当提交STM事务时, 线程检查TRec里面TVar的值是否被其它线程提交改变。如果没有其它线程动过,则由系统底层全部锁住批量改写。如果其它线程动过这些TVar,则再次以这些新的tvar值重新运行事务。

2. 进一步控制STM!

前面的事务控制是自动发生的,当然有些时候我们需要stm的特殊服务。
比如我们想进行转账,可是转账的时候TVar的余额不足。我们能不能监控TVar,如果TVar的值发生改变了,我们再次重试本次事务呢? 或者失败了回滚后去运行另一个事务呢?
这里面就是STM的两大流程控制: retry与orElse

a. 我们来看一下retry的控制代码

transferBlocking :: Int -> TVar Int -> TVar Int -> STM ()
transferBlocking v a b = do
    x <- readTVar a
    y <- readTVar b
    if x < v
      then retry
      else do
              writeTVar a (x - v)
              writeTVar b (y + v)

我们可以看到如果x<v的时候,事务不能发生,调用了retry进行了阻塞。阻塞了什么时候唤醒呢?当阻塞的时候,线程会将自己加入到TVar的watchQueue队列里面,当其它线程修改了TVar,则会唤醒线程重新运行事务。
不用写代码,就有这么高级的功能,是不是很震憾!

b. orElse就更加高级了,直接作用在retry上面.

如果前面的retry没有成功,我们不让它阻塞等待唤醒,我们跑另一个事务!连retry也被控制了,可怕,太可怕!

transferChoice :: Int -> TVar Int -> TVar Int -> TVar Int -> STM ()
transferChoice v a a' b = do
    transferBlocking v a b `orElse` transferBlocking v a' b

orElse很简单,将前面retry事务作为一个嵌套事务,如果失败则回滚继续其它事务。当然如果是其它线程修改数据造成验证失败,则不算是前面的事务失败,还是要重新再提交事务的喽。

发生了这么多有趣的事情,是不是忍受不了看源码的诱惑了。
好的,遵命!

三. STM的haskell源码解析

STM已经作为haskell的核心部分,所以直接在运行时与base模块里面了。
在这节我们关注haskell接口部分。
前面讲的几个部分,我们再来回顾一下:
a. STM世界
b. retry与orElse流程控制
c. TVar变量
d. atomically事务提交

haskell stm的代码均在GHC.Conc.Sync模块里面:
https://github.com/ghc/ghc/blob/master/libraries/base/GHC/Conc/Sync.hs

1. STM世界

a. 先看STM的类型定义

newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))

unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
unSTM (STM a) = a

atomically :: STM a -> IO a
atomically (STM m) = IO (\s -> (atomically# m) s )

可以看到STM世界跟IO世界非常像,都是状态传递。
当atomically运行STM事务逻辑时,传送到了IO世界.

b. 接着看stm是如何传递复合的

instance Applicative STM where
  {-# INLINE pure #-}
  {-# INLINE (*>) #-}
  {-# INLINE liftA2 #-}
  pure x = returnSTM x
  (<*>) = ap
  liftA2 = liftM2
  m *> k = thenSTM m k

instance  Monad STM  where
    {-# INLINE (>>=)  #-}
    m >>= k     = bindSTM m k
    (>>) = (*>)

bindSTM :: STM a -> (a -> STM b) -> STM b
bindSTM (STM m) k = STM ( \s ->
  case m s of
    (# new_s, a #) -> unSTM (k a) new_s
  )

thenSTM :: STM a -> STM b -> STM b
thenSTM (STM m) k = STM ( \s ->
  case m s of
    (# new_s, _ #) -> unSTM k new_s
  )

instance Alternative STM where
  empty = retry
  (<|>) = orElse

这里的传递复合过程比较简单。
a. >>=为代表的bindSTM, 运行完后接着运行,
>>为其快捷方式,不绑定直接thenSTM,忽略绑定参数运行
b. <|>为代表的如果发生retry则运行另外一个
<|>的逻辑即为后文的orElse on retry逻辑

c. 最后看STM是如何构造的

returnSTM :: a -> STM a
returnSTM x = STM (\s -> (# s, x #))

newTVar :: a -> STM (TVar a)
newTVar val = STM $ \s1# ->
    case newTVar# val s1# of
         (# s2#, tvar# #) -> (# s2#, TVar tvar# #)

readTVar :: TVar a -> STM a
readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#

writeTVar :: TVar a -> a -> STM ()
writeTVar (TVar tvar#) val = STM $ \s1# ->
    case writeTVar# tvar# val s1# of
         s2# -> (# s2#, () #)

throwSTM :: Exception e => e -> STM a
throwSTM e = STM $ raiseIO# (toException e)

STM构造方法分为三种,一种常量构造,一种是通过TVar构建,还有一种是异常接入。
TVar是最常用的形式,所以我们接着进一步了解

2. TVar变量

data TVar a = TVar (TVar# RealWorld a)

instance Eq (TVar a) where
        (TVar tvar1#) == (TVar tvar2#) = isTrue# (sameTVar# tvar1# tvar2#)

newTVar :: a -> STM (TVar a)
newTVar val = STM $ \s1# ->
    case newTVar# val s1# of
         (# s2#, tvar# #) -> (# s2#, TVar tvar# #)

readTVar :: TVar a -> STM a
readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#

writeTVar :: TVar a -> a -> STM ()
writeTVar (TVar tvar#) val = STM $ \s1# ->
    case writeTVar# tvar# val s1# of
         s2# -> (# s2#, () #)

TVar数据类型本质上是TVar#原始类型的封装,TVar#由运行时提供。
TVar#包括新建,读取,写入三种方式调用.
分别用由运行时的newTVar#, readTVar#, writeTVar#提供实现

3. retry与orElse流程控制

throwSTM :: Exception e => e -> STM a
throwSTM e = STM $ raiseIO# (toException e)

catchSTM :: Exception e => STM a -> (e -> STM a) -> STM a
catchSTM (STM m) handler = STM $ catchSTM# m handler'
    where
      handler' e = case fromException e of
                     Just e' -> unSTM (handler e')
                     Nothing -> raiseIO# e

retry :: STM a
retry = STM $ \s# -> retry# s#

orElse :: STM a -> STM a -> STM a
orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s

这里的流程也包含了异常处理.
a. 首先throwSTM 调用运行时接口raiseIO#封装异常到STM世界
接着catchSTM调用运行时catchSTM#处理异常
b. 而retry则由运行时的retry# 方法提供
c. orElse则是对retry进行catch,由运行时的catchRetry#方法提供

4. 运行时接口清单

现在一切都指向运行时了,我们重新回顾一下。
TVar类型-TVar#
Tvar操作-newTVar#, readTVar#, writeTVar#
STM操作-atomically#, raiseIO#, catchSTM#, retry#, catchRetry#

到了最激动人心的时刻了,进军STM运行时!

四. STM的运行时分析

1. PrimOps介绍

rts基本的操作在PrimOps里面实现,
由genprimopcode工具动态生成GHC.Prim模块。

a. C语言层

b. cmm层

cmm实现GHC.Prim方法, 对接底层c接口
https://github.com/ghc/ghc/blob/master/rts/PrimOps.cmm
包含如下函数:
stg_newTVarzh -> newTVar#
stg_readTVarzh -> readTVar#
stg_writeTVarzh -> writeTVar#
stg_atomicallyzh -> atomically#
stg_catchSTMzh -> catchSTM#
stg_retryzh -> retry#
stg_catchRetryzh -> catchRetry#

c . 接口定义层

定义rts运行时底层调用方法列表:
https://github.com/ghc/ghc/blob/master/compiler/prelude/primops.txt.pp

d. Haskell层

动态生成GHC.Prim 模块供haskell调用,
接口映射过程包含一些字符转换将#转为zh

usage: genprimopcode command < primops.txt > ...\n"

https://github.com/ghc/ghc/blob/master/utils/genprimopcode/Main.hs

               -- Constructors
               encode_ch '('  = "ZL"    -- Needed for things like (,), and (->)
               encode_ch ')'  = "ZR"    -- For symmetry with (
               encode_ch '['  = "ZM"
               encode_ch ']'  = "ZN"
               encode_ch ':'  = "ZC"
               encode_ch 'Z'  = "ZZ"

               -- Variables
               encode_ch 'z'  = "zz"
               encode_ch '&'  = "za"
               encode_ch '|'  = "zb"
               encode_ch '^'  = "zc"
               encode_ch '$'  = "zd"
               encode_ch '='  = "ze"
               encode_ch '>'  = "zg"
               encode_ch '#'  = "zh"
               encode_ch '.'  = "zi"
               encode_ch '<'  = "zl"
               encode_ch '-'  = "zm"
               encode_ch '!'  = "zn"
               encode_ch '+'  = "zp"
               encode_ch '\'' = "zq"
               encode_ch '\\' = "zr"
               encode_ch '/'  = "zs"
               encode_ch '*'  = "zt"
               encode_ch '_'  = "zu"
               encode_ch '%'  = "zv"
               encode_ch c    = 'z' : shows (ord c) "U"

2. rts运行时基本概念

前面介绍了STM涉及c, cmm, haskell三种接口。现在就来详细说明一下。

a. Haskell源码编译过程

                                                                           +---------+
                                                         LLVM backend /--->| LLVM IR |--\
                                                                      |    +---------+  | LLVM
                                                                      |                 v
 +------------+ Desugar  +------+ STGify  +-----+ CodeGen  +-----+    |  NCG    +----------+
 | Parse tree |--------->| Core |-------->| STG |--------->| C-- |----+-------->| Assembly |
 +------------+          +------+         +-----+          +-----+    |         +----------+
                                                                      |            ^
                                                                      |     +---+  | GCC
                                                            C backend \---->| C |--/
                                                                            +---+

a. haskell代码首先desugar之后变成haskell core.
b. 接着经过stg转换生成cmm
c. 最后与不同的c语言接口进行链接(LLVM, GCC, Assembly)


b. RTS的线程状态保存在TSO(Thread State Object)里面

包含了StgStack_堆栈与StgTRecHeader事务记录。
其中StgStack_用于闭包调用,StgTRecHeader用于TVar事务处理。



TSO对象定义在TSO.h文件中:

typedef struct StgTSO_ {
    StgHeader               header;

    struct StgTSO_*         _link;

    struct StgTSO_*         global_link;    // Links threads on the
                                            // generation->threads lists

    struct StgStack_       *stackobj;

    StgWord16               what_next;      // Values defined in Constants.h
    StgWord16               why_blocked;    // Values defined in Constants.h
    StgWord32               flags;          // Values defined in Constants.h
    StgTSOBlockInfo         block_info;
    StgThreadID             id;
    StgWord32               saved_errno;
    StgWord32               dirty;          /* non-zero => dirty */
    struct InCall_*         bound;
    struct Capability_*     cap;

    struct StgTRecHeader_ * trec;       /* STM transaction record */

    struct MessageThrowTo_ * blocked_exceptions;

    struct StgBlockingQueue_ *bq;

    StgInt64  alloc_limit;     /* in bytes */

    StgWord32  tot_stack_size;

#if defined(TICKY_TICKY)
    /* TICKY-specific stuff would go here. */
#endif
#if defined(PROFILING)
    StgTSOProfInfo prof;
#endif
#if defined(mingw32_HOST_OS)
    StgWord32 saved_winerror;
#endif

} *StgTSOPtr; // StgTSO defined in rts/Types.h

https://github.com/ghc/ghc/blob/master/includes/rts/storage/TSO.h

c. STG寄存器指针

我们可以看到CurrentTSO寄存器指向了当前线程TSO地址
https://github.com/ghc/ghc/blob/master/compiler/cmm/CmmExpr.hs

d. haskell的闭包调用与非函数式不一样。

haskell的闭包调用是通过操纵Stack指针入口进行调用的。而不是采用c语言传统的后入先出的入栈方式。

更多的详情请参照如下文档:
https://www.cs.tufts.edu/~nr/cs257/archive/simon-peyton-jones/eval-apply-jfp.pdf
https://github.com/ghc/ghc/blob/master/compiler/cmm/CmmParse.y

3. STM数据类型-StgTSO, StgTRecHeader , StgTVar

我们首先来看STM相关的数据结构。
前面提到有两种数据结构,一种是事务变量 StgTVar,另一种是线程自己的事务记录StgTRecHeader,记录在StgTSO线程状态对象里面,StgTSO在上一节中有介绍。

STM相关的数据结构定义在如下文件中:
https://github.com/ghc/ghc/blob/master/includes/rts/storage/Closures.h

typedef struct {
  StgHeader                  header;
  StgClosure                *volatile current_value;
  StgTVarWatchQueue         *volatile first_watch_queue_entry;
  StgInt                     volatile num_updates;
} StgTVar;


struct StgTRecHeader_ {
  StgHeader                  header;
  struct StgTRecHeader_     *enclosing_trec;
  StgTRecChunk              *current_chunk;
  TRecState                  state;
};

typedef struct StgTRecChunk_ {
  StgHeader                  header;
  struct StgTRecChunk_      *prev_chunk;
  StgWord                    next_entry_idx;
  TRecEntry                  entries[TREC_CHUNK_NUM_ENTRIES];
} StgTRecChunk;

typedef struct {
  StgTVar                   *tvar;
  StgClosure                *expected_value;
  StgClosure                *new_value;
#if defined(THREADED_RTS)
  StgInt                     num_updates;
#endif
} TRecEntry;

我们可以看:
a. StgTVar包含了一个StgTVarWatchQueue队列,用于阻塞唤醒操作,还有一个current_value用于存储当前值。

b. StgTRecHeader_包含了StgTRecChunk,每个StgTRecChunk包含TREC_CHUNK_NUM_ENTRIES个TRecEntry。StgTRecHeader_可以通过enclosing_trec指向下一个StgTRecHeader_完成容量的扩充。

c. TRecEntry里面包含了StgTVar,以及expected_value与new_value值用于对比及记录修改状态。详情请见后文

有了基础的数据结构,我们接着看具体的操作过程。

4. Tvar操作-newTVar#, readTVar#, writeTVar#

a. 首先看新建事务变量的过程: newTVar#

stg_newTVarzh (P_ init)
{
    W_ tv;

    ALLOC_PRIM_P (SIZEOF_StgTVar, stg_newTVarzh, init);

    tv = Hp - SIZEOF_StgTVar + WDS(1);
    SET_HDR (tv, stg_TVAR_DIRTY_info, CCCS);

    StgTVar_current_value(tv) = init;
    StgTVar_first_watch_queue_entry(tv) = stg_END_STM_WATCH_QUEUE_closure;
    StgTVar_num_updates(tv) = 0;

    return (tv);
}

stg_newTVarzh接受一个init值。
调用ALLOC_PRIM_P分配空间后,操作堆指针赋值给tv。
接着初始化StgTVar事务变量tv:

  • 将current_value设为 init值
  • 将first_watch_queue_entry置为stg_END_STM_WATCH_QUEUE_closure
  • 将StgTVar_num_updates置为0

b. 接着看变量读取函数readTVar#

stg_readTVarzh (P_ tvar)
{
    P_ trec;
    P_ result;

    // Call to stmReadTVar may allocate
    MAYBE_GC_P (stg_readTVarzh, tvar);

    trec = StgTSO_trec(CurrentTSO);
    ("ptr" result) = ccall stmReadTVar(MyCapability() "ptr", trec "ptr",
                                       tvar "ptr");
    return (result);
}

StgClosure *stmReadTVar(Capability *cap,
                        StgTRecHeader *trec,
                        StgTVar *tvar) {
  StgTRecHeader *entry_in = NULL;
  StgClosure *result = NULL;
  TRecEntry *entry = NULL;
  TRACE("%p : stmReadTVar(%p)", trec, tvar);
  ASSERT(trec != NO_TREC);
  ASSERT(trec -> state == TREC_ACTIVE ||
         trec -> state == TREC_CONDEMNED);

  entry = get_entry_for(trec, tvar, &entry_in);

  if (entry != NULL) {
    if (entry_in == trec) {
      // Entry found in our trec
      result = entry -> new_value;
    } else {
      // Entry found in another trec
      TRecEntry *new_entry = get_new_entry(cap, trec);
      new_entry -> tvar = tvar;
      new_entry -> expected_value = entry -> expected_value;
      new_entry -> new_value = entry -> new_value;
      result = new_entry -> new_value;
    }
  } else {
    // No entry found
    StgClosure *current_value = read_current_value(trec, tvar);
    TRecEntry *new_entry = get_new_entry(cap, trec);
    new_entry -> tvar = tvar;
    new_entry -> expected_value = current_value;
    new_entry -> new_value = current_value;
    result = current_value;
  }

  TRACE("%p : stmReadTVar(%p)=%p", trec, tvar, result);
  return result;
}


这里使用ccall调用stmReadTVar底层c函数

c. 最后不得不提到的写入函数writeTVar#

stg_writeTVarzh (P_ tvar,     /* :: TVar a */
                 P_ new_value /* :: a      */)
{
    W_ trec;

    // Call to stmWriteTVar may allocate
    MAYBE_GC_PP (stg_writeTVarzh, tvar, new_value);

    trec = StgTSO_trec(CurrentTSO);
    ccall stmWriteTVar(MyCapability() "ptr", trec "ptr", tvar "ptr",
                       new_value "ptr");
    return ();
}
void stmWriteTVar(Capability *cap,
                  StgTRecHeader *trec,
                  StgTVar *tvar,
                  StgClosure *new_value) {

  StgTRecHeader *entry_in = NULL;
  TRecEntry *entry = NULL;
  TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value);
  ASSERT(trec != NO_TREC);
  ASSERT(trec -> state == TREC_ACTIVE ||
         trec -> state == TREC_CONDEMNED);

  entry = get_entry_for(trec, tvar, &entry_in);

  if (entry != NULL) {
    if (entry_in == trec) {
      // Entry found in our trec
      entry -> new_value = new_value;
    } else {
      // Entry found in another trec
      TRecEntry *new_entry = get_new_entry(cap, trec);
      new_entry -> tvar = tvar;
      new_entry -> expected_value = entry -> expected_value;
      new_entry -> new_value = new_value;
    }
  } else {
    // No entry found
    StgClosure *current_value = read_current_value(trec, tvar);
    TRecEntry *new_entry = get_new_entry(cap, trec);
    new_entry -> tvar = tvar;
    new_entry -> expected_value = current_value;
    new_entry -> new_value = new_value;
  }

  TRACE("%p : stmWriteTVar done", trec);
}

5. STM操作-atomically#, raiseIO#, catchSTM#, retry#, catchRetry#

a. 提交事务函数之atomically#

stg_atomicallyzh (P_ stm)
{
    P_ old_trec;
    P_ new_trec;
    P_ code, frame_result;

    // stmStartTransaction may allocate
    MAYBE_GC_P(stg_atomicallyzh, stm);

    STK_CHK_GEN();

    old_trec = StgTSO_trec(CurrentTSO);

    /* Nested transactions are not allowed; raise an exception */
    if (old_trec != NO_TREC) {
        jump stg_raisezh(base_ControlziExceptionziBase_nestedAtomically_closure);
    }

    code = stm;
    frame_result = NO_TREC;

    /* Start the memory transcation */
    ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr", old_trec "ptr");
    StgTSO_trec(CurrentTSO) = new_trec;

    jump stg_ap_v_fast
        (ATOMICALLY_FRAME_FIELDS(,,stg_atomically_frame_info, CCCS, 0,
                                 code,frame_result))
        (stm);
}

INFO_TABLE_RET(stg_atomically_frame, ATOMICALLY_FRAME,
               // layout of the frame, and bind the field names
               ATOMICALLY_FRAME_FIELDS(W_,P_,
                                       info_ptr, p1, p2,
                                       code,
                                       frame_result))
    return (P_ result) // value returned to the frame
{
    W_ valid;
    gcptr trec, outer, q;

    trec   = StgTSO_trec(CurrentTSO);
    outer  = StgTRecHeader_enclosing_trec(trec);

    /* Back at the atomically frame */
    frame_result = result;

    /* try to commit */
    (valid) = ccall stmCommitTransaction(MyCapability() "ptr", trec "ptr");
    if (valid != 0) {
        /* Transaction was valid: commit succeeded */
        StgTSO_trec(CurrentTSO) = NO_TREC;
        return (frame_result);
    } else {
        /* Transaction was not valid: try again */
        ("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr",
                                                 NO_TREC "ptr");
        StgTSO_trec(CurrentTSO) = trec;

        jump stg_ap_v_fast
            // push the StgAtomicallyFrame again: the code generator is
            // clever enough to only assign the fields that have changed.
            (ATOMICALLY_FRAME_FIELDS(,,info_ptr,p1,p2,
                                     code,frame_result))
            (code);
    }
}

StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
  StgInt64 max_commits_at_start = max_commits;

  TRACE("%p : stmCommitTransaction()", trec);
  ASSERT(trec != NO_TREC);

  lock_stm(trec);

  ASSERT(trec -> enclosing_trec == NO_TREC);
  ASSERT((trec -> state == TREC_ACTIVE) ||
         (trec -> state == TREC_CONDEMNED));

  // Use a read-phase (i.e. don't lock TVars we've read but not updated) if
  // the configuration lets us use a read phase.

  bool result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), true);
  if (result) {
    // We now know that all the updated locations hold their expected values.
    ASSERT(trec -> state == TREC_ACTIVE);

    if (config_use_read_phase) {
      StgInt64 max_commits_at_end;
      StgInt64 max_concurrent_commits;
      TRACE("%p : doing read check", trec);
      result = check_read_only(trec);
      TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");

      max_commits_at_end = max_commits;
      max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
                                (n_capabilities * TOKEN_BATCH_SIZE));
      if (((max_concurrent_commits >> 32) > 0) || shake()) {
        result = false;
      }
    }

    if (result) {
      // We now know that all of the read-only locations held their expected values
      // at the end of the call to validate_and_acquire_ownership.  This forms the
      // linearization point of the commit.

      // Make the updates required by the transaction.
      FOR_EACH_ENTRY(trec, e, {
        StgTVar *s;
        s = e -> tvar;
        if ((!config_use_read_phase) || (e -> new_value != e -> expected_value)) {
          // Either the entry is an update or we're not using a read phase:
          // write the value back to the TVar, unlocking it if necessary.

          ACQ_ASSERT(tvar_is_locked(s, trec));
          TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
          unpark_waiters_on(cap,s);
          IF_STM_FG_LOCKS({
            s -> num_updates ++;
          });
          unlock_tvar(cap, trec, s, e -> new_value, true);
        }
        ACQ_ASSERT(!tvar_is_locked(s, trec));
      });
    } else {
        revert_ownership(cap, trec, false);
    }
  }

  unlock_stm(trec);

  free_stg_trec_header(cap, trec);

  TRACE("%p : stmCommitTransaction()=%d", trec, result);

  return result;
}

static StgBool validate_and_acquire_ownership (Capability *cap,
                                               StgTRecHeader *trec,
                                               int acquire_all,
                                               int retain_ownership) {
  StgBool result;

  if (shake()) {
    TRACE("%p : shake, pretending trec is invalid when it may not be", trec);
    return false;
  }

  ASSERT((trec -> state == TREC_ACTIVE) ||
         (trec -> state == TREC_WAITING) ||
         (trec -> state == TREC_CONDEMNED));
  result = !((trec -> state) == TREC_CONDEMNED);
  if (result) {
    FOR_EACH_ENTRY(trec, e, {
      StgTVar *s;
      s = e -> tvar;
      if (acquire_all || entry_is_update(e)) {
        TRACE("%p : trying to acquire %p", trec, s);
        if (!cond_lock_tvar(trec, s, e -> expected_value)) {
          TRACE("%p : failed to acquire %p", trec, s);
          result = false;
          BREAK_FOR_EACH;
        }
      } else {
        ASSERT(config_use_read_phase);
        IF_STM_FG_LOCKS({
          TRACE("%p : will need to check %p", trec, s);
          if (s -> current_value != e -> expected_value) {
            TRACE("%p : doesn't match", trec);
            result = false;
            BREAK_FOR_EACH;
          }
          e -> num_updates = s -> num_updates;
          if (s -> current_value != e -> expected_value) {
            TRACE("%p : doesn't match (race)", trec);
            result = false;
            BREAK_FOR_EACH;
          } else {
            TRACE("%p : need to check version %ld", trec, e -> num_updates);
          }
        });
      }
    });
  }

  if ((!result) || (!retain_ownership)) {
      revert_ownership(cap, trec, acquire_all);
  }

  return result;
}

static StgBool cond_lock_tvar(StgTRecHeader *trec,
                              StgTVar *s,
                              StgClosure *expected) {
  StgClosure *result;
  StgWord w;
  TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
  w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
  result = (StgClosure *)w;
  TRACE("%p : %s", trec, result ? "success" : "failure");
  return (result == expected);
}

EXTERN_INLINE StgWord
cas(StgVolatilePtr p, StgWord o, StgWord n)
{
    return __sync_val_compare_and_swap(p, o, n);
}

void
tryWakeupThread (Capability *cap, StgTSO *tso)
{
    traceEventThreadWakeup (cap, tso, tso->cap->no);

#if defined(THREADED_RTS)
    if (tso->cap != cap)
    {
        MessageWakeup *msg;
        msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
        SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
        msg->tso = tso;
        sendMessage(cap, tso->cap, (Message*)msg);
        debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
                      (W_)tso->id, tso->cap->no);
        return;
    }
#endif

    switch (tso->why_blocked)
    {
    case BlockedOnMVar:
    case BlockedOnMVarRead:
    {
        if (tso->_link == END_TSO_QUEUE) {
            tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
            goto unblock;
        } else {
            return;
        }
    }

    case BlockedOnMsgThrowTo:
    {
        const StgInfoTable *i;

        i = lockClosure(tso->block_info.closure);
        unlockClosure(tso->block_info.closure, i);
        if (i != &stg_MSG_NULL_info) {
            debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
                          (W_)tso->id, tso->block_info.throwto->header.info);
            return;
        }

        // remove the block frame from the stack
        ASSERT(tso->stackobj->sp[0] == (StgWord)&stg_block_throwto_info);
        tso->stackobj->sp += 3;
        goto unblock;
    }

    case BlockedOnSTM:
        tso->block_info.closure = &stg_STM_AWOKEN_closure;
        goto unblock;

    case BlockedOnBlackHole:
    case ThreadMigrating:
        goto unblock;

    default:
        // otherwise, do nothing
        return;
    }

unblock:
    // just run the thread now, if the BH is not really available,
    // we'll block again.
    tso->why_blocked = NotBlocked;
    appendToRunQueue(cap,tso);

    // We used to set the context switch flag here, which would
    // trigger a context switch a short time in the future (at the end
    // of the current nursery block).  The idea is that we have just
    // woken up a thread, so we may need to load-balance and migrate
    // threads to other CPUs.  On the other hand, setting the context
    // switch flag here unfairly penalises the current thread by
    // yielding its time slice too early.
    //
    // The synthetic benchmark nofib/smp/chan can be used to show the
    // difference quite clearly.

    // cap->context_switch = 1;
}

b. 捕获异常函数之 catchSTM#

stg_catchSTMzh (P_ code    /* :: STM a */,
                P_ handler /* :: Exception -> STM a */)
{
    STK_CHK_GEN();

    /* Start a nested transaction to run the body of the try block in */
    W_ cur_trec;
    W_ new_trec;
    cur_trec = StgTSO_trec(CurrentTSO);
    ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr",
                                                 cur_trec "ptr");
    StgTSO_trec(CurrentTSO) = new_trec;

    jump stg_ap_v_fast
        (CATCH_STM_FRAME_FIELDS(,,stg_catch_stm_frame_info, CCCS, 0,
                                code, handler))
        (code);
}

INFO_TABLE_RET(stg_catch_stm_frame, CATCH_STM_FRAME,
               // layout of the frame, and bind the field names
               CATCH_STM_FRAME_FIELDS(W_,P_,info_ptr,p1,p2,code,handler))
    return (P_ ret)
{
    W_ r, trec, outer;

    trec = StgTSO_trec(CurrentTSO);
    outer  = StgTRecHeader_enclosing_trec(trec);
    (r) = ccall stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr");
    if (r != 0) {
        /* Commit succeeded */
        StgTSO_trec(CurrentTSO) = outer;
        return (ret);
    } else {
        /* Commit failed */
        W_ new_trec;
        ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr");
        StgTSO_trec(CurrentTSO) = new_trec;

        jump stg_ap_v_fast
            (CATCH_STM_FRAME_FIELDS(,,info_ptr,p1,p2,code,handler))
            (code);
    }
}

StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
  StgTRecHeader *et;
  ASSERT(trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
  TRACE("%p : stmCommitNestedTransaction() into %p", trec, trec -> enclosing_trec);
  ASSERT((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));

  lock_stm(trec);

  et = trec -> enclosing_trec;
  bool result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), true);
  if (result) {
    // We now know that all the updated locations hold their expected values.

    if (config_use_read_phase) {
      TRACE("%p : doing read check", trec);
      result = check_read_only(trec);
    }
    if (result) {
      // We now know that all of the read-only locations held their expected values
      // at the end of the call to validate_and_acquire_ownership.  This forms the
      // linearization point of the commit.

      TRACE("%p : read-check succeeded", trec);
      FOR_EACH_ENTRY(trec, e, {
        // Merge each entry into the enclosing transaction record, release all
        // locks.

        StgTVar *s;
        s = e -> tvar;
        if (entry_is_update(e)) {
            unlock_tvar(cap, trec, s, e -> expected_value, false);
        }
        merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
        ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
      });
    } else {
        revert_ownership(cap, trec, false);
    }
  }

  unlock_stm(trec);

  free_stg_trec_header(cap, trec);

  TRACE("%p : stmCommitNestedTransaction()=%d", trec, result);

  return result;
}

c. 休眠函数之retry#

stg_retryzh /* no arg list: explicit stack layout */
{
    W_ frame_type;
    W_ frame;
    W_ trec;
    W_ outer;
    W_ r;

    // STM operations may allocate
    MAYBE_GC_ (stg_retryzh); // NB. not MAYBE_GC(), we cannot make a
                             // function call in an explicit-stack proc

    // Find the enclosing ATOMICALLY_FRAME or CATCH_RETRY_FRAME
retry_pop_stack:
    SAVE_THREAD_STATE();
    (frame_type) = ccall findRetryFrameHelper(MyCapability(), CurrentTSO "ptr");
    LOAD_THREAD_STATE();
    frame = Sp;
    trec = StgTSO_trec(CurrentTSO);
    outer  = StgTRecHeader_enclosing_trec(trec);

    if (frame_type == CATCH_RETRY_FRAME) {
        // The retry reaches a CATCH_RETRY_FRAME before the atomic frame
        ASSERT(outer != NO_TREC);
        // Abort the transaction attempting the current branch
        ccall stmAbortTransaction(MyCapability() "ptr", trec "ptr");
        ccall stmFreeAbortedTRec(MyCapability() "ptr", trec "ptr");
        if (!StgCatchRetryFrame_running_alt_code(frame) != 0) {
            // Retry in the first branch: try the alternative
            ("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr");
            StgTSO_trec(CurrentTSO) = trec;
            StgCatchRetryFrame_running_alt_code(frame) = 1 :: CInt; // true;
            R1 = StgCatchRetryFrame_alt_code(frame);
            jump stg_ap_v_fast [R1];
        } else {
            // Retry in the alternative code: propagate the retry
            StgTSO_trec(CurrentTSO) = outer;
            Sp = Sp + SIZEOF_StgCatchRetryFrame;
            goto retry_pop_stack;
        }
    }

    // We've reached the ATOMICALLY_FRAME: attempt to wait
    ASSERT(frame_type == ATOMICALLY_FRAME);
    ASSERT(outer == NO_TREC);

    (r) = ccall stmWait(MyCapability() "ptr", CurrentTSO "ptr", trec "ptr");
    if (r != 0) {
        // Transaction was valid: stmWait put us on the TVars' queues, we now block
        StgHeader_info(frame) = stg_atomically_waiting_frame_info;
        Sp = frame;
        R3 = trec; // passing to stmWaitUnblock()
        jump stg_block_stmwait [R3];
    } else {
        // Transaction was not valid: retry immediately
        ("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr");
        StgTSO_trec(CurrentTSO) = trec;
        Sp = frame;
        R1 = StgAtomicallyFrame_code(frame);
        jump stg_ap_v_fast [R1];
    }
}

StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
  TRACE("%p : stmWait(%p)", trec, tso);
  ASSERT(trec != NO_TREC);
  ASSERT(trec -> enclosing_trec == NO_TREC);
  ASSERT((trec -> state == TREC_ACTIVE) ||
         (trec -> state == TREC_CONDEMNED));

  lock_stm(trec);
  bool result = validate_and_acquire_ownership(cap, trec, true, true);
  if (result) {
    // The transaction is valid so far so we can actually start waiting.
    // (Otherwise the transaction was not valid and the thread will have to
    // retry it).

    // Put ourselves to sleep.  We retain locks on all the TVars involved
    // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
    // in the TSO, (c) TREC_WAITING in the Trec.
    build_watch_queue_entries_for_trec(cap, tso, trec);
    park_tso(tso);
    trec -> state = TREC_WAITING;

    // We haven't released ownership of the transaction yet.  The TSO
    // has been put on the wait queue for the TVars it is waiting for,
    // but we haven't yet tidied up the TSO's stack and made it safe
    // to wake up the TSO.  Therefore, we must wait until the TSO is
    // safe to wake up before we release ownership - when all is well,
    // the runtime will call stmWaitUnlock() below, with the same
    // TRec.

  } else {
    unlock_stm(trec);
    free_stg_trec_header(cap, trec);
  }

  TRACE("%p : stmWait(%p)=%d", trec, tso, result);
  return result;
}

分支跳转函数之catchRetry#

stg_catchRetryzh (P_ first_code, /* :: STM a */
                  P_ alt_code    /* :: STM a */)
{
    W_ new_trec;

    // stmStartTransaction may allocate
    MAYBE_GC_PP (stg_catchRetryzh, first_code, alt_code);

    STK_CHK_GEN();

    /* Start a nested transaction within which to run the first code */
    ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr",
                                                 StgTSO_trec(CurrentTSO) "ptr");
    StgTSO_trec(CurrentTSO) = new_trec;

    // push the CATCH_RETRY stack frame, and apply first_code to realWorld#
    jump stg_ap_v_fast
        (CATCH_RETRY_FRAME_FIELDS(,, stg_catch_retry_frame_info, CCCS, 0,
                                  0, /* not running_alt_code */
                                  first_code,
                                  alt_code))
        (first_code);
}

INFO_TABLE_RET(stg_catch_retry_frame, CATCH_RETRY_FRAME,
               CATCH_RETRY_FRAME_FIELDS(W_,P_,
                                        info_ptr, p1, p2,
                                        running_alt_code,
                                        first_code,
                                        alt_code))
    return (P_ ret)
{
    unwind Sp = Sp + SIZEOF_StgCatchRetryFrame;
    W_ r;
    gcptr trec, outer, arg;

    trec = StgTSO_trec(CurrentTSO);
    outer  = StgTRecHeader_enclosing_trec(trec);
    (r) = ccall stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr");
    if (r != 0) {
        // Succeeded (either first branch or second branch)
        StgTSO_trec(CurrentTSO) = outer;
        return (ret);
    } else {
        // Did not commit: re-execute
        P_ new_trec;
        ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr",
                                                           outer "ptr");
        StgTSO_trec(CurrentTSO) = new_trec;
        if (running_alt_code != 0) {
            jump stg_ap_v_fast
                (CATCH_RETRY_FRAME_FIELDS(,,info_ptr, p1, p2,
                                          running_alt_code,
                                          first_code,
                                          alt_code))
                (alt_code);
        } else {
            jump stg_ap_v_fast
                (CATCH_RETRY_FRAME_FIELDS(,,info_ptr, p1, p2,
                                          running_alt_code,
                                          first_code,
                                          alt_code))
                (first_code);
        }
    }
}

由于时间关系,代码不作细一步讲述,后面将会补充进来。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容