爬虫基础

0.爬虫简介

网络爬虫,是一种按照一定的规则,自动地抓取万维网信息的程序或者脚本

1.HTTP请求处理

爬虫抓数据的第一步,就是把网址的http数据抓取下来

1.1 requests库

1)相关的准备
pip install requests
安装chrome应用:PostMan
抓包/查看请求的详细信息:Charles Fiddler

2)requests库的相关操作
import requests
from PIL import Image
from io import BytesIO

def simple_get(url):
    print('simple_get')
    r = requests.get(url)
    print(dir(r))
    print('\t' + str(r.status_code))
    print('\t' + str(r.headers))
    print('\t' + str(r.encoding))
    print('\t' + r.text)

def get_with_header(url):
    ua = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36'
    r = requests.get(url, headers={'user-agent':ua})
    print(r.text)

def get_with_parameters(url):
    r = requests.get("http://httpbin.org/get", params={'key1':'hello', 'key2':'world'})
    print(dir(r))
    print(r.url) # The url used for request
    print(r.text)

def get_json_data(url):
    r = requests.get(url)
    jdata = r.json()
    print(type(jdata))
    print(jdata)

def get_using_cookie(url):
    r = requests.get(url, cookies={'key1':'hello', 'key2':'world'})
    for k, v in r.cookies.items():
        print(k, v)

#拿到图片后,通过BytesIO将其变为二进制的数据流
def get_image(url):
    r = requests.get(url)
    image = Image.open(BytesIO(r.content)) 
    image.save('test.jpg')

if __name__ == '__main__':
    # simple_get('http://httpbin.org/get')
    # get_with_header('http://httpbin.org/get')
    # get_with_parameters('http://httpbin.org/get')
    # get_json_data('http://apis.juhe.cn/mobile/get?phone=13429667914&key=05ff96fc7ca7ee1ff48ee772bddd0320')
    cookies = get_using_cookie('http://www.baidu.com')
    get_image('http://n.sinaimg.cn/news/20170717/vrST-fyiavtv9510530.jpg')

3)编码的问题
python中,我们使用decode()和encode()来进行解码和编码
在python中,使用unicode类型作为编码的基础类型。即
             decode              encode
        str ---------> unicode --------->str


import requests

r = requests.get('http://www.ip138.com/post/')
print(r.text)
print(r.encoding)
b = r.text.encode('ISO-8859-1')
t = b.decode('gbk')
with open('post.html', 'w+', encoding='gbk') as f:
    f.write(t)

2.XML/Json解析

2.1 Json

  • 内置json库
  • dump(s):把dict转化为json文本
  • load(s):把json文本转换为dict

2.2 XML

  • DOM方式:代码实现简单,容易理解,但是需要把整个xml读入内存,解析为树,因此占用内存大,解析慢
  • SAX方式:边读边解析,占用内存小,解析快,缺点是需要自己处理事件,代码比较难写和理解
#1 sample.xml
<collection shelf="New Arrivals">
    <movie title="Enemy Behind">
        <type>War, Thriller</type>
        <format>DVD</format>
        <year>2003</year>
        <rating>PG</rating>
        <stars>10</stars>
        <description>Talk about a US-Japan war</description>
    </movie>
    <movie title="Transformers">
        <type>Anime, Science Fiction</type>
        <format>DVD</format>
        <year>1989</year>
        <rating>R</rating>
        <stars>8</stars>
        <description>A schientific fiction</description>
    </movie>
    <movie title="Trigun">
        <type>Anime, Action</type>
        <format>DVD</format>
        <episodes>4</episodes>
        <rating>PG</rating>
        <stars>10</stars>
        <description>Vash the Stampede!</description>
    </movie>
    <movie title="Ishtar">
        <type>Comedy</type>
        <format>VHS</format>
        <rating>PG</rating>
        <stars>2</stars>
        <description>Viewable boredom</description>
    </movie>
</collection>




#2 dom.py
# coding:utf-8

import xml.dom.minidom
from xml.dom.minidom import parse

DT = xml.dom.minidom.parse('sample.xml')
COLLECTION = DT.documentElement
if COLLECTION.hasAttribute('shelf'):
   print('Root element : %s' % COLLECTION.getAttribute('shelf'))
# Get all films and print detail information
MOVIES = COLLECTION.getElementsByTagName('movie')
# 打印每部电影的详细信息
for movie in MOVIES:
    type_ = movie.getElementsByTagName('type')[0]
    format_ = movie.getElementsByTagName('format')[0]
    rating = movie.getElementsByTagName('rating')[0]
    description = movie.getElementsByTagName('description')[0]
    print('*****Movie*****')
    print('\tTitle: %s' % movie.getAttribute('title'))
    print('\tType: %s' % type_.childNodes[0].data)
    print('\tFormat: %s' % format_.childNodes[0].data)
    print('\tRating: %s' % rating.childNodes[0].data)
    print('\tDescription: %s' % description.childNodes[0].data)


#3 sax.py
# coding:utf-8

import xml.sax

