data_product.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. """
  2. 数据产品模型
  3. 用于记录数据工厂加工完成后的数据产品信息
  4. """
  5. from __future__ import annotations
  6. from typing import Any
  7. from app import db
  8. from app.core.common.timezone_utils import now_china_naive
  9. class DataProduct(db.Model):
  10. """数据产品模型,记录数据工厂加工后的数据产品信息"""
  11. __tablename__ = "data_products"
  12. __table_args__ = {"schema": "public"}
  13. id = db.Column(db.Integer, primary_key=True)
  14. # 数据产品基本信息
  15. product_name = db.Column(db.String(200), nullable=False)
  16. product_name_en = db.Column(db.String(200), nullable=False)
  17. description = db.Column(db.Text, nullable=True)
  18. # 关联信息
  19. source_dataflow_id = db.Column(db.Integer, nullable=True)
  20. source_dataflow_name = db.Column(db.String(200), nullable=True)
  21. # 目标表信息
  22. target_table = db.Column(db.String(200), nullable=False)
  23. target_schema = db.Column(db.String(100), nullable=False, default="public")
  24. # 数据统计信息
  25. record_count = db.Column(db.BigInteger, nullable=False, default=0)
  26. column_count = db.Column(db.Integer, nullable=False, default=0)
  27. # 更新提示相关
  28. last_updated_at = db.Column(db.DateTime, nullable=True)
  29. last_viewed_at = db.Column(db.DateTime, nullable=True)
  30. # 状态信息
  31. status = db.Column(db.String(50), nullable=False, default="active")
  32. # 审计字段
  33. created_at = db.Column(db.DateTime, nullable=False, default=now_china_naive)
  34. created_by = db.Column(db.String(100), nullable=False, default="system")
  35. updated_at = db.Column(db.DateTime, nullable=False, default=now_china_naive)
  36. def to_dict(self) -> dict[str, Any]:
  37. """
  38. 将模型转换为字典
  39. Returns:
  40. 包含所有字段的字典
  41. """
  42. return {
  43. "id": self.id,
  44. "product_name": self.product_name,
  45. "product_name_en": self.product_name_en,
  46. "description": self.description,
  47. "source_dataflow_id": self.source_dataflow_id,
  48. "source_dataflow_name": self.source_dataflow_name,
  49. "target_table": self.target_table,
  50. "target_schema": self.target_schema,
  51. "record_count": self.record_count,
  52. "column_count": self.column_count,
  53. "last_updated_at": (
  54. self.last_updated_at.isoformat() if self.last_updated_at else None
  55. ),
  56. "last_viewed_at": (
  57. self.last_viewed_at.isoformat() if self.last_viewed_at else None
  58. ),
  59. "status": self.status,
  60. "created_at": self.created_at.isoformat() if self.created_at else None,
  61. "created_by": self.created_by,
  62. "updated_at": self.updated_at.isoformat() if self.updated_at else None,
  63. "has_new_data": self._has_new_data(),
  64. }
  65. def _has_new_data(self) -> bool:
  66. """
  67. 判断是否有新数据(用于更新提示)
  68. Returns:
  69. 如果 last_updated_at > last_viewed_at 则返回 True
  70. """
  71. if self.last_updated_at is None:
  72. return False
  73. if self.last_viewed_at is None:
  74. return True
  75. return self.last_updated_at > self.last_viewed_at
  76. def mark_as_viewed(self) -> None:
  77. """标记为已查看,更新 last_viewed_at 时间"""
  78. self.last_viewed_at = now_china_naive()
  79. self.updated_at = now_china_naive()
  80. def update_data_stats(
  81. self,
  82. record_count: int,
  83. column_count: int | None = None,
  84. ) -> None:
  85. """
  86. 更新数据统计信息
  87. Args:
  88. record_count: 记录数
  89. column_count: 列数(可选)
  90. """
  91. self.record_count = record_count
  92. if column_count is not None:
  93. self.column_count = column_count
  94. self.last_updated_at = now_china_naive()
  95. self.updated_at = now_china_naive()
  96. def __repr__(self) -> str:
  97. return f"<DataProduct {self.product_name} ({self.target_table})>"
  98. class DataOrder(db.Model):
  99. """
  100. 数据订单模型
  101. 用于记录用户提交的数据需求订单,通过 LLM 提取实体并在图谱中检测连通性
  102. """
  103. __tablename__ = "data_orders"
  104. __table_args__ = {"schema": "public"}
  105. id = db.Column(db.Integer, primary_key=True)
  106. # 订单基本信息
  107. order_no = db.Column(db.String(50), unique=True, nullable=False) # 订单编号
  108. title = db.Column(db.String(200), nullable=False) # 订单标题
  109. description = db.Column(db.Text, nullable=False) # 需求描述
  110. # LLM 提取结果
  111. extracted_domains = db.Column(db.JSON, nullable=True) # 提取的业务领域列表
  112. extracted_fields = db.Column(db.JSON, nullable=True) # 提取的数据字段列表
  113. extraction_purpose = db.Column(db.Text, nullable=True) # 提取的数据用途
  114. # 图谱分析结果
  115. graph_analysis = db.Column(db.JSON, nullable=True) # 连通性分析结果
  116. can_connect = db.Column(db.Boolean, nullable=True) # 是否可连通
  117. connection_path = db.Column(db.JSON, nullable=True) # 连通路径
  118. # 状态管理
  119. # pending-待处理, analyzing-分析中, processing-加工中,
  120. # completed-完成, rejected-驳回, need_supplement-待补充,
  121. # manual_review-待人工处理, updated-已更新
  122. status = db.Column(db.String(50), nullable=False, default="pending")
  123. reject_reason = db.Column(db.Text, nullable=True) # 驳回原因
  124. # 关联数据
  125. result_product_id = db.Column(db.Integer, nullable=True) # 生成的数据产品ID
  126. result_dataflow_id = db.Column(db.Integer, nullable=True) # 生成的数据流ID
  127. # 审计字段
  128. created_by = db.Column(db.String(100), nullable=False, default="user")
  129. created_at = db.Column(db.DateTime, nullable=False, default=now_china_naive)
  130. updated_at = db.Column(db.DateTime, nullable=False, default=now_china_naive)
  131. processed_by = db.Column(db.String(100), nullable=True) # 处理人
  132. processed_at = db.Column(db.DateTime, nullable=True) # 处理时间
  133. # 状态常量
  134. STATUS_PENDING = "pending"
  135. STATUS_ANALYZING = "analyzing"
  136. STATUS_PENDING_APPROVAL = "pending_approval" # 待审批
  137. STATUS_PROCESSING = "processing"
  138. STATUS_ONBOARD = "onboard" # 数据产品就绪
  139. STATUS_COMPLETED = "completed"
  140. STATUS_REJECTED = "rejected"
  141. STATUS_NEED_SUPPLEMENT = "need_supplement"
  142. STATUS_MANUAL_REVIEW = "manual_review"
  143. STATUS_UPDATED = "updated"
  144. # 状态标签映射
  145. STATUS_LABELS = {
  146. "pending": "待处理",
  147. "analyzing": "分析中",
  148. "pending_approval": "待审批",
  149. "processing": "加工中",
  150. "onboard": "数据产品就绪",
  151. "completed": "已完成",
  152. "rejected": "已驳回",
  153. "need_supplement": "待补充",
  154. "manual_review": "待人工处理",
  155. "updated": "已更新",
  156. }
  157. def to_dict(self) -> dict[str, Any]:
  158. """
  159. 将模型转换为字典
  160. Returns:
  161. 包含所有字段的字典
  162. """
  163. return {
  164. "id": self.id,
  165. "order_no": self.order_no,
  166. "title": self.title,
  167. "description": self.description,
  168. "extracted_domains": self.extracted_domains,
  169. "extracted_fields": self.extracted_fields,
  170. "extraction_purpose": self.extraction_purpose,
  171. "graph_analysis": self.graph_analysis,
  172. "can_connect": self.can_connect,
  173. "connection_path": self.connection_path,
  174. "status": self.status,
  175. "status_label": self.STATUS_LABELS.get(self.status, "未知"),
  176. "reject_reason": self.reject_reason,
  177. "result_product_id": self.result_product_id,
  178. "result_dataflow_id": self.result_dataflow_id,
  179. "created_by": self.created_by,
  180. "created_at": self.created_at.isoformat() if self.created_at else None,
  181. "updated_at": self.updated_at.isoformat() if self.updated_at else None,
  182. "processed_by": self.processed_by,
  183. "processed_at": (
  184. self.processed_at.isoformat() if self.processed_at else None
  185. ),
  186. }
  187. def update_status(self, new_status: str, processed_by: str | None = None) -> None:
  188. """
  189. 更新订单状态
  190. Args:
  191. new_status: 新状态
  192. processed_by: 处理人
  193. """
  194. self.status = new_status
  195. self.updated_at = now_china_naive()
  196. if processed_by:
  197. self.processed_by = processed_by
  198. self.processed_at = now_china_naive()
  199. def set_extraction_result(
  200. self,
  201. domains: list[str] | None,
  202. fields: list[str] | None,
  203. purpose: str | None = None,
  204. ) -> None:
  205. """
  206. 设置 LLM 提取结果
  207. Args:
  208. domains: 提取的业务领域列表
  209. fields: 提取的数据字段列表
  210. purpose: 数据用途
  211. """
  212. self.extracted_domains = domains
  213. self.extracted_fields = fields
  214. self.extraction_purpose = purpose
  215. self.updated_at = now_china_naive()
  216. def set_graph_analysis(
  217. self,
  218. analysis: dict[str, Any] | None,
  219. can_connect: bool,
  220. connection_path: dict[str, Any] | None = None,
  221. ) -> None:
  222. """
  223. 设置图谱分析结果
  224. Args:
  225. analysis: 分析结果详情
  226. can_connect: 是否可连通
  227. connection_path: 连通路径
  228. """
  229. self.graph_analysis = analysis
  230. self.can_connect = can_connect
  231. self.connection_path = connection_path
  232. self.updated_at = now_china_naive()
  233. def set_result(
  234. self,
  235. product_id: int | None = None,
  236. dataflow_id: int | None = None,
  237. ) -> None:
  238. """
  239. 设置订单结果关联
  240. Args:
  241. product_id: 生成的数据产品ID
  242. dataflow_id: 生成的数据流ID
  243. """
  244. if product_id is not None:
  245. self.result_product_id = product_id
  246. if dataflow_id is not None:
  247. self.result_dataflow_id = dataflow_id
  248. self.updated_at = now_china_naive()
  249. def reject(self, reason: str, processed_by: str | None = None) -> None:
  250. """
  251. 驳回订单
  252. Args:
  253. reason: 驳回原因
  254. processed_by: 处理人
  255. """
  256. self.status = self.STATUS_REJECTED
  257. self.reject_reason = reason
  258. self.updated_at = now_china_naive()
  259. if processed_by:
  260. self.processed_by = processed_by
  261. self.processed_at = now_china_naive()
  262. def __repr__(self) -> str:
  263. return f"<DataOrder {self.order_no} ({self.status})>"