
项目背景
很多做数据采集的同学都会遇到一个老问题:到底是一次性把网站的数据全部抓取下来,还是定期只更新新增和变化的部分?我之前在做二手房市场监测的时候,就碰到过这个选择。当时目标是对比不同城市、不同小区的挂牌房源,看看价格走势和交易活跃度。如果抓取策略不对,不仅会浪费资源,还可能导致数据质量不高。
所以,本文就结合「链家二手房」这个实际站点,聊聊全量抓取和增量采集的取舍,并通过一个实战小项目,展示如何结合代理 IP 技术去实现定期的数据获取和统计。
数据目标
目标字段(示例)
* 基础识别:house_id(从 URL 或页面特征提取)、title、url
* 位置维度:city、district(区)、bizcircle(商圈/板块)、community(小区名)
* 核心指标:total_price(万元)、unit_price(元/?)、area(?)、room_type(几室几厅)
* 时间戳:first_seen_at(首次发现时间)、last_seen_at(最后一次看到)
* 变更检测:content_hash(用于判断记录是否变更)
存储设计
* 使用 SQLite(轻量)/ PostgreSQL(生产可选)持久化记录;
* 以 house_id 作为主键,配合 content_hash 实现 幂等写入 与 增量更新;
* 定期产出 统计汇总,如“按区/小区的挂牌数量、均价、面积分布”。
统计示例
* district 维度:挂牌量、平均单价、价格分位
* community 维度:挂牌量 Top N、均价 Top N
* 趋势维度(可扩展):每日新增挂牌量、下架量(需引入“消失检测”)
技术选型
* 在数据获取方式上,常见有两种:
1. 全量抓取
o 每次任务都从头到尾抓一遍。
o 优点:不会漏数据。
o 缺点:压力大,耗时耗流量,重复数据多。
2. 增量采集
o 每次只采集“新增”或“变化”的部分,比如根据发布时间筛选。
o 优点:节省资源,数据更新快。
o 缺点:需要额外逻辑来判断哪些是新数据,哪些是修改过的数据。
我的经验是:
* 前期数据基线不足时,用全量抓取先把底子打好。
* 后期维护阶段,采用增量采集,避免重复抓取大量无效信息。
在网络层面,由于链家有一定的访问频率限制,所以必须结合代理池。这里我选用了爬虫代理服务,支持用户名密码认证,可以减少封禁风险。
模块实现(代码可直接运行/改造)
运行环境:Python 3.10+安装依赖:pip install requests curl_cffi lxml beautifulsoup4 fake-useragent sqlalchemy pandas apscheduler
0)统一配置(目标入口、代理、数据库)
# -*- coding: utf-8 -*-
"""
项目:贝壳二手房抓取 - 全量 vs 增量
说明:示例代码仅作教学演示,请遵守目标站点条款与 robots.txt。
"""
import os, re, time, random, hashlib, json, datetime as dt
from typing import List, Dict, Optional, Tuple
import requests
from curl_cffi import requests as cffi_requests # 更拟真TLS栈,可在受限站点兜底
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from sqlalchemy import create_engine, text
import pandas as pd
from apscheduler.schedulers.blocking import BlockingScheduler
# -- 代理(参考:亿牛云爬虫代理 www.16yun.cn)--------
# 请替换为你的真实配置(域名、端口、用户名、密码)
PROXY_HOST = os.getenv("YINIU_HOST", "proxy.16yun.cn") # 示例域名
PROXY_PORT = os.getenv("YINIU_PORT", "3100") # 示例端口
PROXY_USER = os.getenv("YINIU_USER", "16YUN")
PROXY_PASS = os.getenv("YINIU_PASS", "16IP")
PROXY = f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"
PROXIES = {"http": PROXY, "https": PROXY}
# ------------------ 目标与数据库 ------------------
BASE_URL = "https://www.ke.com/ershoufang/"
CITY = "sh" # 城市简码可按需切换,如:bj、gz、sz;或直接用根入口配合筛选
DB_URL = os.getenv("DB_URL", "sqlite:///houses.db")
engine = create_engine(DB_URL, echo=False, future=True)
# ------------------ 抓取模式开关 ------------------
MODE = os.getenv("MODE", "incremental") # 可选:'full' / 'incremental'
1)建表 & 工具函数(主键、哈希、幂等)
DDL = """
CREATE TABLE IF NOT EXISTS house (
house_id TEXT PRIMARY KEY,
title TEXT,
url TEXT,
city TEXT,
district TEXT,
bizcircle TEXT,
community TEXT,
total_price REAL,
unit_price REAL,
area REAL,
room_type TEXT,
content_hash TEXT,
first_seen_at TEXT,
last_seen_at TEXT
);
CREATE TABLE IF NOT EXISTS cursor_state (
key TEXT PRIMARY KEY,
value TEXT
);
"""
with engine.begin() as conn:
for stmt in DDL.strip().split(";"):
s = stmt.strip()
if s:
conn.execute(text(s))
def sha1(obj: Dict) -> str:
"""对核心字段做内容哈希,用于变更检测。"""
payload = json.dumps(obj, sort_keys=True, ensure_ascii=False)
return hashlib.sha1(payload.encode("utf-8")).hexdigest()
def now_iso():
return dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def load_state(key: str, default: str="") -> str:
with engine.begin() as conn:
r = conn.execute(text("SELECT value FROM cursor_state WHERE key=:k"), {"k": key}).fetchone()
return r[0] if r else default
def save_state(key: str, value: str):
with engine.begin() as conn:
conn.execute(text("""
INSERT INTO cursor_state(key, value) VALUES(:k, :v)
ON CONFLICT(key) DO UPDATE SET value=excluded.value
"""), {"k": key, "v": value})
2)请求层
ua = UserAgent()
def get_session(use_cffi: bool=False):
"""
默认使用 requests;当遇到严格 TLS/指纹校验时,可切到 curl_cffi(use_cffi=True)。
"""
s = cffi_requests.Session() if use_cffi else requests.Session()
s.headers.update({
"User-Agent": ua.random,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9",
"Connection": "keep-alive",
# 可按需设置 Cookie,提高连续性(遵守站点条款)
})
s.proxies.update(PROXIES)
s.timeout = 20
return s
def fetch_html(url: str, session=None, max_retry=5, sleep_base=1.2) -> Optional[str]:
"""带退避重试与随机抖动的请求封装。"""
if session is None:
session = get_session()
for i in range(max_retry):
try:
resp = session.get(url, allow_redirects=True)
if resp.status_code == 200 and "text/html" in resp.headers.get("Content-Type", ""):
return resp.text
# 软性限流/异常,退避
time.sleep(sleep_base * (2 ** i) + random.random())
except Exception:
time.sleep(sleep_base * (2 ** i) + random.random())
return None
3)解析层(列表页抽取 + 详情增强可选)
贝壳列表页的结构在不同城市/版本可能有差异,以下选择“尽量稳健”的选择器(需要你根据线上实际 HTML 小幅调整)。
def parse_list(html: str) -> List[Dict]:
soup = BeautifulSoup(html, "lxml")
items = []
# 典型结构:ul.sellListContent > li 或 div.content > div.leftContent > ul > li
for li in soup.select("ul.sellListContent > li, li.clear"):
try:
a = li.select_one("a[data-el='ershoufang']")
if not a:
continue
url = a.get("href", "").strip()
title = a.get_text(strip=True)
# house_id 可能包含在 URL 中,例如 .../123456789.html
m = re.search(r"/(\d+)\.html", url)
house_id = m.group(1) if m else hashlib.md5(url.encode()).hexdigest()[:16]
# 价格与单价
total_price = None
tp = li.select_one(".totalPrice span")
if tp:
try:
total_price = float(tp.get_text(strip=True))
except: pass
unit_price = None
up = li.select_one(".unitPrice span")
if up:
# e.g. "单价50012元/平"
digits = re.findall(r"\d+", up.get_text())
if digits:
unit_price = float(digits[0])
# 面积/户型/小区等在 .houseInfo 或 .positionInfo 中
area, room_type, community, district, bizcircle = None, None, None, None, None
info = li.select_one(".houseInfo")
if info:
text_all = info.get_text(" ", strip=True)
# 经验规则:如 "某小区 | 2室1厅 | 60.12平米 | 南 北 | ..."
parts = [p.strip() for p in text_all.split("|")]
if parts:
community = parts[0]
for p in parts:
if "平米" in p:
try: area = float(re.findall(r"[0-9.]+", p)[0])
except: pass
if "室" in p:
room_type = p
pos = li.select_one(".positionInfo")
if pos:
# 形如 "浦东 - 金桥"
pos_text = pos.get_text(" ", strip=True)
seg = [x.strip() for x in pos_text.split("-")]
if len(seg) >= 1: district = seg[0]
if len(seg) >= 2: bizcircle = seg[1]
items.append({
"house_id": house_id,
"title": title,
"url": url,
"district": district,
"bizcircle": bizcircle,
"community": community,
"total_price": total_price,
"unit_price": unit_price,
"area": area,
"room_type": room_type
})
except Exception:
continue
return items
4)写库(幂等插入/更新 + 增量哈希)
def upsert_items(city: str, rows: List[Dict]) -> Tuple[int, int]:
"""
返回 (inserted, updated)
- 以 house_id 为主键;
- 通过 content_hash 判定变更;
"""
inserted = updated = 0
now = now_iso()
with engine.begin() as conn:
for r in rows:
payload = {
"title": r.get("title"),
"url": r.get("url"),
"city": city,
"district": r.get("district"),
"bizcircle": r.get("bizcircle"),
"community": r.get("community"),
"total_price": r.get("total_price"),
"unit_price": r.get("unit_price"),
"area": r.get("area"),
"room_type": r.get("room_type"),
}
h = sha1(payload)
# 查询是否存在
cur = conn.execute(text("SELECT content_hash, first_seen_at FROM house WHERE house_id=:id"),
{"id": r["house_id"]}).fetchone()
if cur is None:
# 新插入
conn.execute(text("""
INSERT INTO house (house_id, title, url, city, district, bizcircle, community, total_price,
unit_price, area, room_type, content_hash, first_seen_at, last_seen_at)
VALUES (:house_id, :title, :url, :city, :district, :bizcircle, :community, :total_price,
:unit_price, :area, :room_type, :content_hash, :first_seen_at, :last_seen_at)
"""), {
**{"house_id": r["house_id"]}, **payload,
"content_hash": h, "first_seen_at": now, "last_seen_at": now
})
inserted += 1
else:
old_hash, first_seen = cur
if h != old_hash:
# 内容变更才更新
conn.execute(text("""
UPDATE house
SET title=:title, url=:url, city=:city, district=:district, bizcircle=:bizcircle,
community=:community, total_price=:total_price, unit_price=:unit_price,
area=:area, room_type=:room_type, content_hash=:content_hash, last_seen_at=:last_seen_at
WHERE house_id=:house_id
"""), {**{"house_id": r["house_id"]}, **payload, "content_hash": h, "last_seen_at": now})
updated += 1
else:
# 无变更,仅刷新最后看到时间(可选)
conn.execute(text("UPDATE house SET last_seen_at=:t WHERE house_id=:id"),
{"t": now, "id": r["house_id"]})
return inserted, updated
5)分页抓取与“模式切换”(全量 vs 增量)
以 城市 + 区 为示例(如上海“浦东”“闵行”等),也可切换为“关键词搜索”或“小区名精确搜索”。
def district_entry(city_code: str, district_slug: str, page: int) -> str:
"""
入口 URL 形态举例:
https://{city}.ke.com/ershoufang/{district}/pg{page}/
"""
return f"https://{city_code}.ke.com/ershoufang/{district_slug}/pg{page}/"
def crawl_district(city_code: str, district_slug: str, mode: str="incremental", max_pages: int=100):
"""
mode:
- 'full':从第1页扫到 max_pages 或遇到空页停止
- 'incremental':从第1页开始,直到出现全部“已见过”的连续页范围即可停止
增量的停止条件可根据业务调优(例如最近3页全为已见记录则停)。
"""
print(f"[{mode}] crawling district={district_slug}")
session = get_session()
seen_streak = 0
total_inserted = total_updated = 0
for pg in range(1, max_pages + 1):
url = district_entry(city_code, district_slug, pg)
html = fetch_html(url, session=session)
if not html:
print(f" page {pg}: fetch failed, stop.")
break
items = parse_list(html)
if not items:
print(f" page {pg}: empty, stop.")
break
inserted, updated = upsert_items(city_code, items)
total_inserted += inserted
total_updated += updated
print(f" page {pg}: inserted={inserted}, updated={updated}, total={len(items)}")
if mode == "incremental":
# 简单启发式:若连续2页均无新增(inserted==0),认为到达“旧区间”,可停止
if inserted == 0:
seen_streak += 1
if seen_streak >= 2:
print(" incremental early-stop (no new items).")
break
else:
seen_streak = 0
# 低频+抖动,减轻风控
time.sleep(random.uniform(1.2, 2.5))
print(f"==> district {district_slug} done. inserted={total_inserted}, updated={total_updated}")
6)地理位置 / 小区名搜索入口(多维组合)
你可以维护一个“任务清单”:若使用 全量模式 就覆盖更广的区/小区集合;若 增量模式 则只盯住核心板块或关键词。
DISTRICTS = [
"pudong", "minhang", "xuhui", "jingan", "huangpu", # 示例,需对应站点实际 slug
]
COMMUNITIES = [
# 小区维度的精准检索入口(不同城市/站点可能使用 ?q= 或 /rs{keyword}/ 语法,需按实际调整)
# 例如: https://sh.ke.com/ershoufang/rs万科城市花园/
"万科城市花园", "仁恒河滨花园"
]
def community_search_url(city_code: str, keyword: str, page: int) -> str:
# 常见形态:/ershoufang/rs{keyword}/pg{page}/
return f"https://{city_code}.ke.com/ershoufang/rs{keyword}/pg{page}/"
def crawl_community(city_code: str, keyword: str, mode: str="incremental", max_pages: int=50):
print(f"[{mode}] crawling community={keyword}")
session = get_session()
seen_streak = 0
total_inserted = total_updated = 0
for pg in range(1, max_pages + 1):
url = community_search_url(city_code, keyword, pg)
html = fetch_html(url, session=session)
if not html:
print(f" page {pg}: fetch failed, stop.")
break
items = parse_list(html)
if not items:
print(f" page {pg}: empty, stop.")
break
inserted, updated = upsert_items(city_code, items)
total_inserted += inserted
total_updated += updated
print(f" page {pg}: inserted={inserted}, updated={updated}, total={len(items)}")
if mode == "incremental":
if inserted == 0:
seen_streak += 1
if seen_streak >= 2:
print(" incremental early-stop (no new items).")
break
else:
seen_streak = 0
time.sleep(random.uniform(1.0, 2.0))
print(f"==> community {keyword} done. inserted={total_inserted}, updated={total_updated}")
7)统计与导出(定期归纳)
def daily_stats(date_tag: Optional[str]=None) -> Dict[str, pd.DataFrame]:
"""
产出几个常用视图:按区/小区的挂牌量与均价。
"""
with engine.begin() as conn:
df = pd.read_sql(text("SELECT * FROM house"), conn)
if df.empty:
print("No data yet.")
return {}
# 过滤异常值(可自定义)
df = df[(df["unit_price"].notna()) & (df["unit_price"] > 0)]
# district 维度
g_d = df.groupby("district").agg(
listings=("house_id", "nunique"),
avg_unit_price=("unit_price", "mean"),
p50_unit_price=("unit_price", "median")
).reset_index().sort_values("listings", ascending=False)
# community 维度
g_c = df.groupby("community").agg(
listings=("house_id", "nunique"),
avg_unit_price=("unit_price", "mean")
).reset_index().sort_values("listings", ascending=False)
# Top榜(示例)
top_comm = g_c.head(20).copy()
date_tag = date_tag or dt.datetime.now().strftime("%Y%m%d")
out_dir = f"exports_{date_tag}"
os.makedirs(out_dir, exist_ok=True)
g_d.to_csv(os.path.join(out_dir, "district_stats.csv"), index=False)
g_c.to_csv(os.path.join(out_dir, "community_stats.csv"), index=False)
top_comm.to_csv(os.path.join(out_dir, "top_community.csv"), index=False)
print(f"Exported to {out_dir}/")
return {"district": g_d, "community": g_c, "top_community": top_comm}
8)调度(每天 08:00 / 16:00 运行)
def job_run():
mode = MODE # 环境变量切换
# 1) 区维度
for d in DISTRICTS:
crawl_district(CITY, d, mode=mode, max_pages=80 if mode=="full" else 30)
# 2) 小区关键词维度
for kw in COMMUNITIES:
crawl_community(CITY, kw, mode=mode, max_pages=40 if mode=="full" else 15)
# 3) 统计导出
daily_stats()
if __name__ == "__main__":
# 方式A:直接执行一次
job_run()
# 方式B:APScheduler 定时
# scheduler = BlockingScheduler(timezone="Asia/Shanghai")
# scheduler.add_job(job_run, "cron", hour="8,16", minute=0)
# scheduler.start()
结语
在真实业务里,“全量 vs 增量”从来不是二选一,而是 阶段性权衡 与 工程化妥协。建议你将两种模式都纳入框架能力:用全量做“基线校准”,用增量做“日常维护”,再辅以内容哈希、早停策略、代理与频控,既稳且快,长期运营成本最低。