class MovieHandler(xml.sax.ContentHandler):
    ''' SAX parser '''
    def __init__(self):
        self.CurrentData = ''
        self.type = ''
        self.format = ''
        self.year = ''
        self.rating = ''
        self.stars = ''
        self.description = ''

    # Processe element start
    def startElement(self, tag, attributes):
        self.CurrentData = tag
        if tag == 'movie':
            print('*****Movie*****')
            title = attributes['title']
            print('\tTitle:%s' % title)

    # Processe element end
    def endElement(self, tag):
        if self.CurrentData == 'type':
            print('\tType:%s' % self.type)
        elif self.CurrentData == 'format':
            print('\tFormat:%s' % self.format)
        elif self.CurrentData == 'year':
            print('\tYear:%s' % self.year)
        elif self.CurrentData == 'rating':
            print('\tRating:%s' % self.rating)
        elif self.CurrentData == 'stars':
            print('\tStars:%s' % self.stars)
        elif self.CurrentData == 'description':
            print('\tDescription:%s' % self.description)
        self.CurrentData = ''

    # 内容事件处理
    def characters(self, content):
        if self.CurrentData == 'type':
            self.type = content
        elif self.CurrentData == 'format':
            self.format = content
        elif self.CurrentData == 'year':
            self.year = content
        elif self.CurrentData == 'rating':
            self.rating = content
        elif self.CurrentData == 'stars':
            self.stars = content
        elif self.CurrentData == 'description':
            self.description = content
  
if __name__ == '__main__':
   PARSER = xml.sax.make_parser()
   PARSER.setFeature(xml.sax.handler.feature_namespaces, 0)
   PARSER.setContentHandler(MovieHandler())
   PARSER.parse('sample.xml')

3.CSS/Xpath定位器基础知识

http://www.jianshu.com/p/c2ddb7a226c2

4.静态网页解析

'''
If you are using python 3.x,
please modify /usr/local/lib/python3.6/site-packages/HTMLParser.py first,
change line 'import markupbase' to 'import _markupbase as markupbase'
'''
from HTMLParser import HTMLParser

class MyHTMLParser(HTMLParser):
    def handle_starttag(self, tag, attrs):
        print('<%s>' % tag)

    def handle_endtag(self, tag):
        print('</%s>' % tag)

    def handle_startendtag(self, tag, attrs):
        print('<%s/>' % tag)

    def handle_data(self, data):
        print(data)

    def handle_comment(self, data):
        print('<!-- -->')

    def handle_entityref(self, name):
        print('&%s;' % name)

    def handle_charref(self, name):
        print('&#%s;' % name)

parser = MyHTMLParser()
parser.feed('<html><head></head><body><p>Some <a href=\"#\">html</a> tutorial...<br>END</p></body></html>')
pip install beautifulsoup4 (需要html5lib或lxml)

""" Get ZipCodes """
# coding:utf-8

import requests
from bs4 import BeautifulSoup

BSLIB = 'html5lib'
BASE_URL = 'http://www.ip138.com'
UA = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36'
HEADERS = {'user-agent':UA}

def handle_zip_code(province, url):
    r = requests.get(url, headers=HEADERS)
    s = BeautifulSoup(r.text.encode("iso-8859-1").decode("gbk"), BSLIB)
    print(province)
    rows = s.select('table.t12 > tbody > tr')
    for i in range(1, len(rows)):
        items = rows[i].select('td')
        a_1 = items[0].text.strip()
        if a_1:
            print('\t%s, %s' % (a_1, items[1].text.strip()))
        if len(items) >= 4:
            a_2 = items[3].text.strip()
            if a_2:
                print('\t%s, %s' % (a_2, items[4].text.strip()))

if __name__ == '__main__':
    R = requests.get(BASE_URL + '/post/', headers=HEADERS)
    TEXT = R.text.encode('iso-8859-1').decode('gbk')
    S = BeautifulSoup(TEXT, BSLIB)
    for item in S.select('div#newAlexa > table > tbody > tr > td > a'):
        handle_zip_code(item.text, BASE_URL + item.get('href'))
        #print(item.text, BASE_URL + item.get('href'))
        
""" 获取杭州男装衬衫数据 """
# coding:utf-8

import requests
from bs4 import BeautifulSoup

BSLIB = 'html5lib'
UA = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36'
HEADERS = {'user-agent':UA}

def get_products_info(url):
    r = requests.get(url, headers=HEADERS)
    s = BeautifulSoup(r.text, BSLIB)
    pages = s.select('div.pagem.product_list_pager > a')
    next_page = pages[-2].get('href')
    for item in s.select('ul.item > li > a'):
        url = item.get('href').strip()
        if '/shop/' in url:
            continue
        name = item.select('p.item_title')[0].text.strip()
        code = item.select('p.item_info > span')[0].text.strip()
        price = item.select('div.item_info > span')[0].text.strip()
        print(url)
        print('\t' + name)
        print('\t' + code)
        print('\t' + price)
    return next_page if next_page != url else None

if __name__ == '__main__':
    NEXT_PAGE = 'http://www.17huo.com/list-man-0-50011123-0--2m0.html'
    pages = 0
    while NEXT_PAGE:
        NEXT_PAGE = get_products_info(NEXT_PAGE)
        pages += 1
        if pages == 3:
            break

5.多进程、多线程并发

import os
from multiprocessing import Process

def run_proc(v_1, v_2):
    print('Run child process %s %s (%s)...' % (v_1, v_2, os.getpid()))

