Python 中多进程是由 multiprocessing
模块提供的
import time
from multiprocessing import Process
def test(i):
print("子进程开始")
print(i)
time.sleep(2)
print("子进程结束")
if __name__ == "__main__":
p_ls = []
for i in range(3):
p = Process(target=test, args=(i, ))
p.start()
p_ls.append(p)
for i in range(3):
p_ls[i].join(()
还有一种方法,使用类:
from multiprocessing import Process
class MyProcess(Process):
def __init__(self):
super().__init__()
def run():
pass
p = MyProcess()
p.start()
p.join()
终止一个进程:p.terminate()
判断一个进程是否存活: p.is_alive
查看一个进程的 id:p.pid
查看进程对象名称:p.name
守护进程会随着主进程的代码执行完毕而结束
在 p.start()
之前设置 p.daemon = True
加锁。涉及到数据改动的时候,目的是保证数据安全。
from multiprocessing import Lock
lock = Lock()
lock.acquire()
lock.release()
进程同步控制 ---- 锁,信号量,事件
- 信号量 相当于多个锁,可以规定获取锁个数的大小。其内部相当于加了一个计数器,acquire() 的时候计数器 -1,release() +1
from multiprocessing import Process, Semaphore
def test(i, sem):
sem.acquire()
pass
sem.release()
if __name__ =="__main__":
sem = Semaphore(4)
for i in range(4):
p = Process(target=test, args=(i, sem,))
p.start()
- 事件
一个信号可以使所有的进程都进入阻塞状态,也可以解除
创建之初默认是阻塞的
from multiprocessing import Event
e = Event()
e.is_set() --> False
e.set() --> 将事件的状态设置为 True
e.wait() --> 依据 is_set 的值来决定是否阻塞,相当于监视 is_set 的状态,然后根据其状态来做某些事情
e.clear() --> 将事件的状态设置为 False 阻塞
例如一个简单的红绿灯事件
import time
import random
from multiprocessing import Process, Event
def car(e, i):
if not e.is_set():
print('car %s 在等待'%(i) )
e.wait() --> 阻塞,直到得到事件状态变为 True
print('car %s 通过'%i)
def light(e):
while True:
if e.is_set(): --> 开始时是 False,所以绿灯亮
e.clear()
print("红灯亮了")
else:
e.set() --> 设置为 True,红灯亮
print("绿灯亮了")
time.sleep(2)
if __name__ == "__main__":
e = Event()
traffic = Process(target=light, args=(e, ))
traffic.start()
for i in range(10):
c = Process(target=cars, args=(e, i))
c.start()
time.sleep(random.randint())
进程间通信 IPC ----- 队列和管道
- 队列
# 先进先出
from multiprocessing import Queue
q = Queue(5)
q.put(1)
q.get()
q.full() -->> 队列是否满了
q.empty() -->> 判断队列是否空
q.get_nowait() ->> 如果队列空了,那么它会报错
消费者与生产者
import time
import random
from multiprocessing import Process, Queue
def producer(q):
for i in range(10):
time.sleep(random.randint(1, 2))
food = "产品%s"%i
print('生产者生产了一个东西')
q.put(food)
def consumer(q):
while True:
food = q.get()
if food is None:
break
print(food)
time.sleep(random.random())
if __name__ == "__main__":
q = Queue()
p = Process(target=produce, args=(q,))
p.start()
c = Process(target=consume, args=(q, ))
c.start()
p.join()
q.put(None)
但是上面的代码有一个问题,就是 q.put(None) 的频繁,有几个消费者,那么就需要向队列中 put 几个值,这种情况可以使用 JoinableQueue
from multiprocessing import JoinableQueue
def producer(q):
xxxx
q.join() -->> 阻塞,直到队列中的所有数据全被处理完毕
def consumer(q):
xxxx
q.task_done()
if __name__ == "__main__":
q = JoinableQueue()
c.daemon = True
- 管道
from multiprocessing import Pipe
conn1, conn2 = Pipe()
conn1.send('12345')
conn2.recv()
进程池
主要是为了效率问题,开启单个进程会耗费很多资源。进程池就是先创建好多个进程,然后从进程池中取到进程,提高操作系统调度进程的利用率。一般 CPU 个数 + 1 是最大开启进程的数量。
from multiprocessing import Pool
def func(n):
for i in range(10):
n += 1
print(n)
if __name__ == "__main__":
pool = Pool(5)
pool.map(func, <可迭代对象>range(100))
# map 是自带 close 和 join 的,最后一次返回结果
res_lst = []
# 还有一个 apply 方法
for i in range(20):
p.apply(func, args=(i, )) --> 同步提交的方式
res = p.apply_sync(func, args=(i, )) --> 异步提交方式
# res.get() --> 阻塞等待结果
# 如果需要不阻塞,那么可以这样做
res_lst.append(res)
# 最后可以得到结果
for res in res_lst:
print(res.get())
# 与 apply_async 配合使用
p.close() --> 结束进程池接受任务
p.join() --> 感知进程池中的任务结束
进程池实现 socket 服务端
import socket
from multiprocessing import Pool
def func(conn):
conn.send(b'hello')
msg = conn.recv(1024).decode('utf-8')
print(msg)
con.close()
if __name__ == "__main__":
p = Pool(5)
sk = socket.socket()
sk,bind(('127.0.0.1', 8000))
sk.listen()
while True:
conn, addr = sk.accept()
p.apply_async(func, args=(conn, ))
sk.close()
另外进程池还有回掉函数,意思就是在 任务执行完毕后执行指定的回调函数,任务返回的数据作为回调函数的参数。回调函数的参数来源只有进程返回的数据。回调函数是在主进程中执行的
def func1():
pass
return x
def func2(x):
pass
return
if __name__ == "__main__":
pool = Pool(5)
p.apply_async(func, args=(), callback=func2)
回调函数经常用于爬虫,因为网络延迟和下载很耗时。
例如
import requests
from multiprocessing import Pool
def get(url):
res = requests.get(url)
if res.status_code == 200:
return url, res.content
def call_back(args):
url, content = args
print(url, len(content))
if __name__ == "__main__":
url_lst = []
p = Pool(5)
for url in url_lst:
p.apply_async(get, args=(url,), callback=call_back)
p.close()
p.join()