目录
1. 进程池与线程池
2. 协程
3. gevent
4. 单线程下实现并发的套接字通信
首先写一个基于多线程的套接字
服务端:
from socket import *
from threading import Thread
def comunicate(conn):
while True: # 通信循环
try:
data = conn.recv(1024)
if len(data) == 0: break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port, backlog=5):
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip, port))
server.listen(backlog)
while True: # 链接循环
conn, client_addr = server.accept()
print(client_addr)
# 通信
t=Thread(target=comunicate,args=(conn,))
t.start()
if __name__ == '__main__':
s=Thread(target=server,args=('127.0.0.1',8081))
s.start()
每连接上一个客户端便会创造一个线程 , 那么如果有一万个客户端的话服务端会产生一万个线程 , 然后服务端就炸了 , 所以要想个办法限制连接个数 , 即限制
1. 进程池\线程池
开启一个进程池 , 会开启一定个数的进程 , 然后将任务提交给进程就可以了
1 介绍
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.
2 基本方法
submit(fn, *args, **kwargs)
异步提交任务
map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作
shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
result(timeout=None)
取得结果
add_done_callback(fn)
回调函数
- 导入模块
from concurrent.futures import ProcessPoolExecutor
- 创建一个进程池
p=ProcessPoolExecutor(4)
#进程数为4 - 提交任务, 有两种方式
- a.同步调用:同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的
res=p.submit(function,参数一...).result()
- b. 异步调用:提交完一个任务之后,不在原地等待,而是直接执行下一行代码,会导致任务是并发执行的,,结果futrue对象会在任务运行完毕后自动传给回调函数
res=p.submit(function,参数一...)
回调函数(用于异步调用)
每提交一个任务 , 会产生一个对象 , 给这个任务绑定了一个函数 , 这个函数会在你提交的任务完成后自动触发 , 且会将这个对象当作参数传给这个函数
这个函数用于处理子进程运行完之后产生的结果
多进程下的回调函数
from concurrent.futures import ProcessPoolExecutor
import time,os
import requests
def get(url):
print('%s GET %s' %(os.getpid(),url))
time.sleep(3)#处理的太快看不出效果 , 模拟多处理3秒
response=requests.get(url)#爬取网站内容
if response.status_code == 200:
res=response.text
else:
res='下载失败'
return res#返回爬取数据
# 到这里任务运行完了之后自动调用parse函数 ,
# 回调函数 , 处理任务的结果用
def parse(future):
time.sleep(1)
res=future.result()#future对象下的result为任务的返回值
print('%s 解析结果为%s' %(os.getpid(),len(res)))
if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.sina.com.cn',
'https://www.tmall.com',
'https://www.jd.com',
'https://www.python.org',
'https://www.openstack.org',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
#开启进程数为9的进程池
p=ProcessPoolExecutor(9)
start=time.time()
for url in urls:
# 异步调用:提交完一个任务之后,不在原地等待,而是直接执行下一行代码,会导致任务是并发执行的,,结果futrue对象会在任务运行完毕后自动传给回调函数
future=p.submit(get,url)
#将parse设为回调函数
future.add_done_callback(parse) #parse会在任务运行完毕后自动触发,然后接收一个参数future对象
p.shutdown(wait=True)
print('主',time.time()-start)
print('主',os.getpid())
多线程与多进程相同,只需将p=ProcessPoolExecutor(9)
改为p=ThreadPoolExecutor(9)
就可以了
2. 单线程下实现并发-------协程
目标:
在单线程下实现并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
并发(多个任务看起来是同时执行就是并发):切换+保存状态协程:
协程是单线程实现并发
注意:协程是程序员意淫出来的东西,操作系统里只有进程和线程的概念(操作系统调度的是线程)
在单线程下实现多个任务间遇到IO就切换就可以降低单线程的IO时间,从而最大限度地提升单线程的效率
强调
- python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
- 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
对比操作系统控制线程的切换,用户在单线程内控制协程的切换
- 优点如下
- 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
- 单线程内就可以实现并发的效果,最大限度地利用cpu
- 缺点如下
- 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
- 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
总结
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈
- 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
3. gevent
他是一个第三方的库 , 可以实现在单线程内遇到 IO 任务自动切换
geven是不能直接识别的 需要在整个文件最前面加上一行代码
from gevent import monkey;monkey.patch_all()
步骤 :
- 打补丁
- 导入
from gevent import spawn
- 定义有 IO 操作的任务(函数)
- 将多个任务分别提交给协程
g1=spawn(函数名,函数的参数)
g2=spawn(函数名,函数的参数)
- 等待两个协程运行完 ,
g1.join()
g2.join()
:因为这里是异步调用 , 主线程代码运行完了 , 主线程就会死掉 , 协程里面的任务也不会运行完 就跟着死了, 所以要加上join方法 ,如果主线程要运行很久 , 或者是一个死循环 , 就不用加join方法 ,即上面的的第五步就可以忽略
from gevent import monkey;monkey.patch_all()
from gevent import spawn,joinall #pip3 install gevent
import time
def play(name):
print('%s play 1' %name)
time.sleep(5)
print('%s play 2' %name)
def eat(name):
print('%s eat 1' %name)
time.sleep(3)
print('%s eat 2' %name)
start=time.time()
g1=spawn(play,'王昭锦')
g2=spawn(eat,'王昭锦')
g1.join()
g2.join()
# joinall([g1,g2]) #上面两步可以并成这一步
print('主',time.time()-start)
运行结果如下:
'''
王昭锦 play 1
王昭锦 eat 1
王昭锦 eat 2
王昭锦 play 2
主 5.009259223937988
'''
4. 单线程下实现并发的套接字通信
服务端:
from gevent import monkey;monkey.patch_all()
from socket import *
from gevent import spawn
def comunicate(conn):
while True: # 通信循环
try:
data = conn.recv(1024)
if len(data) == 0: break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port, backlog=5):
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip, port))
server.listen(backlog)
while True: # 链接循环
conn, client_addr = server.accept()
print(client_addr)
# 通信
spawn(comunicate,conn)
if __name__ == '__main__':
g1=spawn(server,'127.0.0.1',8080)
g1.join()