""" 数据产品模型 用于记录数据工厂加工完成后的数据产品信息 """ from __future__ import annotations from datetime import datetime from typing import Any from app import db class DataProduct(db.Model): """数据产品模型,记录数据工厂加工后的数据产品信息""" __tablename__ = "data_products" __table_args__ = {"schema": "public"} id = db.Column(db.Integer, primary_key=True) # 数据产品基本信息 product_name = db.Column(db.String(200), nullable=False) product_name_en = db.Column(db.String(200), nullable=False) description = db.Column(db.Text, nullable=True) # 关联信息 source_dataflow_id = db.Column(db.Integer, nullable=True) source_dataflow_name = db.Column(db.String(200), nullable=True) # 目标表信息 target_table = db.Column(db.String(200), nullable=False) target_schema = db.Column(db.String(100), nullable=False, default="public") # 数据统计信息 record_count = db.Column(db.BigInteger, nullable=False, default=0) column_count = db.Column(db.Integer, nullable=False, default=0) # 更新提示相关 last_updated_at = db.Column(db.DateTime, nullable=True) last_viewed_at = db.Column(db.DateTime, nullable=True) # 状态信息 status = db.Column(db.String(50), nullable=False, default="active") # 审计字段 created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) created_by = db.Column(db.String(100), nullable=False, default="system") updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) def to_dict(self) -> dict[str, Any]: """ 将模型转换为字典 Returns: 包含所有字段的字典 """ return { "id": self.id, "product_name": self.product_name, "product_name_en": self.product_name_en, "description": self.description, "source_dataflow_id": self.source_dataflow_id, "source_dataflow_name": self.source_dataflow_name, "target_table": self.target_table, "target_schema": self.target_schema, "record_count": self.record_count, "column_count": self.column_count, "last_updated_at": ( self.last_updated_at.isoformat() if self.last_updated_at else None ), "last_viewed_at": ( self.last_viewed_at.isoformat() if self.last_viewed_at else None ), "status": self.status, "created_at": self.created_at.isoformat() if self.created_at else None, "created_by": self.created_by, "updated_at": self.updated_at.isoformat() if self.updated_at else None, "has_new_data": self._has_new_data(), } def _has_new_data(self) -> bool: """ 判断是否有新数据(用于更新提示) Returns: 如果 last_updated_at > last_viewed_at 则返回 True """ if self.last_updated_at is None: return False if self.last_viewed_at is None: return True return self.last_updated_at > self.last_viewed_at def mark_as_viewed(self) -> None: """标记为已查看,更新 last_viewed_at 时间""" self.last_viewed_at = datetime.utcnow() self.updated_at = datetime.utcnow() def update_data_stats( self, record_count: int, column_count: int | None = None, ) -> None: """ 更新数据统计信息 Args: record_count: 记录数 column_count: 列数(可选) """ self.record_count = record_count if column_count is not None: self.column_count = column_count self.last_updated_at = datetime.utcnow() self.updated_at = datetime.utcnow() def __repr__(self) -> str: return f""