if __name__ == '__main__':
    p = Process(target=run_proc, args=('hello', 'world'))
    print('Start child process.')
    p.start()
    p.join()
    print('Child process end.')
import os
from multiprocessing import Pool
from multiprocessing import Queue
from multiprocessing import cpu_count

def run_proc(v):
    if v % 5 == 0:
        print('Run child process %s (%s)...' % (v, os.getpid()))

if __name__ == '__main__':
    print(cpu_count())
    p = Pool(cpu_count())
    for i in range(500):
        p.apply_async(run_proc, args=(i, ))
    p.close()
    p.join()
import os
from multiprocessing import Process
from multiprocessing import Queue

def run_proc(q):
    while not q.empty():
        v = q.get(True)
        print('Run child process %s (%s)...' % (v, os.getpid()))

if __name__ == '__main__':
    q = Queue()
    for i in range(100):
        q.put(i)
    p_1 = Process(target=run_proc, args=(q, ))
    p_2 = Process(target=run_proc, args=(q, ))
    print('Start')
    p_1.start()
    p_2.start()
    p_1.join()
    p_2.join()
    print('End')
import threading

def func(v_1, v_2):
    print(threading.current_thread().name, ':', v_1, v_2)

t = threading.Thread(target=func, name='t1', args=('hello', 'world'))
t.start()
t.join()
import grequests
import threadpool
from multiprocessing import cpu_count

def run_thread(v):
    if v % 5 == 0:
        print('value = ' + str(v))

if __name__ == '__main__':
    requests = threadpool.makeRequests(run_thread, [i for i in range(500)])
    pool = threadpool.ThreadPool(cpu_count())
    [pool.putRequest(req) for req in requests]
    pool.wait()

import threading
from multiprocessing import cpu_count

balance = 0
lock = threading.Lock()

def change_it(n):
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(100000):
        lock.acquire()
        try:
            change_it(n)
        finally:
            lock.release()
    print(balance)


if __name__ == '__main__':
    threads = []
    for i in range(cpu_count()):
        t = threading.Thread(target=run_thread, args=(500, ))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()

6.高效异步请求、协程

6.1 同步和异步IO的区别

async.jpg

6.2 异步是怎样实现的

#1 异步示例代码
import grequests

urls = ['http://news.sina.com.cn',
        'http://www.baidu.com',
        'http://www.so.com',
        'http://www.csdn.com']

rs = [grequests.get(url) for url in urls]
for r in grequests.map(rs):
    print(r.url, r.status_code, r.encoding)


#2. grequests怎么实现异步的
# https://github.com/kennethreitz/grequests/blob/master/grequests.py

get = partial(AsyncRequest, 'GET')

def map(requests, stream=False, size=None, exception_handler=None, gtimeout=None):
    ......
    requests = list(requests)

    pool = Pool(size) if size else None
    jobs = [send(r, pool, stream=stream) for r in requests]
    gevent.joinall(jobs, timeout=gtimeout)

6.3 协程

7.如何应对反爬虫

8.JS与Selenium Web驱动

Selenium 是一个用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。支持的浏览器包括IE,Mozilla Firefox,Safari,Google Chrome,Opera等。这个工具的主要功能包括:测试与浏览器的兼容性——测试你的应用程序看是否能够很好得工作在不同浏览器和操作系统之上。

  • 如何滚动到页面底部

    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

  • 文本框输入,用Selenium登陆豆瓣
  • 如何截图
  • 更多知识

https://www.gitbook.com/book/wangxiwei/webdriver-python/details

# coding: utf-8

import time
from selenium import webdriver

if __name__ == '__main__':
    DRIVER = webdriver.Chrome() # DRIVER大写是为了满足Pylint静态代码检查规范
    try:    
        DRIVER.get('https://www.douban.com/')
        EMAIL = DRIVER.find_element_by_css_selector('input#form_email') # 用户名输入框
        PASSWORD = DRIVER.find_element_by_css_selector('input#form_password') # 密码输入框
        LOGIN = DRIVER.find_element_by_css_selector('input.bn-submit')
        EMAIL.send_keys('regdzz.lin@gmail.com')
        PASSWORD.send_keys('Julyedu123!')
        LOGIN.click() # 提交
        time.sleep(15)
        DRIVER.save_screenshot('douban.jpg')
    except Exception as e:
        pass
    finally:
        if DRIVER:
            DRIVER.quit()
#1.util.py
# coding: utf-8

from selenium import webdriver
import importlib
import os
import time

def sleep(seconds = 1):
    time.sleep(seconds)

def create_chrome_driver():
    options = webdriver.ChromeOptions()
    options.add_argument('--ignore-certificate-errors')
    driver = webdriver.Chrome(chrome_options = options)
    driver.implicitly_wait(5)
    driver.maximize_window()
    return driver

def find_element_by_css_selector(item, selector):
    try:
        return item.find_element_by_css_selector(selector)
    except:
        return None
        
def find_elements_by_css_selector(item, selector):
    try:
        return item.find_elements_by_css_selector(selector)
    except:
        return []


#2.bally.py
# coding: utf-8

import sys
sys.path.append('../')     #util.py在上一层目录
import util

PREFIX = 'www.bally.cn'

