123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- from dataclasses import dataclass, field
- from typing import List, Dict, Optional, Any, Union
- from enum import Enum
- import hashlib
- import json
- class FieldType(Enum):
- """字段类型枚举"""
- INTEGER = "integer"
- VARCHAR = "varchar"
- TEXT = "text"
- TIMESTAMP = "timestamp"
- DATE = "date"
- BOOLEAN = "boolean"
- NUMERIC = "numeric"
- ENUM = "enum"
- JSON = "json"
- UUID = "uuid"
- OTHER = "other"
- class ProcessingStatus(Enum):
- """处理状态枚举"""
- PENDING = "pending"
- RUNNING = "running"
- SUCCESS = "success"
- FAILED = "failed"
- SKIPPED = "skipped"
- @dataclass
- class FieldInfo:
- """字段信息标准结构"""
- name: str
- type: str
- nullable: bool
- default_value: Optional[str] = None
- comment: Optional[str] = None
- original_comment: Optional[str] = None # 原始注释
- generated_comment: Optional[str] = None # LLM生成的注释
- is_primary_key: bool = False
- is_foreign_key: bool = False
- is_enum: bool = False
- enum_values: Optional[List[str]] = None
- enum_description: Optional[str] = None
- max_length: Optional[int] = None
- precision: Optional[int] = None
- scale: Optional[int] = None
-
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典格式"""
- return {
- 'name': self.name,
- 'type': self.type,
- 'nullable': self.nullable,
- 'default_value': self.default_value,
- 'comment': self.comment,
- 'is_primary_key': self.is_primary_key,
- 'is_foreign_key': self.is_foreign_key,
- 'is_enum': self.is_enum,
- 'enum_values': self.enum_values
- }
- @dataclass
- class TableMetadata:
- """表元数据标准结构"""
- schema_name: str
- table_name: str
- full_name: str # schema.table_name
- comment: Optional[str] = None
- original_comment: Optional[str] = None # 原始注释
- generated_comment: Optional[str] = None # LLM生成的注释
- fields: List[FieldInfo] = field(default_factory=list)
- sample_data: List[Dict[str, Any]] = field(default_factory=list)
- row_count: Optional[int] = None
- table_size: Optional[str] = None # 表大小(如 "1.2 MB")
- created_date: Optional[str] = None
-
- @property
- def safe_file_name(self) -> str:
- """生成安全的文件名"""
- if self.schema_name.lower() == 'public':
- return self.table_name
- return f"{self.schema_name}__{self.table_name}".replace('.', '__').replace('-', '_').replace(' ', '_')
-
- def get_metadata_hash(self) -> str:
- """计算元数据哈希值,用于增量更新判断"""
- hash_data = {
- 'schema_name': self.schema_name,
- 'table_name': self.table_name,
- 'fields': [f.to_dict() for f in self.fields],
- 'comment': self.original_comment
- }
- return hashlib.md5(json.dumps(hash_data, sort_keys=True).encode()).hexdigest()
- @dataclass
- class ProcessingResult:
- """工具处理结果标准结构"""
- success: bool
- data: Optional[Any] = None
- error_message: Optional[str] = None
- warnings: List[str] = field(default_factory=list)
- execution_time: Optional[float] = None
- metadata: Dict[str, Any] = field(default_factory=dict)
-
- def add_warning(self, warning: str):
- """添加警告信息"""
- self.warnings.append(warning)
-
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典格式"""
- return {
- 'success': self.success,
- 'data': self.data,
- 'error_message': self.error_message,
- 'warnings': self.warnings,
- 'execution_time': self.execution_time,
- 'metadata': self.metadata
- }
- @dataclass
- class TableProcessingContext:
- """表处理上下文"""
- table_metadata: TableMetadata
- business_context: str
- output_dir: str
- pipeline: str
- vn: Any # vanna实例
- file_manager: Any
- current_step: str = "initialized"
- step_results: Dict[str, ProcessingResult] = field(default_factory=dict)
- start_time: Optional[float] = None
-
- def update_step(self, step_name: str, result: ProcessingResult):
- """更新步骤结果"""
- self.current_step = step_name
- self.step_results[step_name] = result
|