协程概念
子程序/函数:在所有语言中都是层级调用,比如A调用B,在B执行的过程中又可以调用C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。是通过栈实现的,一个线程就是执行一个子程序,子程序调用总是一个入口,一次返回,调用的顺序是明确的
概述:看上去也是子程序,但执行过程中,在子程序的内部可中断,然后转而执行别的子程序,不是函数调用。有点类似于CPU中断
'''
def C():
print("C--start")
print("C--end")
def B():
print("B--start")
C()
print("B--end")
def A():
print("A--start")
B()
print("A--end")
A()
'''
def A():
print(1)
print(2)
print(3)
def B():
print("x")
print("y")
print("z")
'''
1
2
x
y
z
3
执行出这个结果
但是A中是没有B的调用
看起来A、B执行过程有点像线程,但协程的特点在于是一个线程执行
与线程相比,协程的执行效率极高,因为只有一个线程,也不存在同时写变量的冲突,在协程中共享资源不加锁,只需要判断状态
'''
自己写生成器
为了理解协程,首先理解生成器
class My_gen(object):
def __init__(self):
self.number = 0
pass
def __next__(self):
self.number += 1
if self.number < 10:
return self.number
else:
raise StopIteration()
def f():
number = 0
while number < 10:
number +=1
yield number
g = My_gen()
print(type(g))
print(g.__next__())
print(g.__next__())
print(g.__next__())
print(g.__next__())
print(next(g))
协程原理
'''
Python对协程的支持是通过generator实现的
'''
def run():
print(1)
yield 10
print(2)
yield 20
print(3)
yield 30
#协程的最简单风格,控制函数的阶段执行,节约线程或者进程的切换
#返回值是一个生成器
m = run()
print(next(m))
print(next(m))
print(next(m))
向yield发送消息
import threading
def f():
while True:
x = yield
print(x)
def func(g):
next(g)
g.send(10)
next(g)
g.send(12)
if __name__ == '__main__':
g = f()
print(threading.enumerate())
func(g)
输出结果:
[<_MainThread(MainThread, started 1856)>]
10
None
12
数据传输
def run():
#空变量,存储的作用data始终为空
data = ""
r = yield data
#r = a
print(1, r, data)
r = yield "aa"
#r = b
print(2, r, data)
r = yield "bb"
#r = c
print(3, r, data)
r = yield "cc"
m = run()
#启动m
m.send(None) # ''
print(m.send("a")) # 'aa'
print(m.send("b")) # 'bb'
print(m.send("c")) # 'cc'
print("******")
输出结果:
1 a
aa
2 b
bb
3 c
cc
******
生产者与消费者
def product(c):
c.send(None)
for i in range(5):
print("生产者产生数据%d"%i)
r = c.send(str(i))
print("消费者消费了数据%s"%r)
c.close()
def customer():
data = ""
while True:
n = yield data
if not n:
return
print("消费者消费了%s"%n)
data = "200"
c = customer()
product(c)
买汉堡
import time
def buy():
"""
购买
:return:
"""
result = '666'
while True:
n = yield result
if not n:
return None
time.sleep(1)
result = '{n}购买成功'.format(n=n)
def make(buy):
next(buy)
n = 0
while n < 5:
n = n + 1
r = buy.send(n)
print(r)
buy.close()
if __name__ == '__main__':
buy = buy()
make(buy)
异步协程
import asyncio
# 创建一个类, 做汉堡的一个类
class Hamburger:
@classmethod
def make(cls, num, *args, **kwargs):
"""
创建指定个数的对象
"""
hamburgers = []
for i in range(num):
hamburgers.append(cls.__new__(cls))
return hamburgers
# 创建实例,一个实例就相当于一个汉堡
hamburgers = Hamburger.make(5)
print(hamburgers)
# 开始 已经做好了5个汉堡
async def make_hamburger(n):
# 统计做的汉堡的个数
count = 0
while True:
if len(hamburgers) == 0:
# 如果没有汉堡, 根据请求做汉堡
await ask_for_hamburger()
# 取出一个汉堡给顾客
hamburger = hamburgers.pop()
yield hamburger
count += 1
if count == n:
break
async def ask_for_hamburger():
await asyncio.sleep(4)
hamburgers.extend(Hamburger.make(3))
async def buy_hamburgers():
bucket = []
async for hamburger in make_hamburger(12):
bucket.append(hamburger)
print('买到第{0}个汉堡'.format(hamburger))
# 事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(buy_hamburgers())
loop.close()
使用异步协程实现批量下载
"""
- aiohttp : 异步请求库
- asuyncio : 异步协成库
- re : 正则表达式
"""
import re
import asyncio
import aiohttp
import async_timeout
start_url = "http://www.geyanw.com"
crawled_urls = [] # 已经爬过的URL
ALLOW_HOST = "geyanw.com"
async def start_reqeust():
"""
根据start_url获取网页的内容,从start_url网页中
提取新的url,并把url加到队列中.
:return:
"""
# 以异步的方法获得网页内容
content = await get_content(start_url)
if content is not None:
urls = parse_html(content)
for url in urls:
q.put_nowait(url)
async def get_content(url):
"""
根据url获取网页内容
:param url:网页地址
:return:
"""
async with aiohttp.ClientSession() as session:
try:
with async_timeout.timeout(5):
async with session.get(url) as response:
print(response)
if response.status == 200:
print("{url}下载成功".format(url=url))
html = await response.read()
return html
else:
return None
except Exception as e:
print(e)
return None
def parse_html(html):
"""
从html中提取新的url
:param html:网页源码
:return:网页源码中的所有url -->list
"""
pattern = re.compile(r'href="(/[a-z0-9-/]+(.html)?)"')
urls = pattern.findall(html.decode('GBK'))
return [start_url + url[0] for url in urls]
async def work(q):
"""
从队列q中取出url,以异步的方式发送一个get请求,
从返回的response中获取网页源码,从网页源码中提取url,
如果url是新的没有爬过的有效url,把url添加到队列中.
:param q:
:return:
"""
while not q.empty():
# 从队列q中取出url
url = await q.get()
if url not in crawled_urls:
# 以异步的方式发送一个get请求,
content = await get_content(url)
if content is not None:
urls = parse_html(content)
for url in urls:
# 如果url是新的没有爬过的有效url,把url添加到队列中.
if url not in crawled_urls and ALLOW_HOST in url:
q.put_nowait(url)
else:
print("{url}请求失败!".format(url=url))
if __name__ == '__main__':
q = asyncio.Queue()
loop = asyncio.get_event_loop()
init_req = start_reqeust()
task = asyncio.ensure_future(init_req)
loop.run_until_complete(task)
works = []
# 创建多个协程
for i in range(50):
# 创建一个协程
c = asyncio.ensure_future(work(q))
# 把协程对象添加到列表
works.append(c)
loop.run_until_complete(asyncio.wait(works))
loop.close()
print(crawled_urls)