Mokeke's Blog
返回文章列表

Redis 千万级 Key 批量删除:用 Lua 脚本从 10 小时优化到 10 分钟

问题场景

我们有一套数据管线的失败重试机制:当 Kafka 消息处理失败时,将消息写入 Redis 作为"失败队列",等待人工确认后重放或丢弃。

存储结构

采用"每条消息一个 Key"的散列模式(旧版设计):

Key:   {prefix}{topic}_{partition}_{offset}
Value: {"raw_id":"xxx", "reason":"missing_required_field:md5", "time":"2026-06-01 12:00:00", "raw_message":"..."}

辅助元数据:

  • {prefix}__count__ — 累计计数(只增不减,供监控用)
  • {prefix}__recent__ — 最近 20 条摘要(LIST)

问题规模

某队列由于上游持续推送缺少必填字段的消息,积累了 1,371 万条 失败记录。我们需要一个"按错误原因批量清理"的功能——只删除 reason 包含特定关键词的消息,保留其他。


方案演进

第一版:Python 逐条循环

最直觉的实现:

import redis
import json
 
def purge_by_reason(r, prefix, reason_filter):
    """按 reason 筛选删除失败消息"""
    cursor = 0
    deleted = 0
    while True:
        # Step 1: SCAN 一批 key
        cursor, keys = r.scan(cursor, match=f"{prefix}*", count=5000)
        
        if keys:
            # Step 2: 过滤掉元数据 key
            to_check = [k for k in keys 
                        if "__count__" not in k 
                        and "__recent__" not in k]
            
            # Step 3: Pipeline 批量取值
            pipe = r.pipeline(transaction=False)
            for k in to_check:
                pipe.get(k)
            vals = pipe.execute()
            
            # Step 4: Python 侧逐条解析 JSON,判断 reason
            to_delete = []
            for k, val in zip(to_check, vals):
                if val:
                    d = json.loads(val)  # 解析 JSON
                    if reason_filter in d.get("reason", ""):
                        to_delete.append(k)
            
            # Step 5: 删除匹配的 key
            if to_delete:
                r.unlink(*to_delete)
                deleted += len(to_delete)
        
        if cursor == 0:
            break
    
    return deleted

性能分析

对于 1300 万条 key,SCAN COUNT=5000,每轮处理流程:

┌─────────┐       ┌─────────┐
│  Python │       │  Redis  │
└────┬────┘       └────┬────┘
     │  SCAN 5000 keys  │
     │─────────────────→│   RTT #1: ~0.5ms
     │←─────────────────│   返回 5000 个 key name
     │                   │
     │  PIPELINE GET ×5000│
     │─────────────────→│   RTT #2: ~2ms(含序列化 5000 个 value)
     │←─────────────────│   返回 5000 个 JSON string(每个 ~500B)
     │                   │         → 网络传输 ~2.5MB
     │  [Python 解析 5000 次 JSON]
     │  [Python 字符串匹配]
     │                   │
     │  UNLINK matched   │
     │─────────────────→│   RTT #3: ~0.3ms
     │←─────────────────│
     └───────────────────┘
     
     单轮耗时: ~5-10ms (网络) + ~20ms (Python JSON 解析)
     总轮数: 13,000,000 / 5,000 = 2,600 轮
     预估总耗时: 2,600 × 30ms = ~78 秒 (理论)

实际远比理论慢,原因:

  1. 网络 RTT 不是 0.5ms:跨机房 Redis 通常 2-5ms
  2. Pipeline GET 的返回体积巨大:5000 × 500B = 2.5MB/轮,带宽成瓶颈
  3. Python GIL + json.loads:单线程解析 5000 条 JSON 比想象中慢
  4. Redis 单线程阻塞:大 Pipeline 执行期间其他请求排队

实测结果:跑了数小时仍未完成,前端 HTTP 请求早已超时。


第二版:Lua 脚本(服务端执行)

核心原理

Redis 内置 Lua 解释器,通过 EVAL / EVALSHA 命令可以在服务端原子地执行脚本。关键优势:

  1. 零网络往返:GET 取到的 value 不需要传回客户端,在 Lua 里直接判断
  2. 原子性:脚本执行期间不会被其他命令打断(单线程模型)
  3. 批量操作:一次调用完成 SCAN + GET + 判断 + UNLINK 全流程
┌─────────┐         ┌──────────────────────────────┐
│  Python │         │         Redis Server          │
└────┬────┘         │  ┌─────────────────────────┐  │
     │  EVALSHA     │  │      Lua Script          │  │
     │  (cursor,    │  │                          │  │
     │   pattern,   │  │  SCAN → keys[]          │  │
     │   reason)    │  │  for each key:           │  │
     │─────────────→│  │    val = GET(key)        │  │  ← 内存操作,无网络
     │              │  │    if match(val, reason) │  │
     │              │  │      batch += key        │  │
     │              │  │    if #batch >= 500       │  │
     │              │  │      UNLINK(batch)       │  │  ← 内部命令调用
     │              │  │  return {cursor, count}  │  │
     │←─────────────│  └─────────────────────────┘  │
     │  仅返回两个数字  └──────────────────────────────┘
     │  (next_cursor, deleted_count)

