Redis 千万级 Key 批量删除:用 Lua 脚本从 10 小时优化到 10 分钟
问题场景
我们有一套数据管线的失败重试机制:当 Kafka 消息处理失败时,将消息写入 Redis 作为"失败队列",等待人工确认后重放或丢弃。
存储结构
采用"每条消息一个 Key"的散列模式(旧版设计):
Key: {prefix}{topic}_{partition}_{offset}
Value: {"raw_id":"xxx", "reason":"missing_required_field:md5", "time":"2026-06-01 12:00:00", "raw_message":"..."}
辅助元数据:
{prefix}__count__— 累计计数(只增不减,供监控用){prefix}__recent__— 最近 20 条摘要(LIST)
问题规模
某队列由于上游持续推送缺少必填字段的消息,积累了 1,371 万条 失败记录。我们需要一个"按错误原因批量清理"的功能——只删除 reason 包含特定关键词的消息,保留其他。
方案演进
第一版:Python 逐条循环
最直觉的实现:
import redis
import json
def purge_by_reason(r, prefix, reason_filter):
"""按 reason 筛选删除失败消息"""
cursor = 0
deleted = 0
while True:
# Step 1: SCAN 一批 key
cursor, keys = r.scan(cursor, match=f"{prefix}*", count=5000)
if keys:
# Step 2: 过滤掉元数据 key
to_check = [k for k in keys
if "__count__" not in k
and "__recent__" not in k]
# Step 3: Pipeline 批量取值
pipe = r.pipeline(transaction=False)
for k in to_check:
pipe.get(k)
vals = pipe.execute()
# Step 4: Python 侧逐条解析 JSON,判断 reason
to_delete = []
for k, val in zip(to_check, vals):
if val:
d = json.loads(val) # 解析 JSON
if reason_filter in d.get("reason", ""):
to_delete.append(k)
# Step 5: 删除匹配的 key
if to_delete:
r.unlink(*to_delete)
deleted += len(to_delete)
if cursor == 0:
break
return deleted性能分析
对于 1300 万条 key,SCAN COUNT=5000,每轮处理流程:
┌─────────┐ ┌─────────┐
│ Python │ │ Redis │
└────┬────┘ └────┬────┘
│ SCAN 5000 keys │
│─────────────────→│ RTT #1: ~0.5ms
│←─────────────────│ 返回 5000 个 key name
│ │
│ PIPELINE GET ×5000│
│─────────────────→│ RTT #2: ~2ms(含序列化 5000 个 value)
│←─────────────────│ 返回 5000 个 JSON string(每个 ~500B)
│ │ → 网络传输 ~2.5MB
│ [Python 解析 5000 次 JSON]
│ [Python 字符串匹配]
│ │
│ UNLINK matched │
│─────────────────→│ RTT #3: ~0.3ms
│←─────────────────│
└───────────────────┘
单轮耗时: ~5-10ms (网络) + ~20ms (Python JSON 解析)
总轮数: 13,000,000 / 5,000 = 2,600 轮
预估总耗时: 2,600 × 30ms = ~78 秒 (理论)
实际远比理论慢,原因:
- 网络 RTT 不是 0.5ms:跨机房 Redis 通常 2-5ms
- Pipeline GET 的返回体积巨大:5000 × 500B = 2.5MB/轮,带宽成瓶颈
- Python GIL + json.loads:单线程解析 5000 条 JSON 比想象中慢
- Redis 单线程阻塞:大 Pipeline 执行期间其他请求排队
实测结果:跑了数小时仍未完成,前端 HTTP 请求早已超时。
第二版:Lua 脚本(服务端执行)
核心原理
Redis 内置 Lua 解释器,通过 EVAL / EVALSHA 命令可以在服务端原子地执行脚本。关键优势:
- 零网络往返:GET 取到的 value 不需要传回客户端,在 Lua 里直接判断
- 原子性:脚本执行期间不会被其他命令打断(单线程模型)
- 批量操作:一次调用完成 SCAN + GET + 判断 + UNLINK 全流程
┌─────────┐ ┌──────────────────────────────┐
│ Python │ │ Redis Server │
└────┬────┘ │ ┌─────────────────────────┐ │
│ EVALSHA │ │ Lua Script │ │
│ (cursor, │ │ │ │
│ pattern, │ │ SCAN → keys[] │ │
│ reason) │ │ for each key: │ │
│─────────────→│ │ val = GET(key) │ │ ← 内存操作,无网络
│ │ │ if match(val, reason) │ │
│ │ │ batch += key │ │
│ │ │ if #batch >= 500 │ │
│ │ │ UNLINK(batch) │ │ ← 内部命令调用
│ │ │ return {cursor, count} │ │
│←─────────────│ └─────────────────────────┘ │
│ 仅返回两个数字 └──────────────────────────────┘
│ (next_cursor, deleted_count)
每轮只有 1 次网络往返,且返回的数据量极小(两个整数)。
全量删除 Lua 脚本(不过滤 reason)
当确认要清理整个队列时,无需读取 value,直接 SCAN + UNLINK:
lua_purge_all = r.register_script("""
local cursor = ARGV[1]
local pattern = ARGV[2]
-- 在 Redis 服务端执行 SCAN
local result = redis.call('SCAN', cursor, 'MATCH', pattern, 'COUNT', 10000)
local next_cursor = result[1]
local keys = result[2]
local count = 0
local batch = {}
for i, key in ipairs(keys) do
-- 跳过元数据 key(plain string.find,第4个参数=true 表示不用正则)
if not string.find(key, '__count__', 1, true)
and not string.find(key, '__recent__', 1, true)
and not string.find(key, '__messages__', 1, true) then
table.insert(batch, key)
-- 攒满 500 个统一 UNLINK,减少内部命令调用次数
if #batch >= 500 then
redis.call('UNLINK', unpack(batch))
count = count + #batch
batch = {}
end
end
end
-- 处理尾部不满 500 的残留
if #batch > 0 then
redis.call('UNLINK', unpack(batch))
count = count + #batch
end
-- 返回下一轮 cursor 和本轮删除数
return {next_cursor, count}
""")Python 调用方:
cursor = "0"
total_deleted = 0
while True:
# 每次调用只有 1 次网络往返
result = lua_purge_all(args=[cursor, f"{prefix}*"])
cursor = str(result[0]) # Redis 返回的是 bytes/int
total_deleted += result[1]
# 更新进度(供前端轮询)
progress["deleted"] = total_deleted
if cursor == "0": # SCAN 遍历完成
break按 reason 筛选删除 Lua 脚本
需要在 Redis 服务端执行 GET + 字符串匹配:
lua_purge_by_reason = r.register_script("""
local cursor = ARGV[1]
local pattern = ARGV[2]
local reason = ARGV[3] -- 要匹配的 reason 子串
local result = redis.call('SCAN', cursor, 'MATCH', pattern, 'COUNT', 10000)
local next_cursor = result[1]
local keys = result[2]
local count = 0
local batch = {}
for i, key in ipairs(keys) do
if not string.find(key, '__count__', 1, true)
and not string.find(key, '__recent__', 1, true)
and not string.find(key, '__messages__', 1, true) then
-- 在 Redis 服务端直接 GET,value 不出网络
local val = redis.call('GET', key)
-- string.find 做子串匹配
-- value 格式: {"reason":"missing_required_field:md5", ...}
-- 直接搜索 reason 子串即可,不需要 JSON 解析
if val and string.find(val, reason, 1, true) then
table.insert(batch, key)
if #batch >= 500 then
redis.call('UNLINK', unpack(batch))
count = count + #batch
batch = {}
end
end
end
end
if #batch > 0 then
redis.call('UNLINK', unpack(batch))
count = count + #batch
end
return {next_cursor, count}
""")调用方式完全相同,只是多传一个 reason 参数:
cursor = "0"
total_deleted = 0
while True:
result = lua_purge_by_reason(
args=[cursor, f"{prefix}*", "missing_required_field:md5"]
)
cursor = str(result[0])
total_deleted += result[1]
progress["deleted"] = total_deleted
if cursor == "0":
break为什么用 string.find 而不是 JSON 解析?
Redis 内置的 Lua 版本是 5.1(嵌入式精简版),可以通过 cjson.decode 解析 JSON:
local d = cjson.decode(val)
if d and d.reason and string.find(d.reason, reason, 1, true) then ...但 cjson.decode 在 Lua 中的性能远不如 C 层面的 string.find。我们的 value 格式是固定的:
{"raw_id":"xxx","reason":"missing_required_field:md5","time":"...","raw_message":"..."}reason 的值不会出现在其他字段中(raw_message 是 base64 或原始 Kafka 消息,不会包含 missing_required_field 这种字符串),所以直接用 string.find(val, reason, 1, true) 做子串匹配是安全且高效的。
如果你的场景中 reason 子串可能出现在 value 的其他位置,可以用更精确的模式:
-- 匹配 "reason":"...reason_filter..." 格式
if val and string.find(val, '"reason":"[^"]*' .. reason, 1, false) then性能对比
| 维度 | Python 循环 | Lua 脚本 |
|---|---|---|
| 网络往返 / 万条 key | ~20,000 次 | 1 次 |
| 数据传输量 / 万条 | ~5MB (value 全传回) | ~16B (仅 cursor+count) |
| CPU 开销位置 | Python 侧 (GIL) | Redis 侧 (C 内嵌 Lua) |
| 实测 1300 万 key | 8-10 小时 | ~10 分钟 |
| 对 Redis 其他请求的影响 | 长期占用连接 | 每轮阻塞 ~50-100ms |
工程实践:异步任务 + 进度条
千万级 key 即使用 Lua 也需要 ~10 分钟,不能在 HTTP 请求中同步等待。完整方案:
import threading
import uuid
# 内存中的任务状态
_tasks = {}
def purge_worker(task_id, prefix, reason_filter):
"""后台线程执行 Lua 脚本循环"""
task = _tasks[task_id]
try:
r = get_redis()
# 选择 Lua 脚本
if reason_filter:
script = lua_purge_by_reason
args_fn = lambda c: [c, f"{prefix}*", reason_filter]
else:
script = lua_purge_all
args_fn = lambda c: [c, f"{prefix}*"]
cursor = "0"
deleted = 0
while True:
result = script(args=args_fn(cursor))
cursor = str(result[0])
deleted += result[1]
task["deleted"] = deleted # 进度更新(线程安全:GIL 保证 int 赋值原子)
if cursor == "0":
break
task["status"] = "done"
task["message"] = f"完成,共删除 {deleted:,} 条"
except Exception as e:
task["status"] = "error"
task["message"] = str(e)
# API: 提交清理任务
@app.post("/api/purge")
def start_purge(body: dict):
task_id = str(uuid.uuid4())[:8]
_tasks[task_id] = {"status": "running", "deleted": 0}
t = threading.Thread(
target=purge_worker,
args=(task_id, body["prefix"], body.get("reason_filter", "")),
daemon=True
)
t.start()
return {"task_id": task_id, "message": "已启动"}
# API: 查询进度
@app.get("/api/purge_status/{task_id}")
def purge_status(task_id: str):
task = _tasks.get(task_id)
if not task:
raise HTTPException(404)
return task前端每 2 秒轮询进度:
const poll = setInterval(async () => {
const resp = await fetch(`/api/purge_status/${taskId}`);
const data = await resp.json();
// 更新进度条
const pct = Math.round((data.deleted / estimated) * 100);
progressBar.style.width = pct + '%';
progressText.textContent = `已删除 ${data.deleted.toLocaleString()} 条 (${pct}%)`;
if (data.status === 'done' || data.status === 'error') {
clearInterval(poll);
showResult(data.message);
}
}, 2000);注意事项
1. Lua 脚本会阻塞 Redis 主线程
Redis 是单线程模型,Lua 脚本执行期间所有其他命令排队等待。单次 SCAN 10000 + GET + UNLINK 大约耗时 50-100ms。如果你的 Redis 有严格的延迟要求(如 p99 < 10ms),可以:
- 把
COUNT调小(如 2000),牺牲总速度换取更短的单次阻塞 - 在业务低峰期执行
2. UNLINK vs DEL
DEL:同步释放内存,大 value 时会阻塞UNLINK:异步释放内存(另起线程回收),主线程只做标记删除
千万级 key 场景必须用 UNLINK。
3. SCAN COUNT 不是精确的
SCAN ... COUNT 10000 不保证返回恰好 10000 个 key,可能多也可能少。这只是给 Redis 的"提示",实际返回数取决于哈希表的桶分布。
4. Cluster 模式的限制
Redis Cluster 中,Lua 脚本只能操作同一个 slot 的 key。如果 key 分布在不同 slot,需要按 slot 分组执行。单机/哨兵模式无此限制。
5. 脚本注册 vs 每次 EVAL
register_script 会先 SCRIPT LOAD 得到 SHA,后续调用 EVALSHA,避免每次传输完整脚本文本。对于循环调用数千次的场景,这个优化有意义。
总结
| 优化阶段 | 方案 | 耗时 | 瓶颈 |
|---|---|---|---|
| V1 | Python SCAN + Pipeline GET + json.loads + UNLINK | 8-10h | 网络 RTT × 千万次 |
| V2 | Lua 脚本服务端执行 (SCAN + GET + string.find + UNLINK) | ~10min | Redis 内存带宽 |
核心收益来自消除网络往返——当你的操作模式是"读一个值、判断、决定删不删"时,把判断逻辑推到数据所在的位置(Redis 服务端)永远是对的。这也是 Redis Lua 脚本最经典的应用场景之一。