基本概念
并行:多个任务同时执行,在同一时刻有多个任务在同时执行。
并发:多个任务分时交替执行,在同一时刻仅有1个任务在执行,但在宏观上看着像一起执行。一般指的是抗QPS的能力。
进程:系统资源分配的最小单元,有独立的内存空间,开销大
线程:CPU调度的最小单元,空间内存共享,需要依赖进程存活。并发编程时要注意线程安全的问题。每个线程大约占用30K左右开销
协程:不被操作系统控制,完全由用户程序控制。开销大约几K空间,协程间的切换只发生在用户态,不需要像线程切换那样再进入内核态;协程切换次数一般情况比线切换程少(这里怎么理解?),产生IO时让出CPU,asyncio.sleep(0)时会主动让出CPU,但如果一个协程是x+=1操作,会一直霸占着CPU。
性能对比:https://www.jianshu.com/p/6c63dafa70bf
- 同步阻塞:A调用B,等待B执行完任务后返回。
- 同步非阻塞:A调用B,B直接返回成功或者什么都不返回或者某个状态,然后B去执行任务。过一段时间后A再去调用B。一般情况下A还会去轮询B执行任务的结果。
- 异步阻塞:A调用B1,B2,B3...Bx,需要等待B1,B2,B3...Bx的所有返回结果。掉B1,B2,B3...Bx时不是串行的,是并发的。
- 异步非阻塞:A调用B,B直接返回成功或者什么都不返回或者某个状态,然后B再去执行任务,任务执行完后,B再回调A,或者把结果推送给某个队列让A去消费。
相关函数:
Thread.setDaemon() 设置为后台线程,默认为False,设置为True后,主线程退出,则整个程序退出
Thread.join() 阻塞线程,join后面的主线程语句,会等子线程执行完成后再执行。多个join()语句不分前后次序
继承方式:平时常用继承的方式写线程方法,控制起来更像面向对象编程
class MyThread(threading.Thread):
def __init__(self):
pass
def run(self):
pass
# Method representing the thread's activity
mythread = MyThread()
mythread.start()
锁机制
GIL锁简介:https://www.jianshu.com/p/633b7aacf722
GIL的简单结论:
对于IO密集型场景,更适合使用多线程。比如WEB,磁盘
对于CPU密集型场景,更适合使用多进程。比如模型的运算
threading模块下的锁们,multiprocessing模块下的锁类似
锁 | 简介 |
---|---|
非递归锁(互斥锁) Lock() | 多个线程访问共享变量时需要互斥锁;互斥锁的操作acquire()和release()需要成对出现;如果某个线程方法里acquire了多个锁时,要注意死锁的问题。 |
递归锁 RLock() | RLock内部维护了一个Lock和一个计数器counter,一个线程能够同时获取N次Lock,只有当一个线程所有acquire的锁都release,counter变为0后,其他线程才能抢到这把锁。写代码时,如果底层方法封装了lock.acquire()和release(),上次方法又想封装lock时,最好使用RLock() |
条件锁 Condition() | 使用条件锁更像使用协程,由程序控制自己什么时候释放锁 acquire():获取锁 release():释放锁 wait():线程进入blocking状态,直到收到notify通知或超时才继续运行 notify():通知其他await的线程,可以加参数n=1或者k,需要注意notify()和await()的次序,由程序决定 notifyAll():notify所有await的线程 |
事件 Event() | await():挂起当前线程,直到收到event为True时才继续执行当前线程。 set():设置event为True。 clear():设置event为False。 isSet():获取event状态 |
Timer() | 几秒钟后执行任务,multiprocessing下没看到Timer() |
信号量 Semaphore() | semaphore是个内部数据,它的内部有个计数器,表明当前共享资源最多有多少个线程可以同时使用。 |
有界信号量 BoundedSemaphore() | 和Semaphore相似,这个更严格一些,超过信号量限制时会return ValueError |
栅栏(障碍) Barrier() | threading.Barrier(3, action=xxx_action, timeout=None),等barrier.wait()的数量到达3后,优先执行xxx_action方法。 wait():方法表示想要通过栅栏,如果没跨过栅栏就阻塞,跨过栅栏后优先执行xxx_action方法 wait(N):N表示秒,如果到达N秒后,还未跨过栅栏,则引发BrokenBarrierError错误。 reset():重置栅栏 |
队列机制
队列 | 简介 |
---|---|
先进先出队列 | FIFO--queue.Queue(5) |
先进后出队列 | LIFO--queue.LifoQueue(5) |
优先级队列 | q = queue.PriorityQueue() q.put([1, 'aaa']) q.put([20, 'bbb'] 数值越小优先级越高,在q.get()是会被优先去出来,底层是通过heapq实现的 |
阻塞队列 | python的queue.Queue支持阻塞方式,也有get_nowait和put_nowait方法 |
延迟队列 | python本身不支持延迟队列,需要通过其他手段实现。 利用优先级队列实现 利用redis的zset结构实现 利用rabbitmq实现 如果只是为了定时执行任务,用Timer()多线程也可以 https://www.jianshu.com/p/a663e52e6488 |
进程间通信IPC(Inter-process communication)
IPC机制 | 简介 |
---|---|
队列 | 进程间通信都可能用到队列 |
共享内存 | multiprocessing模块里提供了共享Value,共享List等对象 |
管道 | 用于父进程和子进程间通信 |
信号 | 只用于进程间的通信,信号是个软中断,捕捉信号的过程:1、主程序控制流收到信号后中断,由用户态进入内核态。2、内核捕捉中断信号,进行中断处理。3、如果中断函数是用户自定义函数,则跳回用户态执行中断函数。4、中断函数处理完成后,再次进入内核态,准备恢复主程序流程。5、返回用户态从主控制流程中上次被中断的地方继续向下执行。 由于中断可以出发中断函数,中断函数中能能够在主程序blocking时处理大量的业务逻辑,因此可用做平滑重启和热加载。 |
信号量 | 和锁里的信号量类似,能够获取信号量时,就能操作资源。列在这为了和信号区分是两个概念 |
socket/zmq | zeromq用起来更像是一个封装好的socket接口 |
zookeeper | 分布式协调服务 |
池
线程池from concurrent.futures import ThreadPoolExecutor
进程池from concurrent.futures import ProcessPoolExecutor
注意和multiprocessing的Pool用法不一样
常用方法:
map:阻塞直到返回,result并不是你map_fun返回的结果,而是一个生成器,如果要从中遍历去结果。map能够保证线程任务的顺序性
submit:提交执行的函数到线/进程池中,submit函数立即返回,不阻塞
task.cancel():取消某个任务,该任务没有放入线程池中才能取消成功
task.done():判断任务是否已完成,没啥用。用as_completed()
task.result():获取task的结果,如果获取了结果,就会造成阻塞
as_completed():此方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,就能继续执行for循环后面的语句,然后继续阻塞住,循环到所有的任务结束。
for job in as_completed(all_jobs):
res = job.result()
asyncio(python3.6+)
async和await语法需要成对出现
在工程中注意一协全协
如果代码需要主动让出CPU,注意让出CPU的位置
TODO是否有必要实现协程池,为什么?
TODOjava中在并发编程里定义了许多概念
eg:
可见性,原子性,有序性