| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- """
- 数据产品模型
- 用于记录数据工厂加工完成后的数据产品信息
- """
- from __future__ import annotations
- from typing import Any
- from app import db
- from app.core.common.timezone_utils import now_china_naive
- class DataProduct(db.Model):
- """数据产品模型,记录数据工厂加工后的数据产品信息"""
- __tablename__ = "data_products"
- __table_args__ = {"schema": "public"}
- id = db.Column(db.Integer, primary_key=True)
- # 数据产品基本信息
- product_name = db.Column(db.String(200), nullable=False)
- product_name_en = db.Column(db.String(200), nullable=False)
- description = db.Column(db.Text, nullable=True)
- # 关联信息
- source_dataflow_id = db.Column(db.Integer, nullable=True)
- source_dataflow_name = db.Column(db.String(200), nullable=True)
- # 目标表信息
- target_table = db.Column(db.String(200), nullable=False)
- target_schema = db.Column(db.String(100), nullable=False, default="public")
- # 数据统计信息
- record_count = db.Column(db.BigInteger, nullable=False, default=0)
- column_count = db.Column(db.Integer, nullable=False, default=0)
- # 更新提示相关
- last_updated_at = db.Column(db.DateTime, nullable=True)
- last_viewed_at = db.Column(db.DateTime, nullable=True)
- # 状态信息
- status = db.Column(db.String(50), nullable=False, default="active")
- # 审计字段
- created_at = db.Column(db.DateTime, nullable=False, default=now_china_naive)
- created_by = db.Column(db.String(100), nullable=False, default="system")
- updated_at = db.Column(db.DateTime, nullable=False, default=now_china_naive)
- def to_dict(self) -> dict[str, Any]:
- """
- 将模型转换为字典
- Returns:
- 包含所有字段的字典
- """
- return {
- "id": self.id,
- "product_name": self.product_name,
- "product_name_en": self.product_name_en,
- "description": self.description,
- "source_dataflow_id": self.source_dataflow_id,
- "source_dataflow_name": self.source_dataflow_name,
- "target_table": self.target_table,
- "target_schema": self.target_schema,
- "record_count": self.record_count,
- "column_count": self.column_count,
- "last_updated_at": (
- self.last_updated_at.isoformat() if self.last_updated_at else None
- ),
- "last_viewed_at": (
- self.last_viewed_at.isoformat() if self.last_viewed_at else None
- ),
- "status": self.status,
- "created_at": self.created_at.isoformat() if self.created_at else None,
- "created_by": self.created_by,
- "updated_at": self.updated_at.isoformat() if self.updated_at else None,
- "has_new_data": self._has_new_data(),
- }
- def _has_new_data(self) -> bool:
- """
- 判断是否有新数据(用于更新提示)
- Returns:
- 如果 last_updated_at > last_viewed_at 则返回 True
- """
- if self.last_updated_at is None:
- return False
- if self.last_viewed_at is None:
- return True
- return self.last_updated_at > self.last_viewed_at
- def mark_as_viewed(self) -> None:
- """标记为已查看,更新 last_viewed_at 时间"""
- self.last_viewed_at = now_china_naive()
- self.updated_at = now_china_naive()
- def update_data_stats(
- self,
- record_count: int,
- column_count: int | None = None,
- ) -> None:
- """
- 更新数据统计信息
- Args:
- record_count: 记录数
- column_count: 列数(可选)
- """
- self.record_count = record_count
- if column_count is not None:
- self.column_count = column_count
- self.last_updated_at = now_china_naive()
- self.updated_at = now_china_naive()
- def __repr__(self) -> str:
- return f"<DataProduct {self.product_name} ({self.target_table})>"
- class DataOrder(db.Model):
- """
- 数据订单模型
- 用于记录用户提交的数据需求订单,通过 LLM 提取实体并在图谱中检测连通性
- """
- __tablename__ = "data_orders"
- __table_args__ = {"schema": "public"}
- id = db.Column(db.Integer, primary_key=True)
- # 订单基本信息
- order_no = db.Column(db.String(50), unique=True, nullable=False) # 订单编号
- title = db.Column(db.String(200), nullable=False) # 订单标题
- description = db.Column(db.Text, nullable=False) # 需求描述
- # LLM 提取结果
- extracted_domains = db.Column(db.JSON, nullable=True) # 提取的业务领域列表
- extracted_fields = db.Column(db.JSON, nullable=True) # 提取的数据字段列表
- extraction_purpose = db.Column(db.Text, nullable=True) # 提取的数据用途
- # 图谱分析结果
- graph_analysis = db.Column(db.JSON, nullable=True) # 连通性分析结果
- can_connect = db.Column(db.Boolean, nullable=True) # 是否可连通
- connection_path = db.Column(db.JSON, nullable=True) # 连通路径
- # 状态管理
- # pending-待处理, analyzing-分析中, processing-加工中,
- # completed-完成, rejected-驳回, need_supplement-待补充,
- # manual_review-待人工处理, updated-已更新
- status = db.Column(db.String(50), nullable=False, default="pending")
- reject_reason = db.Column(db.Text, nullable=True) # 驳回原因
- # 关联数据
- result_product_id = db.Column(db.Integer, nullable=True) # 生成的数据产品ID
- result_dataflow_id = db.Column(db.Integer, nullable=True) # 生成的数据流ID
- # 审计字段
- created_by = db.Column(db.String(100), nullable=False, default="user")
- created_at = db.Column(db.DateTime, nullable=False, default=now_china_naive)
- updated_at = db.Column(db.DateTime, nullable=False, default=now_china_naive)
- processed_by = db.Column(db.String(100), nullable=True) # 处理人
- processed_at = db.Column(db.DateTime, nullable=True) # 处理时间
- # 状态常量
- STATUS_PENDING = "pending"
- STATUS_ANALYZING = "analyzing"
- STATUS_PENDING_APPROVAL = "pending_approval" # 待审批
- STATUS_PROCESSING = "processing"
- STATUS_ONBOARD = "onboard" # 数据产品就绪
- STATUS_COMPLETED = "completed"
- STATUS_REJECTED = "rejected"
- STATUS_NEED_SUPPLEMENT = "need_supplement"
- STATUS_MANUAL_REVIEW = "manual_review"
- STATUS_UPDATED = "updated"
- # 状态标签映射
- STATUS_LABELS = {
- "pending": "待处理",
- "analyzing": "分析中",
- "pending_approval": "待审批",
- "processing": "加工中",
- "onboard": "数据产品就绪",
- "completed": "已完成",
- "rejected": "已驳回",
- "need_supplement": "待补充",
- "manual_review": "待人工处理",
- "updated": "已更新",
- }
- def to_dict(self) -> dict[str, Any]:
- """
- 将模型转换为字典
- Returns:
- 包含所有字段的字典
- """
- return {
- "id": self.id,
- "order_no": self.order_no,
- "title": self.title,
- "description": self.description,
- "extracted_domains": self.extracted_domains,
- "extracted_fields": self.extracted_fields,
- "extraction_purpose": self.extraction_purpose,
- "graph_analysis": self.graph_analysis,
- "can_connect": self.can_connect,
- "connection_path": self.connection_path,
- "status": self.status,
- "status_label": self.STATUS_LABELS.get(self.status, "未知"),
- "reject_reason": self.reject_reason,
- "result_product_id": self.result_product_id,
- "result_dataflow_id": self.result_dataflow_id,
- "created_by": self.created_by,
- "created_at": self.created_at.isoformat() if self.created_at else None,
- "updated_at": self.updated_at.isoformat() if self.updated_at else None,
- "processed_by": self.processed_by,
- "processed_at": (
- self.processed_at.isoformat() if self.processed_at else None
- ),
- }
- def update_status(self, new_status: str, processed_by: str | None = None) -> None:
- """
- 更新订单状态
- Args:
- new_status: 新状态
- processed_by: 处理人
- """
- self.status = new_status
- self.updated_at = now_china_naive()
- if processed_by:
- self.processed_by = processed_by
- self.processed_at = now_china_naive()
- def set_extraction_result(
- self,
- domains: list[str] | None,
- fields: list[str] | None,
- purpose: str | None = None,
- ) -> None:
- """
- 设置 LLM 提取结果
- Args:
- domains: 提取的业务领域列表
- fields: 提取的数据字段列表
- purpose: 数据用途
- """
- self.extracted_domains = domains
- self.extracted_fields = fields
- self.extraction_purpose = purpose
- self.updated_at = now_china_naive()
- def set_graph_analysis(
- self,
- analysis: dict[str, Any] | None,
- can_connect: bool,
- connection_path: dict[str, Any] | None = None,
- ) -> None:
- """
- 设置图谱分析结果
- Args:
- analysis: 分析结果详情
- can_connect: 是否可连通
- connection_path: 连通路径
- """
- self.graph_analysis = analysis
- self.can_connect = can_connect
- self.connection_path = connection_path
- self.updated_at = now_china_naive()
- def set_result(
- self,
- product_id: int | None = None,
- dataflow_id: int | None = None,
- ) -> None:
- """
- 设置订单结果关联
- Args:
- product_id: 生成的数据产品ID
- dataflow_id: 生成的数据流ID
- """
- if product_id is not None:
- self.result_product_id = product_id
- if dataflow_id is not None:
- self.result_dataflow_id = dataflow_id
- self.updated_at = now_china_naive()
- def reject(self, reason: str, processed_by: str | None = None) -> None:
- """
- 驳回订单
- Args:
- reason: 驳回原因
- processed_by: 处理人
- """
- self.status = self.STATUS_REJECTED
- self.reject_reason = reason
- self.updated_at = now_china_naive()
- if processed_by:
- self.processed_by = processed_by
- self.processed_at = now_china_naive()
- def __repr__(self) -> str:
- return f"<DataOrder {self.order_no} ({self.status})>"
|