Python 多进程和多线程

一、多进程

说到 Python 多进程必然会有 multiprocessing 模块。但是在说 multiprocessing 模块前,先看一下子进程是如何创建的。

1. 创建子进程 fork

再 Linux 或 Unix 系统中 提供了一个叫做 fork 的函数。
系统调用 fork 用于从已存在进程中创建一个新进程,新进程称为子进程,而原进程称为父进程。

  • fork 调用一次,返回两次,两次返回值分别是:

(1)在父进程中的返回值是子进程的进程号
(2)在子进程中的返回值则返回 0

fork 返回值:
< 0 子进程创建失败
= 0 在子进程中的返回值
> 0 在父进程中的返回值

  • fork 创建的子进程特性如下

(1)子进程会继承父进程几乎全部代码段(包括 fork 前所定义的所有内容)
(2)子进程拥有自己独立的信息标识,如PID
(3)父、子进程独立存在,在各自存储空间上运行,互不影响
(4)创建父子进程执行不同的内容是多任务中固定方法

  • 示例

再 Python 的 os 模块中提供了 fork 函数。下面来看一个创建子进程的例子:

#!coding=utf-8
import os
from time import sleep

pid = os.fork()

if pid < 0:
    print("create process failed")
elif pid == 0:
    print "子进程PID %s" % os.getpid()  # 子进程PID
    print "父进程PID %s" % os.getppid()  # 父进程PID
    print "子进程执行过程"
else:
    sleep(1)
    print "=" * 30
    print "子进程PID %s" % pid  # 父进程中fork()返回值 = 子进程PID
    print "父进程PID %s" % os.getpid()  # 父进程PID
    print "父进程执行过程"

结果如下:

子进程PID 26569
父进程PID 26528
子进程执行过程
==============================
子进程PID 26569
父进程PID 26528
父进程执行过程
[Finished in 1.4s]

代码中 sleep(1) 是为了等待子进程结束后,再结束父进程。

下面示例中,先结束父进程再结束子进程,看看 pid 的变化

#!coding=utf-8
import os
from time import sleep

pid = os.fork()

if pid < 0:
    print("创建失败")
elif pid == 0:
    sleep(5)
    print "子进程PID %s" % os.getpid()  # 子进程PID
    print "父进程PID %s" % os.getppid()  # 父进程PID
    print "子进程执行过程"
else:
    sleep(1)
    print "子进程PID %s" % pid  # 父进程中fork()返回值 = 子进程PID
    print "父进程PID %s" % os.getpid()  # 父进程PID
    print "父进程执行过程"
    print "=" * 30

执行结果如下:

子进程PID 26885
父进程PID 26844
父进程执行过程
==============================
子进程PID 26885
父进程PID 1
子进程执行过程
[Finished in 5.4s]

会发现子进程的父进程 pid 发生了改变。因为子进程原来的父进程已经结束,子进程变成了孤儿,所以由系统的 init 进程收留。

下面是使用 ps 查看 ppid 的变化

PPID 变化

2. Python 多进程 multiprocessing 模块

  • multiprocessing.Process 创建一个进程

示例

from multiprocessing import Process

def hello(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=hello, args=('Fat',))
    p.start()
    p.join()
方法 说明 参数
Process(target=function, args=(param, ), name='name') 初始化进程 参数:
target: 要执行的函数
args: 参数元祖的形式
name: 进程名字
start() 启动进程 没有参数
join(timeout=None) 主进程阻塞等待子进程的退出 参数:time_out: 超时时间
  • multiprocessing.Pool 进程池批量创建子进程

一般分为两种堵塞和非堵塞

非堵塞进程池

示例

#!coding=utf-8
from multiprocessing import Pool
import os
import time
import random


