在分布式任务调度系统中(如 AI 系统的定时任务管理),tasks 表往往是核心存储组件,用于记录任务的元数据(如任务 ID、due_time 执行时间、状态、负载等)。随着任务量的增长(例如每日百万级插入),单表或单库会面临性能瓶颈,如查询慢、锁竞争和存储膨胀。为了解决这些问题,我们采用分库分表(Sharding and Partitioning)策略,按 due_time(执行时间)进行分片。同时,结合 Redis Sorted Set (ZSET) 缓存即将执行的任务,以减少数据库 (DB) 访问频率。最后,通过 Redis Pub/Sub 或 PostgreSQL LISTEN/NOTIFY 实现事件监听机制,触发 scheduler 即时处理新插入或更新的任务。
下面,我将详细、具体、完整地介绍这个优化方案,包括概念解释、实现步骤、代码示例、优缺点分析以及潜在风险。方案假设数据库为 PostgreSQL(兼容其他 RDBMS 如 MySQL),缓存为 Redis,适用于高可用、高性能的 AI 调度场景(如模型推理任务的定时触发)。
1. 分库分表的概念与必要性
-
什么是分库分表?
- 分表 (Partitioning):在同一数据库实例内,将一张大表拆分成多个小表(子表),每个子表存储部分数据。分区键(Partition Key)决定数据路由到哪个子表。这是一种垂直拆分,通常用于单机优化。
- 分库 (Sharding):将数据分布到多个数据库实例(或服务器)上,每个库存储部分数据。分片键(Shard Key)决定数据路由到哪个库。这是一种水平拆分,适合跨机扩展。
- 在实践中,常结合使用:先按时间分表,再按用户或业务分库,形成“分库分表”。
-
为什么按 due_time 分片?
- 任务调度查询通常是时间范围查询,如
SELECT * FROM tasks WHERE due_time <= NOW() AND status = 'pending'。如果不分片,全表扫描会导致高 IO 和锁等待,尤其在亿级数据时。 - 按 due_time(执行时间)分片,能将查询局限在少数子表中(e.g., 只查当前月和未来1-2个月的表),减少扫描范围,提高查询效率 10-100 倍。
- 时间分片还便于数据归档:历史表可移到冷存储,释放热库空间。
- 相比按 task_id 或 user_id 分片,时间分片更适合定时任务,因为任务到期分布相对均匀,避免热点分片(e.g., 某些用户任务密集)。
- 任务调度查询通常是时间范围查询,如
-
分片粒度选择
- 按月分片:如
tasks_YYYYMM(e.g., tasks_202512 表示 2025 年 12 月的任务)。 - 为什么按月?如果按天(tasks_20251229),表数过多(每年 365 张),管理复杂;按年太粗,查询范围大。按月平衡了表数(每年 12 张)和查询效率。每个分片大小控制在 3-70 GB(参考 CrateDB 最佳实践)。
- 如果任务量极大,可进一步分库:e.g., 按年分库(db_2025),内部分表按月。
- 按月分片:如
2. 分库分表的具体实现
-
表结构设计
- 主表:tasks(虚拟表或视图,不存储数据)。
- 子表:tasks_YYYYMM,继承主表结构。
- 示例字段:
id (BIGINT PRIMARY KEY), due_time (TIMESTAMP), status (VARCHAR), payload (JSONB), version (INT FOR OPTIMISTIC LOCK)。 - 分区约束:在创建子表时添加 CHECK 约束,如
CHECK (due_time >= '2025-12-01' AND due_time < '2026-01-01')。
-
数据路由与查询
-
插入:根据 due_time 计算分片名(e.g., Python:
table_name = f"tasks_{due_time.strftime('%Y%m')}"),动态插入对应子表。 - 查询:scheduler 计算当前及未来分片(e.g., 当前月 + 下个月),并行查询这些子表,聚合结果。避免跨所有分片查询。
- 工具支持:用 ShardingSphere 或 Vitess 中间件自动路由;PostgreSQL 原生支持分区表(PARTITION BY RANGE(due_time))。
- 迁移历史数据:用 pg_dump/pg_restore 分批迁移;在线迁移工具如 pg_partman。
-
插入:根据 due_time 计算分片名(e.g., Python:
代码示例(Python + SQLAlchemy)
from sqlalchemy import create_engine, text
from datetime import datetime
engine = create_engine('postgresql://user:pass@host/db')
def get_table_name(due_time: datetime) -> str:
return f"tasks_{due_time.strftime('%Y%m')}"
def insert_task(task_id, due_time, payload):
table = get_table_name(due_time)
with engine.connect() as conn:
conn.execute(text(f"INSERT INTO {table} (id, due_time, payload) VALUES (:id, :due_time, :payload)"),
{'id': task_id, 'due_time': due_time, 'payload': payload})
conn.commit()
def fetch_due_tasks(now: datetime):
current_table = get_table_name(now)
next_table = get_table_name(now.replace(month=now.month % 12 + 1)) # 下个月
with engine.connect() as conn:
result = conn.execute(text(f"""
SELECT * FROM {current_table} WHERE due_time <= :now AND status = 'pending'
UNION ALL
SELECT * FROM {next_table} WHERE due_time <= :now AND status = 'pending'
"""), {'now': now}).fetchall()
return result
3. 结合 Redis ZSET 缓存即将任务
-
为什么用 ZSET 缓存?
- Scheduler 频繁查询即将到期任务(e.g., 每 5-10 秒轮询),直接查 DB 会增加负载。ZSET 作为有序集合(Sorted Set),以 due_time 作为 score(分数),可高效范围查询(ZRANGEBYSCORE)和移除(ZREM),性能远超 DB。
- 缓存范围:未来 2-3 分钟任务(e.g., score <= now + 180s),减少 DB 命中率 90%以上。过期任务自动从 ZSET 移除。
- 任务数据:ZSET 成员(member)为任务 ID 或 JSON 编码的任务详情(e.g., {'id':1, 'payload':'data'}),score 为 UNIX 时间戳。
-
实现流程
- 插入任务时:先写 DB,再加到 ZSET(ZADD 'upcoming_tasks' score member)。
- Scheduler 轮询:检查 ZSET 最小 score <= now(ZSCORE 或 ZRANGE),取出并处理,ZREM 移除。
- 一致性:用 Lua 脚本原子操作;如果 ZSET 丢失,重载从 DB。
代码示例(Python + Redis)
import redis
import time
import json
r = redis.Redis(host='localhost', port=6379)
def add_to_zset(task_id, due_time, payload):
score = int(due_time.timestamp())
member = json.dumps({'id': task_id, 'payload': payload})
r.zadd('upcoming_tasks', {member: score})
def fetch_upcoming_tasks():
now = int(time.time())
tasks = r.zrangebyscore('upcoming_tasks', '-inf', now) # 取出到期任务
if tasks:
r.zremrangebyscore('upcoming_tasks', '-inf', now) # 移除
return [json.loads(t) for t in tasks]
4. 监听任务插入/更新事件,触发 Scheduler 即时处理
-
为什么需要监听?
- 纯轮询 DB/ZSET 可能延迟(e.g., 5s 间隔内新任务未触发)。事件监听实现“即时”响应,提升实时性。
-
Redis Pub/Sub 机制
- Pub/Sub 是 Redis 的发布订阅模式,轻量、非持久化。
- 实现:插入/更新任务时,PUBLISH 频道(e.g., 'task_events')消息(e.g., JSON: {'action':'insert', 'task_id':1})。
- Scheduler SUBSCRIBE 'task_events',收到消息触发即时 poll ZSET/DB。
- 优点:简单、高性能(亚毫秒延迟),适合小规模。缺点:非持久化,断连丢失消息;占用连接。
-
PostgreSQL LISTEN/NOTIFY 机制
- LISTEN/NOTIFY 是 Postgres 内置 pub/sub,原生集成 DB,无需额外组件。
- 实现:用触发器 (TRIGGER) 在 INSERT/UPDATE tasks 表时 NOTIFY 频道(e.g., 'task_events')payload(字符串,限 8000 字节)。
- Scheduler LISTEN 'task_events',收到通知触发 poll。
- 优点:事务一致(通知在 COMMIT 后发送),无需 Redis;适合 DB 重场景。缺点:占用 DB 连接,规模大时连接池压力大。
-
比较与选择
- Redis Pub/Sub:更灵活(跨服务),但需维护 Redis 一致性。
- Postgres LISTEN/NOTIFY:更可靠(事务绑定),但限于 DB 生态。
- 建议:如果已用 Redis ZSET,优先 Pub/Sub;否则用 LISTEN/NOTIFY 简化架构。
代码示例(Postgres LISTEN/NOTIFY + Python psycopg2)
import psycopg2
import select
conn = psycopg2.connect('dbname=db user=user')
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
cur.execute("LISTEN task_events;")
# 触发器创建(SQL)
# CREATE TRIGGER task_notify AFTER INSERT OR UPDATE ON tasks FOR EACH ROW EXECUTE PROCEDURE notify_trigger('task_events');
while True:
if select.select([conn], [], [], 5) == ([], [], []):
continue
conn.poll()
while conn.notifies:
notify = conn.notifies.pop(0)
print(f"Received: {notify.payload}") # e.g., '{"id":1,"action":"insert"}'
fetch_due_tasks() # 触发 scheduler 处理
5. 整体方案的优缺点与风险
-
优点:
- 性能提升:分片 + ZSET 减少 DB 负载 80-90%;事件监听实现亚秒级响应。
- 扩展性:易水平扩展分库;ZSET 支持动态调整缓存范围。
- 成本低:利用现有 Redis/Postgres,无需新 MQ。
-
缺点:
- 复杂度增:需处理跨分片查询、数据迁移。
- 一致性挑战:ZSET 与 DB 需同步(e.g., 失效用 Lua 脚本)。
- 监听开销:Pub/Sub 非持久;LISTEN 连接消耗。
-
风险与缓解:
- 热点分片:监控任务分布,动态调整粒度。
- 迁移风险:渐进分片,先测试小流量。
- 规模极限:任务 > 亿级时,考虑专用时序 DB 如 TimescaleDB。
这个方案已在类似系统中证明有效(如 Suprsend 的动态调度),适用于 AI 系统的高并发定时任务。如果需调整分片粒度或集成 K8s,我可进一步扩展。