def parse(driver, url):
    products = []
    driver.get(url)
    driver.execute_script('window.scrollBy(0,50000)')
    util.sleep(3)
    elements = util.find_elements_by_css_selector(driver, 'a.js-producttile_link')
    for element in elements:
        products.append(element.get_attribute('href').strip())
    return products

if __name__ == '__main__':
    # Sample: http://www.bally.cn/zh/%E9%80%89%E8%B4%AD%E7%94%B7%E5%A3%AB%E7%B3%BB%E5%88%97/%E9%9E%8B%E5%B1%A5/%E9%9D%B4%E5%AD%90/
    driver = util.create_chrome_driver()
    for product in parse(driver, sys.argv[1]):
        print(product)
    driver.quit()

#3.将chromdriver.exe放在C:\Program Files (x86)\Google\Chrome\Application,并且在环境变量path加入这个变量,重启计算机

#4.执行如下:
python bally.py "http://www.bally.cn/zh/%E9%80%89%E8%B4%AD%E7%94%B7%E5%A3%AB%E7%B3%BB%E5%88%97/%E9%9E%8B%E5%B1%A5/%E9%9D%B4%E5%AD%90/"
#1. bulgari.py
# coding: utf-8

import sys
sys.path.append('../')
import util

PREFIX = 'www.bulgari.com'

def parse(driver, url):
    products = []
    driver.get(url)
    while True:
        elements = util.find_elements_by_css_selector(driver, 'a.bul-btn-more')
        cont = False
        for element in elements:
            if element.is_displayed():
                cont = True
                driver.execute_script('arguments[0].click();', element)
                util.sleep(3)
        if not cont:
            break
    elements = util.find_elements_by_css_selector(driver, 'a.product-link')
    for element in elements:
        products.append(element.get_attribute('href').strip())
    return products
    
if __name__ == '__main__':
    # Sample: https://www.bulgari.com/zh-cn/products.html?root_level=317&sign=594
    driver = util.create_chrome_driver()
    for product in parse(driver, sys.argv[1]):
        print(product)
    driver.quit()

#2. python bulgari.py "https://www.bulgari.com/zh-cn/products.html?root_level=317&sign=594"

9.模拟登陆

#手动输入验证码
#登陆后的内容写入了contacts.txt

import requests
import html5lib
import re
from bs4 import BeautifulSoup


s = requests.Session()
url_login = 'https://accounts.douban.com/login'

formdata = {
    'redir':'https://www.douban.com',
    'form_email': 'regdzz.lin@gmail.com',
    'form_password': 'Julyedu123!'}
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36'}

r = s.post(url_login, data=formdata, headers = headers)
content = r.text
soup = BeautifulSoup(content, 'html5lib')
captcha = soup.find('img', id='captcha_image')
if captcha:
    captcha_url = captcha['src']
    re_captcha_id = r'<input type="hidden" name="captcha-id" value="(.*?)"/'
    captcha_id = re.findall(re_captcha_id, content)
    print(captcha_id)
    print(captcha_url)
    captcha_text = input('Please input the captcha:')
    formdata['captcha-solution'] = captcha_text
    formdata['captcha-id'] = captcha_id
    r = s.post(url_login, data = formdata, headers = headers)
with open('contacts.txt', 'w+', encoding = 'utf-8') as f:
    f.write(r.text)
#利用cookie登陆

import requests

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36'}
cookies = {'cookie': 'bid=qD8OOq_UqUU; ll="108296"; gr_user_id=25a4f7f8-1ffc-4c28-a6ba-83de7be683ae; viewed="24703171_22993903"; _vwo_uuid_v2=C89857F2349D6C77F158F6E66E92AD1D|c8dc205a22f2ec9bc9d286f6d52caeb7; _pk_ref.100001.8cb4=%5B%22%22%2C%22%22%2C1500439696%2C%22https%3A%2F%2Fwww.google.co.jp%2F%22%5D; ps=y; __yadk_uid=CcYA3jvmrNmFkHObkXxZbYfIEN7TW7PL; ap=1; __utmt=1; dbcl2="163994952:qm6cKevpMog"; _ga=GA1.2.482272683.1493810699; _gid=GA1.2.278142237.1500440795; _gat_UA-7019765-1=1; ck=hoCg; _pk_id.100001.8cb4=d37b273bf333df1d.1500364347.2.1500442245.1500364347.; _pk_ses.100001.8cb4=*; push_noty_num=0; push_doumail_num=0; __utma=30149280.482272683.1493810699.1500364350.1500439699.16; __utmb=30149280.15.10.1500439699; __utmc=30149280; __utmz=30149280.1500364350.15.11.utmcsr=google|utmccn=(organic)|utmcmd=organic|utmctr=(not%20provided); __utmv=30149280.16399'}
url = 'http://www.douban.com'
r = requests.get(url, cookies = cookies, headers = headers)
with open('douban_2.txt', 'wb+') as f:
    f.write(r.content)

10.动态语言特性的使用

""" Async and dynamic """

import grequests
import threadpool
from multiprocessing import Queue
from multiprocessing import cpu_count

stocks = ['sh600005',
          'sh600006',
          'sh600007',
          'sh600008',
          'sh600009',
          'sh600010']

