为 data_pipeline 添加两个新参数来管理 vector 表数据:
--backup-vector-tables
: 备份vector表数据--truncate-vector-tables
: 清空vector表数据(自动启用备份)--backup-vector-tables
--truncate-vector-tables
--truncate-vector-tables
时自动启用 --backup-vector-tables
python -m data_pipeline.schema_workflow
(包括使用 --skip-training-load
的情况)python -m data_pipeline.trainer.run_training
POST /api/v0/data_pipeline/tasks/{task_id}/execute
schema_workflow
使用了 --skip-training-load
参数,仍然要支持 --backup-vector-tables
和 --truncate-vector-tables
参数的执行schema_workflow
的完整流程包含了 run_training
的调用,需要避免重复执行vector表管理操作langchain_pg_collection
:只备份,不清空langchain_pg_embedding
:备份并清空# schema_workflow.py 新增参数
--backup-vector-tables # 备份vector表数据
--truncate-vector-tables # 清空vector表数据(自动启用备份)
# run_training.py 新增参数
--backup-vector-tables # 备份vector表数据
--truncate-vector-tables # 清空vector表数据(自动启用备份)
CLI参数 -> SchemaWorkflowOrchestrator -> process_training_files -> VectorTableManager
位置: data_pipeline/trainer/vector_table_manager.py
class VectorTableManager:
"""Vector表管理器,负责备份和清空操作"""
def __init__(self, task_output_dir: str, task_id: str = None):
"""
Args:
task_output_dir: 任务输出目录(用于存放备份文件)
task_id: 任务ID(用于日志记录)
Note:
数据库连接将从data_pipeline.config.SCHEMA_TOOLS_CONFIG自动获取
"""
self.task_output_dir = task_output_dir
self.task_id = task_id
# 从data_pipeline.config获取配置
from data_pipeline.config import SCHEMA_TOOLS_CONFIG
self.config = SCHEMA_TOOLS_CONFIG.get("vector_table_management", {})
# 初始化日志
if task_id:
from data_pipeline.dp_logging import get_logger
self.logger = get_logger("VectorTableManager", task_id)
else:
import logging
self.logger = logging.getLogger("VectorTableManager")
async def backup_vector_tables(self) -> Dict[str, Any]:
"""备份vector表数据"""
async def truncate_vector_tables(self) -> Dict[str, Any]:
"""清空vector表数据(只清空langchain_pg_embedding)"""
async def execute_vector_management(self, backup: bool, truncate: bool) -> Dict[str, Any]:
"""执行vector表管理操作"""
def get_connection(self):
"""获取pgvector数据库连接(从data_pipeline.config获取配置)"""
def _format_file_size(self, size_bytes: int) -> str:
"""格式化文件大小显示"""
async def execute_vector_management(self, backup: bool, truncate: bool) -> Dict[str, Any]:
"""执行vector表管理操作的主流程"""
start_time = time.time()
# 1. 参数验证和自动启用逻辑
if truncate and not backup:
backup = True
self.logger.info("🔄 启用truncate时自动启用backup")
if not backup and not truncate:
self.logger.info("⏭️ 未启用vector表管理,跳过操作")
return {"backup_performed": False, "truncate_performed": False}
# 2. 初始化结果统计
result = {
"backup_performed": backup,
"truncate_performed": truncate,
"tables_backed_up": {},
"truncate_results": {},
"errors": [],
"backup_directory": None,
"duration": 0
}
try:
# 3. 创建备份目录
backup_dir = Path(self.task_output_dir) / self.config["backup_directory"]
if backup:
backup_dir.mkdir(parents=True, exist_ok=True)
result["backup_directory"] = str(backup_dir)
self.logger.info(f"📁 备份目录: {backup_dir}")
# 4. 执行备份操作
if backup:
self.logger.info("🗂️ 开始备份vector表...")
backup_results = await self.backup_vector_tables()
result["tables_backed_up"] = backup_results
# 检查备份是否全部成功
backup_failed = any(not r.get("success", False) for r in backup_results.values())
if backup_failed:
result["errors"].append("部分表备份失败")
if truncate:
self.logger.error("❌ 备份失败,取消清空操作")
result["truncate_performed"] = False
truncate = False
# 5. 执行清空操作(仅在备份成功时)
if truncate:
self.logger.info("🗑️ 开始清空vector表...")
truncate_results = await self.truncate_vector_tables()
result["truncate_results"] = truncate_results
# 检查清空是否成功
truncate_failed = any(not r.get("success", False) for r in truncate_results.values())
if truncate_failed:
result["errors"].append("部分表清空失败")
# 6. 生成备份日志文件
if backup and backup_dir.exists():
self._write_backup_log(backup_dir, result)
# 7. 计算总耗时
result["duration"] = time.time() - start_time
# 8. 记录最终状态
if result["errors"]:
self.logger.warning(f"⚠️ Vector表管理完成,但有错误: {'; '.join(result['errors'])}")
else:
self.logger.info(f"✅ Vector表管理完成,耗时: {result['duration']:.2f}秒")
return result
except Exception as e:
result["duration"] = time.time() - start_time
result["errors"].append(f"执行失败: {str(e)}")
self.logger.error(f"❌ Vector表管理失败: {e}")
raise
配置获取层次:
VectorTableManager
↓
data_pipeline.config.SCHEMA_TOOLS_CONFIG["default_db_connection"]
↓
app_config.PGVECTOR_CONFIG (在config.py中自动继承)
def get_connection(self):
"""获取pgvector数据库连接"""
import psycopg2
try:
# 方法1:如果SCHEMA_TOOLS_CONFIG中有连接字符串,直接使用
connection_string = self.config.get("default_db_connection")
if connection_string:
conn = psycopg2.connect(connection_string)
else:
# 方法2:从app_config获取pgvector数据库配置
import app_config
pgvector_config = app_config.PGVECTOR_CONFIG
conn = psycopg2.connect(
host=pgvector_config.get('host'),
port=pgvector_config.get('port'),
database=pgvector_config.get('dbname'),
user=pgvector_config.get('user'),
password=pgvector_config.get('password')
)
# 设置自动提交,避免事务问题
conn.autocommit = True
return conn
except Exception as e:
self.logger.error(f"pgvector数据库连接失败: {e}")
raise
def _write_backup_log(self, backup_dir: Path, result: Dict[str, Any]):
"""写入详细的备份日志"""
log_file = backup_dir / "vector_backup_log.txt"
try:
with open(log_file, 'w', encoding='utf-8') as f:
f.write("=== Vector Table Backup Log ===\n")
f.write(f"Backup Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Task ID: {self.task_id or 'Unknown'}\n")
f.write(f"Duration: {result.get('duration', 0):.2f}s\n\n")
# 备份状态
f.write("Tables Backup Status:\n")
for table_name, info in result.get("tables_backed_up", {}).items():
if info.get("success", False):
f.write(f"✓ {table_name}: {info['row_count']} rows -> {info['backup_file']} ({info['file_size']})\n")
else:
f.write(f"✗ {table_name}: FAILED - {info.get('error', 'Unknown error')}\n")
# 清空状态
if result.get("truncate_performed", False):
f.write("\nTruncate Status:\n")
for table_name, info in result.get("truncate_results", {}).items():
if info.get("success", False):
f.write(f"✓ {table_name}: TRUNCATED ({info['rows_before']} rows removed)\n")
else:
f.write(f"✗ {table_name}: FAILED - {info.get('error', 'Unknown error')}\n")
else:
f.write("\nTruncate Status:\n- Not performed\n")
# 错误汇总
if result.get("errors"):
f.write(f"\nErrors: {'; '.join(result['errors'])}\n")
except Exception as e:
self.logger.warning(f"写入备份日志失败: {e}")
{task_output_dir}/vector_bak/langchain_pg_collection_{timestamp}.csv
{task_output_dir}/vector_bak/langchain_pg_embedding_{timestamp}.csv
时间戳格式:YYYYMMDD_HHMMSS
步骤1: DDL/MD生成
步骤2: Question-SQL生成
步骤3: SQL验证(可选)
步骤4: 训练数据加载
├── 4.1 Vector表管理(新增)
│ ├── 备份vector表(如果启用)
│ └── 清空vector表(如果启用)
└── 4.2 加载训练数据
前置步骤: Vector表管理(新增)
├── 备份vector表(如果启用)
└── 清空vector表(如果启用)
主要步骤: 训练数据加载
data_pipeline/training_data/manual_20250720_121007/
├── *.ddl # DDL文件
├── *.md # 文档文件
├── *.json # QS文件
├── data_pipeline.log # 任务日志(直接在根目录)
├── vector_bak/ # 新增:vector备份目录
│ ├── langchain_pg_collection_20250720_121007.csv
│ ├── langchain_pg_embedding_20250720_121007.csv
│ └── vector_backup_log.txt # 备份操作日志
└── task_config.json # 任务配置文件
=== Vector Table Backup Log ===
Backup Time: 2025-01-20 12:10:07
Task ID: manual_20250720_121007
Database: highway_db
Tables Backup Status:
✓ langchain_pg_collection: 1,234 rows -> langchain_pg_collection_20250720_121007.csv (45.6 KB)
✓ langchain_pg_embedding: 12,345 rows -> langchain_pg_embedding_20250720_121007.csv (2.1 MB)
Truncate Status:
✓ langchain_pg_embedding: TRUNCATED
- langchain_pg_collection: SKIPPED (collection table preserved)
要求:在脚本作业的日志最后的summary阶段,必须总结是否执行了备份和truncate。
在 print_final_summary()
方法中添加vector管理总结:
def print_final_summary(self, report: Dict[str, Any]):
# 现有总结逻辑...
# 新增:Vector表管理总结
vector_stats = report.get("vector_management_stats")
if vector_stats:
self.logger.info("📊 Vector表管理:")
if vector_stats.get("backup_performed", False):
tables_count = len(vector_stats.get("tables_backed_up", {}))
total_size = sum(info.get("file_size", 0) for info in vector_stats.get("tables_backed_up", {}).values())
self.logger.info(f" ✅ 备份执行: {tables_count}个表,总大小: {self._format_size(total_size)}")
else:
self.logger.info(" - 备份执行: 未执行")
if vector_stats.get("truncate_performed", False):
self.logger.info(" ✅ 清空执行: langchain_pg_embedding表已清空")
else:
self.logger.info(" - 清空执行: 未执行")
duration = vector_stats.get("duration", 0)
self.logger.info(f" ⏱️ 执行耗时: {duration:.1f}秒")
else:
self.logger.info("📊 Vector表管理: 未执行(未启用相关参数)")
在 main()
函数的最终统计部分添加vector管理报告:
def main():
# 现有逻辑...
# 执行训练处理
process_successful, vector_stats = process_training_files(data_path, task_id,
backup_vector_tables,
truncate_vector_tables)
# 原有成功统计...
# 新增:Vector表管理总结
print("\n===== Vector表管理统计 =====")
if vector_stats:
if vector_stats.get("backup_performed", False):
tables_info = vector_stats.get("tables_backed_up", {})
print(f"✓ 备份执行: 成功备份 {len(tables_info)} 个表")
for table_name, info in tables_info.items():
print(f" - {table_name}: {info['row_count']}行 -> {info['backup_file']} ({info['file_size']})")
if vector_stats.get("truncate_performed", False):
print("✓ 清空执行: langchain_pg_embedding表已清空")
print(f"✓ 总耗时: {vector_stats.get('duration', 0):.1f}秒")
else:
print("- 未执行vector表管理操作")
print("===========================")
新增参数:
parser.add_argument(
"--backup-vector-tables",
action="store_true",
help="备份vector表数据到任务目录"
)
parser.add_argument(
"--truncate-vector-tables",
action="store_true",
help="清空vector表数据(自动启用备份)"
)
修改 SchemaWorkflowOrchestrator 构造函数:
def __init__(self, ..., backup_vector_tables: bool = False, truncate_vector_tables: bool = False):
# 参数验证和自动启用逻辑
if truncate_vector_tables:
backup_vector_tables = True
修改 _execute_step_4_training_data_load:
async def _execute_step_4_training_data_load(self):
# 新增:Vector表管理
if self.backup_vector_tables or self.truncate_vector_tables:
await self._execute_vector_table_management()
# 原有:训练数据加载
load_successful = process_training_files(
training_data_dir,
self.task_id,
backup_vector_tables=False, # 避免重复执行
truncate_vector_tables=False # 避免重复执行
)
新增独立的Vector表管理方法:
async def _execute_vector_table_management(self):
"""独立执行Vector表管理(支持--skip-training-load场景)"""
if not (self.backup_vector_tables or self.truncate_vector_tables):
return
self.logger.info("🗂️ 开始执行Vector表管理...")
try:
from data_pipeline.trainer.vector_table_manager import VectorTableManager
vector_manager = VectorTableManager(
task_output_dir=str(self.output_dir),
task_id=self.task_id
)
# 执行vector表管理
vector_stats = await vector_manager.execute_vector_management(
backup=self.backup_vector_tables,
truncate=self.truncate_vector_tables
)
# 记录结果到工作流状态
self.workflow_state["artifacts"]["vector_management"] = vector_stats
self.logger.info("✅ Vector表管理完成")
except Exception as e:
self.logger.error(f"❌ Vector表管理失败: {e}")
raise
修改主工作流以支持--skip-training-load场景:
async def execute_complete_workflow(self) -> Dict[str, Any]:
# 现有步骤1-3...
# 新增:独立的Vector表管理(在训练加载之前或替代训练加载)
if self.backup_vector_tables or self.truncate_vector_tables:
await self._execute_vector_table_management()
# 步骤4: 训练数据加载(如果启用)
if self.enable_training_data_load:
await self._execute_step_4_training_data_load()
else:
self.logger.info("⏭️ 跳过训练数据加载步骤")
新增参数处理:
parser.add_argument('--backup-vector-tables', action='store_true', help='备份vector表数据')
parser.add_argument('--truncate-vector-tables', action='store_true', help='清空vector表数据(自动启用备份)')
修改 process_training_files 函数:
def process_training_files(data_path, task_id=None, backup_vector_tables=False, truncate_vector_tables=False):
# 参数验证和自动启用逻辑
if truncate_vector_tables:
backup_vector_tables = True
# Vector表管理(前置步骤)
vector_stats = None
if backup_vector_tables or truncate_vector_tables:
vector_manager = VectorTableManager(data_path, task_id)
vector_stats = asyncio.run(vector_manager.execute_vector_management(backup_vector_tables, truncate_vector_tables))
# 原有训练数据处理逻辑...
# 在最终统计中包含vector管理信息
return process_successful, vector_stats
SimpleWorkflowExecutor 修改:
def __init__(self, task_id: str, backup_vector_tables: bool = False, truncate_vector_tables: bool = False):
# 传递参数给 orchestrator
API 路由处理(后续步骤,当前不实现):
{
"execution_mode": "complete",
"step_name": null,
"backup_vector_tables": false,
"truncate_vector_tables": false
}
SQL命令设计:
-- 设置编码
SET client_encoding TO 'UTF8';
-- 导出数据(先导出为.tmp文件)
COPY langchain_pg_collection TO '{backup_path}/langchain_pg_collection_{timestamp}.csv.tmp' WITH CSV HEADER;
COPY langchain_pg_embedding TO '{backup_path}/langchain_pg_embedding_{timestamp}.csv.tmp' WITH CSV HEADER;
Python实现方式:
async def backup_vector_tables(self) -> Dict[str, Any]:
"""备份vector表数据"""
# 1. 创建备份目录
backup_dir = Path(self.task_output_dir) / "vector_bak"
backup_dir.mkdir(parents=True, exist_ok=True)
# 2. 生成时间戳
timestamp = datetime.now().strftime(self.config["timestamp_format"])
# 3. 执行备份(每个表分别处理)
results = {}
for table_name in self.config["supported_tables"]:
try:
# 3.1 定义文件路径(.tmp临时文件)
temp_file = backup_dir / f"{table_name}_{timestamp}.csv.tmp"
final_file = backup_dir / f"{table_name}_{timestamp}.csv"
# 3.2 执行COPY命令导出到.tmp文件
copy_sql = f"""
SET client_encoding TO 'UTF8';
COPY {table_name} TO '{temp_file}' WITH CSV HEADER;
"""
# 3.3 通过psycopg2执行SQL
start_time = time.time()
with self.get_connection() as conn:
with conn.cursor() as cursor:
# 执行编码设置
cursor.execute("SET client_encoding TO 'UTF8'")
# 执行COPY命令
cursor.execute(f"COPY {table_name} TO '{temp_file}' WITH CSV HEADER")
# 获取导出行数
row_count = cursor.rowcount
# 3.4 导出完成后,重命名文件 (.tmp -> .csv)
if temp_file.exists():
temp_file.rename(final_file)
# 3.5 获取文件信息
file_stat = final_file.stat()
duration = time.time() - start_time
results[table_name] = {
"success": True,
"row_count": row_count,
"file_size": self._format_file_size(file_stat.st_size),
"backup_file": final_file.name,
"duration": duration
}
self.logger.info(f"✅ {table_name} 备份成功: {row_count}行 -> {final_file.name}")
else:
raise Exception(f"临时文件 {temp_file} 未生成")
except Exception as e:
results[table_name] = {
"success": False,
"error": str(e)
}
self.logger.error(f"❌ {table_name} 备份失败: {e}")
# 清理可能的临时文件
if temp_file.exists():
temp_file.unlink()
return results
# 注意:get_connection()方法在类的其他地方已定义,这里不需要重复
关键设计点:
.csv.tmp
文件,完成后重命名为 .csv
SQL命令设计:
-- 只清空 embedding 表,保留 collection 表
TRUNCATE TABLE langchain_pg_embedding;
Python实现方式:
async def truncate_vector_tables(self) -> Dict[str, Any]:
"""清空vector表数据(只清空langchain_pg_embedding)"""
results = {}
# 只清空配置中指定的表(通常只有langchain_pg_embedding)
truncate_tables = self.config["truncate_tables"]
for table_name in truncate_tables:
try:
# 记录清空前的行数(用于统计)
count_sql = f"SELECT COUNT(*) FROM {table_name}"
start_time = time.time()
with self.get_connection() as conn:
with conn.cursor() as cursor:
# 1. 获取清空前的行数
cursor.execute(count_sql)
rows_before = cursor.fetchone()[0]
# 2. 执行TRUNCATE
cursor.execute(f"TRUNCATE TABLE {table_name}")
# 3. 验证清空结果
cursor.execute(count_sql)
rows_after = cursor.fetchone()[0]
duration = time.time() - start_time
if rows_after == 0:
results[table_name] = {
"success": True,
"rows_before": rows_before,
"rows_after": rows_after,
"duration": duration
}
self.logger.info(f"✅ {table_name} 清空成功: {rows_before}行 -> 0行")
else:
raise Exception(f"清空失败,表中仍有 {rows_after} 行数据")
except Exception as e:
results[table_name] = {
"success": False,
"error": str(e)
}
self.logger.error(f"❌ {table_name} 清空失败: {e}")
return results
关键设计点:
langchain_pg_embedding
表,保留 langchain_pg_collection
表SET client_encoding TO 'UTF8';
COPY langchain_pg_collection FROM '{backup_path}/langchain_pg_collection_{timestamp}.csv' WITH CSV HEADER;
COPY langchain_pg_embedding FROM '{backup_path}/langchain_pg_embedding_{timestamp}.csv' WITH CSV HEADER;
# data_pipeline/config.py 新增配置项
SCHEMA_TOOLS_CONFIG = {
# 现有配置...
# 新增:Vector表管理配置
"vector_table_management": {
"backup_enabled": True,
"backup_directory": "vector_bak",
"supported_tables": [
"langchain_pg_collection",
"langchain_pg_embedding"
],
"truncate_tables": [
"langchain_pg_embedding" # 只清空embedding表
],
"timestamp_format": "%Y%m%d_%H%M%S",
"backup_temp_suffix": ".tmp"
}
}
统计信息将出现在以下位置:
统计信息格式:
{
"vector_management_stats": {
"backup_performed": True,
"truncate_performed": True,
"tables_backed_up": {
"langchain_pg_collection": {
"row_count": 1234,
"file_size": "45.6 KB",
"backup_file": "langchain_pg_collection_20250720_121007.csv"
},
"langchain_pg_embedding": {
"row_count": 12345,
"file_size": "2.1 MB",
"backup_file": "langchain_pg_embedding_20250720_121007.csv"
}
},
"truncate_results": {
"langchain_pg_embedding": "SUCCESS"
},
"duration": 12.5,
"backup_directory": "/path/to/task/vector_bak"
}
}
脚本总结示例:
📊 工作流程执行统计
===================
✅ Vector表管理:
- 备份执行: 是
- 清空执行: 是
- 备份文件: 2个 (共2.15MB)
- 执行耗时: 12.5秒
或者(如果未执行vector管理):
📊 工作流程执行统计
===================
- Vector表管理: 未执行(未启用相关参数)
当前执行 API:POST /api/v0/data_pipeline/tasks/{task_id}/execute
请求体格式:
{
"execution_mode": "complete|step",
"step_name": "ddl_generation|qa_generation|sql_validation|training_load"
}
方案1: 在请求体中添加 vector 管理参数
{
"execution_mode": "complete",
"step_name": null,
"vector_options": {
"backup_vector_tables": false,
"truncate_vector_tables": false
}
}
方案2: 扁平化参数结构
{
"execution_mode": "complete",
"step_name": null,
"backup_vector_tables": false,
"truncate_vector_tables": false
}
响应中包含 vector 管理操作的结果:
{
"success": true,
"task_id": "manual_20250720_121007",
"execution_mode": "complete",
"result": {
"workflow_state": {...},
"vector_management": {
"backup_performed": true,
"truncate_performed": true,
"backup_files": [...],
"statistics": {...}
}
}
}
这个设计提供了一个完整的 vector 表管理功能,包括:
根据用户反馈,已对以下问题进行了修正:
data_pipeline.config.SCHEMA_TOOLS_CONFIG
获取(该配置从 app_config.PGVECTOR_CONFIG
继承),而不是通过参数传递logs/
子目录--backup-vector-tables
和 --truncate-vector-tables
都是可选参数,没有默认值.csv.tmp
临时文件设计,确保导出过程的原子性
.csv.tmp
文件.csv
文件psycopg2
连接pgvector数据库(配置从 data_pipeline.config
获取)execute_vector_management()
方法设计
schema_workflow
使用 --skip-training-load
时仍支持vector表管理的设计
_execute_vector_table_management()
方法schema_workflow
中独立执行vector管理run_training
时禁用vector管理参数(设为False)--backup-vector-tables
和 --truncate-vector-tables
# 1. 完整工作流 + 备份和清空vector表
python -m data_pipeline.schema_workflow --db-connection "postgresql://postgres:postgres@localhost:6432/highway_db" --table-list ./data_pipeline/tables.txt --business-context "高速公路服务区管理系统" --truncate-vector-tables
# 2. 跳过训练但执行vector表管理
python -m data_pipeline.schema_workflow --db-connection "postgresql://postgres:postgres@localhost:6432/highway_db" --table-list ./data_pipeline/tables.txt --business-context "高速公路服务区管理系统" --skip-training-load --backup-vector-tables
# 3. 跳过训练并清空vector表
python -m data_pipeline.schema_workflow --db-connection "postgresql://postgres:postgres@localhost:6432/highway_db" --table-list ./data_pipeline/tables.txt --business-context "高速公路服务区管理系统" --skip-training-load --truncate-vector-tables
# 4. 独立训练脚本 + vector表管理
python -m data_pipeline.trainer.run_training --data_path "./training_data/" --backup-vector-tables --truncate-vector-tables
# 5. 只备份不清空
python -m data_pipeline.trainer.run_training --data_path "./training_data/" --backup-vector-tables
--backup-vector-tables
: 备份 langchain_pg_collection 和 langchain_pg_embedding 表--truncate-vector-tables
: 清空 langchain_pg_embedding 表(自动启用备份)核心原则是安全优先,确保在任何情况下都不会意外丢失数据。