Python 中的进程、线程、协程详解

目录

[toc]


概念

  1. CPU在同一时刻只能处理一个任务,cpu在各个任务之间来回的进行切换,只是因为cpu执行速度很快,误认为是同时执行,但是python的线程是伪线程,即使是多核cpu也只是同时执行一个进程
  2. 一个进程占一块内存,每一个程序的内存是独立的
  3. 进程需要执行必须要创建一个线程
  4. 同一个进程的线程共享同一块资源

进程:
在进行的一个任务(一些资源的集合:),由cpu执行

线程:
是操作系统最小的调度单位,是一串指令的集合

区别:

1.进程快还是线程快?:一样快 进程是通过线程执行所以是线程同线程比较
2.启动线程快还是进程快? 启动线程快,启动线程:要申请内存空间等 ,启动进程:直接执行指令
3.线程共享内存空间,进程内存是独立的
4.创建新线程更简单,创建新进程要拷贝父进程
5.一个线程可以操作同一个进程里的其他线程,进程只能操作子进程

线程

使用场景

  1. 不适合cpu()密集型任务,适合io(数据读取写入)操作密集型任务

创建进程

使用方法的方式使用线程

def run(name):
    print(name)
    time.sleep(2)
threads=[ threading.Thread(target=run,args=(i,)) for i in range(10)] #用列表生成式 生成10个线程
for t in threads: #启动刚刚创建的10个线程
    t.start()

使用类的方式使用线程


class MyThread(threading.Thread):
    def __init__(self,n):
        super(MyThread,self).__init__()
        self.n=n
    def run(self):
        print(self.n)

threads=[ MyThread(str(i)) for i in range(10)] #用列表生成式 生成10个线程
for t in threads: #启动刚刚创建的10个线程
    t.start()

join

主线程创建子线程后,两者并不影响,所以是并行执行,造成主线程结束后,子线程还在运行。那么我们需要主线程要等待子线程运行完后,再退出,就要使用join

def run(name):
    for i in range(3):
        print(name,i)
        time.sleep(2)

threads=[ threading.Thread(target=run,args=('t%s'%i,)) for i in range(3)] #用列表生成式 生成10个线程

for t in threads: #启动刚刚创建的10个线程
    t.start()
for t in threads: #join创建的10个线程
    t.join()
print('main finished...') #join后,所有子线程执行完才会执行,join前,子线程没有执行完就执行了这句

print('-----')
print('进程个数:',threading.active_count())
print('进程个数:',threading.activeCount())
print('当前进程:',threading.current_thread())

守护线程(后台线程)

默认情况下,主线程退出之后,那么主线程结束后,子线程也依然会继续执行。如果希望主线程退出后,其子线程也退出而不再执行,则需要设置子线程为守护线程。用setDeamon 方法设置线程为守护线程。

print('-------守护线程-------')

def run(name):
    for i in range(3):
        print(name,i)
        time.sleep(2)

threads=[ threading.Thread(target=run,args=('t%s'%i,)) for i in range(3)] #用列表生成式 生成10个线程

for t in threads: #启动刚刚创建的10个线程
    t.setDaemon(True)
    t.start()

print('main finished...')

线程同步与互斥锁 LOCK

同一时间 只有一个线程运行,即使cpu是多核的
python 线程执行的是c语言写的原生线程,多线程对数据操作时,a线程更改数据后,b线程又再次更改了数据,两次个线程执行的结果不会叠加,只是修改,为了实现数据同步,所以加入全局解释器锁(GIL)
不添加锁时在乌班图上运行此段代码,num的计数是会不准确的。

线程同步

print('-------线程同步与互斥锁-------')
#添加锁
lock=threading.Lock()
num=0
def run(name):
    lock.acquire() #获取锁
    global num
    num+=1
    lock.release() #释放锁
threads=[ threading.Thread(target=run,args=('t%s'%i,)) for i in range(10)] #用列表生成式 生成10个线程
for t in threads: #启动刚刚创建的10个线程
    t.start()
for t in threads: #join创建的10个线程
    t.join()
print('num',num)

互斥锁(递归锁)RLock

多重锁的时候,也就是两层锁,程序会锁死,这时候要用RLock,程序才能正常执行

lock=threading.RLock()
num1,num2=0,0