data = []
rs = [grequests.get('http://hq.sinajs.cn/list=' + stock) for stock in stocks]
for r in grequests.map(rs):
    text = r.text[4:]
    exec(text)
    items = (eval('hq_str_' + r.url[-8:])).split(',')
    data.append({'name':items[0], 'date':items[-3], 'open':items[1]})
for d in data:
    print('name: %s, open: %s, date: %s' % (d['name'], d['open'], d['date']))

11.反射机制的使用

""" Load modules dynamically """
# coding:utf-8

import importlib
import os
import traceback
import util

def load_modulers(folder, name=''):
    if folder.endswith('/'):
        folder = folder[:-1]
    modules = {}
    for f in os.listdir(folder):
        if not os.path.isfile(folder + '/' + f):
            continue
        if name and (f != (name + '.py')):
            continue
        if f.endswith('.py'):
            module = importlib.import_module(folder + '.' + f[:-3])
            modules[module.PREFIX] = module
            print("Module for '%s' loaded" % module.PREFIX)
    return modules

if __name__ == '__main__':
    modules = load_modulers('mod')
    urls = ['http://www.bally.cn/zh/%E9%80%89%E8%B4%AD%E7%94%B7%E5%A3%AB%E7%B3%BB%E5%88%97/%E9%9E%8B%E5%B1%A5/%E9%9D%B4%E5%AD%90/',
             'https://www.bulgari.com/zh-cn/products.html?root_level=317&sign=594',
             'http://www.baidu.com']
    for url in urls:
        driver = None
        try:
            prefix = url.split('//')[1].split('/')[0]
            if prefix in modules:
                print(url)
                driver = util.create_chrome_driver()
                products = modules[prefix].parse(driver, url)
                for product in products:
                    print('\t' + product)
            else:
                raise Exception("Can't parse %s" % prefix)
        except Exception as e:
            print('%s\n%s' % (e, traceback.format_exc()))
        finally:
            if driver:
                driver.quit()

12.简易ORM框架

  • 安装XAMPP(包含Apache+MySQL+PHP+PERL),是一个功能强大的建站集成软件包
  • 启动Mysql和Apache,点击Mysql的Admin即可进入浏览器http://localhost/phpmyadmin/
  • 报错:peewee.ImproperlyConfigured: Mysqlldb or pymysql must be installed
    需要安装一下:pip install PyMySQL
  • 程序里面使用到元类,参考下面

https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/0014319106919344c4ef8b1e04c48778bb45796e0335839000

#1. 配置文件ds_config.py
# coding:utf-8

DB = {'host':'localhost',
      'user':'root',
      'passwd':'',
      'database':'july_sample',
      'charset':'utf8'}

QUEUE = {'host':'localhost',
         'port':'6379',
         'prefix':'dks'}

TS_NEW = 'new'
TS_INPROGRESS = 'inprogress'
TS_FINISHED = 'finished'

JS_NEW = 'new'
JS_FAILED = 'failed'
JS_FINISHED = 'finished'

#2.创建数据表
# coding:utf-8

import inspect
import datetime as dt
import sys
import traceback
from peewee import BooleanField # peewee相关模块
from peewee import CharField
from peewee import DateTimeField
from peewee import IntegerField
from peewee import Model
from peewee import MySQLDatabase
from peewee import TextField

class Task(Model):
    status = CharField()
    jobs = IntegerField(default=0)
    finished = IntegerField(default=0)
    unfinished = IntegerField(default=0)
    failed = IntegerField(default=0)
    created_at = DateTimeField(default=dt.datetime.now)
    updated_at = DateTimeField(default=dt.datetime.now)
    class Meta:
        database = None

class Job(Model):
    task_id = IntegerField()
    source_id = IntegerField()
    status = CharField()
    message = TextField(default='')
    created_at = DateTimeField(default=dt.datetime.now)
    updated_at = DateTimeField(default=dt.datetime.now)
    class Meta:
        database = None

class Source(Model):
    url = TextField()
    enabled = BooleanField(default=True)
    created_at = DateTimeField(default=dt.datetime.now)
    updated_at = DateTimeField(default=dt.datetime.now)
    class Meta:
        database = None

class Result(Model):
    content = TextField()
    source_id = IntegerField()
    created_at = DateTimeField(default=dt.datetime.now)
    updated_at = DateTimeField(default=dt.datetime.now)
    class Meta:
        database = None

def init_database(config):
    db = MySQLDatabase(host=config['host'],
                       user=config['user'],
                       passwd=config['passwd'],
                       database=config['database'],
                       charset=config['charset'])
    for var in dir(sys.modules[__name__]): # 动态加载各个table类
        if var != 'Model':
            obj = eval(var)
            if inspect.isclass(obj) and issubclass(obj, Model):
                '''
                这里_meta是访问元类
                请参考:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/0014319106919344c4ef8b1e04c48778bb45796e0335839000
                '''
                obj._meta.database = db
    return db

if __name__ == '__main__':
    DATABASE = None
    try:
        import ds_config as dc
        DATABASE = init_database(dc.DB)
        TABLES = []
        for var in dir(sys.modules[__name__]): # 动态加载各个table类
            if var != 'Model':
                obj = eval(var)
                if inspect.isclass(obj) and issubclass(obj, Model):
                    TABLES.append(obj)
        DATABASE.connect()
        DATABASE.create_tables(TABLES, safe=True) # safe=True避免重复创建
    except Exception as e:
        print('%s\n%s' % (e, traceback.print_exc()))
    finally:
        if DATABASE:
            DATABASE.close()

