data_product.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. """
  2. 数据产品模型
  3. 用于记录数据工厂加工完成后的数据产品信息
  4. """
  5. from __future__ import annotations
  6. from typing import Any
  7. from app import db
  8. from app.core.common.timezone_utils import now_china_naive
  9. class DataProduct(db.Model):
  10. """数据产品模型,记录数据工厂加工后的数据产品信息"""
  11. __tablename__ = "data_products"
  12. __table_args__ = {"schema": "public"}
  13. id = db.Column(db.Integer, primary_key=True)
  14. # 数据产品基本信息
  15. product_name = db.Column(db.String(200), nullable=False)
  16. product_name_en = db.Column(db.String(200), nullable=False)
  17. description = db.Column(db.Text, nullable=True)
  18. # 关联信息
  19. source_dataflow_id = db.Column(db.Integer, nullable=True)
  20. source_dataflow_name = db.Column(db.String(200), nullable=True)
  21. # 目标表信息
  22. target_table = db.Column(db.String(200), nullable=False)
  23. target_schema = db.Column(db.String(100), nullable=False, default="public")
  24. # 数据统计信息
  25. record_count = db.Column(db.BigInteger, nullable=False, default=0)
  26. column_count = db.Column(db.Integer, nullable=False, default=0)
  27. # 更新提示相关
  28. last_updated_at = db.Column(db.DateTime, nullable=True)
  29. last_viewed_at = db.Column(db.DateTime, nullable=True)
  30. # 状态信息
  31. status = db.Column(db.String(50), nullable=False, default="active")
  32. # 审计字段
  33. created_at = db.Column(db.DateTime, nullable=False, default=now_china_naive)
  34. created_by = db.Column(db.String(100), nullable=False, default="system")
  35. updated_at = db.Column(db.DateTime, nullable=False, default=now_china_naive)
  36. def to_dict(self) -> dict[str, Any]:
  37. """
  38. 将模型转换为字典
  39. Returns:
  40. 包含所有字段的字典
  41. """
  42. return {
  43. "id": self.id,
  44. "product_name": self.product_name,
  45. "product_name_en": self.product_name_en,
  46. "description": self.description,
  47. "source_dataflow_id": self.source_dataflow_id,
  48. "source_dataflow_name": self.source_dataflow_name,
  49. "target_table": self.target_table,
  50. "target_schema": self.target_schema,
  51. "record_count": self.record_count,
  52. "column_count": self.column_count,
  53. "last_updated_at": (
  54. self.last_updated_at.isoformat() if self.last_updated_at else None
  55. ),
  56. "last_viewed_at": (
  57. self.last_viewed_at.isoformat() if self.last_viewed_at else None
  58. ),
  59. "status": self.status,
  60. "created_at": self.created_at.isoformat() if self.created_at else None,
  61. "created_by": self.created_by,
  62. "updated_at": self.updated_at.isoformat() if self.updated_at else None,
  63. "has_new_data": self._has_new_data(),
  64. }
  65. def _has_new_data(self) -> bool:
  66. """
  67. 判断是否有新数据(用于更新提示)
  68. Returns:
  69. 如果 last_updated_at > last_viewed_at 则返回 True
  70. """
  71. if self.last_updated_at is None:
  72. return False
  73. if self.last_viewed_at is None:
  74. return True
  75. return self.last_updated_at > self.last_viewed_at
  76. def mark_as_viewed(self) -> None:
  77. """标记为已查看,更新 last_viewed_at 时间"""
  78. self.last_viewed_at = now_china_naive()
  79. self.updated_at = now_china_naive()
  80. def update_data_stats(
  81. self,
  82. record_count: int,
  83. column_count: int | None = None,
  84. ) -> None:
  85. """
  86. 更新数据统计信息
  87. Args:
  88. record_count: 记录数
  89. column_count: 列数(可选)
  90. """
  91. self.record_count = record_count
  92. if column_count is not None:
  93. self.column_count = column_count
  94. self.last_updated_at = now_china_naive()
  95. self.updated_at = now_china_naive()
  96. def __repr__(self) -> str:
  97. return f"<DataProduct {self.product_name} ({self.target_table})>"
  98. class DataOrder(db.Model):
  99. """
  100. 数据订单模型
  101. 用于记录用户提交的数据需求订单,通过 LLM 提取实体并在图谱中检测连通性
  102. """
  103. __tablename__ = "data_orders"
  104. __table_args__ = {"schema": "public"}
  105. id = db.Column(db.Integer, primary_key=True)
  106. # 订单基本信息
  107. order_no = db.Column(db.String(50), unique=True, nullable=False) # 订单编号
  108. title = db.Column(db.String(200), nullable=False) # 订单标题
  109. description = db.Column(db.Text, nullable=False) # 需求描述
  110. # LLM 提取结果
  111. extracted_domains = db.Column(db.JSON, nullable=True) # 提取的业务领域列表
  112. extracted_fields = db.Column(db.JSON, nullable=True) # 提取的数据字段列表
  113. extraction_purpose = db.Column(db.Text, nullable=True) # 提取的数据用途
  114. # 图谱分析结果
  115. graph_analysis = db.Column(db.JSON, nullable=True) # 连通性分析结果
  116. can_connect = db.Column(db.Boolean, nullable=True) # 是否可连通
  117. connection_path = db.Column(db.JSON, nullable=True) # 连通路径
  118. # 状态管理
  119. # pending-待处理, analyzing-分析中, processing-加工中,
  120. # completed-完成, rejected-驳回, need_supplement-待补充,
  121. # manual_review-待人工处理, updated-已更新
  122. status = db.Column(db.String(50), nullable=False, default="pending")
  123. reject_reason = db.Column(db.Text, nullable=True) # 驳回原因
  124. # 关联数据
  125. result_product_id = db.Column(db.Integer, nullable=True) # 生成的数据产品ID
  126. result_dataflow_id = db.Column(db.Integer, nullable=True) # 生成的数据流ID
  127. data_source = db.Column(db.Integer, nullable=True) # 指定的数据源节点ID
  128. # 审计字段
  129. created_by = db.Column(db.String(100), nullable=False, default="user")
  130. created_at = db.Column(db.DateTime, nullable=False, default=now_china_naive)
  131. updated_at = db.Column(db.DateTime, nullable=False, default=now_china_naive)
  132. processed_by = db.Column(db.String(100), nullable=True) # 处理人
  133. processed_at = db.Column(db.DateTime, nullable=True) # 处理时间
  134. # 状态常量
  135. STATUS_PENDING = "pending"
  136. STATUS_ANALYZING = "analyzing"
  137. STATUS_PENDING_APPROVAL = "pending_approval" # 待审批
  138. STATUS_PROCESSING = "processing"
  139. STATUS_ONBOARD = "onboard" # 数据产品就绪
  140. STATUS_COMPLETED = "completed"
  141. STATUS_REJECTED = "rejected"
  142. STATUS_NEED_SUPPLEMENT = "need_supplement"
  143. STATUS_MANUAL_REVIEW = "manual_review"
  144. STATUS_UPDATED = "updated"
  145. # 状态标签映射
  146. STATUS_LABELS = {
  147. "pending": "待处理",
  148. "analyzing": "分析中",
  149. "pending_approval": "待审批",
  150. "processing": "加工中",
  151. "onboard": "数据产品就绪",
  152. "completed": "已完成",
  153. "rejected": "已驳回",
  154. "need_supplement": "待补充",
  155. "manual_review": "待人工处理",
  156. "updated": "已更新",
  157. }
  158. def to_dict(self) -> dict[str, Any]:
  159. """
  160. 将模型转换为字典
  161. Returns:
  162. 包含所有字段的字典
  163. """
  164. return {
  165. "id": self.id,
  166. "order_no": self.order_no,
  167. "title": self.title,
  168. "description": self.description,
  169. "extracted_domains": self.extracted_domains,
  170. "extracted_fields": self.extracted_fields,
  171. "extraction_purpose": self.extraction_purpose,
  172. "graph_analysis": self.graph_analysis,
  173. "can_connect": self.can_connect,
  174. "connection_path": self.connection_path,
  175. "status": self.status,
  176. "status_label": self.STATUS_LABELS.get(self.status, "未知"),
  177. "reject_reason": self.reject_reason,
  178. "result_product_id": self.result_product_id,
  179. "result_dataflow_id": self.result_dataflow_id,
  180. "data_source": self.data_source,
  181. "created_by": self.created_by,
  182. "created_at": self.created_at.isoformat() if self.created_at else None,
  183. "updated_at": self.updated_at.isoformat() if self.updated_at else None,
  184. "processed_by": self.processed_by,
  185. "processed_at": (
  186. self.processed_at.isoformat() if self.processed_at else None
  187. ),
  188. }
  189. def update_status(self, new_status: str, processed_by: str | None = None) -> None:
  190. """
  191. 更新订单状态
  192. Args:
  193. new_status: 新状态
  194. processed_by: 处理人
  195. """
  196. self.status = new_status
  197. self.updated_at = now_china_naive()
  198. if processed_by:
  199. self.processed_by = processed_by
  200. self.processed_at = now_china_naive()
  201. def set_extraction_result(
  202. self,
  203. domains: list[str] | None,
  204. fields: list[str] | None,
  205. purpose: str | None = None,
  206. ) -> None:
  207. """
  208. 设置 LLM 提取结果
  209. Args:
  210. domains: 提取的业务领域列表
  211. fields: 提取的数据字段列表
  212. purpose: 数据用途
  213. """
  214. self.extracted_domains = domains
  215. self.extracted_fields = fields
  216. self.extraction_purpose = purpose
  217. self.updated_at = now_china_naive()
  218. def set_graph_analysis(
  219. self,
  220. analysis: dict[str, Any] | None,
  221. can_connect: bool,
  222. connection_path: dict[str, Any] | None = None,
  223. ) -> None:
  224. """
  225. 设置图谱分析结果
  226. Args:
  227. analysis: 分析结果详情
  228. can_connect: 是否可连通
  229. connection_path: 连通路径
  230. """
  231. self.graph_analysis = analysis
  232. self.can_connect = can_connect
  233. self.connection_path = connection_path
  234. self.updated_at = now_china_naive()
  235. def set_result(
  236. self,
  237. product_id: int | None = None,
  238. dataflow_id: int | None = None,
  239. ) -> None:
  240. """
  241. 设置订单结果关联
  242. Args:
  243. product_id: 生成的数据产品ID
  244. dataflow_id: 生成的数据流ID
  245. """
  246. if product_id is not None:
  247. self.result_product_id = product_id
  248. if dataflow_id is not None:
  249. self.result_dataflow_id = dataflow_id
  250. self.updated_at = now_china_naive()
  251. def reject(self, reason: str, processed_by: str | None = None) -> None:
  252. """
  253. 驳回订单
  254. Args:
  255. reason: 驳回原因
  256. processed_by: 处理人
  257. """
  258. self.status = self.STATUS_REJECTED
  259. self.reject_reason = reason
  260. self.updated_at = now_china_naive()
  261. if processed_by:
  262. self.processed_by = processed_by
  263. self.processed_at = now_china_naive()
  264. def __repr__(self) -> str:
  265. return f"<DataOrder {self.order_no} ({self.status})>"