Pyppeteer简介
Puppeteer 是 Google 基于 Node.js 开发的一个工具,有了它我们可以通过 JavaScript 来控制 Chrome 浏览器的一些操作,当然也可以用作网络爬虫上,其 API 极其完善,功能非常强大。 而 Pyppeteer 又是什么呢?它实际上是 Puppeteer 的 Python 版本的实现,但他不是 Google 开发的,是一位来自于日本的工程师依据 Puppeteer 的一些功能开发出来的非官方版本。
官方文档: https://miyakogi.github.io/pyppeteer/reference.html
下载
1
|
pip install pyppeteer |
打开网页并截图
1
2
3
4
5
6
7
8
9
|
import asyncio from pyppeteer import launch async def main(): browser = await launch() page = await browser.newPage() await page.goto( 'http://example.com' ) await page.screenshot({ 'path' : 'example.png' }) await browser.close() asyncio.get_event_loop().run_until_complete(main()) |
评估页面上的脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import asyncio from pyppeteer import launch async def main(): browser = await launch() page = await browser.newPage() await page.goto( 'http://example.com' ) await page.screenshot({ 'path' : 'example.png' }) dimensions = await page.evaluate( '''() => { return { width: document.documentElement.clientWidth, height: document.documentElement.clientHeight, deviceScaleFactor: window.devicePixelRatio, } }''' ) print (dimensions) # >>> {'width': 800, 'height': 600, 'deviceScaleFactor': 1} await browser.close() asyncio.get_event_loop().run_until_complete(main()) |
关键字参数的选项
1
2
3
|
{ 'headless' : True } # 默认为True无头 { 'headless' : False } # 改为False变成有头 browser = await launch({ 'headless' : False }) |
选择器
1
2
3
|
Page.querySelector() # CSS选择器 Page.querySelectorAll() # CSS选择器选所有 Page.xpath() # xpath选择器 |
参数Page.evaluate()和Page.querySelectorEval()
添加force_expr=True选项,这会强制pyppeteer将字符串视为表达式。
获取页面内容的示例:
1
2
3
4
5
6
7
8
9
10
11
|
content = await page.evaluate( 'document.body.textContent' , force_expr = True ) import asyncio from pyppeteer import launch async def main(): browser = await launch({ 'headless' : False }) page = await browser.newPage() await page.goto( 'https://www.cnblogs.com/guyouyin123/p/12669430.html#selenium%E9%80%89%E6%8B%A9%E5%99%A8%E9%80%89%E6%8B%A9' ) content = await page.evaluate( 'document.body.textContent' , force_expr = True ) print (content) await browser.close() asyncio.get_event_loop().run_until_complete(main()) |
获取元素内部文本的示例:
1
2
|
element = await page.querySelector('h1') title = await page.evaluate('(element) => element.textContent', element) |
基础用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import asyncio from pyppeteer import launch async def main(): # headless参数设为False,则变成有头模式 # Pyppeteer支持字典和关键字传参,Puppeteer只支持字典传参 # 指定引擎路径 # exepath = r'C:\Users\Administrator\AppData\Local\pyppeteer\pyppeteer\local-chromium\575458\chrome-win32/chrome.exe' # browser = await launch({'executablePath': exepath, 'headless': False, 'slowMo': 30}) browser = await launch( # headless=False, { 'headless' : False } ) page = await browser.newPage() # 设置页面视图大小 await page.setViewport(viewport = { 'width' : 1280 , 'height' : 800 }) # 是否启用JS,enabled设为False,则无渲染效果 await page.setJavaScriptEnabled(enabled = True ) # 超时间见 1000 毫秒 res = await page.goto( 'https://www.toutiao.com/' , options = { 'timeout' : 1000 }) resp_headers = res.headers # 响应头 resp_status = res.status # 响应状态 # 等待 await asyncio.sleep( 2 ) # 第二种方法,在while循环里强行查询某元素进行等待 while not await page.querySelector( '.t' ): pass # 滚动到页面底部 await page.evaluate( 'window.scrollBy(0, document.body.scrollHeight)' ) await asyncio.sleep( 2 ) # 截图 保存图片 await page.screenshot({ 'path' : 'toutiao.png' }) # 打印页面cookies print (await page.cookies()) """ 打印页面文本 """ # 获取所有 html 内容 print (await page.content()) # 在网页上执行js 脚本 dimensions = await page.evaluate(pageFunction = '''() => { return { width: document.documentElement.clientWidth, // 页面宽度 height: document.documentElement.clientHeight, // 页面高度 deviceScaleFactor: window.devicePixelRatio, // 像素比 1.0000000149011612 } }''' , force_expr = False ) # force_expr=False 执行的是函数 print (dimensions) # 只获取文本 执行 js 脚本 force_expr 为 True 则执行的是表达式 content = await page.evaluate(pageFunction = 'document.body.textContent' , force_expr = True ) print (content) # 打印当前页标题 print (await page.title()) # 抓取新闻内容 可以使用 xpath 表达式 """ # Pyppeteer 三种解析方式 Page.querySelector() # 选择器 Page.querySelectorAll() Page.xpath() # xpath 表达式 # 简写方式为: Page.J(), Page.JJ(), and Page.Jx() """ element = await page.querySelector( ".feed-infinite-wrapper > ul>li" ) # 纸抓取一个 print (element) # 获取所有文本内容 执行 js content = await page.evaluate( '(element) => element.textContent' , element) print (content) # elements = await page.xpath('//div[@class="title-box"]/a') elements = await page.querySelectorAll( ".title-box a" ) for item in elements: print (await item.getProperty( 'textContent' )) # <pyppeteer.execution_context.JSHandle object at 0x000002220E7FE518> # 获取文本 title_str = await (await item.getProperty( 'textContent' )).jsonValue() # 获取链接 title_link = await (await item.getProperty( 'href' )).jsonValue() print (title_str) print (title_link) # 关闭浏览器 await browser.close() asyncio.get_event_loop().run_until_complete(main()) import asyncio import pyppeteer from collections import namedtuple Response = namedtuple( "rs" , "title url html cookies headers history status" ) async def get_html(url): browser = await pyppeteer.launch(headless = True , args = [ '--no-sandbox' ]) page = await browser.newPage() res = await page.goto(url, options = { 'timeout' : 3000 }) data = await page.content() title = await page.title() resp_cookies = await page.cookies() # cookie resp_headers = res.headers # 响应头 resp_status = res.status # 响应状态 print (data) print (title) print (resp_headers) print (resp_status) return title if __name__ = = '__main__' : url_list = [ "https://www.toutiao.com/" , "http://jandan.net/ooxx/page-8#comments" , "https://www.12306.cn/index/" ] task = [get_html(url) for url in url_list] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather( * task)) for res in results: print (res) headers = { 'date' : 'Sun, 28 Apr 2019 06:50:20 GMT' , 'server' : 'Cmcc' , 'x-frame-options' : 'SAMEORIGIN\nSAMEORIGIN' , 'last-modified' : 'Fri, 26 Apr 2019 09:58:09 GMT' , 'accept-ranges' : 'bytes' , 'cache-control' : 'max-age=43200' , 'expires' : 'Sun, 28 Apr 2019 18:50:20 GMT' , 'vary' : 'Accept-Encoding,User-Agent' , 'content-encoding' : 'gzip' , 'content-length' : '19823' , 'content-type' : 'text/html' , 'connection' : 'Keep-alive' , 'via' : '1.1 ID-0314217270751344 uproxy-17' } |
模拟输入
1
2
3
4
5
|
# 模拟输入 账号密码 {'delay': rand_int()} 为输入时间 await page. type ( '#TPL_username_1' , "sadfasdfasdf" ) await page. type ( '#TPL_password_1' , "123456789" , ) await page.waitFor( 1000 ) await page.click( "#J_SubmitStatic" ) |
使用 tkinter 获取页面高度 宽度
1
2
3
4
5
6
7
8
|
def screen_size(): """使用tkinter获取屏幕大小""" import tkinter tk = tkinter.Tk() width = tk.winfo_screenwidth() height = tk.winfo_screenheight() tk.quit() return width, height |
爬取京东商城
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
import requests from bs4 import BeautifulSoup from pyppeteer import launch import asyncio def screen_size(): """使用tkinter获取屏幕大小""" import tkinter tk = tkinter.Tk() width = tk.winfo_screenwidth() height = tk.winfo_screenheight() tk.quit() return width, height async def main(url): # browser = await launch({'headless': False, 'args': ['--no-sandbox'], }) browser = await launch({ 'args' : [ '--no-sandbox' ], }) page = await browser.newPage() width, height = screen_size() await page.setViewport(viewport = { "width" : width, "height" : height}) await page.setJavaScriptEnabled(enabled = True ) await page.setUserAgent( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299' ) await page.goto(url) # await asyncio.sleep(2) await page.evaluate( 'window.scrollBy(0, document.body.scrollHeight)' ) await asyncio.sleep( 1 ) # content = await page.content() li_list = await page.xpath( '//*[@id="J_goodsList"]/ul/li' ) # print(li_list) item_list = [] for li in li_list: a = await li.xpath( './/div[@class="p-img"]/a' ) detail_url = await (await a[ 0 ].getProperty( "href" )).jsonValue() promo_words = await (await a[ 0 ].getProperty( "title" )).jsonValue() a_ = await li.xpath( './/div[@class="p-commit"]/strong/a' ) p_commit = await (await a_[ 0 ].getProperty( "textContent" )).jsonValue() i = await li.xpath( './div/div[3]/strong/i' ) price = await (await i[ 0 ].getProperty( "textContent" )).jsonValue() em = await li.xpath( './div/div[4]/a/em' ) title = await (await em[ 0 ].getProperty( "textContent" )).jsonValue() item = { "title" : title, "detail_url" : detail_url, "promo_words" : promo_words, 'p_commit' : p_commit, 'price' : price } item_list.append(item) # print(item) # break # print(content) await page_close(browser) return item_list async def page_close(browser): for _page in await browser.pages(): await _page.close() await browser.close() msg = "手机" url = "https://search.jd.com/Search?keyword={}&enc=utf-8&qrst=1&rt=1&stop=1&vt=2&wq={}&cid2=653&cid3=655&page={}" task_list = [] for i in range ( 1 , 6 ): page = i * 2 - 1 url = url. format (msg, msg, page) task_list.append(main(url)) loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather( * task_list)) # print(results, len(results)) for i in results: print (i, len (i)) # soup = BeautifulSoup(content, 'lxml') # div = soup.find('div', id='J_goodsList') # for i, li in enumerate(div.find_all('li', class_='gl-item')): # if li.select('.p-img a'): # print(li.select('.p-img a')[0]['href'], i) # print(li.select('.p-price i')[0].get_text(), i) # print(li.select('.p-name em')[0].text, i) # else: # print("#" * 200) # print(li) |
爬取淘宝网
taobao.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
import asyncio import time from pyppeteer.launcher import launch from alifunc import mouse_slide, input_time_random from exe_js import js1, js3, js4, js5 def screen_size(): """使用tkinter获取屏幕大小""" import tkinter tk = tkinter.Tk() width = tk.winfo_screenwidth() height = tk.winfo_screenheight() tk.quit() return width, height async def main(username, pwd, url): browser = await launch({ 'headless' : False , 'args' : [ '--no-sandbox' ], }, userDataDir = './userdata' , args = [ '--window-size=1366,768' ]) page = await browser.newPage() width, height = screen_size() await page.setViewport(viewport = { "width" : width, "height" : height}) await page.setUserAgent( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299' ) await page.goto(url) await page.evaluate(js1) await page.evaluate(js3) await page.evaluate(js4) await page.evaluate(js5) pwd_login = await page.querySelector( '.J_Quick2Static' ) # print(await (await pwd_login.getProperty('textContent')).jsonValue()) await pwd_login.click() await page. type ( '#TPL_username_1' , username, { 'delay' : input_time_random() - 50 }) await page. type ( '#TPL_password_1' , pwd, { 'delay' : input_time_random()}) await page.screenshot({ 'path' : './headless-test-result.png' }) time.sleep( 2 ) slider = await page.Jeval( '#nocaptcha' , 'node => node.style' ) # 是否有滑块 if slider: print ( '出现滑块情况判定' ) await page.screenshot({ 'path' : './headless-login-slide.png' }) flag = await mouse_slide(page = page) if flag: print (page.url) await page.keyboard.press( 'Enter' ) await get_cookie(page) else : await page.keyboard.press( 'Enter' ) await page.waitFor( 20 ) await page.waitForNavigation() try : global error error = await page.Jeval( '.error' , 'node => node.textContent' ) except Exception as e: error = None print (e, "错啦" ) finally : if error: print ( '确保账户安全重新入输入' ) else : print (page.url) # 可继续网页跳转 已经携带 cookie # await get_search(page) await get_cookie(page) await page_close(browser) async def page_close(browser): for _page in await browser.pages(): await _page.close() await browser.close() async def get_search(page): # https://s.taobao.com/search?q={查询的条件}&p4ppushleft=1%2C48&s={每页 44 条 第一页 0 第二页 44}&sort=sale-desc await page.goto( "https://s.taobao.com/search?q=气球" ) await asyncio.sleep( 5 ) # print(await page.content()) # 获取登录后cookie async def get_cookie(page): res = await page.content() cookies_list = await page.cookies() cookies = '' for cookie in cookies_list: str_cookie = '{0}={1};' str_cookie = str_cookie. format (cookie.get( 'name' ), cookie.get( 'value' )) cookies + = str_cookie print (cookies) # 将cookie 放入 cookie 池 以便多次请求 封账号 利用cookie 对搜索内容进行爬取 return cookies if __name__ = = '__main__' : username = 'username' pwd = 'password' url = "https://login.taobao.com/member/login.jhtml?spm=a21bo.2017.754894437.1.5af911d9qqVAb1&f=top&redirectURL=https%3A%2F%2Fwww.taobao.com%2F" loop = asyncio.get_event_loop() loop.run_until_complete(main(username, pwd, url)) |
exe_js.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
js1 = '''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''' js2 = '''() => { alert ( window.navigator.webdriver ) }''' js3 = '''() => { window.navigator.chrome = { runtime: {}, // etc. }; }''' js4 = '''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''' js5 = '''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''' |
alifunc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
from retrying import retry # 错误自动重试 import time, asyncio, random def retry_if_result_none(result): return result is None @retry (retry_on_result = retry_if_result_none, ) async def mouse_slide(page = None ): await asyncio.sleep( 3 ) try : await page.hover( '#nc_1_n1z' ) await page.mouse.down() await page.mouse.move( 2000 , 0 , { 'delay' : random.randint( 1000 , 2000 )}) await page.mouse.up() except Exception as e: print (e, ' :slide login False' ) return None else : await asyncio.sleep( 3 ) slider_again = await page.Jeval( '.nc-lang-cnt' , 'node => node.textContent' ) if slider_again ! = '验证通过' : return None else : await page.screenshot({ 'path' : './headless-slide-result.png' }) print ( '验证通过' ) return 1 def input_time_random(): return random.randint( 100 , 151 ) |
利用获取到的cookie 爬取搜索内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import json import requests import re # 设置 cookie 池 随机发送请求 通过 pyppeteer 获取 cookie cookie = '_tb_token_=edd7e354dee53;t=fed8f4ca1946ca1e73223cfae04bc589;sg=20f;cna=2uJSFdQGmDMCAbfFWXWAC4Jv;cookie2=1db6cd63ad358170ea13319f7a862c33;_l_g_=Ug%3D%3D;v=0;unb=3150916610;skt=49cbfd5e01d1b550;cookie1=BxVRmD3sh19TaAU6lH88bHw5oq%2BgcAGcRe229Hj5DTA%3D;csg=cf45a9e2;uc3=vt3=F8dByEazRMnQZDe%2F9qI%3D&id2=UNGTqfZ61Z3rsA%3D%3D&nk2=oicxO%2BHX4Pg%3D&lg2=U%2BGCWk%2F75gdr5Q%3D%3D;existShop=MTU1Njg3MDM3MA%3D%3D;tracknick=%5Cu7433150322;lgc=%5Cu7433150322;_cc_=V32FPkk%2Fhw%3D%3D;mt=ci=86_1;dnk=%5Cu7433150322;_nk_=%5Cu7433150322;cookie17=UNGTqfZ61Z3rsA%3D%3D;tg=0;enc=tThHs6Sn3BAl8v1fu3J4tMpgzA1n%2BLzxjib0vDAtGsXJCb4hqQZ7Z9fHIzsN0WghdcKEsoeKz6mBwPUpyzLOZw%3D%3D;JSESSIONID=B3F383B3467EC60F8CA425935232D395;l=bBMspAhrveV5732DBOCanurza77OSIRYYuPzaNbMi_5pm6T_G4QOlC03xF96VjfRswYBqh6Mygv9-etuZ;hng=CN%7Czh-CN%7CCNY%7C156;isg=BLi41Q8PENDal3xUVsA-aPbfiWaKiRzB6vcTu_IpBPOmDVj3mjHsO86vxUQYW9SD;uc1=cookie16=W5iHLLyFPlMGbLDwA%2BdvAGZqLg%3D%3D&cookie21=W5iHLLyFeYZ1WM9hVnmS&cookie15=UIHiLt3xD8xYTw%3D%3D&existShop=false&pas=0&cookie14=UoTZ4ttqLhxJww%3D%3D&tag=8&lng=zh_CN;thw=cn;x=e%3D1%26p%3D*%26s%3D0%26c%3D0%26f%3D0%26g%3D0%26t%3D0;swfstore=34617;' headers = { 'cookie' : cookie, "user-agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36" } rep = requests.get( 'https://s.taobao.com/search?q=手机&p4ppushleft=1%2C48&s=0&sort=sale-desc ' , headers = headers) rep.encoding = 'utf-8' res = rep.text print (res) r = re. compile (r 'g_page_config = (.*?)g_srp_loadCss' , re.S) res = r.findall(res) data = res[ 0 ].strip().rstrip( ';' ) dic_data = json.loads(data) auctions = dic_data.get( 'mods' )[ 'itemlist' ][ 'data' ][ 'auctions' ] # print(auctions,len(auctions)) for item in auctions[ 1 :]: print (item) break |
针对iframe 的操作
page.frames 获取所有的 iframe 列表 需要判断操作的是哪一个 iframe 跟操作 page 一样操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from pyppeteer import launch import asyncio async def main(url): w = await launch({ 'headless' : False , 'args' : [ '--no-sandbox' ], }) page = await w.newPage() await page.setViewport({ "width" : 1366 , 'height' : 800 }) await page.goto(url) try : await asyncio.sleep( 1 ) frame = page.frames print (frame) # 需要找到是哪一个 frame title = await frame[ 1 ].title() print (title) await asyncio.sleep( 1 ) login = await frame[ 1 ].querySelector( '#switcher_plogin' ) print (login) await login.click() await asyncio.sleep( 20 ) except Exception as e: print (e, "EEEEEEEEE" ) for _page in await w.pages(): await _page.close() await w.close() asyncio.get_event_loop().run_until_complete(main( "https://i.qq.com/?rd=1" )) # asyncio.get_event_loop().run_until_complete(main("https://www.gushici.com/")) |
未完传送门:pyppeteer执行js绕过webdriver监测方法下
以上就是pyppeteer执行js绕过webdriver监测方法上的详细内容,更多关于pyppeteer执行js绕过webdriver监测的资料请关注服务器之家其它相关文章!
原文链接:https://www.cnblogs.com/guyouyin123/p/12758373.html