data_product.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """
  2. 数据产品模型
  3. 用于记录数据工厂加工完成后的数据产品信息
  4. """
  5. from __future__ import annotations
  6. from datetime import datetime
  7. from typing import Any
  8. from app import db
  9. class DataProduct(db.Model):
  10. """数据产品模型,记录数据工厂加工后的数据产品信息"""
  11. __tablename__ = "data_products"
  12. __table_args__ = {"schema": "public"}
  13. id = db.Column(db.Integer, primary_key=True)
  14. # 数据产品基本信息
  15. product_name = db.Column(db.String(200), nullable=False)
  16. product_name_en = db.Column(db.String(200), nullable=False)
  17. description = db.Column(db.Text, nullable=True)
  18. # 关联信息
  19. source_dataflow_id = db.Column(db.Integer, nullable=True)
  20. source_dataflow_name = db.Column(db.String(200), nullable=True)
  21. # 目标表信息
  22. target_table = db.Column(db.String(200), nullable=False)
  23. target_schema = db.Column(db.String(100), nullable=False, default="public")
  24. # 数据统计信息
  25. record_count = db.Column(db.BigInteger, nullable=False, default=0)
  26. column_count = db.Column(db.Integer, nullable=False, default=0)
  27. # 更新提示相关
  28. last_updated_at = db.Column(db.DateTime, nullable=True)
  29. last_viewed_at = db.Column(db.DateTime, nullable=True)
  30. # 状态信息
  31. status = db.Column(db.String(50), nullable=False, default="active")
  32. # 审计字段
  33. created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
  34. created_by = db.Column(db.String(100), nullable=False, default="system")
  35. updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
  36. def to_dict(self) -> dict[str, Any]:
  37. """
  38. 将模型转换为字典
  39. Returns:
  40. 包含所有字段的字典
  41. """
  42. return {
  43. "id": self.id,
  44. "product_name": self.product_name,
  45. "product_name_en": self.product_name_en,
  46. "description": self.description,
  47. "source_dataflow_id": self.source_dataflow_id,
  48. "source_dataflow_name": self.source_dataflow_name,
  49. "target_table": self.target_table,
  50. "target_schema": self.target_schema,
  51. "record_count": self.record_count,
  52. "column_count": self.column_count,
  53. "last_updated_at": (
  54. self.last_updated_at.isoformat() if self.last_updated_at else None
  55. ),
  56. "last_viewed_at": (
  57. self.last_viewed_at.isoformat() if self.last_viewed_at else None
  58. ),
  59. "status": self.status,
  60. "created_at": self.created_at.isoformat() if self.created_at else None,
  61. "created_by": self.created_by,
  62. "updated_at": self.updated_at.isoformat() if self.updated_at else None,
  63. "has_new_data": self._has_new_data(),
  64. }
  65. def _has_new_data(self) -> bool:
  66. """
  67. 判断是否有新数据(用于更新提示)
  68. Returns:
  69. 如果 last_updated_at > last_viewed_at 则返回 True
  70. """
  71. if self.last_updated_at is None:
  72. return False
  73. if self.last_viewed_at is None:
  74. return True
  75. return self.last_updated_at > self.last_viewed_at
  76. def mark_as_viewed(self) -> None:
  77. """标记为已查看,更新 last_viewed_at 时间"""
  78. self.last_viewed_at = datetime.utcnow()
  79. self.updated_at = datetime.utcnow()
  80. def update_data_stats(
  81. self,
  82. record_count: int,
  83. column_count: int | None = None,
  84. ) -> None:
  85. """
  86. 更新数据统计信息
  87. Args:
  88. record_count: 记录数
  89. column_count: 列数(可选)
  90. """
  91. self.record_count = record_count
  92. if column_count is not None:
  93. self.column_count = column_count
  94. self.last_updated_at = datetime.utcnow()
  95. self.updated_at = datetime.utcnow()
  96. def __repr__(self) -> str:
  97. return f"<DataProduct {self.product_name} ({self.target_table})>"