| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- """
- 数据产品模型
- 用于记录数据工厂加工完成后的数据产品信息
- """
- from __future__ import annotations
- from datetime import datetime
- from typing import Any
- from app import db
- class DataProduct(db.Model):
- """数据产品模型,记录数据工厂加工后的数据产品信息"""
- __tablename__ = "data_products"
- __table_args__ = {"schema": "public"}
- id = db.Column(db.Integer, primary_key=True)
- # 数据产品基本信息
- product_name = db.Column(db.String(200), nullable=False)
- product_name_en = db.Column(db.String(200), nullable=False)
- description = db.Column(db.Text, nullable=True)
- # 关联信息
- source_dataflow_id = db.Column(db.Integer, nullable=True)
- source_dataflow_name = db.Column(db.String(200), nullable=True)
- # 目标表信息
- target_table = db.Column(db.String(200), nullable=False)
- target_schema = db.Column(db.String(100), nullable=False, default="public")
- # 数据统计信息
- record_count = db.Column(db.BigInteger, nullable=False, default=0)
- column_count = db.Column(db.Integer, nullable=False, default=0)
- # 更新提示相关
- last_updated_at = db.Column(db.DateTime, nullable=True)
- last_viewed_at = db.Column(db.DateTime, nullable=True)
- # 状态信息
- status = db.Column(db.String(50), nullable=False, default="active")
- # 审计字段
- created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
- created_by = db.Column(db.String(100), nullable=False, default="system")
- updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
- def to_dict(self) -> dict[str, Any]:
- """
- 将模型转换为字典
- Returns:
- 包含所有字段的字典
- """
- return {
- "id": self.id,
- "product_name": self.product_name,
- "product_name_en": self.product_name_en,
- "description": self.description,
- "source_dataflow_id": self.source_dataflow_id,
- "source_dataflow_name": self.source_dataflow_name,
- "target_table": self.target_table,
- "target_schema": self.target_schema,
- "record_count": self.record_count,
- "column_count": self.column_count,
- "last_updated_at": (
- self.last_updated_at.isoformat() if self.last_updated_at else None
- ),
- "last_viewed_at": (
- self.last_viewed_at.isoformat() if self.last_viewed_at else None
- ),
- "status": self.status,
- "created_at": self.created_at.isoformat() if self.created_at else None,
- "created_by": self.created_by,
- "updated_at": self.updated_at.isoformat() if self.updated_at else None,
- "has_new_data": self._has_new_data(),
- }
- def _has_new_data(self) -> bool:
- """
- 判断是否有新数据(用于更新提示)
- Returns:
- 如果 last_updated_at > last_viewed_at 则返回 True
- """
- if self.last_updated_at is None:
- return False
- if self.last_viewed_at is None:
- return True
- return self.last_updated_at > self.last_viewed_at
- def mark_as_viewed(self) -> None:
- """标记为已查看,更新 last_viewed_at 时间"""
- self.last_viewed_at = datetime.utcnow()
- self.updated_at = datetime.utcnow()
- def update_data_stats(
- self,
- record_count: int,
- column_count: int | None = None,
- ) -> None:
- """
- 更新数据统计信息
- Args:
- record_count: 记录数
- column_count: 列数(可选)
- """
- self.record_count = record_count
- if column_count is not None:
- self.column_count = column_count
- self.last_updated_at = datetime.utcnow()
- self.updated_at = datetime.utcnow()
- def __repr__(self) -> str:
- return f"<DataProduct {self.product_name} ({self.target_table})>"
|