简介:
Puppeteer(中文翻译”木偶”) 是 Google Chrome 团队官方的无界面(Headless)Chrome 工具,它是一个 Node 库,提供了一个高级的 API 来控制 DevTools协议上的无头版 Chrome 。也可以配置为使用完整(非无头)的 Chrome。Chrome 素来在浏览器界稳执牛耳,因此,Chrome Headless 必将成为 web 应用自动化测试的行业标杆。使用 Puppeteer,相当于同时具有 Linux 和 Chrome 双端的操作能力,应用场景可谓非常之多。此仓库的建立,即是尝试各种折腾使用 GoogleChrome Puppeteer;以期在好玩的同时,学到更多有意思的操作。
Pyppeteer是基于Puppeteer协议,用python实现的异步框架。本人在此基础上,做了简易封装。
核心代码:
ExtendPageOrFrame类定义了更方便的方法:
class ExtendPageOrFrame(pyppeteer.page.Page):
def __init__(self, page_frame_obj):
self.page = page_frame_obj
self.all_pending_requests = []
self.all_finished_requests = []
def __getattr__(self, name):
if name not in ['page', 'all_pending_requests', 'all_finished_requests',
'click', 'set_value', 'set_value',
'set_el_radio', 'get_el_radio', 'waitForAllXhrFinished']:
return getattr(self.page, name)
else:
return getattr(self, name)
async def click(self, selector_or_xpath, idx=0, wait_after=1):
if selector_or_xpath is None:
return
elif selector_or_xpath.startswith('/'):
elem = await self.page.xpath(selector_or_xpath)
await elem[idx].click()
else:
await self.page.click(selector_or_xpath)
await self.page.waitFor(wait_after * 1000)
async def get_value(self, selector_or_xpath, idx=0):
if selector_or_xpath is None:
return
else:
if selector_or_xpath.startswith('/'):
elements = await self.page.xpath(selector_or_xpath)
element = elements[idx]
else:
element = await self.page.querySelector(selector_or_xpath)
tag_name = await self.page.evaluate('(element) => element.tagName', element)
# print(tag_name)
if tag_name == 'INPUT':
ele_type = await self.page.evaluate('(element) => element.getAttribute("type")', element)
if ele_type == 'checkbox':
# print(ele_type)
value = await self.page.evaluate('(element) => element.checked', element)
else:
value = await self.page.evaluate('(element) => element.value', element)
elif tag_name == 'SELECT':
value = await self.page.evaluate('''(element) => {
var index=element.selectedIndex;
return element.options[index].text;
}''', element)
else:
value = await self.page.evaluate('(element) => element.textContent', element)
return value
async def set_value(self, selector_or_xpath, value, idx=0):
if selector_or_xpath is None:
return
else:
if selector_or_xpath.startswith('/'):
elements = await self.page.xpath(selector_or_xpath)
element = elements[idx]
else:
element = await self.page.querySelector(selector_or_xpath)
tag_name = await self.page.evaluate('(element) => element.tagName', element)
if tag_name == 'INPUT':
ele_type = await self.page.evaluate('(element) => element.getAttribute("type")', element)
if ele_type == 'checkbox':
if value is True:
await self.page.evaluate('(element) => {element.checked=true;}', element)
else:
await self.page.evaluate('(element) => {element.checked=false;}', element)
else:
await element.type(value)
elif tag_name == 'SELECT':
await self.page.evaluate('''(element, value) => {
for(var i=0; i<element.options.length; i++){
if(element.options[i].innerHTML == value){
element.options[i].selected = true;
break;
}
}
}''', element, value)
else:
pass
async def set_el_radio(self, text, idx=0, wait_after=1):
radios = await self.page.xpath(
'//label[contains(@class,"el-radio")]/span[@class="el-radio__label" and contains(text(), "{}")]'.format(
text))
await radios[idx].click()
await self.page.waitFor(wait_after * 1000)
async def get_el_radio(self, idx=0):
value = await self.get_value(
'//label[contains(@class,"el-radio") and contains(@class,"is-checked")]/span[@class="el-radio__label"]',
idx)
return value.strip()
async def wait_for_all_xhr_finished(self):
def request_handler(r):
if r.resourceType == 'xhr':
self.all_pending_requests.append(r)
def response_handler(fr):
if fr.resourceType == 'xhr':
self.all_finished_requests.append(fr)
self.page.on('request', request_handler)
self.page.on('requestfinished', response_handler)
last_several_cnt = []
loop_times = 0
max_tries = 6
while 1:
loop_times += 1
if len(last_several_cnt) == max_tries:
last_several_cnt.pop(0)
cur_num = len(self.all_pending_requests)
last_several_cnt.append(cur_num)
if len(set(last_several_cnt)) == 1 and loop_times >= max_tries:
break
await asyncio.sleep(0.1)
# print('last 10 requests number:', last_several_cnt)
while 1:
if len(self.all_pending_requests) == len(self.all_finished_requests):
break
else:
await asyncio.sleep(0.1)
self.all_pending_requests = []
self.all_finished_requests = []
self.remove_listener('request', request_handler)
self.remove_listener('requestfinished', response_handler)
BaseWebPage类启动浏览器、初始化
class BaseWebPage(object):
def __init__(self, start_url, headless=False):
self.url = start_url
self.headless = headless
async def navigate(self):
browser = await pyppeteer.launch(headless=self.headless, args=['--start-maximized'])
page = (await browser.pages())[0]
page = ExtendPageOrFrame(page)
current_screen = await page.evaluate('''() => {
return {
width: window.screen.availWidth,
height: window.screen.availHeight,
};
}''')
await page.setViewport(current_screen)
await page.goto(self.url)
await page.wait_for_all_xhr_finished()
return page, browser
测试代码(使用上述自定义类):
async def test_async():
page, browser = await BaseWebPage('http://ip:port/static/html/login.html').navigate()
await page.set_value('input[name=username]', '')
print(await page.get_value('input[name=username]'))
await page.type('input[name=password]', '')
await page.click('button[type=submit]', wait_after=3)
# await page.waitForNavigation({'waitUntil': 'networkidle2'})
await page.click('//span[text()=\'策略中心\']')
await page.click('//a[text()=\'优先拣选策略\']', wait_after=3)
iframe = await page.xpath('//iframe[@class="iframe"]')
frame = await iframe[0].contentFrame()
frame = ExtendPageOrFrame(frame)
await frame.click('//table[@class="el-table__body"]/tbody/tr[1]/td[12]/div/button/span[contains(text(),"编辑")]')
await frame.set_el_radio('每天')
print(await frame.get_el_radio())
await asyncio.sleep(2)
await browser.close()
def test():
loop = asyncio.get_event_loop()
loop.run_until_complete(test_async())
if __name__ == '__main__':
test()