def run1():
    lock.acquire()  # 获取锁
    global  num1
    num1+=1
    lock.release()  # 释放锁
    return num1

def run2():
    lock.acquire()  # 获取锁
    global  num2
    num2+=1
    lock.release()  # 释放锁
    return num2

def run():
    lock.acquire() #获取锁

    run1()
    print('between run1 run2')
    run2()

    lock.release() #释放锁
threads=[ threading.Thread(target=run) for i in range(10)] #用列表生成式 生成10个线程
for t in threads: #启动刚刚创建的10个线程
    t.start()
for t in threads: #join创建的10个线程
    t.join()

print('num1:',num1,'num2:',num2)

信号量

指定固定数量的线程一起执行(其实还是执行完一个 输出一个)

print('-------信号量-------')
lock=threading.BoundedSemaphore(3) #只允许三个线程同时一起执行
def run(tag):
    lock.acquire() #获取锁
    print(tag)
    time.sleep(2)  #执行后等待两秒
    lock.release() #释放锁
threads=[ threading.Thread(target=run,args=('t%s'%i,)) for i in range(10)] #用列表生成式 生成10个线程
for t in threads: #启动刚刚创建的10个线程
    t.start()
for t in threads: #join创建的10个线程
    t.join()

线程的事件Events

线程控制其他线程的执行,线程间交互
四个方法:

1.set 设置标志位 true
2.clear 清空标志位 flase
3.等待标志位被设置
4.is_set 是否设置了标志位

print('-------红绿灯-------')
event=threading.Event()
def light():   # 设置标志位时 是绿灯
    count=0
    while True:
        if count<5:
            event.set()  # 设置标志位时
            print('\033[42;1m绿灯...%s\033[0m'%count)
        elif count>4:
            event.clear()#清空标志位
            print('\033[41;1m红灯...%s\033[0m'%count)
        time.sleep(1)
        count += 1
        if count==10:
            count=0

thrad_light=threading.Thread(target=light,)
thrad_light.start()

def car():
    while True:
        if event.is_set(): #判断是否设置了标志位
            print('绿灯了,我过马路咯')
            time.sleep(1)
        else:
            print('红灯了,等着过马路') #标志位flase时,wait 可阻塞当前事件
            event.wait()
thrad_car=threading.Thread(target=car,)
thrad_car.start()

queue 队列

为什么要用队列?

1. 提高效率 :任务放进队列执行就OK
2. 完成程序之间的解耦(程序间的依赖关系):执行任务 和 队列 没有关联

方法:

put:放入队列
get:获取队列中的数据
get_nowait:获取为空时,抛出异常
qsize:获取队列大小

放入取出

放入

q=queue.Queue(maxsize=2) #生成队列 并设置最大size
q.put(1)
q.put('domain')
try:
    q.put('alex', block=False) #添加时 已经占满最大size了,会阻塞卡住,需要吧block 设置成true,则不会阻塞,直接抛出异常
except Exception as e:
    print(e)
print(q.qsize()) #查看当前队列 size

取出

print(q.get())
print(q.get())
try:
    print(q.get(block=False)) #取出时 已经占满最大size了,会阻塞卡住,需要吧block 设置成true,则不会阻塞,直接抛出异常,用get_nowait 效果一样
except Exception as e:
    print(e)

try:
    print(q.get_nowait())
except Exception as e:
   pass

出入顺序

默认的读取顺序是先入先出也就是queue.Queue(),后入先出使用:queue.LifoQueue()

后入先出

print('----后入先出----')
q=queue.LifoQueue() #生成队列
q.put('alex')
q.put('domain')
print(q.get())
print(q.get())

自定义优先级

print('----自定义优先级----')
q=queue.PriorityQueue()
q.put((1,'domain'))
q.put((-1,'alex'))
q.put((3,'searse'))
print((q.get()))
print((q.get()))
print((q.get()))

生产者 消费者

某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责产生数据送完仓库,而消费者负责从仓库里取出数据处理消费,这就构成了生产者消费者模式。

为什么要使用生产者消费者 模型

1.解耦:两个模块间通过仓库交互,两个模块间的代码互不依赖
2.并发:两个模块并发,生产者产生数据,仓库中有数据消费者就处理
print('----生产者 消费者----')
q=queue.Queue(maxsize=10) #生成队列 并设置最大size10

