全量抓取还是增量采集?二手房数据采集实战解析

爬虫代理

项目背景

很多做数据采集的同学都会遇到一个老问题:到底是一次性把网站的数据全部抓取下来,还是定期只更新新增和变化的部分?我之前在做二手房市场监测的时候,就碰到过这个选择。当时目标是对比不同城市、不同小区的挂牌房源,看看价格走势和交易活跃度。如果抓取策略不对,不仅会浪费资源,还可能导致数据质量不高。

所以,本文就结合「链家二手房」这个实际站点,聊聊全量抓取和增量采集的取舍,并通过一个实战小项目,展示如何结合代理 IP 技术去实现定期的数据获取和统计。

数据目标

目标字段(示例)

* 基础识别:house_id(从 URL 或页面特征提取)、title、url

* 位置维度:city、district(区)、bizcircle(商圈/板块)、community(小区名)

* 核心指标:total_price(万元)、unit_price(元/?)、area(?)、room_type(几室几厅)

* 时间戳:first_seen_at(首次发现时间)、last_seen_at(最后一次看到)

* 变更检测:content_hash(用于判断记录是否变更)

存储设计

* 使用 SQLite(轻量)/ PostgreSQL(生产可选)持久化记录;

* 以 house_id 作为主键,配合 content_hash 实现 幂等写入 与 增量更新;

* 定期产出 统计汇总,如“按区/小区的挂牌数量、均价、面积分布”。

统计示例

* district 维度:挂牌量、平均单价、价格分位

* community 维度:挂牌量 Top N、均价 Top N

* 趋势维度(可扩展):每日新增挂牌量、下架量(需引入“消失检测”)

技术选型

* 在数据获取方式上,常见有两种:

1. 全量抓取

o 每次任务都从头到尾抓一遍。

o 优点:不会漏数据。

o 缺点:压力大,耗时耗流量,重复数据多。

2. 增量采集

o 每次只采集“新增”或“变化”的部分,比如根据发布时间筛选。

o 优点:节省资源,数据更新快。

o 缺点:需要额外逻辑来判断哪些是新数据,哪些是修改过的数据。

我的经验是:

* 前期数据基线不足时,用全量抓取先把底子打好。

* 后期维护阶段,采用增量采集,避免重复抓取大量无效信息。

在网络层面,由于链家有一定的访问频率限制,所以必须结合代理池。这里我选用了爬虫代理服务,支持用户名密码认证,可以减少封禁风险。

模块实现(代码可直接运行/改造)

运行环境:Python 3.10+安装依赖:pip install requests curl_cffi lxml beautifulsoup4 fake-useragent sqlalchemy pandas apscheduler

0)统一配置(目标入口、代理、数据库)

# -*- coding: utf-8 -*-

"""

项目:贝壳二手房抓取 - 全量 vs 增量

说明:示例代码仅作教学演示,请遵守目标站点条款与 robots.txt。

"""

import os, re, time, random, hashlib, json, datetime as dt

from typing import List, Dict, Optional, Tuple

import requests

from curl_cffi import requests as cffi_requests  # 更拟真TLS栈,可在受限站点兜底

from bs4 import BeautifulSoup

from fake_useragent import UserAgent

from sqlalchemy import create_engine, text

import pandas as pd

from apscheduler.schedulers.blocking import BlockingScheduler

# -- 代理(参考:亿牛云爬虫代理 www.16yun.cn)--------

# 请替换为你的真实配置(域名、端口、用户名、密码)

PROXY_HOST = os.getenv("YINIU_HOST", "proxy.16yun.cn")    # 示例域名

PROXY_PORT = os.getenv("YINIU_PORT", "3100")                  # 示例端口

PROXY_USER = os.getenv("YINIU_USER", "16YUN")

PROXY_PASS = os.getenv("YINIU_PASS", "16IP")

PROXY = f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"

PROXIES = {"http": PROXY, "https": PROXY}

# ------------------ 目标与数据库 ------------------

BASE_URL = "https://www.ke.com/ershoufang/"

CITY = "sh"  # 城市简码可按需切换,如:bj、gz、sz;或直接用根入口配合筛选

DB_URL = os.getenv("DB_URL", "sqlite:///houses.db")

engine = create_engine(DB_URL, echo=False, future=True)

# ------------------ 抓取模式开关 ------------------

MODE = os.getenv("MODE", "incremental")  # 可选:'full' / 'incremental'

1)建表 & 工具函数(主键、哈希、幂等)