13.基于Redis的分布式队列

  • redis的下载,下载zip包,直接解压

https://github.com/MicrosoftArchive/redis/releases

  • 使用redis实现队列服务的优势
    1)比数据库效率高
    2)结构简单
    3)分布式更友好
#file: ds_queue.py

# coding:utf-8

import redis

class Queue:
    def __init__(self, name, config):
        self._db = redis.Redis(host=config['host'], port=config['port'])
        self.key = '%s:%s' % (config['prefix'], name)

    def qsize(self):
        return self._db.llen(self.key)

    def empty(self):
        return self.qsize() == 0

    def put(self, item):
        self._db.rpush(self.key, item)

    def get(self, block=True, timeout=None):
        if block: # 阻塞
            item = self._db.blpop(self.key, timeout=timeout)
            if item:
                item = item[1]
        else: # 非阻塞
            item = self._db.lpop(self.key)
        return item
        
    '''
    作业
    实现get_k完成一次获取k个元素,
    要求支持阻塞和非阻塞调用,
    如果元素少于k个就返回所有元素。
    '''

14.分布式爬虫框架设计

Question:需要考虑的点?

  • 任务分发
  • 结果回写
  • 错误重试
  • 动态扩展
arch
解释如下:
    1)调度器将task和job信息写到队列里面去
    2)工人是客户端,   客户端从任务队列里面拿到任务 ,执行完,将结果写会到数据库

15.分布式爬虫框架代码分析

  • ds_config.py 配置信息(数据库和redis queue的配置信息)
  • ds_database.py 声明表格结构,初始化数据库连接(管理数据:指明哪些数据是要爬的,爬的结果是怎样,出错信息)
  • ds_queue.py 分布式队列
  • ds_scheduler.py 任务分发器:创建新的task,以及把task相关的job分发到队列上去
  • ds_worker.py 客户端,具体负责去爬
  • spider_sample.py 爬虫的实现
执行步骤如下:
step1.(数据库)创建数据库表,并插入要爬的数据源
    python ds_database.py
    执行data.sql里面的脚本,将数据插入到表格source中
step2.(redis队列)启动redis-server
    切换到redis路径,执行redis-server.exe
step3.(调度器)
    创建任务:python ds_scheduler.py -a create 
        数据库的task和job表会有数据
        redis-cli.exe执行keys *,可以看到"dks:1";lrange dks:1 0 -1可以看到job
    重试任务:python ds_scheduler.py -a retry -t 1
        当python ds_worker.py执行的时候ctrl-c被中断,redis里面的队列会被清空,此时可以通过重试,重新将这几个job加入到队列中去,然后重新执行python ds_worker.py重新处理redis队列中的四个任务
step4.(客户端)
    python ds_worker.py 会启动四个浏览器去解析source里面的数据源,最终会将每个job的执行状态给行到job表,并将查询到的数据写入result表
    ds_worker.py调用的默认爬虫是spider_sample.py,spider_sample.py调用的解释器是parsers文件夹下面的:bally.py和bulgari.py
#数据库表的作用
source表:要抓取的源
task表:要执行的任务,启动一个task,会有一系列的job
job表:每一个job是和source绑定的,job生成之后,会扔到redis队列,每个client会从redis队列里面拿job去执行;每个job是否执行成功会写到job里面去
result表:写最终的结果
#data.sql

delete from result;
delete from source;
delete from task;

insert into source(url, enabled, created_at, updated_at) values('http://www.bally.cn/zh/%E9%80%89%E8%B4%AD%E7%94%B7%E5%A3%AB%E7%B3%BB%E5%88%97/%E9%9E%8B%E5%B1%A5/%E9%9D%B4%E5%AD%90/', True, now(), now());
insert into source(url, enabled, created_at, updated_at) values('http://www.bally.cn/zh/%E9%80%89%E8%B4%AD%E7%94%B7%E5%A3%AB%E7%B3%BB%E5%88%97/%E9%9E%8B%E5%B1%A5/%E9%A9%BE%E9%A9%B6%E9%9E%8B/', True, now(), now());
insert into source(url, enabled, created_at, updated_at) values('https://www.bulgari.com/zh-cn/products.html?root_level=861&product_detail_one=228#', True, now(), now());
insert into source(url, enabled, created_at, updated_at) values('https://www.bulgari.com/zh-cn/products.html?root_level=861&product_detail_one=218', True, now(), now());
# ds_config.py

# coding:utf-8

DB = {'host':'localhost',
      'user':'root',
      'passwd':'',
      'database':'july_sample',
      'charset':'utf8'}

QUEUE = {'host':'localhost',
         'port':'6379',
         'prefix':'dks'}

TS_NEW = 'new'
TS_INPROGRESS = 'inprogress'
TS_FINISHED = 'finished'

JS_NEW = 'new'
JS_FAILED = 'failed'
JS_FINISHED = 'finished'
#ds_database.py

# coding:utf-8

