test_vector_backup_only.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. #!/usr/bin/env python3
  2. """
  3. 独立测试Vector表备份功能
  4. 只备份langchain_pg_collection和langchain_pg_embedding表
  5. """
  6. import asyncio
  7. import os
  8. from pathlib import Path
  9. from datetime import datetime
  10. async def test_vector_backup():
  11. """测试vector表备份功能"""
  12. print("🧪 开始测试Vector表备份功能...")
  13. print("=" * 50)
  14. # 1. 设置测试输出目录
  15. test_dir = Path("./test_vector_backup_output")
  16. test_dir.mkdir(exist_ok=True)
  17. print(f"📁 测试输出目录: {test_dir.resolve()}")
  18. try:
  19. # 2. 导入VectorTableManager
  20. from data_pipeline.trainer.vector_table_manager import VectorTableManager
  21. # 3. 创建管理器实例
  22. task_id = f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
  23. vector_manager = VectorTableManager(
  24. task_output_dir=str(test_dir),
  25. task_id=task_id
  26. )
  27. print(f"🆔 任务ID: {task_id}")
  28. print("🔧 VectorTableManager 创建成功")
  29. # 4. 执行备份(只备份,不清空)
  30. print("\n🗂️ 开始执行备份...")
  31. result = await vector_manager.execute_vector_management(
  32. backup=True, # 执行备份
  33. truncate=False # 不清空表
  34. )
  35. # 5. 显示结果
  36. print("\n📊 备份结果:")
  37. print("=" * 30)
  38. if result.get("backup_performed", False):
  39. print("✅ 备份状态: 已执行")
  40. tables_info = result.get("tables_backed_up", {})
  41. for table_name, info in tables_info.items():
  42. if info.get("success", False):
  43. print(f" ✅ {table_name}: {info['row_count']}行 -> {info['backup_file']} ({info['file_size']})")
  44. else:
  45. print(f" ❌ {table_name}: 失败 - {info.get('error', '未知错误')}")
  46. else:
  47. print("❌ 备份状态: 未执行")
  48. duration = result.get("duration", 0)
  49. print(f"⏱️ 总耗时: {duration:.2f}秒")
  50. errors = result.get("errors", [])
  51. if errors:
  52. print(f"⚠️ 错误信息: {'; '.join(errors)}")
  53. # 6. 检查生成的文件
  54. backup_dir = test_dir / "vector_bak"
  55. if backup_dir.exists():
  56. print(f"\n📂 备份文件目录: {backup_dir.resolve()}")
  57. backup_files = list(backup_dir.glob("*.csv"))
  58. if backup_files:
  59. print("📄 生成的备份文件:")
  60. for file in backup_files:
  61. file_size = file.stat().st_size
  62. print(f" 📄 {file.name} ({file_size} bytes)")
  63. else:
  64. print("⚠️ 未找到CSV备份文件")
  65. log_files = list(backup_dir.glob("*.txt"))
  66. if log_files:
  67. print("📋 日志文件:")
  68. for file in log_files:
  69. print(f" 📋 {file.name}")
  70. else:
  71. print("❌ 备份目录不存在")
  72. print("\n🎉 测试完成!")
  73. return True
  74. except Exception as e:
  75. print(f"\n❌ 测试失败: {e}")
  76. import traceback
  77. print("详细错误信息:")
  78. print(traceback.format_exc())
  79. return False
  80. def main():
  81. """主函数"""
  82. print("Vector表备份功能独立测试")
  83. print("测试目标: langchain_pg_collection, langchain_pg_embedding")
  84. print("数据库: 从 data_pipeline.config 自动获取连接配置")
  85. print()
  86. # 运行异步测试
  87. success = asyncio.run(test_vector_backup())
  88. if success:
  89. print("\n✅ 所有测试通过!")
  90. exit(0)
  91. else:
  92. print("\n❌ 测试失败!")
  93. exit(1)
  94. if __name__ == "__main__":
  95. main()