DDL = """

CREATE TABLE IF NOT EXISTS house (

    house_id TEXT PRIMARY KEY,

    title TEXT,

    url TEXT,

    city TEXT,

    district TEXT,

    bizcircle TEXT,

    community TEXT,

    total_price REAL,

    unit_price REAL,

    area REAL,

    room_type TEXT,

    content_hash TEXT,

    first_seen_at TEXT,

    last_seen_at TEXT

);

CREATE TABLE IF NOT EXISTS cursor_state (

    key TEXT PRIMARY KEY,

    value TEXT

);

"""

with engine.begin() as conn:

    for stmt in DDL.strip().split(";"):

        s = stmt.strip()

        if s:

            conn.execute(text(s))

def sha1(obj: Dict) -> str:

    """对核心字段做内容哈希,用于变更检测。"""

    payload = json.dumps(obj, sort_keys=True, ensure_ascii=False)

    return hashlib.sha1(payload.encode("utf-8")).hexdigest()

def now_iso():

    return dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"

def load_state(key: str, default: str="") -> str:

    with engine.begin() as conn:

        r = conn.execute(text("SELECT value FROM cursor_state WHERE key=:k"), {"k": key}).fetchone()

        return r[0] if r else default

def save_state(key: str, value: str):

    with engine.begin() as conn:

        conn.execute(text("""

            INSERT INTO cursor_state(key, value) VALUES(:k, :v)

            ON CONFLICT(key) DO UPDATE SET value=excluded.value

        """), {"k": key, "v": value})

2)请求层

ua = UserAgent()

def get_session(use_cffi: bool=False):

    """

    默认使用 requests;当遇到严格 TLS/指纹校验时,可切到 curl_cffi(use_cffi=True)。

    """

    s = cffi_requests.Session() if use_cffi else requests.Session()

    s.headers.update({

        "User-Agent": ua.random,

        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",

        "Accept-Language": "zh-CN,zh;q=0.9",

        "Connection": "keep-alive",

        # 可按需设置 Cookie,提高连续性(遵守站点条款)

    })

    s.proxies.update(PROXIES)

    s.timeout = 20

    return s

def fetch_html(url: str, session=None, max_retry=5, sleep_base=1.2) -> Optional[str]:

    """带退避重试与随机抖动的请求封装。"""

    if session is None:

        session = get_session()

    for i in range(max_retry):

        try:

            resp = session.get(url, allow_redirects=True)

            if resp.status_code == 200 and "text/html" in resp.headers.get("Content-Type", ""):

                return resp.text

            # 软性限流/异常,退避

            time.sleep(sleep_base * (2 ** i) + random.random())

        except Exception:

            time.sleep(sleep_base * (2 ** i) + random.random())

    return None

3)解析层(列表页抽取 + 详情增强可选)

贝壳列表页的结构在不同城市/版本可能有差异,以下选择“尽量稳健”的选择器(需要你根据线上实际 HTML 小幅调整)。

def parse_list(html: str) -> List[Dict]:

    soup = BeautifulSoup(html, "lxml")

    items = []

    # 典型结构:ul.sellListContent > li 或 div.content > div.leftContent > ul > li

    for li in soup.select("ul.sellListContent > li, li.clear"):

        try:

            a = li.select_one("a[data-el='ershoufang']")

            if not a:

                continue

            url = a.get("href", "").strip()

            title = a.get_text(strip=True)

            # house_id 可能包含在 URL 中,例如 .../123456789.html

            m = re.search(r"/(\d+)\.html", url)

            house_id = m.group(1) if m else hashlib.md5(url.encode()).hexdigest()[:16]

            # 价格与单价

            total_price = None

            tp = li.select_one(".totalPrice span")

            if tp:

                try:

                    total_price = float(tp.get_text(strip=True))

                except: pass

            unit_price = None

            up = li.select_one(".unitPrice span")

            if up:

                # e.g. "单价50012元/平"

                digits = re.findall(r"\d+", up.get_text())

                if digits:

                    unit_price = float(digits[0])

            # 面积/户型/小区等在 .houseInfo 或 .positionInfo 中

            area, room_type, community, district, bizcircle = None, None, None, None, None

            info = li.select_one(".houseInfo")

            if info:

                text_all = info.get_text(" ", strip=True)

                # 经验规则:如 "某小区 | 2室1厅 | 60.12平米 | 南 北 | ..."

                parts = [p.strip() for p in text_all.split("|")]

                if parts:

                    community = parts[0]

                for p in parts:

                    if "平米" in p:

                        try: area = float(re.findall(r"[0-9.]+", p)[0])

                        except: pass

                    if "室" in p:

                        room_type = p

            pos = li.select_one(".positionInfo")

            if pos:

                # 形如 "浦东 - 金桥"

                pos_text = pos.get_text(" ", strip=True)

                seg = [x.strip() for x in pos_text.split("-")]

                if len(seg) >= 1: district = seg[0]

                if len(seg) >= 2: bizcircle = seg[1]

            items.append({

                "house_id": house_id,

                "title": title,

                "url": url,

                "district": district,

                "bizcircle": bizcircle,

                "community": community,

                "total_price": total_price,

                "unit_price": unit_price,

                "area": area,

                "room_type": room_type

            })

        except Exception:

            continue

    return items

4)写库(幂等插入/更新 + 增量哈希)

def upsert_items(city: str, rows: List[Dict]) -> Tuple[int, int]:

    """

    返回 (inserted, updated)

    - 以 house_id 为主键;

    - 通过 content_hash 判定变更;

    """

    inserted = updated = 0

    now = now_iso()

    with engine.begin() as conn:

        for r in rows:

            payload = {

                "title": r.get("title"),

                "url": r.get("url"),

                "city": city,

                "district": r.get("district"),

                "bizcircle": r.get("bizcircle"),

                "community": r.get("community"),

                "total_price": r.get("total_price"),

                "unit_price": r.get("unit_price"),

                "area": r.get("area"),

                "room_type": r.get("room_type"),

            }

            h = sha1(payload)

            # 查询是否存在

            cur = conn.execute(text("SELECT content_hash, first_seen_at FROM house WHERE house_id=:id"),

                              {"id": r["house_id"]}).fetchone()

            if cur is None:

                # 新插入

                conn.execute(text("""

                    INSERT INTO house (house_id, title, url, city, district, bizcircle, community, total_price,

                                      unit_price, area, room_type, content_hash, first_seen_at, last_seen_at)

                    VALUES (:house_id, :title, :url, :city, :district, :bizcircle, :community, :total_price,

                            :unit_price, :area, :room_type, :content_hash, :first_seen_at, :last_seen_at)

                """), {

                    **{"house_id": r["house_id"]}, **payload,

                    "content_hash": h, "first_seen_at": now, "last_seen_at": now

                })

                inserted += 1

            else:

                old_hash, first_seen = cur

                if h != old_hash:

                    # 内容变更才更新

                    conn.execute(text("""

                        UPDATE house

                        SET title=:title, url=:url, city=:city, district=:district, bizcircle=:bizcircle,

                            community=:community, total_price=:total_price, unit_price=:unit_price,

                            area=:area, room_type=:room_type, content_hash=:content_hash, last_seen_at=:last_seen_at

                        WHERE house_id=:house_id

                    """), {**{"house_id": r["house_id"]}, **payload, "content_hash": h, "last_seen_at": now})

                    updated += 1

                else:

                    # 无变更,仅刷新最后看到时间(可选)

                    conn.execute(text("UPDATE house SET last_seen_at=:t WHERE house_id=:id"),

                                {"t": now, "id": r["house_id"]})

    return inserted, updated

5)分页抓取与“模式切换”(全量 vs 增量)

以 城市 + 区 为示例(如上海“浦东”“闵行”等),也可切换为“关键词搜索”或“小区名精确搜索”。

def district_entry(city_code: str, district_slug: str, page: int) -> str:

    """

    入口 URL 形态举例:

    https://{city}.ke.com/ershoufang/{district}/pg{page}/

    """

    return f"https://{city_code}.ke.com/ershoufang/{district_slug}/pg{page}/"

def crawl_district(city_code: str, district_slug: str, mode: str="incremental", max_pages: int=100):

    """

    mode:

      - 'full':从第1页扫到 max_pages 或遇到空页停止

      - 'incremental':从第1页开始,直到出现全部“已见过”的连续页范围即可停止

    增量的停止条件可根据业务调优(例如最近3页全为已见记录则停)。

    """

    print(f"[{mode}] crawling district={district_slug}")

    session = get_session()

    seen_streak = 0

    total_inserted = total_updated = 0

    for pg in range(1, max_pages + 1):

        url = district_entry(city_code, district_slug, pg)

        html = fetch_html(url, session=session)

        if not html:

            print(f"  page {pg}: fetch failed, stop.")

            break

        items = parse_list(html)

        if not items:

            print(f"  page {pg}: empty, stop.")

            break

        inserted, updated = upsert_items(city_code, items)

        total_inserted += inserted

        total_updated += updated

        print(f"  page {pg}: inserted={inserted}, updated={updated}, total={len(items)}")

        if mode == "incremental":

            # 简单启发式:若连续2页均无新增(inserted==0),认为到达“旧区间”,可停止

            if inserted == 0:

                seen_streak += 1

                if seen_streak >= 2:

                    print("  incremental early-stop (no new items).")

                    break

            else:

                seen_streak = 0

        # 低频+抖动,减轻风控

        time.sleep(random.uniform(1.2, 2.5))

    print(f"==> district {district_slug} done. inserted={total_inserted}, updated={total_updated}")

