参考的文章为: [笔记]python multiprocessing模块
进程是"懒惰"的,本身不是可执行单元,主要是线程的容器。要使进程中的代码能够运行,需要至少拥有一个能够在这个环境中运行代码的执行单元,也就是线程。线程是操作系统调度和分配处理器时间的基本单位,负责执行包括在进程地址空间中的代码。当一个进程被创建时,操作系统会自动为之创建一个进程,成为主线程。一个进程可以包含多个线程。主线程可以根据需求在动态创建其他子线程。操作系统为每一个线程保存单独的寄存器环境和单独的堆栈,但是他们共享进程的地址空间,对象句柄,代码,数据和其它资源。线程总是在某个进程的上下文中被创建。
常用的threading方法:
函数名 | 作用 |
---|---|
stack_size() | 查看当前线程栈的大小 |
stack_size(int) | 设置栈的大小 |
active_count() 或者activeCount() | 查看活动线程数量 |
current_thread() 或者currentThread() | 返回当前线程对象 |
enumerate() | 枚举所有线程 |
Thread | 线程类,用于创建和管理线程 |
Event | 事件类,用于线程同步 |
Condition | 条件类,用于线程同步 |
Lock,RLock | 锁类,用于线程同步 |
Semaphore | 信号量类,用于线程同步 |
Timer | 用于在制定时间后调用一个函数的情况 |
Thread类
Thread类用于创建和管理线程对象,支持使用两种方法创建线程:
- 直接使用Thread类来示例化一个线程对象
- 继承Thread类并在派生类中重写_ init _()和run()
创建了线程对象后,可以调用其start()方法来启动,该方法自动调用该类对象的run()方法,此时该线程出于alinve状态,直到线程的run()方法运行结束。
thead对象的函数
函数名 | 作用 |
---|---|
start() | 自动调用run()方法,启动线程,执行线程代码 |
run() | 线程代码,用于实现线程的功能与业务逻辑,可以在子类中重写该方法来自定义行为 |
_ init() _() | 构造函数 |
name | 用来读取或者设置线程的名字 |
ident | 线程标识,非0或者None(线程未被启动) |
is_aline(),isAlive() | 测试线程是否处于alive状态 |
daemon | 布尔值,表示线程是否为守护线程 |
join(tineout=None) | 等待线程结束或者超时结束 |
join([timeout])
阻塞当前线程,等待被调用线程结束或超时后在继续执行当前线程的后续代码。参数timeout用来制定最长等待时间,单位是秒。
from threading import Thread
import time
def func1(x,y):
for i in range(x,y):
print(i,end=' ')
print()
time.sleep(10)
t1 = Thread(target=func1,args=(15, 110))
t1.start()
t1.join(2)#删除该句则直接执行下一句,如果不删除会等待2秒,没有参数会等待10秒
t2 = Thread(target=func1, args=(5, 10))
t2.start()
isAlive()
用于测试线程是否处于运行状态,如果仍在运行则返回True,否则返回False。
from threading import Thread
import time
def func1():
time.sleep(10)
t1 = Thread(target=func1)
print('t1:', t1.is_alive())#程序没有运行,false
t1.start()
print('t1:', t1.is_alive())#程序还在运行,true
t1.join(5)#join因为超时而结束
print('t1:', t1.is_alive())#程序还在运行
t1.join()#等待程序结束
print('t1:', t1.is_alive())#程序运行已经结束
daemon属性
在程序运行中有一个主线程,如果在主线程中创建了子线程,当主线程结束时会根据子线程的daemon属性进行如下选择:
- 如果某个子线程的daemon属性为False主线程在结束时会检测该子线程是否结束,如果该子线程还在运行,主线程会等待该子线程结束后再退出。
- 如果某个子线程的daemon属性为True,主线程在结束时不会带这个子线程进行检查而是直接退出。
daemon属性默认为False,如果需要修改,必须在调用start()方法启动线程之前设置。```
from threading import Thread
import time
class myThread(Thread):
def __init__(self,num,threadname):
Thread.__init__(self,name=threadname)
self.num = num
def run(self):
time.sleep(self.num)
print(self.num)
t1 = myThread(1,'t1')
t2 = myThread(5,'t2')
t2.daemon = True#不会输出5,而会直接停止程序
print(t1.daemon)
print(t2.daemon)
t1.start()
t2.start()
线程同步
Lock和RLock对象
lock就是锁,锁定后面的代码,在多处加一个锁,按照顺序执行,一旦一个锁被锁定,在之后的程序中没有进行解锁,则在其他部分这个锁锁定的后续代码无法执行。
import threading
import time
import random
class mythread1(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global x #使用全局的变量
print("int 1 befor lock1")
lock1.acquire() #锁定lock1
print("int 1 befor lock2")
lock2.acquire() #锁定lock2
x += 3
print('before lock1',x)
sleepTime = random.randint(0, 3)
time.sleep(sleepTime)
print(x)
# lock2.release()
lock1.release() #lock1开锁,但是lock2没有开锁
class mythread2(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global x
print("in 2 befor lock1")
lock1.acquire()
print("in 2 befor lock2")
lock2.acquire()
x += 3
print('before lock2', x)
sleepTime = random.randint(0, 3)
time.sleep(sleepTime)
print(x)
lock2.release()
lock1.release()
lock1 = threading.Lock()
lock2 = threading.Lock()
t1 = mythread1()
t2 = mythread2()
x = 1
t1.start()
t2.start()
//结果:
# int 1 befor lock1
# int 1 befor lock2
# before lock1 4
# in 2 befor lock1
# 4
# in 2 befor lock2
通过观察结果可知,没有执行的语句是print('before lock2', x),因此程序是卡在了lock2.acquire(),因为之前的的mythread1中的lock2没有释放。而Lock与RLock的区别在于RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
Condition对象
该对象可以在某些事件触发后才处理数据或者执行特定的代码,可以用于不同线程之间的通信或者通知,以及更高级的同步。Condition对象除了具有acquire()和release()方法外,还有wait(),notify()/notify_all()等方法。
- acquire([timeout])/release(): 调用关联的锁的相应方法。
- wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
- notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
- notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
#coding=utf-8
__author__ = 'Bruce_Zhou'
import threading
import time
import datetime
num = 0
con = threading.Condition()
class Gov(threading.Thread):
def __init__(self):
super(Gov, self).__init__()
def run(self):
global num
con.acquire() #获取一个锁
while True:
print("开始拉升股市")
num += 1
print("拉升了" + str(num) + "个点")
time.sleep(2)
if num == 5:
print("暂时安全!")
con.notify() #通知其他线程
print('gov wait before')
con.wait() #让本线程进入等待状态
print('gov wait after')
con.release()# 释放锁
class Consumers(threading.Thread):
def __init__(self):
super(Consumers, self).__init__()
def run(self):
global num
con.acquire()
while True:
if num > 0:
print("开始打压股市")
num -= 1
print("打压了" + str(num) + "个点")
time.sleep(2)
if num == 0:
print("你妹的!天台在哪里!")
con.notify()
print('consumer wait before')
con.wait()
print('consumer wait after')
con.release()
if __name__ == '__main__':
p = Gov()
c = Consumers()
p.start()
c.start()
Event对象
Event对象是一种简单的线程通信技术,一个线程设置Event对象,另一个额线程等待Event对象。Event对象的set()方法可以设置Event对象内部的信号标志为真,clear()方卡可以清楚Event对象内部的信号标志,将其设置为假;isSet()方法用来判断其内部信号的状态;wait()方法在其内部信号状态为真时会立即执行并返回,若Event对象的内部信号标志为假,wait()方法就会一直等待至超时或者内部信号状态为真。
import threading
class mythread(threading.Thread):
def __init__(self, threadname):
threading.Thread.__init__(self, name=threadname)
def run(self):
global myevent
if myevent.isSet(): #获取内部状态
myevent.clear() #清除Event对象内部的信号符号
print(self.getName() +' before wait')
myevent.wait() #在其内部信号状态为真时立即执行并返回,若Event对象的内部标志为假,wait()方法就一直等待至超时或者内部信号状态为真
print(self.getName() + 'set')
else:
print(self.getName() + 'not set')
myevent.set() #设置Event对象内部的信号标志为真
print(self.getName() + ' after set')
myevent = threading.Event()
myevent.set()
for i in range(10):
t = mythread(str(i))
t.start()
## 输出结果:
# 0 before wait
# 1not set
# 1 after set
# 0set
# 2 before wait
# 3not set
# 4not set
# 4 after set
# 3 after set
# 2set
# 5 before wait
# 6not set
# 7not set
# 7 after set
# 6 after set
# 8 before wait
# 9not set
# 9 after set
# 5set
# 8set
# 0 before wait
# 1not set
# 1 after set
# 0set
# 2 before wait
# 3not set
# 4not set
# 4 after set
# 3 after set
# 2set
# 5 before wait
# 6not set
# 6 after set
# 7 before wait
# 5set
# 8not set
# 8 after set
# 7set
# 9 before wait
从程序中可以看出一次设置为False,一次设置为True。那次时设置为true那次设置为false并不一定,可能同时几个线程在设置为true时进入循环。将flag设置为true。因此在使用时要额外注意。
多进程编程
一个进程是一个执行中的文件使用资源的总和,包括虚拟地址空间,代码,数据,对象句柄,环境变量和执行单元等。一个应用程序同时打开并执行多次,就会创建多个进程。
python标准库multiprocessing用来实现进程的创建和管理以及进程间的同步与数据的交换。用法和threading类似,时支持并行处理的重要模块。标准库multiprocessing同时支持本地兵法与远程兵法,有效地避免全局解释锁(Global Interpreter Lock ,GIL),可以有效的利用CPU环境,尤其时和多核或者多CPU环境。
常用函数
函数名 | 作用 |
---|---|
Process(target,args) | 创建进程,target为调用的进程,args为该函数需要的参数 |
start() | 启动进程 |
join() | 等待进程运行结束 |
进程间数据交换
multiprocessing中有多种共享的数据类型Queue等,也可以通过共享内存实现变量传递。
Queue
import threading
import time
import queue
class Producer(threading.Thread):
def __init__(self, threadname):
threading.Thread.__init__(self,name=threadname)
def run(self):
global myqueue
myqueue.put(self.getName())
print(self.getName(), 'put ', self.getName(), 'to queue.')
class Consumer(threading.Thread):
def __init__(self, threadname):
threading.Thread.__init__(self, name=threadname)
def run(self):
global myqueue
print(self.getName(), 'get ', myqueue.get(), 'from queue')
myqueue = queue.Queue()
plist = []
clist = []
for i in range(10):
p = Producer('Producer' + str(i))
plist.append(p)
c = Consumer('Consumer' + str(i))
clist.append(c)
for p, c in zip(plist,clist):
p.start()
p.join()
c.start()
c.join()
共享内存
from multiprocessing import Process,Value,Array
def f(n,a):
n.value = 3.1415926
for i in range(len(a)):
a[i] = a[i] * a[i]
if __name__=='__main__':
num = Value('d',0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
Manager
Manger对象控制一个拥有list,dict,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value,Array,Namespace等对象的服务端进程,并且允许其他进程访问这些对象。
from multiprocessing import Process, Manager
import time
def f(d, l, t):
d['name'] = 'Dong Fuguo'
d['age'] = 38
d['sex'] = 'Male'
d['affiliation'] = 'SDIBT'
l.reverse()
t.value = 3
if __name__=='__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
t = manager.Value('i', 0)
p = Process(target=f, args=(d, l, t))
p.start()
# time.sleep(0.1)
p.join()
for item in d.items():
print(item)
print(l)
print(t.value)
线程同步技术
在需要协同工作完成大型任务时,多个进程间的同步非常重要。进程同步方法与线程同步类似。
使用Event对象进行进程同步
from multiprocessing import Process, Event
def f(e, i):
if e.is_set():
e.wait()
print('hello world', i)
e.clear()
else:
e.set()
if __name__ == '__main__':
e = Event()
for num in range(10):
Process(target=f, args=(e,num)).start()
#输出:
# hello world 1
# hello world 3
# hello world 5
# hello world 7
# hello world 9
使用Lock进行进程同步
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
#输出
# hello world 1
# hello world 2
# hello world 3
# hello world 4
# hello world 5
# hello world 6
# hello world 7
# hello world 8
# hello world 9