每轮只有 1 次网络往返,且返回的数据量极小(两个整数)。

全量删除 Lua 脚本(不过滤 reason)

当确认要清理整个队列时,无需读取 value,直接 SCAN + UNLINK:

lua_purge_all = r.register_script("""
    local cursor = ARGV[1]
    local pattern = ARGV[2]
    
    -- 在 Redis 服务端执行 SCAN
    local result = redis.call('SCAN', cursor, 'MATCH', pattern, 'COUNT', 10000)
    local next_cursor = result[1]
    local keys = result[2]
    
    local count = 0
    local batch = {}
    
    for i, key in ipairs(keys) do
        -- 跳过元数据 key(plain string.find,第4个参数=true 表示不用正则)
        if not string.find(key, '__count__', 1, true)
           and not string.find(key, '__recent__', 1, true)
           and not string.find(key, '__messages__', 1, true) then
            table.insert(batch, key)
            -- 攒满 500 个统一 UNLINK,减少内部命令调用次数
            if #batch >= 500 then
                redis.call('UNLINK', unpack(batch))
                count = count + #batch
                batch = {}
            end
        end
    end
    
    -- 处理尾部不满 500 的残留
    if #batch > 0 then
        redis.call('UNLINK', unpack(batch))
        count = count + #batch
    end
    
    -- 返回下一轮 cursor 和本轮删除数
    return {next_cursor, count}
""")

Python 调用方:

cursor = "0"
total_deleted = 0
 
while True:
    # 每次调用只有 1 次网络往返
    result = lua_purge_all(args=[cursor, f"{prefix}*"])
    cursor = str(result[0])     # Redis 返回的是 bytes/int
    total_deleted += result[1]
    
    # 更新进度(供前端轮询)
    progress["deleted"] = total_deleted
    
    if cursor == "0":  # SCAN 遍历完成
        break

按 reason 筛选删除 Lua 脚本

需要在 Redis 服务端执行 GET + 字符串匹配:

lua_purge_by_reason = r.register_script("""
    local cursor = ARGV[1]
    local pattern = ARGV[2]
    local reason = ARGV[3]    -- 要匹配的 reason 子串
    
    local result = redis.call('SCAN', cursor, 'MATCH', pattern, 'COUNT', 10000)
    local next_cursor = result[1]
    local keys = result[2]
    
    local count = 0
    local batch = {}
    
    for i, key in ipairs(keys) do
        if not string.find(key, '__count__', 1, true)
           and not string.find(key, '__recent__', 1, true)
           and not string.find(key, '__messages__', 1, true) then
            
            -- 在 Redis 服务端直接 GET,value 不出网络
            local val = redis.call('GET', key)
            
            -- string.find 做子串匹配
            -- value 格式: {"reason":"missing_required_field:md5", ...}
            -- 直接搜索 reason 子串即可,不需要 JSON 解析
            if val and string.find(val, reason, 1, true) then
                table.insert(batch, key)
                if #batch >= 500 then
                    redis.call('UNLINK', unpack(batch))
                    count = count + #batch
                    batch = {}
                end
            end
        end
    end
    
    if #batch > 0 then
        redis.call('UNLINK', unpack(batch))
        count = count + #batch
    end
    
    return {next_cursor, count}
""")

调用方式完全相同,只是多传一个 reason 参数:

cursor = "0"
total_deleted = 0
 
while True:
    result = lua_purge_by_reason(
        args=[cursor, f"{prefix}*", "missing_required_field:md5"]
    )
    cursor = str(result[0])
    total_deleted += result[1]
    progress["deleted"] = total_deleted
    
    if cursor == "0":
        break

为什么用 string.find 而不是 JSON 解析?

Redis 内置的 Lua 版本是 5.1(嵌入式精简版),可以通过 cjson.decode 解析 JSON:

local d = cjson.decode(val)
if d and d.reason and string.find(d.reason, reason, 1, true) then ...

cjson.decode 在 Lua 中的性能远不如 C 层面的 string.find。我们的 value 格式是固定的:

{"raw_id":"xxx","reason":"missing_required_field:md5","time":"...","raw_message":"..."}

reason 的值不会出现在其他字段中(raw_message 是 base64 或原始 Kafka 消息,不会包含 missing_required_field 这种字符串),所以直接用 string.find(val, reason, 1, true) 做子串匹配是安全且高效的。

如果你的场景中 reason 子串可能出现在 value 的其他位置,可以用更精确的模式:

-- 匹配 "reason":"...reason_filter..." 格式
if val and string.find(val, '"reason":"[^"]*' .. reason, 1, false) then

性能对比

维度Python 循环Lua 脚本
网络往返 / 万条 key~20,000 次1 次
数据传输量 / 万条~5MB (value 全传回)~16B (仅 cursor+count)
CPU 开销位置Python 侧 (GIL)Redis 侧 (C 内嵌 Lua)
实测 1300 万 key8-10 小时~10 分钟
对 Redis 其他请求的影响长期占用连接每轮阻塞 ~50-100ms

工程实践:异步任务 + 进度条

千万级 key 即使用 Lua 也需要 ~10 分钟,不能在 HTTP 请求中同步等待。完整方案:

import threading
import uuid
 
# 内存中的任务状态
_tasks = {}
 
def purge_worker(task_id, prefix, reason_filter):
    """后台线程执行 Lua 脚本循环"""
    task = _tasks[task_id]
    try:
        r = get_redis()
        
        # 选择 Lua 脚本
        if reason_filter:
            script = lua_purge_by_reason
            args_fn = lambda c: [c, f"{prefix}*", reason_filter]
        else:
            script = lua_purge_all
            args_fn = lambda c: [c, f"{prefix}*"]
        
        cursor = "0"
        deleted = 0
        while True:
            result = script(args=args_fn(cursor))
            cursor = str(result[0])
            deleted += result[1]
            task["deleted"] = deleted  # 进度更新(线程安全:GIL 保证 int 赋值原子)
            if cursor == "0":
                break
        
        task["status"] = "done"
        task["message"] = f"完成,共删除 {deleted:,} 条"
    except Exception as e:
        task["status"] = "error"
        task["message"] = str(e)
 
 
# API: 提交清理任务
@app.post("/api/purge")
def start_purge(body: dict):
    task_id = str(uuid.uuid4())[:8]
    _tasks[task_id] = {"status": "running", "deleted": 0}
    
    t = threading.Thread(
        target=purge_worker,
        args=(task_id, body["prefix"], body.get("reason_filter", "")),
        daemon=True
    )
    t.start()
    
    return {"task_id": task_id, "message": "已启动"}
 
 
# API: 查询进度
@app.get("/api/purge_status/{task_id}")
def purge_status(task_id: str):
    task = _tasks.get(task_id)
    if not task:
        raise HTTPException(404)
    return task

前端每 2 秒轮询进度:

const poll = setInterval(async () => {
    const resp = await fetch(`/api/purge_status/${taskId}`);
    const data = await resp.json();
    
    // 更新进度条
    const pct = Math.round((data.deleted / estimated) * 100);
    progressBar.style.width = pct + '%';
    progressText.textContent = `已删除 ${data.deleted.toLocaleString()} 条 (${pct}%)`;
    
    if (data.status === 'done' || data.status === 'error') {
        clearInterval(poll);
        showResult(data.message);
    }
}, 2000);

注意事项

1. Lua 脚本会阻塞 Redis 主线程

Redis 是单线程模型,Lua 脚本执行期间所有其他命令排队等待。单次 SCAN 10000 + GET + UNLINK 大约耗时 50-100ms。如果你的 Redis 有严格的延迟要求(如 p99 < 10ms),可以:

  • COUNT 调小(如 2000),牺牲总速度换取更短的单次阻塞
  • 在业务低峰期执行
  • DEL:同步释放内存,大 value 时会阻塞
  • UNLINK:异步释放内存(另起线程回收),主线程只做标记删除

千万级 key 场景必须用 UNLINK

3. SCAN COUNT 不是精确的

SCAN ... COUNT 10000 不保证返回恰好 10000 个 key,可能多也可能少。这只是给 Redis 的"提示",实际返回数取决于哈希表的桶分布。

4. Cluster 模式的限制

Redis Cluster 中,Lua 脚本只能操作同一个 slot 的 key。如果 key 分布在不同 slot,需要按 slot 分组执行。单机/哨兵模式无此限制。

5. 脚本注册 vs 每次 EVAL

register_script 会先 SCRIPT LOAD 得到 SHA,后续调用 EVALSHA,避免每次传输完整脚本文本。对于循环调用数千次的场景,这个优化有意义。


总结

优化阶段方案耗时瓶颈
V1Python SCAN + Pipeline GET + json.loads + UNLINK8-10h网络 RTT × 千万次
V2Lua 脚本服务端执行 (SCAN + GET + string.find + UNLINK)~10minRedis 内存带宽

核心收益来自消除网络往返——当你的操作模式是"读一个值、判断、决定删不删"时,把判断逻辑推到数据所在的位置(Redis 服务端)永远是对的。这也是 Redis Lua 脚本最经典的应用场景之一。