data_structures.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. from dataclasses import dataclass, field
  2. from typing import List, Dict, Optional, Any, Union
  3. from enum import Enum
  4. import hashlib
  5. import json
  6. class FieldType(Enum):
  7. """字段类型枚举"""
  8. INTEGER = "integer"
  9. VARCHAR = "varchar"
  10. TEXT = "text"
  11. TIMESTAMP = "timestamp"
  12. DATE = "date"
  13. BOOLEAN = "boolean"
  14. NUMERIC = "numeric"
  15. ENUM = "enum"
  16. JSON = "json"
  17. UUID = "uuid"
  18. OTHER = "other"
  19. class ProcessingStatus(Enum):
  20. """处理状态枚举"""
  21. PENDING = "pending"
  22. RUNNING = "running"
  23. SUCCESS = "success"
  24. FAILED = "failed"
  25. SKIPPED = "skipped"
  26. @dataclass
  27. class FieldInfo:
  28. """字段信息标准结构"""
  29. name: str
  30. type: str
  31. nullable: bool
  32. default_value: Optional[str] = None
  33. comment: Optional[str] = None
  34. original_comment: Optional[str] = None # 原始注释
  35. generated_comment: Optional[str] = None # LLM生成的注释
  36. is_primary_key: bool = False
  37. is_foreign_key: bool = False
  38. is_enum: bool = False
  39. enum_values: Optional[List[str]] = None
  40. enum_description: Optional[str] = None
  41. max_length: Optional[int] = None
  42. precision: Optional[int] = None
  43. scale: Optional[int] = None
  44. def to_dict(self) -> Dict[str, Any]:
  45. """转换为字典格式"""
  46. return {
  47. 'name': self.name,
  48. 'type': self.type,
  49. 'nullable': self.nullable,
  50. 'default_value': self.default_value,
  51. 'comment': self.comment,
  52. 'is_primary_key': self.is_primary_key,
  53. 'is_foreign_key': self.is_foreign_key,
  54. 'is_enum': self.is_enum,
  55. 'enum_values': self.enum_values
  56. }
  57. @dataclass
  58. class TableMetadata:
  59. """表元数据标准结构"""
  60. schema_name: str
  61. table_name: str
  62. full_name: str # schema.table_name
  63. comment: Optional[str] = None
  64. original_comment: Optional[str] = None # 原始注释
  65. generated_comment: Optional[str] = None # LLM生成的注释
  66. fields: List[FieldInfo] = field(default_factory=list)
  67. sample_data: List[Dict[str, Any]] = field(default_factory=list)
  68. row_count: Optional[int] = None
  69. table_size: Optional[str] = None # 表大小(如 "1.2 MB")
  70. created_date: Optional[str] = None
  71. @property
  72. def safe_file_name(self) -> str:
  73. """生成安全的文件名"""
  74. if self.schema_name.lower() == 'public':
  75. return self.table_name
  76. return f"{self.schema_name}__{self.table_name}".replace('.', '__').replace('-', '_').replace(' ', '_')
  77. def get_metadata_hash(self) -> str:
  78. """计算元数据哈希值,用于增量更新判断"""
  79. hash_data = {
  80. 'schema_name': self.schema_name,
  81. 'table_name': self.table_name,
  82. 'fields': [f.to_dict() for f in self.fields],
  83. 'comment': self.original_comment
  84. }
  85. return hashlib.md5(json.dumps(hash_data, sort_keys=True).encode()).hexdigest()
  86. @dataclass
  87. class ProcessingResult:
  88. """工具处理结果标准结构"""
  89. success: bool
  90. data: Optional[Any] = None
  91. error_message: Optional[str] = None
  92. warnings: List[str] = field(default_factory=list)
  93. execution_time: Optional[float] = None
  94. metadata: Dict[str, Any] = field(default_factory=dict)
  95. def add_warning(self, warning: str):
  96. """添加警告信息"""
  97. self.warnings.append(warning)
  98. def to_dict(self) -> Dict[str, Any]:
  99. """转换为字典格式"""
  100. return {
  101. 'success': self.success,
  102. 'data': self.data,
  103. 'error_message': self.error_message,
  104. 'warnings': self.warnings,
  105. 'execution_time': self.execution_time,
  106. 'metadata': self.metadata
  107. }
  108. @dataclass
  109. class TableProcessingContext:
  110. """表处理上下文"""
  111. table_metadata: TableMetadata
  112. business_context: str
  113. output_dir: str
  114. pipeline: str
  115. vn: Any # vanna实例
  116. file_manager: Any
  117. current_step: str = "initialized"
  118. step_results: Dict[str, ProcessingResult] = field(default_factory=dict)
  119. start_time: Optional[float] = None
  120. def update_step(self, step_name: str, result: ProcessingResult):
  121. """更新步骤结果"""
  122. self.current_step = step_name
  123. self.step_results[step_name] = result