0.爬虫简介
网络爬虫,是一种按照一定的规则,自动地抓取万维网信息的程序或者脚本
1.HTTP请求处理
爬虫抓数据的第一步,就是把网址的http数据抓取下来
1.1 requests库
1)相关的准备
pip install requests
安装chrome应用:PostMan
抓包/查看请求的详细信息:Charles Fiddler
2)requests库的相关操作
import requests
from PIL import Image
from io import BytesIO
def simple_get(url):
print('simple_get')
r = requests.get(url)
print(dir(r))
print('\t' + str(r.status_code))
print('\t' + str(r.headers))
print('\t' + str(r.encoding))
print('\t' + r.text)
def get_with_header(url):
ua = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36'
r = requests.get(url, headers={'user-agent':ua})
print(r.text)
def get_with_parameters(url):
r = requests.get("http://httpbin.org/get", params={'key1':'hello', 'key2':'world'})
print(dir(r))
print(r.url) # The url used for request
print(r.text)
def get_json_data(url):
r = requests.get(url)
jdata = r.json()
print(type(jdata))
print(jdata)
def get_using_cookie(url):
r = requests.get(url, cookies={'key1':'hello', 'key2':'world'})
for k, v in r.cookies.items():
print(k, v)
#拿到图片后,通过BytesIO将其变为二进制的数据流
def get_image(url):
r = requests.get(url)
image = Image.open(BytesIO(r.content))
image.save('test.jpg')
if __name__ == '__main__':
# simple_get('http://httpbin.org/get')
# get_with_header('http://httpbin.org/get')
# get_with_parameters('http://httpbin.org/get')
# get_json_data('http://apis.juhe.cn/mobile/get?phone=13429667914&key=05ff96fc7ca7ee1ff48ee772bddd0320')
cookies = get_using_cookie('http://www.baidu.com')
get_image('http://n.sinaimg.cn/news/20170717/vrST-fyiavtv9510530.jpg')
3)编码的问题
python中,我们使用decode()和encode()来进行解码和编码
在python中,使用unicode类型作为编码的基础类型。即
decode encode
str ---------> unicode --------->str
import requests
r = requests.get('http://www.ip138.com/post/')
print(r.text)
print(r.encoding)
b = r.text.encode('ISO-8859-1')
t = b.decode('gbk')
with open('post.html', 'w+', encoding='gbk') as f:
f.write(t)
2.XML/Json解析
2.1 Json
- 内置json库
- dump(s):把dict转化为json文本
- load(s):把json文本转换为dict
2.2 XML
- DOM方式:代码实现简单,容易理解,但是需要把整个xml读入内存,解析为树,因此占用内存大,解析慢
- SAX方式:边读边解析,占用内存小,解析快,缺点是需要自己处理事件,代码比较难写和理解
#1 sample.xml
<collection shelf="New Arrivals">
<movie title="Enemy Behind">
<type>War, Thriller</type>
<format>DVD</format>
<year>2003</year>
<rating>PG</rating>
<stars>10</stars>
<description>Talk about a US-Japan war</description>
</movie>
<movie title="Transformers">
<type>Anime, Science Fiction</type>
<format>DVD</format>
<year>1989</year>
<rating>R</rating>
<stars>8</stars>
<description>A schientific fiction</description>
</movie>
<movie title="Trigun">
<type>Anime, Action</type>
<format>DVD</format>
<episodes>4</episodes>
<rating>PG</rating>
<stars>10</stars>
<description>Vash the Stampede!</description>
</movie>
<movie title="Ishtar">
<type>Comedy</type>
<format>VHS</format>
<rating>PG</rating>
<stars>2</stars>
<description>Viewable boredom</description>
</movie>
</collection>
#2 dom.py
# coding:utf-8
import xml.dom.minidom
from xml.dom.minidom import parse
DT = xml.dom.minidom.parse('sample.xml')
COLLECTION = DT.documentElement
if COLLECTION.hasAttribute('shelf'):
print('Root element : %s' % COLLECTION.getAttribute('shelf'))
# Get all films and print detail information
MOVIES = COLLECTION.getElementsByTagName('movie')
# 打印每部电影的详细信息
for movie in MOVIES:
type_ = movie.getElementsByTagName('type')[0]
format_ = movie.getElementsByTagName('format')[0]
rating = movie.getElementsByTagName('rating')[0]
description = movie.getElementsByTagName('description')[0]
print('*****Movie*****')
print('\tTitle: %s' % movie.getAttribute('title'))
print('\tType: %s' % type_.childNodes[0].data)
print('\tFormat: %s' % format_.childNodes[0].data)
print('\tRating: %s' % rating.childNodes[0].data)
print('\tDescription: %s' % description.childNodes[0].data)
#3 sax.py
# coding:utf-8
import xml.sax
class MovieHandler(xml.sax.ContentHandler):
''' SAX parser '''
def __init__(self):
self.CurrentData = ''
self.type = ''
self.format = ''
self.year = ''
self.rating = ''
self.stars = ''
self.description = ''
# Processe element start
def startElement(self, tag, attributes):
self.CurrentData = tag
if tag == 'movie':
print('*****Movie*****')
title = attributes['title']
print('\tTitle:%s' % title)
# Processe element end
def endElement(self, tag):
if self.CurrentData == 'type':
print('\tType:%s' % self.type)
elif self.CurrentData == 'format':
print('\tFormat:%s' % self.format)
elif self.CurrentData == 'year':
print('\tYear:%s' % self.year)
elif self.CurrentData == 'rating':
print('\tRating:%s' % self.rating)
elif self.CurrentData == 'stars':
print('\tStars:%s' % self.stars)
elif self.CurrentData == 'description':
print('\tDescription:%s' % self.description)
self.CurrentData = ''
# 内容事件处理
def characters(self, content):
if self.CurrentData == 'type':
self.type = content
elif self.CurrentData == 'format':
self.format = content
elif self.CurrentData == 'year':
self.year = content
elif self.CurrentData == 'rating':
self.rating = content
elif self.CurrentData == 'stars':
self.stars = content
elif self.CurrentData == 'description':
self.description = content
if __name__ == '__main__':
PARSER = xml.sax.make_parser()
PARSER.setFeature(xml.sax.handler.feature_namespaces, 0)
PARSER.setContentHandler(MovieHandler())
PARSER.parse('sample.xml')
3.CSS/Xpath定位器基础知识
4.静态网页解析
'''
If you are using python 3.x,
please modify /usr/local/lib/python3.6/site-packages/HTMLParser.py first,
change line 'import markupbase' to 'import _markupbase as markupbase'
'''
from HTMLParser import HTMLParser
class MyHTMLParser(HTMLParser):
def handle_starttag(self, tag, attrs):
print('<%s>' % tag)
def handle_endtag(self, tag):
print('</%s>' % tag)
def handle_startendtag(self, tag, attrs):
print('<%s/>' % tag)
def handle_data(self, data):
print(data)
def handle_comment(self, data):
print('<!-- -->')
def handle_entityref(self, name):
print('&%s;' % name)
def handle_charref(self, name):
print('&#%s;' % name)
parser = MyHTMLParser()
parser.feed('<html><head></head><body><p>Some <a href=\"#\">html</a> tutorial...<br>END</p></body></html>')
pip install beautifulsoup4 (需要html5lib或lxml)
""" Get ZipCodes """
# coding:utf-8
import requests
from bs4 import BeautifulSoup
BSLIB = 'html5lib'
BASE_URL = 'http://www.ip138.com'
UA = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36'
HEADERS = {'user-agent':UA}
def handle_zip_code(province, url):
r = requests.get(url, headers=HEADERS)
s = BeautifulSoup(r.text.encode("iso-8859-1").decode("gbk"), BSLIB)
print(province)
rows = s.select('table.t12 > tbody > tr')
for i in range(1, len(rows)):
items = rows[i].select('td')
a_1 = items[0].text.strip()
if a_1:
print('\t%s, %s' % (a_1, items[1].text.strip()))
if len(items) >= 4:
a_2 = items[3].text.strip()
if a_2:
print('\t%s, %s' % (a_2, items[4].text.strip()))
if __name__ == '__main__':
R = requests.get(BASE_URL + '/post/', headers=HEADERS)
TEXT = R.text.encode('iso-8859-1').decode('gbk')
S = BeautifulSoup(TEXT, BSLIB)
for item in S.select('div#newAlexa > table > tbody > tr > td > a'):
handle_zip_code(item.text, BASE_URL + item.get('href'))
#print(item.text, BASE_URL + item.get('href'))
""" 获取杭州男装衬衫数据 """
# coding:utf-8
import requests
from bs4 import BeautifulSoup
BSLIB = 'html5lib'
UA = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36'
HEADERS = {'user-agent':UA}
def get_products_info(url):
r = requests.get(url, headers=HEADERS)
s = BeautifulSoup(r.text, BSLIB)
pages = s.select('div.pagem.product_list_pager > a')
next_page = pages[-2].get('href')
for item in s.select('ul.item > li > a'):
url = item.get('href').strip()
if '/shop/' in url:
continue
name = item.select('p.item_title')[0].text.strip()
code = item.select('p.item_info > span')[0].text.strip()
price = item.select('div.item_info > span')[0].text.strip()
print(url)
print('\t' + name)
print('\t' + code)
print('\t' + price)
return next_page if next_page != url else None
if __name__ == '__main__':
NEXT_PAGE = 'http://www.17huo.com/list-man-0-50011123-0--2m0.html'
pages = 0
while NEXT_PAGE:
NEXT_PAGE = get_products_info(NEXT_PAGE)
pages += 1
if pages == 3:
break
5.多进程、多线程并发
import os
from multiprocessing import Process
def run_proc(v_1, v_2):
print('Run child process %s %s (%s)...' % (v_1, v_2, os.getpid()))
if __name__ == '__main__':
p = Process(target=run_proc, args=('hello', 'world'))
print('Start child process.')
p.start()
p.join()
print('Child process end.')
import os
from multiprocessing import Pool
from multiprocessing import Queue
from multiprocessing import cpu_count
def run_proc(v):
if v % 5 == 0:
print('Run child process %s (%s)...' % (v, os.getpid()))
if __name__ == '__main__':
print(cpu_count())
p = Pool(cpu_count())
for i in range(500):
p.apply_async(run_proc, args=(i, ))
p.close()
p.join()
import os
from multiprocessing import Process
from multiprocessing import Queue
def run_proc(q):
while not q.empty():
v = q.get(True)
print('Run child process %s (%s)...' % (v, os.getpid()))
if __name__ == '__main__':
q = Queue()
for i in range(100):
q.put(i)
p_1 = Process(target=run_proc, args=(q, ))
p_2 = Process(target=run_proc, args=(q, ))
print('Start')
p_1.start()
p_2.start()
p_1.join()
p_2.join()
print('End')
import threading
def func(v_1, v_2):
print(threading.current_thread().name, ':', v_1, v_2)
t = threading.Thread(target=func, name='t1', args=('hello', 'world'))
t.start()
t.join()
import grequests
import threadpool
from multiprocessing import cpu_count
def run_thread(v):
if v % 5 == 0:
print('value = ' + str(v))
if __name__ == '__main__':
requests = threadpool.makeRequests(run_thread, [i for i in range(500)])
pool = threadpool.ThreadPool(cpu_count())
[pool.putRequest(req) for req in requests]
pool.wait()
import threading
from multiprocessing import cpu_count
balance = 0
lock = threading.Lock()
def change_it(n):
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
lock.acquire()
try:
change_it(n)
finally:
lock.release()
print(balance)
if __name__ == '__main__':
threads = []
for i in range(cpu_count()):
t = threading.Thread(target=run_thread, args=(500, ))
t.start()
threads.append(t)
for t in threads:
t.join()
6.高效异步请求、协程
6.1 同步和异步IO的区别
6.2 异步是怎样实现的
#1 异步示例代码
import grequests
urls = ['http://news.sina.com.cn',
'http://www.baidu.com',
'http://www.so.com',
'http://www.csdn.com']
rs = [grequests.get(url) for url in urls]
for r in grequests.map(rs):
print(r.url, r.status_code, r.encoding)
#2. grequests怎么实现异步的
# https://github.com/kennethreitz/grequests/blob/master/grequests.py
get = partial(AsyncRequest, 'GET')
def map(requests, stream=False, size=None, exception_handler=None, gtimeout=None):
......
requests = list(requests)
pool = Pool(size) if size else None
jobs = [send(r, pool, stream=stream) for r in requests]
gevent.joinall(jobs, timeout=gtimeout)
6.3 协程
7.如何应对反爬虫
- 伪装成浏览器
- 操作速度不要太快
- 使用多个ip (https://github.com/qiyeboy/IPProxys)
- 使用不同账号
8.JS与Selenium Web驱动
Selenium 是一个用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。支持的浏览器包括IE,Mozilla Firefox,Safari,Google Chrome,Opera等。这个工具的主要功能包括:测试与浏览器的兼容性——测试你的应用程序看是否能够很好得工作在不同浏览器和操作系统之上。
- 如何滚动到页面底部
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
- 文本框输入,用Selenium登陆豆瓣
- 如何截图
- 更多知识
https://www.gitbook.com/book/wangxiwei/webdriver-python/details
# coding: utf-8
import time
from selenium import webdriver
if __name__ == '__main__':
DRIVER = webdriver.Chrome() # DRIVER大写是为了满足Pylint静态代码检查规范
try:
DRIVER.get('https://www.douban.com/')
EMAIL = DRIVER.find_element_by_css_selector('input#form_email') # 用户名输入框
PASSWORD = DRIVER.find_element_by_css_selector('input#form_password') # 密码输入框
LOGIN = DRIVER.find_element_by_css_selector('input.bn-submit')
EMAIL.send_keys('regdzz.lin@gmail.com')
PASSWORD.send_keys('Julyedu123!')
LOGIN.click() # 提交
time.sleep(15)
DRIVER.save_screenshot('douban.jpg')
except Exception as e:
pass
finally:
if DRIVER:
DRIVER.quit()
#1.util.py
# coding: utf-8
from selenium import webdriver
import importlib
import os
import time
def sleep(seconds = 1):
time.sleep(seconds)
def create_chrome_driver():
options = webdriver.ChromeOptions()
options.add_argument('--ignore-certificate-errors')
driver = webdriver.Chrome(chrome_options = options)
driver.implicitly_wait(5)
driver.maximize_window()
return driver
def find_element_by_css_selector(item, selector):
try:
return item.find_element_by_css_selector(selector)
except:
return None
def find_elements_by_css_selector(item, selector):
try:
return item.find_elements_by_css_selector(selector)
except:
return []
#2.bally.py
# coding: utf-8
import sys
sys.path.append('../') #util.py在上一层目录
import util
PREFIX = 'www.bally.cn'
def parse(driver, url):
products = []
driver.get(url)
driver.execute_script('window.scrollBy(0,50000)')
util.sleep(3)
elements = util.find_elements_by_css_selector(driver, 'a.js-producttile_link')
for element in elements:
products.append(element.get_attribute('href').strip())
return products
if __name__ == '__main__':
# Sample: http://www.bally.cn/zh/%E9%80%89%E8%B4%AD%E7%94%B7%E5%A3%AB%E7%B3%BB%E5%88%97/%E9%9E%8B%E5%B1%A5/%E9%9D%B4%E5%AD%90/
driver = util.create_chrome_driver()
for product in parse(driver, sys.argv[1]):
print(product)
driver.quit()
#3.将chromdriver.exe放在C:\Program Files (x86)\Google\Chrome\Application,并且在环境变量path加入这个变量,重启计算机
#4.执行如下:
python bally.py "http://www.bally.cn/zh/%E9%80%89%E8%B4%AD%E7%94%B7%E5%A3%AB%E7%B3%BB%E5%88%97/%E9%9E%8B%E5%B1%A5/%E9%9D%B4%E5%AD%90/"
#1. bulgari.py
# coding: utf-8
import sys
sys.path.append('../')
import util
PREFIX = 'www.bulgari.com'
def parse(driver, url):
products = []
driver.get(url)
while True:
elements = util.find_elements_by_css_selector(driver, 'a.bul-btn-more')
cont = False
for element in elements:
if element.is_displayed():
cont = True
driver.execute_script('arguments[0].click();', element)
util.sleep(3)
if not cont:
break
elements = util.find_elements_by_css_selector(driver, 'a.product-link')
for element in elements:
products.append(element.get_attribute('href').strip())
return products
if __name__ == '__main__':
# Sample: https://www.bulgari.com/zh-cn/products.html?root_level=317&sign=594
driver = util.create_chrome_driver()
for product in parse(driver, sys.argv[1]):
print(product)
driver.quit()
#2. python bulgari.py "https://www.bulgari.com/zh-cn/products.html?root_level=317&sign=594"
9.模拟登陆
#手动输入验证码
#登陆后的内容写入了contacts.txt
import requests
import html5lib
import re
from bs4 import BeautifulSoup
s = requests.Session()
url_login = 'https://accounts.douban.com/login'
formdata = {
'redir':'https://www.douban.com',
'form_email': 'regdzz.lin@gmail.com',
'form_password': 'Julyedu123!'}
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36'}
r = s.post(url_login, data=formdata, headers = headers)
content = r.text
soup = BeautifulSoup(content, 'html5lib')
captcha = soup.find('img', id='captcha_image')
if captcha:
captcha_url = captcha['src']
re_captcha_id = r'<input type="hidden" name="captcha-id" value="(.*?)"/'
captcha_id = re.findall(re_captcha_id, content)
print(captcha_id)
print(captcha_url)
captcha_text = input('Please input the captcha:')
formdata['captcha-solution'] = captcha_text
formdata['captcha-id'] = captcha_id
r = s.post(url_login, data = formdata, headers = headers)
with open('contacts.txt', 'w+', encoding = 'utf-8') as f:
f.write(r.text)
#利用cookie登陆
import requests
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36'}
cookies = {'cookie': 'bid=qD8OOq_UqUU; ll="108296"; gr_user_id=25a4f7f8-1ffc-4c28-a6ba-83de7be683ae; viewed="24703171_22993903"; _vwo_uuid_v2=C89857F2349D6C77F158F6E66E92AD1D|c8dc205a22f2ec9bc9d286f6d52caeb7; _pk_ref.100001.8cb4=%5B%22%22%2C%22%22%2C1500439696%2C%22https%3A%2F%2Fwww.google.co.jp%2F%22%5D; ps=y; __yadk_uid=CcYA3jvmrNmFkHObkXxZbYfIEN7TW7PL; ap=1; __utmt=1; dbcl2="163994952:qm6cKevpMog"; _ga=GA1.2.482272683.1493810699; _gid=GA1.2.278142237.1500440795; _gat_UA-7019765-1=1; ck=hoCg; _pk_id.100001.8cb4=d37b273bf333df1d.1500364347.2.1500442245.1500364347.; _pk_ses.100001.8cb4=*; push_noty_num=0; push_doumail_num=0; __utma=30149280.482272683.1493810699.1500364350.1500439699.16; __utmb=30149280.15.10.1500439699; __utmc=30149280; __utmz=30149280.1500364350.15.11.utmcsr=google|utmccn=(organic)|utmcmd=organic|utmctr=(not%20provided); __utmv=30149280.16399'}
url = 'http://www.douban.com'
r = requests.get(url, cookies = cookies, headers = headers)
with open('douban_2.txt', 'wb+') as f:
f.write(r.content)
10.动态语言特性的使用
""" Async and dynamic """
import grequests
import threadpool
from multiprocessing import Queue
from multiprocessing import cpu_count
stocks = ['sh600005',
'sh600006',
'sh600007',
'sh600008',
'sh600009',
'sh600010']
data = []
rs = [grequests.get('http://hq.sinajs.cn/list=' + stock) for stock in stocks]
for r in grequests.map(rs):
text = r.text[4:]
exec(text)
items = (eval('hq_str_' + r.url[-8:])).split(',')
data.append({'name':items[0], 'date':items[-3], 'open':items[1]})
for d in data:
print('name: %s, open: %s, date: %s' % (d['name'], d['open'], d['date']))
11.反射机制的使用
""" Load modules dynamically """
# coding:utf-8
import importlib
import os
import traceback
import util
def load_modulers(folder, name=''):
if folder.endswith('/'):
folder = folder[:-1]
modules = {}
for f in os.listdir(folder):
if not os.path.isfile(folder + '/' + f):
continue
if name and (f != (name + '.py')):
continue
if f.endswith('.py'):
module = importlib.import_module(folder + '.' + f[:-3])
modules[module.PREFIX] = module
print("Module for '%s' loaded" % module.PREFIX)
return modules
if __name__ == '__main__':
modules = load_modulers('mod')
urls = ['http://www.bally.cn/zh/%E9%80%89%E8%B4%AD%E7%94%B7%E5%A3%AB%E7%B3%BB%E5%88%97/%E9%9E%8B%E5%B1%A5/%E9%9D%B4%E5%AD%90/',
'https://www.bulgari.com/zh-cn/products.html?root_level=317&sign=594',
'http://www.baidu.com']
for url in urls:
driver = None
try:
prefix = url.split('//')[1].split('/')[0]
if prefix in modules:
print(url)
driver = util.create_chrome_driver()
products = modules[prefix].parse(driver, url)
for product in products:
print('\t' + product)
else:
raise Exception("Can't parse %s" % prefix)
except Exception as e:
print('%s\n%s' % (e, traceback.format_exc()))
finally:
if driver:
driver.quit()
12.简易ORM框架
- 安装XAMPP(包含Apache+MySQL+PHP+PERL),是一个功能强大的建站集成软件包
- 启动Mysql和Apache,点击Mysql的Admin即可进入浏览器http://localhost/phpmyadmin/
- 报错:peewee.ImproperlyConfigured: Mysqlldb or pymysql must be installed
需要安装一下:pip install PyMySQL - 程序里面使用到元类,参考下面
#1. 配置文件ds_config.py
# coding:utf-8
DB = {'host':'localhost',
'user':'root',
'passwd':'',
'database':'july_sample',
'charset':'utf8'}
QUEUE = {'host':'localhost',
'port':'6379',
'prefix':'dks'}
TS_NEW = 'new'
TS_INPROGRESS = 'inprogress'
TS_FINISHED = 'finished'
JS_NEW = 'new'
JS_FAILED = 'failed'
JS_FINISHED = 'finished'
#2.创建数据表
# coding:utf-8
import inspect
import datetime as dt
import sys
import traceback
from peewee import BooleanField # peewee相关模块
from peewee import CharField
from peewee import DateTimeField
from peewee import IntegerField
from peewee import Model
from peewee import MySQLDatabase
from peewee import TextField
class Task(Model):
status = CharField()
jobs = IntegerField(default=0)
finished = IntegerField(default=0)
unfinished = IntegerField(default=0)
failed = IntegerField(default=0)
created_at = DateTimeField(default=dt.datetime.now)
updated_at = DateTimeField(default=dt.datetime.now)
class Meta:
database = None
class Job(Model):
task_id = IntegerField()
source_id = IntegerField()
status = CharField()
message = TextField(default='')
created_at = DateTimeField(default=dt.datetime.now)
updated_at = DateTimeField(default=dt.datetime.now)
class Meta:
database = None
class Source(Model):
url = TextField()
enabled = BooleanField(default=True)
created_at = DateTimeField(default=dt.datetime.now)
updated_at = DateTimeField(default=dt.datetime.now)
class Meta:
database = None
class Result(Model):
content = TextField()
source_id = IntegerField()
created_at = DateTimeField(default=dt.datetime.now)
updated_at = DateTimeField(default=dt.datetime.now)
class Meta:
database = None
def init_database(config):
db = MySQLDatabase(host=config['host'],
user=config['user'],
passwd=config['passwd'],
database=config['database'],
charset=config['charset'])
for var in dir(sys.modules[__name__]): # 动态加载各个table类
if var != 'Model':
obj = eval(var)
if inspect.isclass(obj) and issubclass(obj, Model):
'''
这里_meta是访问元类
请参考:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/0014319106919344c4ef8b1e04c48778bb45796e0335839000
'''
obj._meta.database = db
return db
if __name__ == '__main__':
DATABASE = None
try:
import ds_config as dc
DATABASE = init_database(dc.DB)
TABLES = []
for var in dir(sys.modules[__name__]): # 动态加载各个table类
if var != 'Model':
obj = eval(var)
if inspect.isclass(obj) and issubclass(obj, Model):
TABLES.append(obj)
DATABASE.connect()
DATABASE.create_tables(TABLES, safe=True) # safe=True避免重复创建
except Exception as e:
print('%s\n%s' % (e, traceback.print_exc()))
finally:
if DATABASE:
DATABASE.close()
13.基于Redis的分布式队列
- redis的下载,下载zip包,直接解压
- 使用redis实现队列服务的优势
1)比数据库效率高
2)结构简单
3)分布式更友好
#file: ds_queue.py
# coding:utf-8
import redis
class Queue:
def __init__(self, name, config):
self._db = redis.Redis(host=config['host'], port=config['port'])
self.key = '%s:%s' % (config['prefix'], name)
def qsize(self):
return self._db.llen(self.key)
def empty(self):
return self.qsize() == 0
def put(self, item):
self._db.rpush(self.key, item)
def get(self, block=True, timeout=None):
if block: # 阻塞
item = self._db.blpop(self.key, timeout=timeout)
if item:
item = item[1]
else: # 非阻塞
item = self._db.lpop(self.key)
return item
'''
作业
实现get_k完成一次获取k个元素,
要求支持阻塞和非阻塞调用,
如果元素少于k个就返回所有元素。
'''
14.分布式爬虫框架设计
Question:需要考虑的点?
- 任务分发
- 结果回写
- 错误重试
- 动态扩展
解释如下:
1)调度器将task和job信息写到队列里面去
2)工人是客户端, 客户端从任务队列里面拿到任务 ,执行完,将结果写会到数据库
15.分布式爬虫框架代码分析
- ds_config.py 配置信息(数据库和redis queue的配置信息)
- ds_database.py 声明表格结构,初始化数据库连接(管理数据:指明哪些数据是要爬的,爬的结果是怎样,出错信息)
- ds_queue.py 分布式队列
- ds_scheduler.py 任务分发器:创建新的task,以及把task相关的job分发到队列上去
- ds_worker.py 客户端,具体负责去爬
- spider_sample.py 爬虫的实现
执行步骤如下:
step1.(数据库)创建数据库表,并插入要爬的数据源
python ds_database.py
执行data.sql里面的脚本,将数据插入到表格source中
step2.(redis队列)启动redis-server
切换到redis路径,执行redis-server.exe
step3.(调度器)
创建任务:python ds_scheduler.py -a create
数据库的task和job表会有数据
redis-cli.exe执行keys *,可以看到"dks:1";lrange dks:1 0 -1可以看到job
重试任务:python ds_scheduler.py -a retry -t 1
当python ds_worker.py执行的时候ctrl-c被中断,redis里面的队列会被清空,此时可以通过重试,重新将这几个job加入到队列中去,然后重新执行python ds_worker.py重新处理redis队列中的四个任务
step4.(客户端)
python ds_worker.py 会启动四个浏览器去解析source里面的数据源,最终会将每个job的执行状态给行到job表,并将查询到的数据写入result表
ds_worker.py调用的默认爬虫是spider_sample.py,spider_sample.py调用的解释器是parsers文件夹下面的:bally.py和bulgari.py
#数据库表的作用
source表:要抓取的源
task表:要执行的任务,启动一个task,会有一系列的job
job表:每一个job是和source绑定的,job生成之后,会扔到redis队列,每个client会从redis队列里面拿job去执行;每个job是否执行成功会写到job里面去
result表:写最终的结果
#data.sql
delete from result;
delete from source;
delete from task;
insert into source(url, enabled, created_at, updated_at) values('http://www.bally.cn/zh/%E9%80%89%E8%B4%AD%E7%94%B7%E5%A3%AB%E7%B3%BB%E5%88%97/%E9%9E%8B%E5%B1%A5/%E9%9D%B4%E5%AD%90/', True, now(), now());
insert into source(url, enabled, created_at, updated_at) values('http://www.bally.cn/zh/%E9%80%89%E8%B4%AD%E7%94%B7%E5%A3%AB%E7%B3%BB%E5%88%97/%E9%9E%8B%E5%B1%A5/%E9%A9%BE%E9%A9%B6%E9%9E%8B/', True, now(), now());
insert into source(url, enabled, created_at, updated_at) values('https://www.bulgari.com/zh-cn/products.html?root_level=861&product_detail_one=228#', True, now(), now());
insert into source(url, enabled, created_at, updated_at) values('https://www.bulgari.com/zh-cn/products.html?root_level=861&product_detail_one=218', True, now(), now());
# ds_config.py
# coding:utf-8
DB = {'host':'localhost',
'user':'root',
'passwd':'',
'database':'july_sample',
'charset':'utf8'}
QUEUE = {'host':'localhost',
'port':'6379',
'prefix':'dks'}
TS_NEW = 'new'
TS_INPROGRESS = 'inprogress'
TS_FINISHED = 'finished'
JS_NEW = 'new'
JS_FAILED = 'failed'
JS_FINISHED = 'finished'
#ds_database.py
# coding:utf-8
import inspect
import datetime as dt
import sys
import traceback
from peewee import BooleanField # peewee相关模块
from peewee import CharField
from peewee import DateTimeField
from peewee import IntegerField
from peewee import Model
from peewee import MySQLDatabase
from peewee import TextField
class Task(Model):
status = CharField()
jobs = IntegerField(default=0)
finished = IntegerField(default=0)
unfinished = IntegerField(default=0)
failed = IntegerField(default=0)
created_at = DateTimeField(default=dt.datetime.now)
updated_at = DateTimeField(default=dt.datetime.now)
class Meta:
database = None
class Job(Model):
task_id = IntegerField()
source_id = IntegerField()
status = CharField()
message = TextField(default='')
created_at = DateTimeField(default=dt.datetime.now)
updated_at = DateTimeField(default=dt.datetime.now)
class Meta:
database = None
class Source(Model):
url = TextField()
enabled = BooleanField(default=True)
created_at = DateTimeField(default=dt.datetime.now)
updated_at = DateTimeField(default=dt.datetime.now)
class Meta:
database = None
class Result(Model):
content = TextField()
source_id = IntegerField()
created_at = DateTimeField(default=dt.datetime.now)
updated_at = DateTimeField(default=dt.datetime.now)
class Meta:
database = None
def init_database(config):
db = MySQLDatabase(host=config['host'],
user=config['user'],
passwd=config['passwd'],
database=config['database'],
charset=config['charset'])
for var in dir(sys.modules[__name__]): # 动态加载各个table类
if var != 'Model':
obj = eval(var)
if inspect.isclass(obj) and issubclass(obj, Model):
'''
这里_meta是访问元类
请参考:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/0014319106919344c4ef8b1e04c48778bb45796e0335839000
'''
obj._meta.database = db
return db
if __name__ == '__main__':
DATABASE = None
try:
import ds_config as dc
DATABASE = init_database(dc.DB)
TABLES = []
for var in dir(sys.modules[__name__]): # 动态加载各个table类
if var != 'Model':
obj = eval(var)
if inspect.isclass(obj) and issubclass(obj, Model):
TABLES.append(obj)
DATABASE.connect()
DATABASE.create_tables(TABLES, safe=True) # safe=True避免重复创建
except Exception as e:
print('%s\n%s' % (e, traceback.print_exc()))
finally:
if DATABASE:
DATABASE.close()
#ds_queue.py
# coding:utf-8
import redis
class Queue:
def __init__(self, name, config):
self._db = redis.Redis(host=config['host'], port=config['port'])
self.key = '%s:%s' % (config['prefix'], name)
def qsize(self):
return self._db.llen(self.key)
def empty(self):
return self.qsize() == 0
def put(self, item):
self._db.rpush(self.key, item)
def get(self, block=True, timeout=None):
if block: # 阻塞
item = self._db.blpop(self.key, timeout=timeout)
if item:
item = item[1]
else: # 非阻塞
item = self._db.lpop(self.key)
return item
'''
作业
实现get_k完成一次获取k个元素,
要求支持阻塞和非阻塞调用,
如果元素少于k个就返回所有元素。
'''
#ds_scheduler.py
# coding:utf-8
import argparse
import datetime as datetime
import importlib
import ds_database as dd
import ds_queue as dq
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-c',
'--config',
default='ds_config') # 指定配置文件,不要加.py后缀名。
parser.add_argument('-a',
'--action',
default='create') # 工作类型
parser.add_argument('-t',
'--task_id',
default=0,
type=int) # 指定工作相关的task_id
return parser.parse_args()
def create_task(config):
task = dd.Task(status=config.TS_NEW)
task.save()
queue = dq.Queue(task.id, config.QUEUE) # 创建Job队列
# select * from source where enabled = True
sources = dd.Source.select().where(dd.Source.enabled)
# 创建job并分发到队列
for source in sources:
job = dd.Job(task_id=task.id, source_id=source.id, status=config.JS_NEW)
job.save()
queue.put({'id':job.id, 'source_id':source.id, 'url':source.url})
task.status = config.TS_INPROGRESS
task.jobs = len(sources)
task.save()
print('Task %d is scheduled with %d jobs.' % (task.id, task.jobs))
def view_task(task_id, config):
r = dd.Task.select().where(dd.Task.id == task_id)
if r:
task = r.get()
if task.status == config.TS_INPROGRESS:
# 这里看明白后自己再实现一遍
jobs = dd.Job.select().where(dd.Job.task_id == task_id)
unfinished = 0
finished = 0
failed = 0
for job in jobs:
if job.status == config.JS_FINISHED:
finished += 1
elif job.status == config.JS_FAILED:
failed += 1
elif job.status == config.JS_NEW:
unfinished += 1
task.unfinished = unfinished
task.finished = finished
task.failed = failed
if (task.jobs == (finished + failed)) and (unfinished == 0):
task.status = config.TS_FINISHED
task.save()
print('%d jobs for task %d: %d finished, %d failed.' % (task.jobs, task.id, task.finished, task.failed))
elif task.status == config.TS_FINISHED:
print('%d jobs for task %d: %d finished, %d failed.' % (task.jobs, task.id, task.finished, task.failed))
elif task.status == config.TS_NEW:
print('Task %d not scheduled yet.' % task_id)
else:
raise Exception('Task %d not found.' % task_id)
def retry(task_id, config):
r = dd.Task.select().where(dd.Task.id == task_id)
if r:
task = r.get()
queue = dq.Queue(task.id, config.QUEUE) # 找到task对应的队列
jobs = dd.Job.select().where((dd.Job.task_id == task_id) & (dd.Job.status != config.JS_FINISHED))
for job in jobs:
source = dd.Source.select().where(dd.Source.id == job.source_id).get()
queue.put({'id':job.id, 'source_id':source.id, 'url':source.url})
job.status = config.JS_NEW
job.save()
task.status = config.TS_INPROGRESS
task.failed = 0
task.unfinished = len(jobs)
task.save()
print('Retry %d jobs of task %d' % (len(jobs), task_id))
else:
raise Exception('Task %d not found.' % task_id)
if __name__ == '__main__':
ARGS = parse_args()
CONFIG = importlib.import_module(ARGS.config)
dd.init_database(CONFIG.DB)
if ARGS.action == 'create':
create_task(CONFIG)
elif ARGS.action == 'view':
view_task(ARGS.task_id, CONFIG)
elif ARGS.action == 'retry':
retry(ARGS.task_id, CONFIG)
else:
raise Exception('Unknown action: %s' % ARGS.action)
#ds_worker.py
# coding:utf-8
import argparse
import datetime as dt
import importlib
import multiprocessing
import threading
import time
import traceback
import ds_database as dd
import ds_queue as dq
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-c',
'--config',
default='ds_config') # 指定配置文件,不要加.py后缀名。
parser.add_argument('-s',
'--spider',
help='',
default='spider_sample') # 指定爬虫实现文件,不要加.py后缀名。
parser.add_argument('-t',
'--thread_count',
help='',
default=0,
type=int) # 指定线程数
return parser.parse_args()
def woker(spider, database, config):
while True:
try:
for task in dd.Task.select().where(dd.Task.status == config.TS_INPROGRESS):
print('Process task %d.' % task.id)
queue = dq.Queue(task.id, config.QUEUE)
while not queue.empty():
job = queue.get(False)
if job:
parse_result = spider.parse(eval(job), config)
content = parse_result['content']
if content:
result = dd.Result.select().where(dd.Result.source_id == parse_result['source_id'])
if result: # 更新
result = result.get()
result.updated_at = dt.datetime.now()
else: # 插入新记录
result = dd.Result()
result.source_id = parse_result['source_id']
result.content = content
result.save()
# 更新job状态
job = dd.Job.select().where(dd.Job.id == parse_result['id']).get()
job.status = parse_result['status']
job.message = parse_result['message']
job.updated_at = dt.datetime.now()
job.save()
else:
pass # job为空什么都不要做
time.sleep(15)
except Exception as e:
print('%s\n%s' % (e, traceback.format_exc()))
if __name__ == '__main__':
ARGS = parse_args()
THREAD_COUNT = ARGS.thread_count
CONFIG = importlib.import_module(ARGS.config)
SPIDER = importlib.import_module(ARGS.spider) # 动态加载爬虫实现
DATABASE = dd.init_database(CONFIG.DB)
threads = []
for i in range(multiprocessing.cpu_count() if THREAD_COUNT <= 0 else THREAD_COUNT):
threads.append(threading.Thread(target=woker, args=(SPIDER, DATABASE, CONFIG)))
for t in threads:
t.daemon = True # 不然无法Ctrl-C终止程序
t.start()
try:
while True:
time.sleep(100)
except KeyboardInterrupt:
exit()
#spider_sample.py
# coding:utf-8
import sys
import traceback
import util
PARSERS = util.load_parsers('parsers')
def parse(job, config):
driver = None
result = {'id':job['id'],
'source_id':job['source_id'],
'status':config.JS_FAILED, # job状态默认失败
'content':'',
'message':''}
try:
url = job['url']
prefix = url.split('//')[1].split('/')[0]
if prefix in PARSERS:
driver = util.create_chrome_driver()
content = PARSERS[prefix](driver, url)
result['content'] = content
result['status'] = config.JS_FINISHED
else:
raise Exception('No parser for %s.' % url)
except Exception as e:
result['message'] = '%s\n%s' % (e, traceback.format_exc())
finally:
if driver:
driver.quit()
return result