目录
[toc]
概念
- CPU在同一时刻只能处理一个任务,cpu在各个任务之间来回的进行切换,只是因为cpu执行速度很快,误认为是同时执行,但是python的线程是伪线程,即使是多核cpu也只是同时执行一个进程
- 一个进程占一块内存,每一个程序的内存是独立的
- 进程需要执行必须要创建一个线程
- 同一个进程的线程共享同一块资源
进程:
在进行的一个任务(一些资源的集合:),由cpu执行
线程:
是操作系统最小的调度单位,是一串指令的集合
区别:
1.进程快还是线程快?:一样快 进程是通过线程执行所以是线程同线程比较
2.启动线程快还是进程快? 启动线程快,启动线程:要申请内存空间等 ,启动进程:直接执行指令
3.线程共享内存空间,进程内存是独立的
4.创建新线程更简单,创建新进程要拷贝父进程
5.一个线程可以操作同一个进程里的其他线程,进程只能操作子进程
线程
使用场景
- 不适合cpu()密集型任务,适合io(数据读取写入)操作密集型任务
创建进程
使用方法的方式使用线程
def run(name):
print(name)
time.sleep(2)
threads=[ threading.Thread(target=run,args=(i,)) for i in range(10)] #用列表生成式 生成10个线程
for t in threads: #启动刚刚创建的10个线程
t.start()
使用类的方式使用线程
class MyThread(threading.Thread):
def __init__(self,n):
super(MyThread,self).__init__()
self.n=n
def run(self):
print(self.n)
threads=[ MyThread(str(i)) for i in range(10)] #用列表生成式 生成10个线程
for t in threads: #启动刚刚创建的10个线程
t.start()
join
主线程创建子线程后,两者并不影响,所以是并行执行,造成主线程结束后,子线程还在运行。那么我们需要主线程要等待子线程运行完后,再退出,就要使用join
def run(name):
for i in range(3):
print(name,i)
time.sleep(2)
threads=[ threading.Thread(target=run,args=('t%s'%i,)) for i in range(3)] #用列表生成式 生成10个线程
for t in threads: #启动刚刚创建的10个线程
t.start()
for t in threads: #join创建的10个线程
t.join()
print('main finished...') #join后,所有子线程执行完才会执行,join前,子线程没有执行完就执行了这句
print('-----')
print('进程个数:',threading.active_count())
print('进程个数:',threading.activeCount())
print('当前进程:',threading.current_thread())
守护线程(后台线程)
默认情况下,主线程退出之后,那么主线程结束后,子线程也依然会继续执行。如果希望主线程退出后,其子线程也退出而不再执行,则需要设置子线程为守护线程。用setDeamon 方法设置线程为守护线程。
print('-------守护线程-------')
def run(name):
for i in range(3):
print(name,i)
time.sleep(2)
threads=[ threading.Thread(target=run,args=('t%s'%i,)) for i in range(3)] #用列表生成式 生成10个线程
for t in threads: #启动刚刚创建的10个线程
t.setDaemon(True)
t.start()
print('main finished...')
线程同步与互斥锁 LOCK
同一时间 只有一个线程运行,即使cpu是多核的
python 线程执行的是c语言写的原生线程,多线程对数据操作时,a线程更改数据后,b线程又再次更改了数据,两次个线程执行的结果不会叠加,只是修改,为了实现数据同步,所以加入全局解释器锁(GIL)
不添加锁时在乌班图上运行此段代码,num的计数是会不准确的。
线程同步
print('-------线程同步与互斥锁-------')
#添加锁
lock=threading.Lock()
num=0
def run(name):
lock.acquire() #获取锁
global num
num+=1
lock.release() #释放锁
threads=[ threading.Thread(target=run,args=('t%s'%i,)) for i in range(10)] #用列表生成式 生成10个线程
for t in threads: #启动刚刚创建的10个线程
t.start()
for t in threads: #join创建的10个线程
t.join()
print('num',num)
互斥锁(递归锁)RLock
多重锁的时候,也就是两层锁,程序会锁死,这时候要用RLock,程序才能正常执行
lock=threading.RLock()
num1,num2=0,0
def run1():
lock.acquire() # 获取锁
global num1
num1+=1
lock.release() # 释放锁
return num1
def run2():
lock.acquire() # 获取锁
global num2
num2+=1
lock.release() # 释放锁
return num2
def run():
lock.acquire() #获取锁
run1()
print('between run1 run2')
run2()
lock.release() #释放锁
threads=[ threading.Thread(target=run) for i in range(10)] #用列表生成式 生成10个线程
for t in threads: #启动刚刚创建的10个线程
t.start()
for t in threads: #join创建的10个线程
t.join()
print('num1:',num1,'num2:',num2)
信号量
指定固定数量的线程一起执行(其实还是执行完一个 输出一个)
print('-------信号量-------')
lock=threading.BoundedSemaphore(3) #只允许三个线程同时一起执行
def run(tag):
lock.acquire() #获取锁
print(tag)
time.sleep(2) #执行后等待两秒
lock.release() #释放锁
threads=[ threading.Thread(target=run,args=('t%s'%i,)) for i in range(10)] #用列表生成式 生成10个线程
for t in threads: #启动刚刚创建的10个线程
t.start()
for t in threads: #join创建的10个线程
t.join()
线程的事件Events
线程控制其他线程的执行,线程间交互
四个方法:
1.set 设置标志位 true
2.clear 清空标志位 flase
3.等待标志位被设置
4.is_set 是否设置了标志位
print('-------红绿灯-------')
event=threading.Event()
def light(): # 设置标志位时 是绿灯
count=0
while True:
if count<5:
event.set() # 设置标志位时
print('\033[42;1m绿灯...%s\033[0m'%count)
elif count>4:
event.clear()#清空标志位
print('\033[41;1m红灯...%s\033[0m'%count)
time.sleep(1)
count += 1
if count==10:
count=0
thrad_light=threading.Thread(target=light,)
thrad_light.start()
def car():
while True:
if event.is_set(): #判断是否设置了标志位
print('绿灯了,我过马路咯')
time.sleep(1)
else:
print('红灯了,等着过马路') #标志位flase时,wait 可阻塞当前事件
event.wait()
thrad_car=threading.Thread(target=car,)
thrad_car.start()
queue 队列
为什么要用队列?
1. 提高效率 :任务放进队列执行就OK
2. 完成程序之间的解耦(程序间的依赖关系):执行任务 和 队列 没有关联
方法:
put:放入队列
get:获取队列中的数据
get_nowait:获取为空时,抛出异常
qsize:获取队列大小
放入取出
放入
q=queue.Queue(maxsize=2) #生成队列 并设置最大size
q.put(1)
q.put('domain')
try:
q.put('alex', block=False) #添加时 已经占满最大size了,会阻塞卡住,需要吧block 设置成true,则不会阻塞,直接抛出异常
except Exception as e:
print(e)
print(q.qsize()) #查看当前队列 size
取出
print(q.get())
print(q.get())
try:
print(q.get(block=False)) #取出时 已经占满最大size了,会阻塞卡住,需要吧block 设置成true,则不会阻塞,直接抛出异常,用get_nowait 效果一样
except Exception as e:
print(e)
try:
print(q.get_nowait())
except Exception as e:
pass
出入顺序
默认的读取顺序是先入先出也就是queue.Queue(),后入先出使用:queue.LifoQueue()
后入先出
print('----后入先出----')
q=queue.LifoQueue() #生成队列
q.put('alex')
q.put('domain')
print(q.get())
print(q.get())
自定义优先级
print('----自定义优先级----')
q=queue.PriorityQueue()
q.put((1,'domain'))
q.put((-1,'alex'))
q.put((3,'searse'))
print((q.get()))
print((q.get()))
print((q.get()))
生产者 消费者
某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责产生数据送完仓库,而消费者负责从仓库里取出数据处理消费,这就构成了生产者消费者模式。
为什么要使用生产者消费者 模型
1.解耦:两个模块间通过仓库交互,两个模块间的代码互不依赖
2.并发:两个模块并发,生产者产生数据,仓库中有数据消费者就处理
print('----生产者 消费者----')
q=queue.Queue(maxsize=10) #生成队列 并设置最大size10
def produce():
count=0
while True:
q.put(count)
print('产生%s'%count)
count+=1
time.sleep(0.5) #0.1秒钟 产生一个
def consume(name):
while True:
data=q.get()
time.sleep(1) #1秒钟 消费一个
print('%s消费了%s'%(name,data))
thread_produce=threading.Thread(target=produce)
thread_produce=threading.Thread(target=produce)
thread_consume1=threading.Thread(target=consume,args=('domain',))
thread_consume2=threading.Thread(target=consume,args=('alex',))
thread_produce.start()
thread_consume1.start()
thread_consume2.start()
进程
介绍
- 语法基本和线程一样
- 每一个进程都是由进程启动的,都会有一个父进程
创建进程,进程中创建线程
def info():
print('parent process id:',os.getppid())
print('current process id:',os.getpid())
def run():
threads = [threading.Thread(target=info,) for i in range(2)] # 用列表生成式 生成10个线程
for t in threads: # 启动刚刚创建的2个线程
t.start() #在当前进程 启动两个线程
time.sleep(3)
if __name__ == '__main__':
info()
processes=[multiprocessing.Process(target=run,) for i in range(1)] #用列表生成式 生成10个线程
for p in processes:
p.start()#启动刚刚创建的10个进程
p.join()#进程结束 主进程也就是当前的程序 才结束
print('master process finished...')
进程间的通信
创建子进程时,克隆了一份父进程,子进程更改数据后,在通过中间件返回父进程,实现通信
使用multiprocessing.Queue 传递数据
from multiprocessing import Queue,Process,Pipe #进程间队列通信 要import 这个Queue
def fun(arg):
arg.put('domain')
arg.put('alex')
if __name__ == '__main__':
q=Queue()
p=Process(target=fun,args=(q,))
p.start()
print(q.get())
使用multiprocessing.Pipe 传递数据
from multiprocessing import Process,Pipe
print('---Pipe---')
def fun(child):
child.send(['domain','alex'])
child.send(['12','33'])
print(child.recv())
child.close()
if __name__ == '__main__':
parent,child=Pipe()
p=Process(target=fun,args=(child,))
p.start()
print(parent.recv())
print(parent.recv())
parent.send("from parent:hello")
p.join() # 需写在send 之后
使用Managers 传递数据
from multiprocessing import Process,Manager
import os
def run(list,dic):
list.append(os.getpid()) #给共享的list添加元素
dic[os.getpid()] = os.getppid() #给共享的dict添加元素
if __name__ == '__main__':
with Manager() as manager:
list=manager.list() #生成一个用于共享的list
dic=manager.dict() #生成一个用于共享的dict
processes = [Process(target=run,args=(list,dic)) for i in range(10)] # 用列表生成式 生成10个线程
for p in processes:
p.start() # 启动刚刚创建的10个进程
p.join() # 进程结束 主进程也就是当前的程序 才结束
print(list)
print(dic)
进程中的锁
进程中不能共享数据,那么这个锁存在的意义的是什么?虽然数据没有共享,但是在打印时共享同一块屏幕,加上锁 确保打印不乱
from multiprocessing import Process,Lock
import os
def run(lock,num):
lock.acquire()
print(num)
lock.release()
if __name__ == '__main__':
lock=Lock()
processes = [Process(target=run, args=(lock, i)) for i in range(10)] # 用列表生成式 生成10个线程
for p in processes:
p.start() # 启动刚刚创建的10个进程
p.join() # 进程结束 主进程也就是当前的程序 才结束
进程池
设置固定数量的进程同时执行
python线程,因为线程池占用资源很小,所有没有线程池。但是进程不一样,前面说过 创建一个进程 要拷贝一份父进程,所以占用资源极其大,需要使用进程池。
from multiprocessing import Pool
import os,time
def run(arg):
print('子进程:',os.getpid())
time.sleep(2)
return os.getpid() # 返回参数给回调方法end
def end(arg):
print(arg,'的回调','---',)
if __name__ == '__main__':
print('主进程id:',os.getpid())
pool=Pool(5)
for i in range(20):
# pool.apply(fun,) #串行执行
if i%2==0:
pool.apply_async(func=run,args=(i,),callback=end) #并行执行,callback,是进程结束后的回调,是主进程调用的回调。
pool.close() #需先close,再join
pool.join() #join: 等待子进程,主线程再结束
print('main finished...'
协程
介绍
- 用户态(存在用户空间的数据)的轻量级线程(微线程)
- 协程能保留上一次状态
- 在单线程实现并发,串行操作,函数间来回切换执行
4.遇到IO操作就切换,IO 操作完了再回来
通过greenlet 使用协程(手动切换)
from greenlet import greenlet
def func1():
print(1)
g2.switch() # 输出1后 后切换到协程2执行
print(2) # 输出1后 后切换到协程2执行
g2.switch()
def func2():
print(3)
g1.switch() # 输出3后 后切换到协程1执行
print(4)
g1=greenlet(func1) #启动协程1 传入要执行的方法
g2=greenlet(func2) #启动协程2 传入要执行的方法
g1.switch() #切换到协程1,启动协程1,执行func1方法
通过gevent使用协程(自动切换)
import gevent,time
def func1():
print('in the func1 1')
gevent.sleep(2) #模拟io 操作需要2s,真实io 操作不需要 写这句,模块会自动判断
print('in the func1 2') # 因为这里io 需要时间最久,所以最后才会执行这句,先跳去执行其他的方法了。最后回来执行。也就是整个协程执行的时间 :2s
def func2():
print('in the func2 1')
gevent.sleep(1)
print('in the func2 2')
def func3():
print('in the func3 1')
gevent.sleep(1)
print('in the func3 2')
start_time=time.time()
gevent.joinall([ # 加入所有协程
gevent.spawn(func1),
gevent.spawn(func2),
gevent.spawn(func3),
])
end_time=time.time()
expend_time=end_time-start_time
print(expend_time)
协程爬取网页
from urllib import request
import gevent,time
from gevent import monkey
monkey.patch_all()
def spider(url,save_name):
respones=request.urlopen(url)
data=respones.read()
with open(save_name,'wb') as f:
f.write(data)
start_time=time.time()
gevent.joinall([ # 加入所有协程
gevent.spawn(spider, 'https://www.baidu.com','baidu.html'),
gevent.spawn(spider,'https://www.iqiyi.com','iqiyi.html'),
gevent.spawn(spider,'https://weibo.com/','weibo.html')
])
end_time=time.time()
expend_time=end_time-start_time
print('花费时间:',expend_time)
协程并发socket
服务端
import sys
import socket
import time
import gevent
from gevent import socket, monkey
monkey.patch_all()
def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
def handle_request(conn):
try:
while True:
data = conn.recv(1024)
print("recv:", data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server(8001)
客户端
import socket
HOST = 'localhost' # The remote host
PORT = 8001 # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"), encoding="utf8")
s.sendall(msg)
data = s.recv(1024)
# print(data)
print('Received', repr(data))
s.close()
100个线程并发
import socket
import threading
def sock_conn():
client = socket.socket()
client.connect(("localhost",8001))
count = 0
while True:
client.send( ("hello %s" %count).encode("utf-8"))
data = client.recv(1024)
print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果
count +=1
client.close()
for i in range(100):
t = threading.Thread(target=sock_conn)
t.start()