@[toc]
内存共享
-
通过Value,Array实现内存共享
- 返回一个从共享内存上创建的ctypes对象
- 从共享内存中申请并返回一个具有ctypes类型的数组对象
-
通过Manager实现内存共享
- Manager返回的管理器对象控制一个服务进程,且由该进程保存Python对象并允许其他进程通过代理操作对象
- 返回的管理器支持类型支持list、dict等
- 注意同步:可能需要加锁,尤其碰到+=更新时
from multiprocessing import Process
from multiprocessing import Manager, Lock
import time
import random
def register(d,name):
if name in d:
print('duplicated name found...register anyway...')
d[name]+=1
else:
d[name]=1
time.sleep(random.random())
def register_with_loc(d,name,lock):
with lock:
if name in d:
print('duplicated name found...register anyway...')
d[name]+=1
else:
d[name]=1
time.sleep(random.random())
if __name__=='__main__':
#Manager
names=['Amy','Lily','Dirk','Lily', 'Denial','Amy','Amy','Amy']
manager=Manager()
dic=manager.dict()
lock=Lock()
#manager.list()
students=[]
for i in range(len(names)):
#s=Process(target=register,args=(dic,names[i]))
s=Process(target=register_with_loc,args=(dic,names[i],lock))
students.append(s)
for s in students:
s.start()
for s in students:
s.join()
print('all processes ended...')
for k,v in dic.items():
print("{}\t{}".format(k,v))
可以共享字典
duplicated name found...register anyway...
duplicated name found...register anyway...
duplicated name found...register anyway...
duplicated name found...register anyway...
all processes ended...
Amy 4
Dirk 1
Denial 1
Lily 2
- 进程池
- 进程开启过多导致效率下降(同步、切换成本)
- 应固定工作进程的数目
- 由这些进程执行所有任务,而非开启更多的进程
- 与CPU的核数相关
创建进程池
-
Pooll([numprocess [,initializer [, initargs]]])
- numprocess:要创建的进程数,默认使用os.cpu_count()的值
- initializer:每个工作进程启动时要执行的可调用对象,默认为None
- initargs:initializer的参数
-
p.apply()
- 同步调用
- 只有一个进程执行(不并行)
- 但可以直接得到返回结果(阻塞至返回结果)
-
p.apply_async()
- 异步调用
- 并行执行,结果不一定马上返回 (AsyncResult)
- 可以有回调函数,进程池中任意任务完成后会立即通知主进程,主进程将调用另一个函数去处理该结果,该函数即回调函数,其参数为返回结果
p.close()
-
p.join()
- 等待所有工作进程退出,只能在close()或teminate()之后调用
补充
回调函数的参数只有一个,即结果
回调函数是由主进程调用的
回调函数应该迅速结束
回调的顺序跟子进程启动的顺序无关p.map()
并行,主进程会等待所有子进程结束p.map_async()
from multiprocessing import Process
from multiprocessing import Pool
from multiprocessing import current_process
import matplotlib.pyplot as plt
import os
import time
global_result=[]
def fib(max):
n,a,b=0,0,1
while n<max:
a,b=b,a+b
n+=1
return b
def job(n):
print('{} is working on {}...'.format(os.getpid(),n))
time.sleep(2)
return fib(n)
def add_result(res):#callback func
global global_result
print("called by {}, result is {}".format(current_process().pid,res))
#也可以返回进程标识信息,用以识别结果(比如进程的参数)
#可以用字典存储
global_result.append(res)
def add_result_map(res):
global global_result
print("called by {}, result is {}".format(current_process().pid,res))
for r in res:
global_result.append(r)
if __name__=='__main__':
p=Pool()#cpu determines
ms=range(1,20)
results=[]
#同步调用
#创建多个进程,但是只有一个执行,需要等到执行结束后再可以返结果
#并不并行
for m in ms:
print('{} will be applied in main'.format(m))
res=p.apply(job,args=(m,))#会等待执行结束后再执行下一个
print(res)
print('{} is applied in main'.format(m))
results.append(res)
p.close()#!!!
print(results)
plt.figure()
plt.plot(ms,results)
plt.show()
plt.close()
#异步调用
#可以并行
'''for m in ms:
res=p.apply_async(job,args=(m,))#注意这里res只是一个引用
results.append(res)
#如果马上打印,可能并没有结果
print(res)
p.close()
p.join()
results2=[]
for res in results:
results2.append(res.get())
print(results2)
plt.figure()
plt.plot(ms,results2)
plt.show()
plt.close()'''
#callback
'''for m in ms:
p.apply_async(job,args=(m,),callback=add_result)
#callback函数只有一个参数
#callback函数是由主进程执行的
p.close()
p.join()
plt.figure()
plt.plot(ms,sorted(global_result))#顺序可能是乱的,这里可以排序解决,但其他问题不一定
plt.show()
plt.close()'''
#使用map
'''results3=p.map(job,ms)
print(type(results3))#list
p.close()
plt.figure()
plt.plot(ms,results3)
plt.show()
plt.close()'''
#使用map_async
'''p.map_async(job,ms,callback=add_result_map)
p.close()
p.join()
plt.figure()
print(len(ms))
print(len(global_result))
plt.plot(ms,global_result)
plt.show()
plt.close()'''
同步调用时,res=p.apply(job,args=(m,))
会等待执行结束后再执行下一个,所以会消耗很多时间,运行时间长
1 will be applied in main
9028 is working on 1...
1
1 is applied in main
2 will be applied in main
3396 is working on 2...
2
2 is applied in main
3 will be applied in main
17520 is working on 3...
3
3 is applied in main
4 will be applied in main
12984 is working on 4...
5
4 is applied in main
5 will be applied in main
10720 is working on 5...
8
5 is applied in main
6 will be applied in main
18792 is working on 6...
13
6 is applied in main
7 will be applied in main
14768 is working on 7...
21
7 is applied in main
8 will be applied in main
19524 is working on 8...
34
8 is applied in main
9 will be applied in main
9028 is working on 9...
55
9 is applied in main
10 will be applied in main
3396 is working on 10...
89
10 is applied in main
11 will be applied in main
17520 is working on 11...
144
11 is applied in main
12 will be applied in main
12984 is working on 12...
233
12 is applied in main
13 will be applied in main
10720 is working on 13...
377
13 is applied in main
14 will be applied in main
18792 is working on 14...
610
14 is applied in main
15 will be applied in main
14768 is working on 15...
987
15 is applied in main
16 will be applied in main
19524 is working on 16...
1597
16 is applied in main
17 will be applied in main
9028 is working on 17...
2584
17 is applied in main
18 will be applied in main
3396 is working on 18...
4181
18 is applied in main
19 will be applied in main
17520 is working on 19...
6765
19 is applied in main
[1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765]
异步调用时,中间打印不出结果,只有在全部执行结束之后才会出结果,但是运行速度较快。
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C16A0>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1780>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1828>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C18D0>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1978>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1A20>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1AC8>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1B70>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1C18>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1CC0>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1D68>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1E10>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1EB8>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1F60>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1FD0>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C80F0>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C8198>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C8240>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C82E8>
21056 is working on 1...
16880 is working on 2...
13832 is working on 3...
17564 is working on 4...
21056 is working on 5...
16940 is working on 6...
12728 is working on 7...
6592 is working on 8...
18296 is working on 9...
16880 is working on 10...
13832 is working on 11...
17564 is working on 12...
21056 is working on 13...
16940 is working on 14...
12728 is working on 15...
6592 is working on 16...
18296 is working on 17...
16880 is working on 18...
13832 is working on 19...
[1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765]
回调函数的结果
4600 is working on 1...
4584 is working on 2...
2892 is working on 3...
10088 is working on 4...
14404 is working on 5...
4600 is working on 6...
called by 14812, result is 116212 is working on 7...
4608 is working on 8...
called by 14812, result is 2
4584 is working on 9...
2632 is working on 10...
called by 14812, result is 3
2892 is working on 11...
called by 14812, result is 5
10088 is working on 12...
called by 14812, result is 8
14404 is working on 13...
called by 14812, result is 13
4600 is working on 14...
called by 14812, result is 21
16212 is working on 15...
called by 14812, result is 34
4608 is working on 16...
called by 14812, result is 55
4584 is working on 17...
called by 14812, result is 89
2632 is working on 18...
called by 14812, result is 144
2892 is working on 19...
called by 14812, result is 233
called by 14812, result is 377
called by 14812, result is 610
called by 14812, result is 987
called by 14812, result is 1597
called by 14812, result is 2584
called by 14812, result is 4181
called by 14812, result is 6765
ProcessPoolExecutor
- 对multiprocessing进一步抽象
- 提供更简单、统一的接口
- submit(fn, *args, **kwargs)
- returns a Future object representing the execution of the callable
- 补充:马上调用Future的result()会阻塞
- map(func, *iterables, timeout=None)
• func is executed asynchronously, i.e., several calls to func may be made concurrently and returns an iterator of results
import concurrent.futures
from multiprocessing import current_process
import math
PRIMES = [
1112272535095293,
1112582705942171,
1112272535095291,
1115280095190773,
1115797848077099,
11099726899285419]
def is_prime(n):
print(f"{current_process().pid}")
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main_submit():
results=[]
with concurrent.futures.ProcessPoolExecutor() as executor:
for number in PRIMES:
n_future=executor.submit(is_prime,number)
#print(n_future.result())#block
results.append(n_future)
#这里要注意,如果马上在主进程里获取结果,即n_future.result(),即主进程会阻塞,无法并行
#因此建议先将n_future搁进list,等启动所有进程后再获取结果。
for number, res in zip(PRIMES,results):
print("%d is prime: %s" % (number,res.result()))
def main_map():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime_or_not in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime_or_not))
if __name__ == '__main__':
main_submit()
#main_map()
8004
8004
1112272535095293 is prime: False
4332
18212
18212
18212
1112582705942171 is prime: True
1112272535095291 is prime: True
1115280095190773 is prime: False
1115797848077099 is prime: False
11099726899285419 is prime: False
多进程
- 分布式多进程
- 多机环境
- 跨设备数据交换
- master-worker模型
- 通过manager暴露Queue
- GIL(Global Interpreter Lock)
- GIL非Python特性,而是实现Python解释器(Cpython)时引入的概念
- GIL本质上是互斥锁,控制同一时间共享数据只能被一个任务修改,以保证数据安全
- GIL在解释器级保护共享数据,在用户编程层面保护数据则需要自行加锁处理
- Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势
- 可能需要先获取GIL
多线程编程
多进程Process | 多线程thread |
---|---|
可以利用多核 | 无法利用多核 |
开销大 | 开销小 |
计算密集型 | IO密集型 |
金融分析 | socket,爬虫,web |
threating
模块multiprocessing
模块和threating
模块在使用层面十分类似threading.currentThread()
:返回当前的线程实例threading.enumerate()
:返回所有正在运行线程的listthreading.activeCount()
:返回正在运行的线程数量,与len(threading.enumerate())
结果相同-
创建多线程
- 通过指定target参数
- 通过继承Thread类
- 设置守护线程
- setDaemon(True)
- 应在start()之前
from threading import Thread,currentThread
import time
def task(name):
time.sleep(2)
print('%s print name: %s' %(currentThread().name,name))
class Task(Thread):
def __init__(self,name):
super().__init__()
self._name=name
def run(self):
time.sleep(2)
print('%s print name: %s' % (currentThread().name,self._name))
if __name__ == '__main__':
n=100
var='test'
t=Thread(target=task,args=('thread_task_func',))
t.start()
t.join()
t=Task('thread_task_class')
t.start()
t.join()
print('main')
Thread-1 print name: thread_task_func
thread_task_class print name: thread_task_class
main
- 线程同步
- 锁(threading.Lock,threading.RLock,可重入锁)
- 一旦线程获得重入锁,再次获取时将不阻塞
- 线程必须在每次获取后释放一次
- 区别:递归调用
- 信号量 threading.Semaphore
- 事件 threading.Event
- 条件 threading.Condition
- 定时器 threading.Timer
- Barrier
- 锁(threading.Lock,threading.RLock,可重入锁)
- 线程局部变量
- 队列
- queue.Queue
- queue.LifoQueue
- queue.PriorityQueue
- 线程池 ThreadPoolExecutor
线程和进程的比较
from multiprocessing import Process
from threading import Thread
import os,time,random
def dense_cal():
res=0
for i in range(100000000):
res*=i
def dense_io():
time.sleep(2)#simulate the io delay
def diff_pt(P=True,pn=4,target=dense_cal):
tlist=[]
start=time.time()
for i in range(pn):
if P:
p=Process(target=target)
else:
p=Thread(target=target)
tlist.append(p)
p.start()
for p in tlist:
p.join()
stop=time.time()
if P:
name='multi-process'
else:
name='multi-thread'
print('%s run time is %s' %(name,stop-start))
if __name__=='__main__':
diff_pt(P=True)
diff_pt(P=False)
diff_pt(P=True,pn=100,target=dense_io)
diff_pt(P=False,pn=100,target=dense_io)
multi-process run time is 28.328214168548584
multi-thread run time is 64.33887600898743
multi-process run time is 13.335324048995972
multi-thread run time is 5.584061861038208
信号 signal
- 信号操作系统中进程间通讯的一种有限制的方式
- 一种异步的通知机制,提醒进程一个事件已经发生
- 当信号发送至进程时,操作系统将中断其执行
• 任何非原子操作都将被中断
• 如果进程定义了该信号的处理函数,将执行该函数,否
则执行默认处理函数
• signal.signal(signal.SIGTSTP, handler) - 可用于进程的中止
- Python信号处理只在主线程中执行
• 即使信号在另一个线程中接收
• 信号不能被用作线程间通信的手段
• 只有主线程才被允许设置新的信号处理程序