一、线程相关概念
1.1 原子操作
原子和原子操作
原子操作:不可分割的操作。该操作一定是在同一个CPU时间片
中完成,这样即使线程被切换,在多个线程也不会看到同一个快内存中有不完整的数据。
原子:不可分割的最小单元。计算机执行的最小单元是单条指令
。可以通过参考各种CPU的指令操作手册,使用汇编指令编写原子操作,但这种方式非常低效。
某些简单的表达式可以被当做现代编程语言的最小执行单元,但其编译之后得到的汇编指令,不止一条,所以并不能算真正意义上的原子。例如常见的加法操作:sum += i
,gcc编译出来的汇编形式如下:
...
movl 0xc(%ebp), %eax
addl $n, %eax
movl %eax, 0xc(%ebp)
...
而将这段代码放到多线程环境下,显然是不安全的。再看看下面的例子:
dispatch_group_t group = dispatch_group_create();
__block int i = 1;
for (int k = 0; k < 300; k++) {
dispatch_group_enter(group);
dispatch_async(dispatch_get_global_queue(0, 0), ^{
++i;
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(dispatch_get_global_queue(0, 0), ^{
i;
dispatch_group_leave(group);
});
}
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"----result=%d i=%d",self.pro1,i);
});
上述例子中,全局变量i
理论上最后得到1,而实际上会有以下结果:0, -1, 2, -2, 1。
为了避免错误,操作系统或编译器提供了原子化操作的内建函数或API。例如:将I++/I
替换为:OSAtomicIncrement32(&i) / OSAtomicDecrement32(&i)
,将得到预期结果。
atomic
在OC中,atomic用来修饰@property
属性,且默认为atomic,而atomic仅仅是对setter/getter
方法加锁,只保证了这两个方法的安全。实际上,线程安全不仅仅只关注这两个地方。譬如:对象在一个线程正在执行写操作,而在另一个线程对象被释放了,自然就不安全了。而且在oc中,我们也可以对成员变量直接赋值,同时操作一块内存,自然就不安全了。
1.2 可重入
函数被重入
一个程序被重入,表示这个函数没有执行完成,由于外部因素或内部调用,有一次进入函数执行。函数被重入分两种情况:
- 多个线程同时执行这个函数
- 函数自身(可能经过多层调用之后)调用自身
函数可重入
一个函数称为可重入
的,表明该函数被重入之后没有产生任何不良后果。可重入函数具备以下特点:
- 不使用任何局部/静态的非const变量
- 不使用任何局部/静态/全局的非const变量的指针
- 仅依赖调用方法提供的参数
- 不依赖任何单个资源提供的锁(互斥锁等)
不调用任何不可重入的函数
可重入是并发的强力保障一个可重入函数可以在多线程环境下放心使用。也就是说在处理多线程问题时,我们尽量将程序拆分成若干个可重入的函数,而把注意的焦点放在可重入函数之外的地方。
在函数式编程
范式中,由于整个系统不需要维护多余数据变量,而是状态流方式。所以可以认为全是由一些可重入的函数组成,所以函数式编程在高并发编程中有其先天优势。
1.3 乱序优化与内存栅栏
CPU有动态调度机制,在执行过程中可能因为执行效率交换指令的顺序。而一些看似独立的比爱你量实际上是相互影响,这种编译器优化会导致潜在的不确定结果。
面对这种情况我们一般采用内存屏障(memory barrier)
。其作用相当于一个栅栏,迫使处理器来完成位于障碍前面的任何加载和存储操作之后,才允许它执行位于屏障之后的加载和内存操作。确保一个线程的内存操作总是按照预定的顺序完成。为了使用一个内存屏障,可以在代码中需要的地方调用 OSMemoryBarrier()
函数。
class A {
let lock = NSRecursiveLock()
var _a : A? = nil
var a : A? {
lock.lock()
if _a == nil {
let temp = A()
OSMemoryBarrier()
_a = temp
}
lock.unlock()
return _a
}
}
大部分锁类型都合并了内存屏障,来确保在进入临界区之前,它前面的加载和存储指令都已经完成
1.4 寄存器优化与volatile变量
在某些情况下编译器会把某些变量加载进入寄存器,而如果这些变量对多个线程可见,那么这种优化可能会阻止其他线程发现变量的任何变化,从而带来线程同步问题。
在变量之前加上关键字volatile可以强制编译器每次使用变量的时候都从内存里面加载。如果一个变量的值随时可能给编译器无法检测的外部源更改,那么你可以把该变量声明为volatile变量。在许多原子性操作API中,大量使用了volatile 标识符修饰。譬如 在系统库中,所有原子性变量都使用了
<libkern/OSAtomic.h>
int32_t OSAtomicIncrement32( volatile int32_t *__theValue )
二、锁分类
线程同步的主要方式:线程锁
。线程同步最常用的方法是使用锁(Lock)
。锁是一种非强制机制,每一个线程访问数据或资源之前,首先试图获取(Acquireuytreewq)锁,并在访问结束之后释放(release)。在锁已经被占用时获取锁,线程会等待,直到该锁被释放。
2.1 互斥锁:切换耗性能
基本概念
互斥锁是在很多平台上都比较常用的一种锁。它属于sleep-waiting
类型的锁。即当锁处于占用状态时,其他线程会挂起,当锁被释放时,所有等待的线程都将被唤醒,再次对锁进行竞争。在挂起与释放过程中,涉及用户态与内核态之间的context切换,而这种切换是比较消耗性能的
。
pthread_mutex
pthread_mutex 是pthread中的互斥锁,具有跨平台性质。pthread是POSIX线程(POSIX threads)的简称,是线程的POSIX标准(可移植操作系统接口 Portable Operation System Interface)。POSIX是unix的api设计标准,兼容各大主流平台。所以pthread_mutex是比较低层的,可以跨平台的互斥锁实现。
初始化方法:
int pthread_mutex_init(pthread_mutex_t * __restrict, const pthread_mutexattr_t * __restrict);
pthread_mutex_t * __restrict
代表互斥锁的类型,有以下四种:
-
PTHREAD_MUTEX_NORMAL 缺省类型
,也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后先进先出原则获得锁。 -
PTHREAD_MUTEX_ERRORCHECK 检错锁
,如果同一个线程请求同一个锁,则返回 EDEADLK,否则与普通锁类型动作相同。这样就保证当不允许多次加锁时不会出现嵌套情况下的死锁。 -
PTHREAD_MUTEX_RECURSIVE 递归锁
,允许同一个线程对同一个锁成功获得多次,并通过多次 unlock 解锁。线程首次成功获取互斥锁时,锁定计数会设置为 1。线程每重新锁定该互斥锁一次,锁定计数就增加 1。线程每解除锁定该互斥锁一次,锁定计数就减小 1。 锁定计数达到 0 时,该互斥锁即可供其他线程获取。如果某个线程尝试解除锁定的互斥锁不是由该线程锁定或者未锁定,则将返回错误。 -
PTHREAD_MUTEX_DEFAULT 适应锁
,动作最简单的锁类型,仅等待解锁后重新竞争,没有等待队列。
pthread_mutex_t mutex;
void MyInitFunction()
{
pthread_mutex_init(&mutex, NULL);
}
void MyLockingFunction()
{
pthread_mutex_lock(&mutex);
// Do work.
pthread_mutex_unlock(&mutex);
}
//释放锁
pthread_mutex_destroy(&mutex);
pthread_mutex还有一种简便的调用方式,使用的是全局唯一互斥锁
。实验表明,该锁是所有属性都是默认的,进程内可见,类型是普通锁
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&mutex);
block();
pthread_mutex_unlock(&mutex);
同时它还提供了一种非阻塞版本pthread_mutex_trylock
。若尝试获取锁时发现互斥锁已经被锁定,或者超出了递归锁定的最大次数,则立即返回,不会挂起。只有在锁未被占用时才能成功加锁。
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int res = pthread_mutex_trylock(&mutex);
if(res == 0){
block();
pthread_mutex_unlock(&mutex);
}else if(res == EBUSY){
printf("由于 mutex 所指向的互斥锁已锁定,因此无法获取该互斥锁。");
}else if (res == EAGAIN){
printf("由于已超出了 mutex 的递归锁定最大次数,因此无法获取该互斥锁。");
}
NSLock、NSRecursiveLock
官方文档:
Warning
The NSLock class uses POSIX threads to implement its locking behavior. When sending an unlock message to an NSLock object, you must be sure that message is sent from the same thread that sent the initial lock message. Unlocking a lock from a different thread can result in undefined behavior.
You should not use this class to implement a recursive lock. Calling the lock method twice on the same thread will lock up your thread permanently. Use the NSRecursiveLock class to implement recursive locks instead.
Unlocking a lock that is not locked is considered a programmer error and should be fixed in your code. The NSLock class reports such errors by printing an error message to the console when they occur.
-
实现是基于pthread
的。 -
谁持有谁释放
,试图释放由其他线程持有的锁是不合法的。 -
lock与unlock是一一对应的
,如果试图释放一个没有加锁的锁,会发生异常崩溃。而lock始终等不到对应的unlock会进入饥饿状态,让当前线程一直挂起。 - 如果用在需要递归嵌套加锁的场景时,需要使用其子类NSRecursiveLock。不是所有情况下都会引发递归调用,而NSLock在性能上要优于NSRecursiveLock。而当我们使用NSLock不小心造成死锁时,可以尝试将其替换为NSRecursiveLock。
NSLock
使用方式:
BOOL moreToDo = YES;
NSLock *theLock = [[NSLock alloc] init];
//...
while (moreToDo) {
/* Do another increment of calculation */
/* until there’s no more to do. */
if ([theLock tryLock]) {
/* Update display used by all threads. */
[theLock unlock];
}
}
NSRecursiveLock
使用方式:
NSRecursiveLock *theLock = [[NSRecursiveLock alloc] init];
void MyRecursiveFunction(int value)
{
[theLock lock];
if (value != 0)
{
--value;
MyRecursiveFunction(value);
}
[theLock unlock];
}
MyRecursiveFunction(5);
@synchronized
- (void)myMethod:(id)anObj
{
@synchronized(anObj)
{
// Everything between the braces is protected by the @synchronized directive.
}
}
anObj 是一个唯一标识符,如果在两个不同线程中执行上述方法,并为anObj参数传递了不同的对象,则每个线程都会获得一个锁继续处理而不会被另一个阻塞,但如果传递相同对象,则其中一个线程会被阻塞,直到第一个线程完成。
@synchronized块
会在受保护的代码中隐式添加一个异常处理程序
,如果抛出异常,将自动释放互斥量。这意味着为了使用该指令,还须在代码中启用OC异常处理
。
隐式添加的代码如下:
@try {
objc_sync_enter(obj);
// do work
} @finally {
objc_sync_exit(obj);
}
以上两个方法的声明:
/**
* Begin synchronizing on 'obj'.
* Allocates recursive pthread_mutex associated with 'obj' if needed.
* 为传入的对象分配了一个递归锁,递归锁在同一线程不会引发死锁
* @param obj The object to begin synchronizing on.
*
* @return OBJC_SYNC_SUCCESS once lock is acquired.
*/
OBJC_EXPORT int
objc_sync_enter(id _Nonnull obj)
OBJC_AVAILABLE(10.3, 2.0, 9.0, 1.0, 2.0);
/**
* End synchronizing on 'obj'.
*
* @param obj The object to end synchronizing on.
*
* @return OBJC_SYNC_SUCCESS or OBJC_SYNC_NOT_OWNING_THREAD_ERROR
*/
OBJC_EXPORT int
objc_sync_exit(id _Nonnull obj)
OBJC_AVAILABLE(10.3, 2.0, 9.0, 1.0, 2.0);
enum {
OBJC_SYNC_SUCCESS = 0,
OBJC_SYNC_NOT_OWNING_THREAD_ERROR = -1
};
两个方法源码如下:
int objc_sync_enter(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
// 可以看做是链表中的一个节点 关联了object与锁
SyncData* data = id2data(obj, ACQUIRE);
assert(data);
data->mutex.lock();
} else {
// @synchronized(nil) does nothing
if (DebugNilSync) {
_objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
}
objc_sync_nil();
}
return result;
}
// End synchronizing on 'obj'.
// Returns OBJC_SYNC_SUCCESS or OBJC_SYNC_NOT_OWNING_THREAD_ERROR
int objc_sync_exit(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, RELEASE);
if (!data) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
} else {
bool okay = data->mutex.tryUnlock();
if (!okay) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
}
}
} else {
// @synchronized(nil) does nothing
}
return result;
}
其中的数据结构以及宏定义说明:
//链表中的一个节点,关联object与lock,并且有一个nextdata指向下一个节点
typedef struct SyncData {
id object;
recursive_mutex_t mutex;
struct SyncData* nextData;
int threadCount; //此时使用这个锁的线程数量,因为 SyncData 结构体会被缓存,如果threadCount==0 说明这个SyncData实例可以被复用了
} SyncData;
typedef struct SyncList {
SyncData *data;
spinlock_t lock;
} SyncList;
// Use multiple parallel lists to decrease contention among unrelated objects.
#define COUNT 16
#define HASH(obj) ((((uintptr_t)(obj)) >> 5) & (COUNT - 1)) //哈希算法将对象所在的内存地址转化为无符号整型并右移五位,再跟 0xF 做按位与运算,这样结果不会超出数组大小。
#define LOCK_FOR_OBJ(obj) sDataLists[HASH(obj)].lock
#define LIST_FOR_OBJ(obj) sDataLists[HASH(obj)].data
static SyncList sDataLists[COUNT]; //声明一个SyncList 结构体数组大小为16
objc_sync_enter
里没有持有传入的对象,假如对象在 synchronized block
中被设成 nil时 其他线程使用这个对象会一直阻塞吗?
NSNumber *number = @(1);
NSNumber *thisPtrWillGoToNil = number;
@synchronized (thisPtrWillGoToNil) {
thisPtrWillGoToNil = nil;
}
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^ {
NSCAssert(![NSThread isMainThread], @"Must be run on background thread");
@synchronized (number) {
NSLog(@"This line does indeed get printed to stdout");
}
});
这行代码还是会打印。OC处理了这种情形,可能是编译器做了如下处理:
NSString *test = @"test";
id synchronizeTarget = (id)test;
@try {
objc_sync_enter(synchronizeTarget);
test = nil; //空操作
} @finally {
objc_sync_exit(synchronizeTarget);
}
2.2 自旋锁:空等耗CPU
自旋锁 与互斥锁有点类似,只是自旋锁被某线程占用时,其他线程不会进入睡眠(挂起)状态,而是一直运行(自旋/空转)直到锁被释放。由于不涉及用户态与内核态之间的切换,它的效率远远高于互斥锁。
虽然它的效率比互斥锁高,但是它也有些不足之处:
- 自旋锁一直占用CPU,会降低CPU效率。在高并发执行的时候,或代码片段比较耗时,容易引发CPU占用率暴涨的风险
- 使用自旋锁可能造成死锁,如递归调用时可能会造成死锁
- 自旋锁可能引起优先级反转的问题。如果一个低优先级的线程获得锁并访问共享资源,这时一个高优先级的线程也尝试获得这个锁,自旋锁会处于忙等状态从而占用大量 CPU。此时低优先级线程无法与高优先级线程争夺 CPU 时间,从而导致任务迟迟完不成、无法释放 lock。
在iOS10中建议替换OSSPinLock
为os_unfair_lock
。
// iOS 10以后使用
os_unfair_lock_t unfairLock = &(OS_UNFAIR_LOCK_INIT);
NSLog(@"线程1 准备上锁");
os_unfair_lock_lock(unfairLock);
sleep(4);
NSLog(@"线程1");
os_unfair_lock_unlock(unfairLock);
NSLog(@"线程1 解锁成功");
NSLog(@"---------------------------------------");
解决优先级反转有两种方法:调整优先级
-
优先级天花板
是当任务申请锁时,把该任务优先级提升到可访问这个资源的所有任务中的最高优先级。 -
优先级继承
是当任务A申请共享资源S时,如果S正在被任务C使用,通过比较任务C与自身的优先级,如发现任务C优先级小于自身优先级,则将任务C的优先级提升到自身优先级。C释放资源后,在恢复C的优先级。
2.3 信号量
dispatch_semaphore是GCD用于控制多线程并发的信号量,通过wait/signal的信号事件控制并发执行的最大线程数,信号量不支持递归。
当信号量为0时,dispatch_wait 会阻塞线程,可以利用这点特性实现控制代码块最大并发数,或将异步线程转为同步。
dispatch_semaphore_signal
源码:
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
//对信号量执行+1操作
long value = os_atomic_inc2o(dsema, dsema_value, release);
// 如果值大于0 直接返回
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
DISPATCH_NOINLINE
long
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
_dispatch_sema4_signal(&dsema->dsema_sema, 1);
return 1;
}
dispatch_semaphore_wait
源码:
DISPATCH_NOINLINE
static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
// 信号量-1
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
// 如果值大于等于0 直接返回
if (likely(value >= 0)) {
return 0;
}
// 否则开始阻塞当前线程
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
2.4 条件锁
NSCondition
- lock/unlock:加锁/解锁;
- wait:在锁中间等待;
- signal:唤醒一个等待的线程,如果有多个,只能唤醒第一个;
- broadcast:唤醒所有等待的线程;
NSCondition *cLock = [NSCondition new];
//线程1
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[cLock lock];
NSLog(@"线程1加锁成功");
[cLock wait];
NSLog(@"线程1");
[cLock unlock];
});
//线程2
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[cLock lock];
NSLog(@"线程2加锁成功");
[cLock wait];
NSLog(@"线程2");
[cLock unlock];
});
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
sleep(2);
NSLog(@"唤醒一个等待的线程");
[cLock signal];
//[cLock broadcast] 唤醒所有等待的线程
});
输出
线程1加锁成功
线程2加锁成功
唤醒一个等待的线程
线程1
NSConditionLock
条件锁,可以用于实现任务间的依赖
- initWithCondition:设置condition初始值;
- tryLockWhenCondition:满足- condition值时上锁,并返回上锁成功与否;
- lockWhenCondition:满足- condition值时上锁;
- unlockWithCondition:解锁,并设置condition为参数值;
NSConditionLock *cLock = [[NSConditionLock alloc] initWithCondition:0];
//线程1
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
if([cLock tryLockWhenCondition:0]){
NSLog(@"线程1");
[cLock unlockWithCondition:1];
}else{
NSLog(@"失败");
}
});
//线程2
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[cLock lockWhenCondition:3];
NSLog(@"线程2");
[cLock unlockWithCondition:2];
});
//线程3
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[cLock lockWhenCondition:1];
NSLog(@"线程3");
[cLock unlockWithCondition:3];
});
输出:
线程1
线程3
线程2
2.5 读写锁
读写锁 从广义的逻辑上讲,也可以认为是一种共享版的互斥锁。如果对一个临界区大部分是读操作而只有少量的写操作,读写锁在一定程度上能够降低线程互斥产生的代价。
对于同一个锁,读写锁有两种获取锁的方式:共享(share)方式,独占(Exclusive)方式。写操作独占,读操作共享
读写锁状态 | 以共享方式获取(读操作) | 以独占方式获取(写操作) |
---|---|---|
自由 | 成功 | 成功 |
共享 | 成功 | 等待 |
独占 | 等待 | 等待 |
//读
pthread_rwlock_rdlock(&rwLock);
pthread_rwlock_unlock(&rwLock);
//写
pthread_rwlock_wrlock(&rwLock);
pthread_rwlock_unlock(&rwLock);