import inspect
import datetime as dt
import sys
import traceback
from peewee import BooleanField # peewee相关模块
from peewee import CharField
from peewee import DateTimeField
from peewee import IntegerField
from peewee import Model
from peewee import MySQLDatabase
from peewee import TextField

class Task(Model):
    status = CharField()
    jobs = IntegerField(default=0)
    finished = IntegerField(default=0)
    unfinished = IntegerField(default=0)
    failed = IntegerField(default=0)
    created_at = DateTimeField(default=dt.datetime.now)
    updated_at = DateTimeField(default=dt.datetime.now)
    class Meta:
        database = None

class Job(Model):
    task_id = IntegerField()
    source_id = IntegerField()
    status = CharField()
    message = TextField(default='')
    created_at = DateTimeField(default=dt.datetime.now)
    updated_at = DateTimeField(default=dt.datetime.now)
    class Meta:
        database = None

class Source(Model):
    url = TextField()
    enabled = BooleanField(default=True)
    created_at = DateTimeField(default=dt.datetime.now)
    updated_at = DateTimeField(default=dt.datetime.now)
    class Meta:
        database = None

class Result(Model):
    content = TextField()
    source_id = IntegerField()
    created_at = DateTimeField(default=dt.datetime.now)
    updated_at = DateTimeField(default=dt.datetime.now)
    class Meta:
        database = None

def init_database(config):
    db = MySQLDatabase(host=config['host'],
                       user=config['user'],
                       passwd=config['passwd'],
                       database=config['database'],
                       charset=config['charset'])
    for var in dir(sys.modules[__name__]): # 动态加载各个table类
        if var != 'Model':
            obj = eval(var)
            if inspect.isclass(obj) and issubclass(obj, Model):
                '''
                这里_meta是访问元类
                请参考:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/0014319106919344c4ef8b1e04c48778bb45796e0335839000
                '''
                obj._meta.database = db
    return db

if __name__ == '__main__':
    DATABASE = None
    try:
        import ds_config as dc
        DATABASE = init_database(dc.DB)
        TABLES = []
        for var in dir(sys.modules[__name__]): # 动态加载各个table类
            if var != 'Model':
                obj = eval(var)
                if inspect.isclass(obj) and issubclass(obj, Model):
                    TABLES.append(obj)
        DATABASE.connect()
        DATABASE.create_tables(TABLES, safe=True) # safe=True避免重复创建
    except Exception as e:
        print('%s\n%s' % (e, traceback.print_exc()))
    finally:
        if DATABASE:
            DATABASE.close()

#ds_queue.py 

# coding:utf-8

import redis

class Queue:
    def __init__(self, name, config):
        self._db = redis.Redis(host=config['host'], port=config['port'])
        self.key = '%s:%s' % (config['prefix'], name)

    def qsize(self):
        return self._db.llen(self.key)

    def empty(self):
        return self.qsize() == 0

    def put(self, item):
        self._db.rpush(self.key, item)

    def get(self, block=True, timeout=None):
        if block: # 阻塞
            item = self._db.blpop(self.key, timeout=timeout)
            if item:
                item = item[1]
        else: # 非阻塞
            item = self._db.lpop(self.key)
        return item
        
    '''
    作业
    实现get_k完成一次获取k个元素,
    要求支持阻塞和非阻塞调用,
    如果元素少于k个就返回所有元素。
    '''

#ds_scheduler.py

# coding:utf-8

import argparse
import datetime as datetime
import importlib
import ds_database as dd
import ds_queue as dq

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('-c',
                        '--config',
                        default='ds_config') # 指定配置文件,不要加.py后缀名。
    parser.add_argument('-a',
                        '--action',
                        default='create') # 工作类型
    parser.add_argument('-t',
                        '--task_id',
                        default=0,
                        type=int) # 指定工作相关的task_id
    return parser.parse_args()

def create_task(config):
    task = dd.Task(status=config.TS_NEW)
    task.save()
    queue = dq.Queue(task.id, config.QUEUE) # 创建Job队列
    # select * from source where enabled = True
    sources = dd.Source.select().where(dd.Source.enabled)
    # 创建job并分发到队列
    for source in sources:
        job = dd.Job(task_id=task.id, source_id=source.id, status=config.JS_NEW)
        job.save()
        
        queue.put({'id':job.id, 'source_id':source.id, 'url':source.url})
    task.status = config.TS_INPROGRESS
    task.jobs = len(sources)
    task.save()
    print('Task %d is scheduled with %d jobs.' % (task.id, task.jobs))
    
def view_task(task_id, config):
    r = dd.Task.select().where(dd.Task.id == task_id)
    if r:
        task = r.get()
        if task.status == config.TS_INPROGRESS:
            # 这里看明白后自己再实现一遍
            jobs = dd.Job.select().where(dd.Job.task_id == task_id)
            unfinished = 0
            finished = 0
            failed = 0
            for job in jobs:
                if job.status == config.JS_FINISHED:
                    finished += 1
                elif job.status == config.JS_FAILED:
                    failed += 1
                elif job.status == config.JS_NEW:
                    unfinished += 1
            task.unfinished = unfinished
            task.finished = finished
            task.failed = failed
            if (task.jobs == (finished + failed)) and (unfinished == 0):
                task.status = config.TS_FINISHED
            task.save()
            print('%d jobs for task %d: %d finished, %d failed.' % (task.jobs, task.id, task.finished, task.failed))
        elif task.status == config.TS_FINISHED:
            print('%d jobs for task %d: %d finished, %d failed.' % (task.jobs, task.id, task.finished, task.failed))
        elif task.status == config.TS_NEW:
            print('Task %d not scheduled yet.' % task_id)
    else:
        raise Exception('Task %d not found.' % task_id)