def produce():
    count=0
    while True:
        q.put(count)
        print('产生%s'%count)
        count+=1
        time.sleep(0.5) #0.1秒钟 产生一个
def consume(name):
    while True:
        data=q.get()
        time.sleep(1)  #1秒钟 消费一个
        print('%s消费了%s'%(name,data))

thread_produce=threading.Thread(target=produce)
thread_produce=threading.Thread(target=produce)
thread_consume1=threading.Thread(target=consume,args=('domain',))
thread_consume2=threading.Thread(target=consume,args=('alex',))
thread_produce.start()
thread_consume1.start()
thread_consume2.start()

进程

介绍

  1. 语法基本和线程一样
  2. 每一个进程都是由进程启动的,都会有一个父进程

创建进程,进程中创建线程

def info():
    print('parent process id:',os.getppid())
    print('current process id:',os.getpid())

def run():
    threads = [threading.Thread(target=info,) for i in range(2)]  # 用列表生成式 生成10个线程
    for t in threads:  # 启动刚刚创建的2个线程
        t.start()      #在当前进程 启动两个线程
        time.sleep(3)

if __name__ == '__main__':
    info()
    processes=[multiprocessing.Process(target=run,) for i in range(1)] #用列表生成式 生成10个线程
    for p in processes:
        p.start()#启动刚刚创建的10个进程
        p.join()#进程结束 主进程也就是当前的程序 才结束
    print('master process finished...')

进程间的通信

创建子进程时,克隆了一份父进程,子进程更改数据后,在通过中间件返回父进程,实现通信

使用multiprocessing.Queue 传递数据

from multiprocessing import Queue,Process,Pipe #进程间队列通信 要import 这个Queue

def fun(arg):
     arg.put('domain')
     arg.put('alex')

 if __name__ == '__main__':
     q=Queue()
     p=Process(target=fun,args=(q,))
    p.start()
     print(q.get())

使用multiprocessing.Pipe 传递数据

from multiprocessing import  Process,Pipe
print('---Pipe---')

def fun(child):
    child.send(['domain','alex'])
    child.send(['12','33'])
    print(child.recv())
    child.close()

if __name__ == '__main__':
    parent,child=Pipe()
    p=Process(target=fun,args=(child,))
    p.start()
    print(parent.recv())
    print(parent.recv())
    parent.send("from parent:hello")
    p.join() # 需写在send 之后

使用Managers 传递数据

from multiprocessing import  Process,Manager
import  os
def run(list,dic):
    list.append(os.getpid())  #给共享的list添加元素
    dic[os.getpid()] = os.getppid()  #给共享的dict添加元素

if __name__ == '__main__':
    with Manager() as manager:
        list=manager.list() #生成一个用于共享的list
        dic=manager.dict()  #生成一个用于共享的dict
        processes = [Process(target=run,args=(list,dic)) for i in range(10)]  # 用列表生成式 生成10个线程
        for p in processes:
            p.start()  # 启动刚刚创建的10个进程
            p.join()  # 进程结束 主进程也就是当前的程序 才结束
        print(list)
        print(dic)

进程中的锁

进程中不能共享数据,那么这个锁存在的意义的是什么?虽然数据没有共享,但是在打印时共享同一块屏幕,加上锁 确保打印不乱

from multiprocessing import  Process,Lock
import  os
def run(lock,num):
    lock.acquire()
    print(num)
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    processes = [Process(target=run, args=(lock, i)) for i in range(10)]  # 用列表生成式 生成10个线程
    for p in processes:
        p.start()  # 启动刚刚创建的10个进程
        p.join()  # 进程结束 主进程也就是当前的程序 才结束

进程池

设置固定数量的进程同时执行
python线程,因为线程池占用资源很小,所有没有线程池。但是进程不一样,前面说过 创建一个进程 要拷贝一份父进程,所以占用资源极其大,需要使用进程池。

from multiprocessing import Pool
import os,time
def run(arg):
    print('子进程:',os.getpid())
    time.sleep(2)
    return os.getpid() # 返回参数给回调方法end
def end(arg):
    print(arg,'的回调','---',)
