""" 数据产品模型 用于记录数据工厂加工完成后的数据产品信息 """ from __future__ import annotations from typing import Any from app import db from app.core.common.timezone_utils import now_china_naive class DataProduct(db.Model): """数据产品模型,记录数据工厂加工后的数据产品信息""" __tablename__ = "data_products" __table_args__ = {"schema": "public"} id = db.Column(db.Integer, primary_key=True) # 数据产品基本信息 product_name = db.Column(db.String(200), nullable=False) product_name_en = db.Column(db.String(200), nullable=False) description = db.Column(db.Text, nullable=True) # 关联信息 source_dataflow_id = db.Column(db.Integer, nullable=True) source_dataflow_name = db.Column(db.String(200), nullable=True) # 目标表信息 target_table = db.Column(db.String(200), nullable=False) target_schema = db.Column(db.String(100), nullable=False, default="public") # 数据统计信息 record_count = db.Column(db.BigInteger, nullable=False, default=0) column_count = db.Column(db.Integer, nullable=False, default=0) # 更新提示相关 last_updated_at = db.Column(db.DateTime, nullable=True) last_viewed_at = db.Column(db.DateTime, nullable=True) # 状态信息 status = db.Column(db.String(50), nullable=False, default="active") # 审计字段 created_at = db.Column(db.DateTime, nullable=False, default=now_china_naive) created_by = db.Column(db.String(100), nullable=False, default="system") updated_at = db.Column(db.DateTime, nullable=False, default=now_china_naive) def to_dict(self) -> dict[str, Any]: """ 将模型转换为字典 Returns: 包含所有字段的字典 """ return { "id": self.id, "product_name": self.product_name, "product_name_en": self.product_name_en, "description": self.description, "source_dataflow_id": self.source_dataflow_id, "source_dataflow_name": self.source_dataflow_name, "target_table": self.target_table, "target_schema": self.target_schema, "record_count": self.record_count, "column_count": self.column_count, "last_updated_at": ( self.last_updated_at.isoformat() if self.last_updated_at else None ), "last_viewed_at": ( self.last_viewed_at.isoformat() if self.last_viewed_at else None ), "status": self.status, "created_at": self.created_at.isoformat() if self.created_at else None, "created_by": self.created_by, "updated_at": self.updated_at.isoformat() if self.updated_at else None, "has_new_data": self._has_new_data(), } def _has_new_data(self) -> bool: """ 判断是否有新数据(用于更新提示) Returns: 如果 last_updated_at > last_viewed_at 则返回 True """ if self.last_updated_at is None: return False if self.last_viewed_at is None: return True return self.last_updated_at > self.last_viewed_at def mark_as_viewed(self) -> None: """标记为已查看,更新 last_viewed_at 时间""" self.last_viewed_at = now_china_naive() self.updated_at = now_china_naive() def update_data_stats( self, record_count: int, column_count: int | None = None, ) -> None: """ 更新数据统计信息 Args: record_count: 记录数 column_count: 列数(可选) """ self.record_count = record_count if column_count is not None: self.column_count = column_count self.last_updated_at = now_china_naive() self.updated_at = now_china_naive() def __repr__(self) -> str: return f"" class DataOrder(db.Model): """ 数据订单模型 用于记录用户提交的数据需求订单,通过 LLM 提取实体并在图谱中检测连通性 """ __tablename__ = "data_orders" __table_args__ = {"schema": "public"} id = db.Column(db.Integer, primary_key=True) # 订单基本信息 order_no = db.Column(db.String(50), unique=True, nullable=False) # 订单编号 title = db.Column(db.String(200), nullable=False) # 订单标题 description = db.Column(db.Text, nullable=False) # 需求描述 # LLM 提取结果 extracted_domains = db.Column(db.JSON, nullable=True) # 提取的业务领域列表 extracted_fields = db.Column(db.JSON, nullable=True) # 提取的数据字段列表 extraction_purpose = db.Column(db.Text, nullable=True) # 提取的数据用途 # 图谱分析结果 graph_analysis = db.Column(db.JSON, nullable=True) # 连通性分析结果 can_connect = db.Column(db.Boolean, nullable=True) # 是否可连通 connection_path = db.Column(db.JSON, nullable=True) # 连通路径 # 状态管理 # pending-待处理, analyzing-分析中, processing-加工中, # completed-完成, rejected-驳回, need_supplement-待补充, # manual_review-待人工处理, updated-已更新 status = db.Column(db.String(50), nullable=False, default="pending") reject_reason = db.Column(db.Text, nullable=True) # 驳回原因 # 关联数据 result_product_id = db.Column(db.Integer, nullable=True) # 生成的数据产品ID result_dataflow_id = db.Column(db.Integer, nullable=True) # 生成的数据流ID # 审计字段 created_by = db.Column(db.String(100), nullable=False, default="user") created_at = db.Column(db.DateTime, nullable=False, default=now_china_naive) updated_at = db.Column(db.DateTime, nullable=False, default=now_china_naive) processed_by = db.Column(db.String(100), nullable=True) # 处理人 processed_at = db.Column(db.DateTime, nullable=True) # 处理时间 # 状态常量 STATUS_PENDING = "pending" STATUS_ANALYZING = "analyzing" STATUS_PENDING_APPROVAL = "pending_approval" # 待审批 STATUS_PROCESSING = "processing" STATUS_ONBOARD = "onboard" # 数据产品就绪 STATUS_COMPLETED = "completed" STATUS_REJECTED = "rejected" STATUS_NEED_SUPPLEMENT = "need_supplement" STATUS_MANUAL_REVIEW = "manual_review" STATUS_UPDATED = "updated" # 状态标签映射 STATUS_LABELS = { "pending": "待处理", "analyzing": "分析中", "pending_approval": "待审批", "processing": "加工中", "onboard": "数据产品就绪", "completed": "已完成", "rejected": "已驳回", "need_supplement": "待补充", "manual_review": "待人工处理", "updated": "已更新", } def to_dict(self) -> dict[str, Any]: """ 将模型转换为字典 Returns: 包含所有字段的字典 """ return { "id": self.id, "order_no": self.order_no, "title": self.title, "description": self.description, "extracted_domains": self.extracted_domains, "extracted_fields": self.extracted_fields, "extraction_purpose": self.extraction_purpose, "graph_analysis": self.graph_analysis, "can_connect": self.can_connect, "connection_path": self.connection_path, "status": self.status, "status_label": self.STATUS_LABELS.get(self.status, "未知"), "reject_reason": self.reject_reason, "result_product_id": self.result_product_id, "result_dataflow_id": self.result_dataflow_id, "created_by": self.created_by, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, "processed_by": self.processed_by, "processed_at": ( self.processed_at.isoformat() if self.processed_at else None ), } def update_status(self, new_status: str, processed_by: str | None = None) -> None: """ 更新订单状态 Args: new_status: 新状态 processed_by: 处理人 """ self.status = new_status self.updated_at = now_china_naive() if processed_by: self.processed_by = processed_by self.processed_at = now_china_naive() def set_extraction_result( self, domains: list[str] | None, fields: list[str] | None, purpose: str | None = None, ) -> None: """ 设置 LLM 提取结果 Args: domains: 提取的业务领域列表 fields: 提取的数据字段列表 purpose: 数据用途 """ self.extracted_domains = domains self.extracted_fields = fields self.extraction_purpose = purpose self.updated_at = now_china_naive() def set_graph_analysis( self, analysis: dict[str, Any] | None, can_connect: bool, connection_path: dict[str, Any] | None = None, ) -> None: """ 设置图谱分析结果 Args: analysis: 分析结果详情 can_connect: 是否可连通 connection_path: 连通路径 """ self.graph_analysis = analysis self.can_connect = can_connect self.connection_path = connection_path self.updated_at = now_china_naive() def set_result( self, product_id: int | None = None, dataflow_id: int | None = None, ) -> None: """ 设置订单结果关联 Args: product_id: 生成的数据产品ID dataflow_id: 生成的数据流ID """ if product_id is not None: self.result_product_id = product_id if dataflow_id is not None: self.result_dataflow_id = dataflow_id self.updated_at = now_china_naive() def reject(self, reason: str, processed_by: str | None = None) -> None: """ 驳回订单 Args: reason: 驳回原因 processed_by: 处理人 """ self.status = self.STATUS_REJECTED self.reject_reason = reason self.updated_at = now_china_naive() if processed_by: self.processed_by = processed_by self.processed_at = now_china_naive() def __repr__(self) -> str: return f""