def task(name):
    print('运行任务 %s (PID %s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 5)
    end = time.time()
    print('任务 %s 运行时间 %0.2f' % (name, (end - start)))


if __name__ == '__main__':
    print('父进程PID %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(task, args=(i,))
    print('等待所有子进程结束')
    p.close()
    p.join()
    print('结束')

结果如下

父进程PID 27851.
等待所有子进程结束
运行任务 0 (PID 27893)...
运行任务 1 (PID 27894)...
运行任务 2 (PID 27895)...
运行任务 3 (PID 27896)...
任务 2 运行时间 1.82
运行任务 4 (PID 27895)...
任务 1 运行时间 2.33
任务 4 运行时间 1.04
任务 3 运行时间 3.24
任务 0 运行时间 3.86
结束
[Finished in 4.4s]

程序中进程池数量上线为4,执行次数为5,所以在结果中看到“运行任务 4 (PID 27895)...”出现在“任务 2 运行时间 1.82”和“任务 1 运行时间 2.33”中间。
因为要等待进程池中有空闲了,才会继续执行。

方法 说明 参数
Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None) 创建进程池 参数:
processes: 创建子进程数量
initializer:初始化pool中的worker的时候调用的初始化函数,例如你每一个worker需要连接数据库
maxtasksperchild: 每个子进程最大的任务量,每干完几次任务后会销毁重建
`close 防止将更多任务提交到池中。一旦完成所有任务,工作进程将退出。
apply_async(func, args=(), kwds={}, callback=None) 任务加入到子进程,非堵塞模式 参数:
func: 需要执行的函数
'args': 出入要执行函数的参数

堵塞进程池
示例:

#!coding=utf-8
from multiprocessing import Pool
import os
import time
import random


def task(name):
    print('运行任务 %s (PID %s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 5)
    end = time.time()
    print('任务 %s 运行时间 %0.2f' % (name, (end - start)))


if __name__ == '__main__':
    print('父进程PID %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply(task, args=(i,))
    print('等待所有子进程结束')
    p.close()
    p.join()
    print('结束')

运行结果如下:

父进程PID 28050.
运行任务 0 (PID 28091)...
任务 0 运行时间 2.25
运行任务 1 (PID 28092)...
任务 1 运行时间 3.76
运行任务 2 (PID 28093)...
任务 2 运行时间 0.45
运行任务 3 (PID 28094)...
任务 3 运行时间 1.36
运行任务 4 (PID 28091)...
任务 4 运行时间 1.00
等待所有子进程结束
结束
[Finished in 9.3s]

可以看到一个一个的运行。
堵塞使用 apply() 方法

3. 进程间通讯 QueuePipes

Queue 示例:
#!coding=utf-8
from multiprocessing import Process, Queue
import time
import os


def put(q):
    print "PUT Queue 值的进程 PID %s" % os.getpid()
    for i in range(1, 6):
        print "PUT 值为:%s" % i
        q.put(i)
        time.sleep(i)


def get(q):
    print "获取 Queue 值的进程 PID %s" % os.getpid()
    while True:
        value = q.get(True)
        print('获取的值为:%s' % value)

if __name__ == '__main__':
    q = Queue()
    p_put = Process(target=put, args=(q,))
    p_get = Process(target=get, args=(q,))
    # 启动 PUT 进程
    p_put.start()
    # 启动 GET 进程
    p_get.start()
    # 等待 PUT 结束:
    p_put.join()
    # 强行终止 get 这个死循环进程
    p_get.terminate()

运行结果

PUT Queue 值的进程 PID 59241
PUT 值为:1
获取 Queue 值的进程 PID 59242
获取的值为:1
PUT 值为:2
获取的值为:2
PUT 值为:3
获取的值为:3
PUT 值为:4
获取的值为:4
[Finished in 11.1s]

这里需要注意的是,multiprocessing.Queue 不支持 multiprocessing.Pool 。如果要在进程池中使用 Queue 使用 multiprocessing.Manager.Queue

Queue 其他方法

方法 说明
q.qsize() 返回当前队列的空间
q.empty() 判断当前队列是否为空
q.full() 判断当前队列是否满
q.put() 放消息
q.get() 获取消息
q.task_done() 接受消息的线程调用该函数来说明消息对应的任务是否已经完成
q.join() 等待队列为空,在执行别的操作
Pipe 示例:

Pipe 管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。 Pipe() 方法返回一个元组 (conn1, conn2)

参数 duplex

  • 模式是 True,双工模式;conn1 和 conn2 是表示管道两端的 Connection 对象。
  • False 单工模式,conn1只能接受,conn2只能用于发送。
#!coding=utf-8
from multiprocessing import Process, Pipe
import time
import os


def write(p):
    print "PUT Queue 值的进程 PID %s" % os.getpid()
    for i in range(1, 6):
        print "PUT 值为:%s" % i
        p.send(i)
        time.sleep(i)


def read(p):
    print "获取 Queue 值的进程 PID %s" % os.getpid()
    while True:
        value = p.recv()
        print('获取的值为:%s' % value)

if __name__ == '__main__':
    p_read, p_write = Pipe(False)
    pw = Process(target=write, args=(p_write,))
    pr = Process(target=read, args=(p_read,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()

运行结果

PUT Queue 值的进程 PID 58896
PUT 值为:1
获取 Queue 值的进程 PID 58897
获取的值为:1
PUT 值为:2
获取的值为:2
PUT 值为:3
获取的值为:3
PUT 值为:4
获取的值为:4
PUT 值为:5
获取的值为:5
[Finished in 16.1s]

二、多线程

Python 再创建线程时是启动了一个真正的线程 Posix Thread。但是因为有一个GIL锁(Global Interpreter Lock)。所有线程再执行时都必须获取一个 GIL 锁。

因为 GIL 锁的存在,每个线程再执行一个定的数据量后,解释器会自动释放 GIL 锁,让下一个线程执行。

再 Python 多线程中,GIL 锁实际上把所有线程的执行代码都给上了锁, 每个线程是交替执行的。

所以,多线程在 Python 中只能交替执行,就算有 100 个线程跑在 100 核的 CPU 上,也只能用到1个核。

1. threading.Thread 创建线程

常用函数说明:

函数 说明 参数
ThreadObject=threading.Thread(target, args=None, name=None) 创建一个线程 参数:
target: 需要线程执行的函数
args: 给线程执行的函数传递参数
name: 线程名
threading.active_count() 获取当前线程数量
threading.current_thread() 获取当前线程信息 返回当前 Thread 对象
threading.enumerate() 返回正在运行的线程是一个 list
ThreadObject.start() 启动线程
ThreadObject.isAlive() 查看线程是否活跃
ThreadObject.getName() 获取线程名
ThreadObject.setName() 设置线程名
ThreadObject.join(time) 等待线程终止,这阻塞调用线程直至线程的 join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。

示例:

#!coding=utf-8
import threading
import random
import time

def job(name):
    start = time.time()
    time.sleep(random.random() * 5)
    print "当前线程的个数: %s" % threading.active_count()
    print "线程的名: %s" % threading.current_thread().name
    end = time.time()
    print "执行时间: %s" % (end - start)
    print name

if __name__ == "__main__":
    print threading.active_count()
    print threading.current_thread().name

    # 创建线程 并开始执行线程
    t1 = threading.Thread(target=job, name="Job1", args=('Thread1',))
    t2 = threading.Thread(target=job, name="Job2", args=('Thread2',))
    # # 使用start方法开始进程
    t1.start()
    t2.start()

结果为:

1
MainThread
当前线程的个数: 3
线程的名: Job2
执行时间: 1.19193983078
Thread2
当前线程的个数: 2
线程的名: Job1
执行时间: 3.69679808617
Thread1
[Finished in 4.1s]

2. multiprocessing.dummy 线程池

用法和 Pool 相同。from multiprocessing.dummy import Pool 这样加载池就可以了。

3. 数据同步

多线程中,所有数据都是共享的,如实多个线程同时修改某个数据,就恨尴尬了。所以在这里需要线程同步。

threading 中提供了 LockRlock 可以实现简单的线程同步。

  • Lock

Lock 中有 acquirerelease 方法。且这两个方法要同时出现。可以将其操作放到acquire和release方法之间。

from threading import Lock
 
lock = Lock()
lock.acquire()
lock.release()
  • Rlock

RLock 对象的 acquire() / release() 是可以嵌套的,当最后一个或者最外层 release() 执行结束后,锁才被设置为 unlocked。

  • 方法说明
方法 说明
acquire() 获得锁。该方法等待锁被解锁,将其设置为 locked 并返回 True。
release() 释放锁。当锁被锁定时,将其重置为解锁并返回。如果锁未锁定,则会引发 RuntimeError。
locked() 如果锁被锁定,返回True。

示例:

#!coding=utf-8
import threading
import time

class MyThread(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):
        global x
        # 获得锁
        lock.acquire()
        x += self.num
        time.sleep(1)
        print(x)
        # 释放锁
        lock.release()

# 创建锁
lock = threading.RLock()
# lock = threading.Lock()

x = 0

thread_list = []
for i in range(1, 5):
    # 创建线程
    t = MyThread(i)
    thread_list.append(t)

for i in thread_list:
    i.start()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,492评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,048评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,927评论 0 358
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,293评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,309评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,024评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,638评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,546评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,073评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,188评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,321评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,998评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,678评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,186评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,303评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,663评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,330评论 2 358