def retry(task_id, config):
    r = dd.Task.select().where(dd.Task.id == task_id)
    if r:
        task = r.get()
        queue = dq.Queue(task.id, config.QUEUE) # 找到task对应的队列
        jobs = dd.Job.select().where((dd.Job.task_id == task_id) & (dd.Job.status != config.JS_FINISHED))
        for job in jobs:
            source = dd.Source.select().where(dd.Source.id == job.source_id).get()
            queue.put({'id':job.id, 'source_id':source.id, 'url':source.url})
            job.status = config.JS_NEW
            job.save()
        task.status = config.TS_INPROGRESS
        task.failed = 0
        task.unfinished = len(jobs)
        task.save()
        print('Retry %d jobs of task %d' % (len(jobs), task_id))
    else:
        raise Exception('Task %d not found.' % task_id)

if __name__ == '__main__':
    ARGS = parse_args()
    CONFIG = importlib.import_module(ARGS.config)
    dd.init_database(CONFIG.DB)
    if ARGS.action == 'create':
        create_task(CONFIG)
    elif ARGS.action == 'view':
        view_task(ARGS.task_id, CONFIG)
    elif ARGS.action == 'retry':
        retry(ARGS.task_id, CONFIG)
    else:
        raise Exception('Unknown action: %s' % ARGS.action)
#ds_worker.py

# coding:utf-8

import argparse
import datetime as dt
import importlib
import multiprocessing
import threading
import time
import traceback
import ds_database as dd
import ds_queue as dq

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('-c',
                        '--config',
                        default='ds_config') # 指定配置文件,不要加.py后缀名。
    parser.add_argument('-s',
                        '--spider',
                        help='',
                        default='spider_sample') # 指定爬虫实现文件,不要加.py后缀名。
    parser.add_argument('-t',
                        '--thread_count',
                        help='',
                        default=0,
                        type=int) # 指定线程数
    return parser.parse_args()

def woker(spider, database, config):
    while True:
        try:
            for task in dd.Task.select().where(dd.Task.status == config.TS_INPROGRESS):
                print('Process task %d.' % task.id)
                queue = dq.Queue(task.id, config.QUEUE)
                while not queue.empty():
                    job = queue.get(False)
                    if job:
                        parse_result = spider.parse(eval(job), config)
                        content = parse_result['content']
                        if content:
                            result = dd.Result.select().where(dd.Result.source_id == parse_result['source_id'])
                            if result: # 更新
                                result = result.get()
                                result.updated_at = dt.datetime.now()
                            else: # 插入新记录
                                result = dd.Result()
                                result.source_id = parse_result['source_id']
                            result.content = content
                            result.save()
                        # 更新job状态
                        job = dd.Job.select().where(dd.Job.id == parse_result['id']).get()
                        job.status = parse_result['status']
                        job.message = parse_result['message']
                        job.updated_at = dt.datetime.now()
                        job.save()
                    else:
                        pass # job为空什么都不要做
                time.sleep(15)
        except Exception as e:
            print('%s\n%s' % (e, traceback.format_exc()))

if __name__ == '__main__':
    ARGS = parse_args()
    THREAD_COUNT = ARGS.thread_count
    CONFIG = importlib.import_module(ARGS.config)
    SPIDER = importlib.import_module(ARGS.spider) # 动态加载爬虫实现
    DATABASE = dd.init_database(CONFIG.DB)
    threads = []
    for i in range(multiprocessing.cpu_count() if THREAD_COUNT <= 0 else THREAD_COUNT):
        threads.append(threading.Thread(target=woker, args=(SPIDER, DATABASE, CONFIG)))
    for t in threads:
        t.daemon = True # 不然无法Ctrl-C终止程序
        t.start()
    try:
        while True:
            time.sleep(100)
    except KeyboardInterrupt:
        exit()

#spider_sample.py 

# coding:utf-8

import sys
import traceback
import util

PARSERS = util.load_parsers('parsers')

def parse(job, config):
    driver = None
    result = {'id':job['id'],
              'source_id':job['source_id'],
              'status':config.JS_FAILED,  # job状态默认失败
              'content':'',
              'message':''}
    try:
        url = job['url']
        prefix = url.split('//')[1].split('/')[0]
        if prefix in PARSERS:
            driver = util.create_chrome_driver()
            content = PARSERS[prefix](driver, url)
            result['content'] = content
            result['status'] = config.JS_FINISHED
        else:
            raise Exception('No parser for %s.' % url)
    except Exception as e:
        result['message'] = '%s\n%s' % (e, traceback.format_exc())
    finally:
        if driver:
            driver.quit()
        return result

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,076评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,658评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,732评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,493评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,591评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,598评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,601评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,348评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,797评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,114评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,278评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,953评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,585评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,202评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,180评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,139评论 2 352

推荐阅读更多精彩内容