6)地理位置 / 小区名搜索入口(多维组合)

你可以维护一个“任务清单”:若使用 全量模式 就覆盖更广的区/小区集合;若 增量模式 则只盯住核心板块或关键词。

DISTRICTS = [

    "pudong", "minhang", "xuhui", "jingan", "huangpu",  # 示例,需对应站点实际 slug

]

COMMUNITIES = [

    # 小区维度的精准检索入口(不同城市/站点可能使用 ?q= 或 /rs{keyword}/ 语法,需按实际调整)

    # 例如: https://sh.ke.com/ershoufang/rs万科城市花园/

    "万科城市花园", "仁恒河滨花园"

]

def community_search_url(city_code: str, keyword: str, page: int) -> str:

    # 常见形态:/ershoufang/rs{keyword}/pg{page}/

    return f"https://{city_code}.ke.com/ershoufang/rs{keyword}/pg{page}/"

def crawl_community(city_code: str, keyword: str, mode: str="incremental", max_pages: int=50):

    print(f"[{mode}] crawling community={keyword}")

    session = get_session()

    seen_streak = 0

    total_inserted = total_updated = 0

    for pg in range(1, max_pages + 1):

        url = community_search_url(city_code, keyword, pg)

        html = fetch_html(url, session=session)

        if not html:

            print(f"  page {pg}: fetch failed, stop.")

            break

        items = parse_list(html)

        if not items:

            print(f"  page {pg}: empty, stop.")

            break

        inserted, updated = upsert_items(city_code, items)

        total_inserted += inserted

        total_updated += updated

        print(f"  page {pg}: inserted={inserted}, updated={updated}, total={len(items)}")

        if mode == "incremental":

            if inserted == 0:

                seen_streak += 1

                if seen_streak >= 2:

                    print("  incremental early-stop (no new items).")

                    break

            else:

                seen_streak = 0

        time.sleep(random.uniform(1.0, 2.0))

    print(f"==> community {keyword} done. inserted={total_inserted}, updated={total_updated}")

7)统计与导出(定期归纳)

def daily_stats(date_tag: Optional[str]=None) -> Dict[str, pd.DataFrame]:

    """

    产出几个常用视图:按区/小区的挂牌量与均价。

    """

    with engine.begin() as conn:

        df = pd.read_sql(text("SELECT * FROM house"), conn)

    if df.empty:

        print("No data yet.")

        return {}

    # 过滤异常值(可自定义)

    df = df[(df["unit_price"].notna()) & (df["unit_price"] > 0)]

    # district 维度

    g_d = df.groupby("district").agg(

        listings=("house_id", "nunique"),

        avg_unit_price=("unit_price", "mean"),

        p50_unit_price=("unit_price", "median")

    ).reset_index().sort_values("listings", ascending=False)

    # community 维度

    g_c = df.groupby("community").agg(

        listings=("house_id", "nunique"),

        avg_unit_price=("unit_price", "mean")

    ).reset_index().sort_values("listings", ascending=False)

    # Top榜(示例)

    top_comm = g_c.head(20).copy()

    date_tag = date_tag or dt.datetime.now().strftime("%Y%m%d")

    out_dir = f"exports_{date_tag}"

    os.makedirs(out_dir, exist_ok=True)

    g_d.to_csv(os.path.join(out_dir, "district_stats.csv"), index=False)

    g_c.to_csv(os.path.join(out_dir, "community_stats.csv"), index=False)

    top_comm.to_csv(os.path.join(out_dir, "top_community.csv"), index=False)

    print(f"Exported to {out_dir}/")

    return {"district": g_d, "community": g_c, "top_community": top_comm}

8)调度(每天 08:00 / 16:00 运行)

def job_run():

    mode = MODE  # 环境变量切换

    # 1) 区维度

    for d in DISTRICTS:

        crawl_district(CITY, d, mode=mode, max_pages=80 if mode=="full" else 30)

    # 2) 小区关键词维度

    for kw in COMMUNITIES:

        crawl_community(CITY, kw, mode=mode, max_pages=40 if mode=="full" else 15)

    # 3) 统计导出

    daily_stats()

if __name__ == "__main__":

    # 方式A:直接执行一次

    job_run()

    # 方式B:APScheduler 定时

    # scheduler = BlockingScheduler(timezone="Asia/Shanghai")

    # scheduler.add_job(job_run, "cron", hour="8,16", minute=0)

    # scheduler.start()

结语

在真实业务里,“全量 vs 增量”从来不是二选一,而是 阶段性权衡 与 工程化妥协。建议你将两种模式都纳入框架能力:用全量做“基线校准”,用增量做“日常维护”,再辅以内容哈希、早停策略、代理与频控,既稳且快,长期运营成本最低。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容