if __name__ == '__main__':
    print('主进程id:',os.getpid())
    pool=Pool(5)
    for i in range(20):
        # pool.apply(fun,) #串行执行
        if i%2==0:
         pool.apply_async(func=run,args=(i,),callback=end) #并行执行,callback,是进程结束后的回调,是主进程调用的回调。
    pool.close() #需先close,再join
    pool.join()  #join: 等待子进程,主线程再结束
    print('main finished...'

协程

介绍

  1. 用户态(存在用户空间的数据)的轻量级线程(微线程)
  2. 协程能保留上一次状态
  3. 在单线程实现并发,串行操作,函数间来回切换执行
    4.遇到IO操作就切换,IO 操作完了再回来

通过greenlet 使用协程(手动切换)

from greenlet import greenlet
def func1():
    print(1)
    g2.switch() # 输出1后 后切换到协程2执行
    print(2)    # 输出1后 后切换到协程2执行
    g2.switch()
def func2():
    print(3)
    g1.switch() # 输出3后 后切换到协程1执行
    print(4)
g1=greenlet(func1) #启动协程1 传入要执行的方法
g2=greenlet(func2) #启动协程2 传入要执行的方法
g1.switch()        #切换到协程1,启动协程1,执行func1方法

通过gevent使用协程(自动切换)

import gevent,time
def func1():
    print('in the func1 1')
    gevent.sleep(2)  #模拟io 操作需要2s,真实io 操作不需要 写这句,模块会自动判断
    print('in the func1 2')   # 因为这里io 需要时间最久,所以最后才会执行这句,先跳去执行其他的方法了。最后回来执行。也就是整个协程执行的时间 :2s
def func2():
    print('in the func2 1')
    gevent.sleep(1)
    print('in the func2 2')
def func3():
    print('in the func3 1')
    gevent.sleep(1)
    print('in the func3 2')
start_time=time.time()
gevent.joinall([          # 加入所有协程
    gevent.spawn(func1),
    gevent.spawn(func2),
    gevent.spawn(func3),
])
end_time=time.time()
expend_time=end_time-start_time
print(expend_time)


协程爬取网页

from urllib import request
import gevent,time
from gevent import  monkey

monkey.patch_all()
def spider(url,save_name):
     respones=request.urlopen(url)
     data=respones.read()
     with open(save_name,'wb') as f:
         f.write(data)
start_time=time.time()
gevent.joinall([          # 加入所有协程
    gevent.spawn(spider, 'https://www.baidu.com','baidu.html'),
    gevent.spawn(spider,'https://www.iqiyi.com','iqiyi.html'),
    gevent.spawn(spider,'https://weibo.com/','weibo.html')
])
end_time=time.time()
expend_time=end_time-start_time
print('花费时间:',expend_time)

协程并发socket

服务端

import sys
import socket
import time
import gevent

from gevent import socket, monkey
monkey.patch_all()
def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)
def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)
    except Exception as  e:
        print(e)
    finally:
        conn.close()
if __name__ == '__main__':
    server(8001)

客户端

import socket

HOST = 'localhost'  # The remote host
PORT = 8001  # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    # print(data)

    print('Received', repr(data))
s.close()

100个线程并发


import socket
import threading
def sock_conn():
    client = socket.socket()
    client.connect(("localhost",8001))
    count = 0
    while True:
        client.send( ("hello %s" %count).encode("utf-8"))
        data = client.recv(1024)
        print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果
        count +=1
    client.close()


for i in range(100):
    t = threading.Thread(target=sock_conn)
    t.start()
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容

  • 1.ios高性能编程 (1).内层 最小的内层平均值和峰值(2).耗电量 高效的算法和数据结构(3).初始化时...
    欧辰_OSR阅读 29,339评论 8 265
  • python之进程、线程与协程 有这么个例子说他们的区别,帮助理解很有用。 有一个老板想开一个工厂生产手机。 他需...
    道无虚阅读 3,173评论 0 3
  • 又来到了一个老生常谈的问题,应用层软件开发的程序员要不要了解和深入学习操作系统呢? 今天就这个问题开始,来谈谈操...
    tangsl阅读 4,100评论 0 23
  • 有一句话很经典,叫先生存后生活。那怎么才能生存了,很简单――赚钱。 而赚钱只是第一步,让我们赚到的有限的钱得到最大...
    你我world阅读 333评论 0 1
  • 我们很多人内在都有着深沉的恐惧,主要是害怕无法生存和害怕得不到爱。所有的恐惧都缘于成长过程中爱的匮乏。儿时的缺憾耗...
    朝雨下阅读 365评论 0 0