学习Python也有很长一段时间了,进程和线程这块一直没作记录。直到最近在写一个爬虫,如果不搞多进程并发执行,爬虫时间令人发指。
说到进程,必然少不了线程。系统可以有多个进程,其中一个进程中可以有多个线程。
每个进程都有一个独立的GIL(全局解释器锁),多进程可以有效的利用多核CPU,而多线程只能占用CPU的一个核,因为线程执行时,必须先获取GIL才能执行,等到释放GIL,其他线程才有执行的机会,即一个进程中,不可能存在多个线程同时执行,现在系统均是多核CPU,所以都不推荐使用多线程。
先说说进程的事。Python提供了进程包multiprocessing
。
1. 进程,创建进程,定义一个target即可,如下:
from multiprocessing import Process
# 定义任务
def p_task(a):
print('process args is %s' % a)
# 定义一个进程,执行一个任务
p = Process(target=p_task, args=(1,))
# 启动进程
p.start()
# 等待进程执行完毕
p.join()
print('over...')
输出结果:
process args is 1
over...
2. 进程池
如果需要创建多个进程并复用,一般采取进程池的方式,因为创建进程和销毁进程的开销很大。
创建进程池,池的大小一般等于系统CPU核数。cpu核数可以通过multiprocessing.cpu_count()
查看。
Python提供了两种创建方式:Pool()
和ProcessPoolExecutor()
两种执行效率如何呢?直接给结果:Pool的效率高于ProcessPoolExecutor,测试代码很容易些,这里就不贴多余的代码了。
第一种:Pool(),进程池有两个方法:
apply(self, func, args=(), kwds={})
:同步执行func,源码如下:
def apply(self, func, args=(), kwds={}):
'''
Equivalent of `func(*args, **kwds)`.
'''
assert self._state == RUN
return self.apply_async(func, args, kwds).get()
可以看出,还是调用的异步执行方法 self.apply_async(func, args, kwds),只不过apply_async后又调用了get()方法等待结果返回。
apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None)
:异步执行func
所以异步执行任务需要返回结果时,注意get()方法的使用,避免把异步变成同步。看示例:
import time
from multiprocessing import Pool
def task(args):
time.sleep(2)
return args + 1
pool = Pool(3)
l = []
start = time.time()
for i in range(5):
# 循环里面千万别调用r.get()方法,否则会等待结果返回,异步变同步
r = pool.apply_async(task, args=(i,))
l.append(r)
# close方法表示不再接收新任务,之前的任务依旧执行
pool.close()
pool.join()
# pool.terminate()会立即终止进程池,包括正在执行的子进程
print('over... ', time.time() - start)
第二种:ProcessPoolExecutor(),说说它的map方法和submit方法。
map
方法:按任务顺序返回进程return的值
submit
方法:无序执行,返回future对象,但返回的future对象是有序的,等进程执行结束,可以通过future.result()获取进程return的值
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from concurrent.futures import ProcessPoolExecutor
def task(i):
time.sleep(0.5)
return i
if __name__ == '__main__':
count = 100
list = [i for i in range(count)]
start = time.time()
with ProcessPoolExecutor(max_workers=8) as pool:
# 有序输出,直接返回结果
print([data for data in pool.map(task, list)])
print('map time: ', time.time() - start)
start = time.time()
future_list = []
with ProcessPoolExecutor(max_workers=8) as pool:
# 返回future对象
future = pool.submit(task, list)
future_list.append(future)
print([future.result() for future in future_list][0])
print('submit time: ', time.time() - start)
输出结果:
map time: 6.669000148773193
submit time: 0.623999834060669
很显然,有序输出效率很低。一般情况下,推荐用submit方法。
3. 进程之间通信
进程都拥有自己独立的数据,它们之间默认无法共享数据。下面介绍几种进程间通信的方式。
方法一:使用Array
from multiprocessing import Process, Array
g = []
def share_task(a):
g.extend(a)
for i in range(len(a)):
a[i] = a[i] * 2
if __name__ == '__main__':
arr = Array('i', range(10))
p = Process(target=share_task, args=(arr,))
p.start()
p.join()
print('arr: ', arr[:])
print('g: ', g[:])
输出结果:
arr: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
g: []
g还是为空列表,表明默认数据不共享,然而通过Array,子进程改变了主进程的数组中的元素。
方法二:使用Manager
from multiprocessing import Process, Manager
def m_task(d, l):
d[1] = '1'
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as m:
m_dict = m.dict()
m_list = m.list(range(10))
p = Process(target=m_task, args=(m_dict, m_list))
p.start()
p.join()
print('m_dict: ', m_dict)
print('m_list: ', m_list)
输出结果:
m_dict: {1: '1', 0.25: None}
m_list: [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
方法三: 使用Queue
from multiprocessing import Process, Queue
def producer_task(q):
for i in range(10):
q.put(i)
print('producer: ', i)
def consumer_task(q):
for i in range(10):
v = q.get()
print('consumer: ', v)
if __name__ == '__main__':
q = Queue()
p_producer = Process(target=producer_task, args=(q,))
p_consumer = Process(target=consumer_task, args=(q,))
p_producer.start()
p_consumer.start()
p_producer.join()
p_consumer.join()
输出结果:
producer: 0
producer: 1
producer: 2
producer: 3
consumer: 0
producer: 4
consumer: 1
producer: 5
producer: 6
consumer: 2
consumer: 3
producer: 7
producer: 8
consumer: 4
producer: 9
consumer: 5
consumer: 6
consumer: 7
consumer: 8
consumer: 9
还有Pipes等通信方式,这里就不多说了。
说完进程,就来说说线程,虽说都不推荐使用多线程,但是本人测试过多进程和多线程执行效率的差距,并没感觉到有多大差别。等后面写爬虫的时候会比较它们的执行效率。
1. 线程:
多线程共享一个变量的时候,记得加锁threading.Lock()
,防止多个线程同时修改变量,造成数据错误。
threading.local()
能记录每个线程独有的变量。
看代码和注释:
#!usr/bin/env python
# -*- coding:utf-8 _*-
import multiprocessing
import threading
g = 0
local = threading.local()
def task():
lock = threading.Lock()
lock.acquire()
try:
global g
print('thread %s is running...' % threading.current_thread().name)
for i in range(100000):
g = g + 1
g = g - 1
finally:
lock.release()
def local_task(name):
local.name = name
print(name)
print_local()
def print_local():
print('thread name: %s, local var is %s' % (threading.current_thread().name, local.name))
if __name__ == '__main__':
print('thread %s is running...' % threading.current_thread().name)
# 创建线程
thread1 = threading.Thread(target=task, name='SonThread1')
thread2 = threading.Thread(target=task, name='SonThread2')
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(g)
thread3 = threading.Thread(target=local_task, name='SonThread3', args=('Tom',))
thread4 = threading.Thread(target=local_task, name='SonThread4', args=('Lucy',))
thread3.start()
thread4.start()
print('thread %s is ended...' % threading.current_thread().name)
输出结果:
thread MainThread is running...
thread SonThread1 is running...
thread SonThread2 is running...
0
Tom
thread name: SonThread3, local var is Tom
Lucy
thread name: SonThread4, local var is Lucy
thread MainThread is ended...
2. 线程池ThreadPoolExecutor:
其使用方式和进程池ProcessPoolExecutor
一模一样。示例代码如下:
def task(i):
time.sleep(0.5)
return i
list = [i for i in range(count)]
with ThreadPoolExecutor(max_workers=8) as pool:
# 有序输出,直接返回结果
print([data for data in pool.map(task, list)])