你的观察非常准确!确实,对于大多数应用场景,只需要保留最近几个checkpoint就够了,中间的大量checkpoint只是在浪费存储空间。你的分析完全正确!从你的截图可以看到wang1:20250729235038043
有36个checkpoint,但实际上只有最新的几个有意义。TTL确实不适合这个场景,因为它会"一刀切"地删除所有过期的checkpoint。
TTL的局限性:
实际需求:
def cleanup_old_checkpoints(checkpointer, thread_id: str, keep_count: int = 10):
"""保留最近N个checkpoint,删除其余的"""
try:
config = {"configurable": {"thread_id": thread_id}}
# 获取所有checkpoint(已按时间排序)
all_checkpoints = list(checkpointer.list(config))
print(f"Thread {thread_id}: 总共{len(all_checkpoints)}个checkpoint")
if len(all_checkpoints) <= keep_count:
print("无需清理")
return
# 保留最新的keep_count个,删除其余的
to_delete = all_checkpoints[keep_count:]
print(f"将删除{len(to_delete)}个旧checkpoint")
for checkpoint_tuple in to_delete:
try:
# 删除checkpoint
delete_config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_id": checkpoint_tuple.checkpoint["id"]
}
}
checkpointer.delete(delete_config)
print(f"已删除: {checkpoint_tuple.checkpoint['id'][:8]}...")
except Exception as e:
print(f"删除失败: {e}")
except Exception as e:
print(f"清理失败: {e}")
# 立即清理你的thread
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
cleanup_old_checkpoints(checkpointer, "wang1:20250729235038043", keep_count=5)
class AutoCleanupCheckpointer:
"""自动清理的Checkpointer包装器"""
def __init__(self, base_checkpointer, keep_count=10, cleanup_frequency=20):
self.base_checkpointer = base_checkpointer
self.keep_count = keep_count
self.cleanup_frequency = cleanup_frequency
self.operation_count = 0
def put(self, config, checkpoint, metadata, new_versions):
# 正常保存checkpoint
result = self.base_checkpointer.put(config, checkpoint, metadata, new_versions)
# 每N次操作触发一次清理
self.operation_count += 1
if self.operation_count % self.cleanup_frequency == 0:
thread_id = config.get("configurable", {}).get("thread_id")
if thread_id:
self._auto_cleanup(thread_id)
return result
def _auto_cleanup(self, thread_id):
"""自动清理该thread的旧checkpoint"""
try:
config = {"configurable": {"thread_id": thread_id}}
all_checkpoints = list(self.base_checkpointer.list(config))
if len(all_checkpoints) > self.keep_count:
to_delete = all_checkpoints[self.keep_count:]
for cp in to_delete:
delete_config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_id": cp.checkpoint["id"]
}
}
try:
self.base_checkpointer.delete(delete_config)
except:
pass # 忽略删除错误
print(f"自动清理{thread_id}:删除{len(to_delete)}个旧checkpoint")
except:
pass # 忽略清理错误
def __getattr__(self, name):
return getattr(self.base_checkpointer, name)
# 使用自动清理的checkpointer
base_checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
auto_checkpointer = AutoCleanupCheckpointer(base_checkpointer, keep_count=10)
graph = builder.compile(checkpointer=auto_checkpointer)
import threading
import time
def start_checkpoint_cleanup_service(checkpointer, keep_count=10, interval_hours=1):
"""启动后台清理服务"""
def cleanup_worker():
while True:
try:
print("开始定期checkpoint清理...")
# 这里需要获取所有thread_id
# 你可以维护一个thread_id列表,或者从Redis扫描
thread_ids = get_all_thread_ids() # 需要你实现这个函数
for thread_id in thread_ids:
cleanup_old_checkpoints(checkpointer, thread_id, keep_count)
print("定期清理完成")
except Exception as e:
print(f"定期清理出错: {e}")
# 等待下次清理
time.sleep(interval_hours * 3600)
# 启动后台线程
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
print(f"后台清理服务已启动,每{interval_hours}小时清理一次")
return cleanup_thread
# 清理你的thread,只保留最新5个
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
cleanup_old_checkpoints(checkpointer, "wang1:20250729235038043", keep_count=5)
# 从36个减少到5个,节省大量存储空间!
# 选择其中一种方式:
# 方式A: 集成到应用中(推荐)
auto_checkpointer = AutoCleanupCheckpointer(checkpointer, keep_count=10)
graph = builder.compile(checkpointer=auto_checkpointer)
# 方式B: 独立的定时清理
start_checkpoint_cleanup_service(checkpointer, keep_count=10, interval_hours=2)
你的观察非常准确:
推荐方案:保留最近10个checkpoint + 每次使用时自动清理
这样既能支持必要的回滚调试,又能有效控制存储成本!
在graph流程中执行checkpoint删除确实可能影响响应时间,特别是当需要删除的checkpoint很多时。你的担心完全合理!在graph流程中执行checkpoint删除确实会影响响应时间,特别是当需要删除很多checkpoint时。
# 用户期望的响应时间
def chat_node(state):
response = llm.invoke(state["messages"]) # 1-3秒
return {"messages": [response]}
# 如果加上清理操作
def chat_node_with_cleanup(state):
response = llm.invoke(state["messages"]) # 1-3秒
# 清理36个checkpoint可能需要额外的1-5秒!
cleanup_old_checkpoints(checkpointer, thread_id, keep_count=10)
return {"messages": [response]}
# 用户感受:原本2秒变成了6秒!
import threading
import queue
class BackgroundCheckpointCleaner:
"""后台checkpoint清理服务 - 不影响用户响应"""
def __init__(self, checkpointer, keep_count=10):
self.checkpointer = checkpointer
self.keep_count = keep_count
self.cleanup_queue = queue.Queue()
self.worker_thread = None
def request_cleanup(self, thread_id: str):
"""请求清理(非阻塞,立即返回)"""
self.cleanup_queue.put(thread_id)
print(f"已加入清理队列: {thread_id}")
def start_worker(self):
"""启动后台工作线程"""
def worker():
while True:
try:
thread_id = self.cleanup_queue.get(timeout=1)
print(f"后台清理开始: {thread_id}")
# 在后台执行清理,不影响用户请求
self._do_cleanup(thread_id)
print(f"后台清理完成: {thread_id}")
self.cleanup_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"后台清理出错: {e}")
self.worker_thread = threading.Thread(target=worker, daemon=True)
self.worker_thread.start()
print("后台清理服务已启动")
def _do_cleanup(self, thread_id: str):
"""实际清理操作(在后台线程中执行)"""
try:
config = {"configurable": {"thread_id": thread_id}}
all_checkpoints = list(self.checkpointer.list(config))
if len(all_checkpoints) <= self.keep_count:
return
to_delete = all_checkpoints[self.keep_count:]
for cp in to_delete:
delete_config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_id": cp.checkpoint["id"]
}
}
self.checkpointer.delete(delete_config)
print(f"后台删除了{len(to_delete)}个checkpoint")
except Exception as e:
print(f"后台清理失败: {e}")
# 全局清理服务
cleanup_service = BackgroundCheckpointCleaner(checkpointer, keep_count=10)
cleanup_service.start_worker()
class SmartCheckpointer:
"""智能checkpointer:达到阈值才触发后台清理"""
def __init__(self, base_checkpointer, cleanup_service, threshold=30):
self.base_checkpointer = base_checkpointer
self.cleanup_service = cleanup_service
self.threshold = threshold
self.checkpoint_counts = {}
def put(self, config, checkpoint, metadata, new_versions):
# 正常保存(不影响性能)
result = self.base_checkpointer.put(config, checkpoint, metadata, new_versions)
# 计数并在达到阈值时触发后台清理
thread_id = config.get("configurable", {}).get("thread_id")
if thread_id:
count = self.checkpoint_counts.get(thread_id, 0) + 1
self.checkpoint_counts[thread_id] = count
# 达到30个checkpoint时触发后台清理
if count >= self.threshold:
self.cleanup_service.request_cleanup(thread_id) # 非阻塞
self.checkpoint_counts[thread_id] = 10 # 重置计数
return result
def __getattr__(self, name):
return getattr(self.base_checkpointer, name)
# main.py
from langgraph.checkpoint.redis import RedisSaver
# 初始化
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
# 启动后台清理服务
cleanup_service = BackgroundCheckpointCleaner(checkpointer, keep_count=10)
cleanup_service.start_worker()
# 使用智能checkpointer
smart_checkpointer = SmartCheckpointer(checkpointer, cleanup_service, threshold=30)
# 编译图
graph = builder.compile(checkpointer=smart_checkpointer)
def chat_api(user_input: str, thread_id: str):
"""聊天API - 快速响应,不被清理阻塞"""
config = {"configurable": {"thread_id": thread_id}}
# 快速响应用户(1-3秒)
result = graph.invoke({"messages": [HumanMessage(user_input)]}, config)
# 清理操作在后台异步执行,不影响响应时间
return result["messages"][-1].content
import schedule
import threading
def setup_scheduled_cleanup():
"""设置定时清理:每天凌晨2点执行"""
def daily_cleanup():
print("开始每日checkpoint清理...")
# 获取活跃的thread_id列表
active_threads = ["wang1:20250729235038043"] # 你的thread列表
for thread_id in active_threads:
cleanup_service.request_cleanup(thread_id)
print("每日清理任务已提交到后台队列")
schedule.every().day.at("02:00").do(daily_cleanup)
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(3600) # 每小时检查一次
threading.Thread(target=run_scheduler, daemon=True).start()
print("定时清理已设置:每天凌晨2点执行")
# 一次性清理脚本(离线运行)
def cleanup_existing_checkpoints():
"""清理现有的冗余checkpoint"""
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
thread_id = "wang1:20250729235038043"
config = {"configurable": {"thread_id": thread_id}}
all_checkpoints = list(checkpointer.list(config))
print(f"当前有{len(all_checkpoints)}个checkpoint")
if len(all_checkpoints) > 10:
to_delete = all_checkpoints[10:] # 保留最新10个
for cp in to_delete:
delete_config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_id": cp.checkpoint["id"]
}
}
checkpointer.delete(delete_config)
print(f"删除了{len(to_delete)}个旧checkpoint")
else:
print("无需清理")
# 运行一次立即清理
if __name__ == "__main__":
cleanup_existing_checkpoints()
你的直觉是对的!在graph流程中执行checkpoint删除会明显影响用户体验。
推荐策略:
核心原则:用户响应优先,清理操作后台执行
这样既保证了快速响应,又能有效管理存储空间!
方案 | 优势 | 劣势 | 推荐场景 |
---|---|---|---|
Flask内置定时器 | 简单,集成度高 | 影响主进程,不够健壮 | 开发测试 |
外部脚本+cron | 独立进程,稳定 | 需要额外配置 | 生产环境推荐 |
Celery | 专业,功能强大 | 配置复杂,需要Redis/RabbitMQ | 大型应用 |
APScheduler | 功能丰富,易集成 | 在Flask进程内运行 | 中小型应用 |
# app.py
from flask import Flask, jsonify, request
from langgraph.checkpoint.redis import RedisSaver
import time
import threading
app = Flask(__name__)
# 初始化checkpointer
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
def cleanup_thread_checkpoints(thread_id: str, keep_count: int = 10):
"""清理单个thread的旧checkpoint"""
try:
config = {"configurable": {"thread_id": thread_id}}
all_checkpoints = list(checkpointer.list(config))
if len(all_checkpoints) <= keep_count:
return {"status": "no_cleanup_needed", "total": len(all_checkpoints)}
to_delete = all_checkpoints[keep_count:]
deleted_count = 0
for checkpoint_tuple in to_delete:
try:
delete_config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_id": checkpoint_tuple.checkpoint["id"]
}
}
checkpointer.delete(delete_config)
deleted_count += 1
except Exception as e:
print(f"删除checkpoint失败: {e}")
return {
"status": "success",
"total_checkpoints": len(all_checkpoints),
"deleted_count": deleted_count,
"remaining_count": len(all_checkpoints) - deleted_count
}
except Exception as e:
return {"status": "error", "message": str(e)}
def get_all_thread_ids():
"""获取所有thread_id - 根据你的实际情况实现"""
# 方法1: 如果你在数据库中维护了thread_id列表
# return db.query("SELECT DISTINCT thread_id FROM conversations")
# 方法2: 从Redis扫描
try:
import redis
redis_client = redis.from_url("redis://localhost:6379")
thread_ids = set()
for key in redis_client.scan_iter(match="checkpoint:*"):
key_str = key.decode('utf-8')
parts = key_str.split(':')
if len(parts) >= 3:
thread_id = parts[1]
thread_ids.add(thread_id)
return list(thread_ids)
except Exception as e:
print(f"获取thread_id失败: {e}")
return []
@app.route('/api/cleanup/thread/<thread_id>', methods=['POST'])
def cleanup_single_thread(thread_id):
"""清理单个thread的checkpoint"""
keep_count = request.json.get('keep_count', 10) if request.json else 10
result = cleanup_thread_checkpoints(thread_id, keep_count)
return jsonify(result)
@app.route('/api/cleanup/all', methods=['POST'])
def cleanup_all_threads():
"""清理所有thread的checkpoint"""
keep_count = request.json.get('keep_count', 10) if request.json else 10
thread_ids = get_all_thread_ids()
results = {}
total_deleted = 0
for thread_id in thread_ids:
result = cleanup_thread_checkpoints(thread_id, keep_count)
results[thread_id] = result
if result["status"] == "success":
total_deleted += result["deleted_count"]
return jsonify({
"status": "completed",
"processed_threads": len(thread_ids),
"total_deleted": total_deleted,
"results": results
})
@app.route('/api/cleanup/stats', methods=['GET'])
def cleanup_stats():
"""获取checkpoint统计信息"""
thread_ids = get_all_thread_ids()
stats = {}
total_checkpoints = 0
for thread_id in thread_ids:
try:
config = {"configurable": {"thread_id": thread_id}}
checkpoints = list(checkpointer.list(config))
count = len(checkpoints)
stats[thread_id] = count
total_checkpoints += count
except Exception as e:
stats[thread_id] = f"error: {e}"
return jsonify({
"total_threads": len(thread_ids),
"total_checkpoints": total_checkpoints,
"thread_stats": stats
})
# 你的其他API路由...
@app.route('/api/chat', methods=['POST'])
def chat():
# 你的聊天逻辑
pass
if __name__ == '__main__':
app.run(debug=True)
# cleanup_scheduler.py
import requests
import time
import schedule
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class CheckpointCleanupScheduler:
def __init__(self, flask_api_url="http://localhost:5000", keep_count=10):
self.api_url = flask_api_url
self.keep_count = keep_count
def cleanup_all_checkpoints(self):
"""调用Flask API清理所有checkpoint"""
try:
logger.info("开始定时清理所有checkpoint...")
# 调用Flask API
response = requests.post(
f"{self.api_url}/api/cleanup/all",
json={"keep_count": self.keep_count},
timeout=300 # 5分钟超时
)
if response.status_code == 200:
result = response.json()
logger.info(f"清理完成: 处理了{result['processed_threads']}个thread,"
f"删除了{result['total_deleted']}个checkpoint")
return True
else:
logger.error(f"清理失败: HTTP {response.status_code}")
return False
except Exception as e:
logger.error(f"清理出错: {e}")
return False
def get_cleanup_stats(self):
"""获取清理统计信息"""
try:
response = requests.get(f"{self.api_url}/api/cleanup/stats", timeout=30)
if response.status_code == 200:
stats = response.json()
logger.info(f"当前状态: {stats['total_threads']}个thread,"
f"共{stats['total_checkpoints']}个checkpoint")
return stats
else:
logger.error(f"获取统计失败: HTTP {response.status_code}")
return None
except Exception as e:
logger.error(f"获取统计出错: {e}")
return None
def cleanup_specific_thread(self, thread_id: str):
"""清理特定thread"""
try:
response = requests.post(
f"{self.api_url}/api/cleanup/thread/{thread_id}",
json={"keep_count": self.keep_count},
timeout=60
)
if response.status_code == 200:
result = response.json()
logger.info(f"清理thread {thread_id}: {result}")
return True
else:
logger.error(f"清理thread {thread_id}失败: HTTP {response.status_code}")
return False
except Exception as e:
logger.error(f"清理thread {thread_id}出错: {e}")
return False
# 创建调度器实例
scheduler = CheckpointCleanupScheduler(
flask_api_url="http://localhost:5000", # 你的Flask应用地址
keep_count=10 # 保留最近10个checkpoint
)
# 设置定时任务
def daily_cleanup():
"""每日清理任务"""
logger.info("=== 开始每日checkpoint清理 ===")
# 先获取统计信息
stats = scheduler.get_cleanup_stats()
# 执行清理
success = scheduler.cleanup_all_checkpoints()
if success:
logger.info("=== 每日清理完成 ===")
else:
logger.error("=== 每日清理失败 ===")
def weekly_stats():
"""每周统计报告"""
logger.info("=== 每周checkpoint统计 ===")
scheduler.get_cleanup_stats()
# 设置定时计划
schedule.every().day.at("02:00").do(daily_cleanup) # 每天凌晨2点清理
schedule.every().monday.at("09:00").do(weekly_stats) # 每周一上午9点统计
# 主循环
def main():
logger.info("Checkpoint清理调度器已启动")
logger.info("清理计划: 每天凌晨2:00执行")
logger.info("统计计划: 每周一上午9:00执行")
while True:
try:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
except KeyboardInterrupt:
logger.info("调度器已停止")
break
except Exception as e:
logger.error(f"调度器出错: {e}")
time.sleep(60)
if __name__ == "__main__":
main()
# /etc/systemd/system/checkpoint-cleanup.service
[Unit]
Description=Checkpoint Cleanup Scheduler
After=network.target
[Service]
Type=simple
User=your_user
WorkingDirectory=/path/to/your/project
Environment=PATH=/path/to/your/venv/bin
ExecStart=/path/to/your/venv/bin/python cleanup_scheduler.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
# 启用服务
sudo systemctl daemon-reload
sudo systemctl enable checkpoint-cleanup.service
sudo systemctl start checkpoint-cleanup.service
# 查看状态
sudo systemctl status checkpoint-cleanup.service
# cleanup_once.py - 单次执行的清理脚本
import requests
import sys
def cleanup_once():
try:
response = requests.post(
"http://localhost:5000/api/cleanup/all",
json={"keep_count": 10},
timeout=300
)
if response.status_code == 200:
result = response.json()
print(f"清理成功: 删除了{result['total_deleted']}个checkpoint")
return 0
else:
print(f"清理失败: HTTP {response.status_code}")
return 1
except Exception as e:
print(f"清理出错: {e}")
return 1
if __name__ == "__main__":
sys.exit(cleanup_once())
# 添加到crontab
crontab -e
# 每天凌晨2点执行清理
0 2 * * * /path/to/venv/bin/python /path/to/cleanup_once.py >> /var/log/checkpoint_cleanup.log 2>&1
# app.py - 如果坚持在Flask内部做定时
import threading
import time
def start_cleanup_thread():
"""启动清理线程"""
def cleanup_worker():
while True:
try:
# 等待24小时
time.sleep(24 * 3600)
# 执行清理
thread_ids = get_all_thread_ids()
for thread_id in thread_ids:
cleanup_thread_checkpoints(thread_id, keep_count=10)
print("定时清理完成")
except Exception as e:
print(f"定时清理出错: {e}")
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
print("内置清理线程已启动")
# 在Flask启动时调用
if __name__ == '__main__':
start_cleanup_thread() # 启动清理线程
app.run(debug=True)
对于生产环境,我强烈推荐:
这样的架构: