|
|
@@ -17,6 +17,7 @@ from openai import OpenAI
|
|
|
from sqlalchemy import text
|
|
|
|
|
|
from app import db
|
|
|
+from app.core.common.timezone_utils import now_china_naive
|
|
|
from app.models.data_product import DataOrder, DataProduct
|
|
|
from app.services.neo4j_driver import neo4j_driver
|
|
|
|
|
|
@@ -547,8 +548,8 @@ class DataProductService:
|
|
|
existing.description = description
|
|
|
existing.source_dataflow_id = source_dataflow_id
|
|
|
existing.source_dataflow_name = source_dataflow_name
|
|
|
- existing.updated_at = datetime.utcnow()
|
|
|
- existing.last_updated_at = datetime.utcnow()
|
|
|
+ existing.updated_at = now_china_naive()
|
|
|
+ existing.last_updated_at = now_china_naive()
|
|
|
db.session.commit()
|
|
|
|
|
|
logger.info(
|
|
|
@@ -568,7 +569,7 @@ class DataProductService:
|
|
|
source_dataflow_id=source_dataflow_id, # type: ignore[arg-type]
|
|
|
source_dataflow_name=source_dataflow_name, # type: ignore[arg-type]
|
|
|
created_by=created_by, # type: ignore[arg-type]
|
|
|
- last_updated_at=datetime.utcnow(), # type: ignore[arg-type]
|
|
|
+ last_updated_at=now_china_naive(), # type: ignore[arg-type]
|
|
|
)
|
|
|
|
|
|
db.session.add(product)
|
|
|
@@ -611,8 +612,8 @@ class DataProductService:
|
|
|
if column_count is not None:
|
|
|
product.column_count = column_count
|
|
|
|
|
|
- product.last_updated_at = datetime.utcnow()
|
|
|
- product.updated_at = datetime.utcnow()
|
|
|
+ product.last_updated_at = now_china_naive()
|
|
|
+ product.updated_at = now_china_naive()
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
|
@@ -662,7 +663,7 @@ class DataProductService:
|
|
|
|
|
|
if not exists:
|
|
|
product.status = "error"
|
|
|
- product.updated_at = datetime.utcnow()
|
|
|
+ product.updated_at = now_china_naive()
|
|
|
db.session.commit()
|
|
|
return product
|
|
|
|
|
|
@@ -688,8 +689,8 @@ class DataProductService:
|
|
|
# 更新统计信息
|
|
|
product.record_count = record_count
|
|
|
product.column_count = column_count
|
|
|
- product.last_updated_at = datetime.utcnow()
|
|
|
- product.updated_at = datetime.utcnow()
|
|
|
+ product.last_updated_at = now_china_naive()
|
|
|
+ product.updated_at = now_china_naive()
|
|
|
product.status = "active"
|
|
|
|
|
|
db.session.commit()
|
|
|
@@ -1251,6 +1252,131 @@ class DataOrderService:
|
|
|
"error": str(e),
|
|
|
}
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def extract_output_domain_and_logic(
|
|
|
+ description: str,
|
|
|
+ input_domains: list[dict[str, Any]] | None = None,
|
|
|
+ ) -> dict[str, Any]:
|
|
|
+ """
|
|
|
+ 使用 LLM 从描述中提取输出 BusinessDomain 信息和数据加工处理逻辑
|
|
|
+
|
|
|
+ Args:
|
|
|
+ description: 需求描述
|
|
|
+ input_domains: 已匹配的输入 BusinessDomain 列表(用于提供上下文)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 提取结果,包含:
|
|
|
+ - output_domain: 输出 BusinessDomain 的信息
|
|
|
+ - name_zh: 中文名称
|
|
|
+ - name_en: 英文名称
|
|
|
+ - describe: 描述
|
|
|
+ - processing_logic: 数据加工处理逻辑描述
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ client = OpenAI(
|
|
|
+ api_key=current_app.config.get("LLM_API_KEY"),
|
|
|
+ base_url=current_app.config.get("LLM_BASE_URL"),
|
|
|
+ )
|
|
|
+
|
|
|
+ model = current_app.config.get("LLM_MODEL_NAME")
|
|
|
+
|
|
|
+ # 构建输入域上下文信息
|
|
|
+ input_context = ""
|
|
|
+ if input_domains:
|
|
|
+ domain_names = [
|
|
|
+ d.get("name_zh", d.get("name_en", "未知")) for d in input_domains
|
|
|
+ ]
|
|
|
+ input_context = f"\n已确定的输入数据源:{', '.join(domain_names)}"
|
|
|
+
|
|
|
+ prompt = f"""分析以下数据需求描述,提取输出数据产品信息和数据加工处理逻辑。
|
|
|
+{input_context}
|
|
|
+
|
|
|
+需求描述:{description}
|
|
|
+
|
|
|
+请严格按照以下JSON格式返回,不要添加任何解释或其他内容:
|
|
|
+{{
|
|
|
+ "output_domain": {{
|
|
|
+ "name_zh": "输出数据产品的中文名称",
|
|
|
+ "name_en": "output_product_english_name",
|
|
|
+ "describe": "输出数据产品的描述,说明这个数据产品包含什么内容"
|
|
|
+ }},
|
|
|
+ "processing_logic": "详细的数据加工处理逻辑,包括:1.需要从哪些源数据中提取什么字段;2.需要进行什么样的数据转换或计算;3.数据的过滤条件或筛选规则;4.最终输出数据的格式和字段"
|
|
|
+}}
|
|
|
+
|
|
|
+注意:
|
|
|
+1. output_domain.name_zh 应该是一个简洁明了的数据产品名称,如"会员消费分析报表"、"销售业绩汇总表"等
|
|
|
+2. output_domain.name_en 应该是英文名称,使用下划线连接,如"member_consumption_analysis"
|
|
|
+3. processing_logic 应该详细描述数据加工的完整流程,便于后续生成数据处理脚本
|
|
|
+"""
|
|
|
+
|
|
|
+ completion = client.chat.completions.create(
|
|
|
+ model=model, # type: ignore[arg-type]
|
|
|
+ messages=[
|
|
|
+ {
|
|
|
+ "role": "system",
|
|
|
+ "content": "你是一个专业的数据架构师,擅长从自然语言描述中提取数据产品定义和数据加工逻辑。"
|
|
|
+ "请严格按照要求的JSON格式返回结果。",
|
|
|
+ },
|
|
|
+ {"role": "user", "content": prompt},
|
|
|
+ ],
|
|
|
+ temperature=0.1,
|
|
|
+ max_tokens=2048,
|
|
|
+ )
|
|
|
+
|
|
|
+ response_text = (
|
|
|
+ completion.choices[0].message.content.strip() # type: ignore[union-attr]
|
|
|
+ )
|
|
|
+
|
|
|
+ # 尝试解析 JSON
|
|
|
+ # 清理可能的 markdown 代码块标记
|
|
|
+ if response_text.startswith("```"):
|
|
|
+ lines = response_text.split("\n")
|
|
|
+ # 移除首尾的代码块标记
|
|
|
+ if lines[0].startswith("```"):
|
|
|
+ lines = lines[1:]
|
|
|
+ if lines and lines[-1].strip() == "```":
|
|
|
+ lines = lines[:-1]
|
|
|
+ response_text = "\n".join(lines)
|
|
|
+
|
|
|
+ result = json.loads(response_text)
|
|
|
+
|
|
|
+ # 验证必要字段
|
|
|
+ if "output_domain" not in result:
|
|
|
+ result["output_domain"] = {
|
|
|
+ "name_zh": "数据产品",
|
|
|
+ "name_en": "data_product",
|
|
|
+ "describe": description[:200] if description else "",
|
|
|
+ }
|
|
|
+ if "processing_logic" not in result:
|
|
|
+ result["processing_logic"] = description
|
|
|
+
|
|
|
+ logger.info(f"LLM 输出域和处理逻辑提取成功: {result}")
|
|
|
+ return result
|
|
|
+
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ logger.error(f"LLM 返回结果解析失败: {str(e)}, response: {response_text}")
|
|
|
+ # 返回默认值
|
|
|
+ return {
|
|
|
+ "output_domain": {
|
|
|
+ "name_zh": "数据产品",
|
|
|
+ "name_en": "data_product",
|
|
|
+ "describe": description[:200] if description else "",
|
|
|
+ },
|
|
|
+ "processing_logic": description,
|
|
|
+ "error": "解析失败",
|
|
|
+ }
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"LLM 输出域和处理逻辑提取失败: {str(e)}")
|
|
|
+ return {
|
|
|
+ "output_domain": {
|
|
|
+ "name_zh": "数据产品",
|
|
|
+ "name_en": "data_product",
|
|
|
+ "describe": description[:200] if description else "",
|
|
|
+ },
|
|
|
+ "processing_logic": description,
|
|
|
+ "error": str(e),
|
|
|
+ }
|
|
|
+
|
|
|
@staticmethod
|
|
|
def find_matching_domains(domain_names: list[str]) -> list[dict[str, Any]]:
|
|
|
"""
|
|
|
@@ -1528,8 +1654,8 @@ class DataOrderService:
|
|
|
|
|
|
# 根据连通性结果更新状态
|
|
|
if can_connect:
|
|
|
- # 可连通,自动进入加工状态
|
|
|
- order.update_status(DataOrder.STATUS_PROCESSING)
|
|
|
+ # 可连通,进入待审批状态
|
|
|
+ order.update_status(DataOrder.STATUS_PENDING_APPROVAL)
|
|
|
else:
|
|
|
# 不可连通,需要人工处理
|
|
|
order.update_status(DataOrder.STATUS_MANUAL_REVIEW)
|
|
|
@@ -1547,32 +1673,55 @@ class DataOrderService:
|
|
|
def approve_order(
|
|
|
order_id: int,
|
|
|
processed_by: str = "admin",
|
|
|
- ) -> DataOrder | None:
|
|
|
+ ) -> dict[str, Any]:
|
|
|
"""
|
|
|
- 审批通过订单
|
|
|
+ 审批通过订单,并自动生成 BusinessDomain 和 DataFlow 资源
|
|
|
|
|
|
Args:
|
|
|
order_id: 订单ID
|
|
|
processed_by: 处理人
|
|
|
|
|
|
Returns:
|
|
|
- 更新后的订单对象
|
|
|
+ 包含订单信息和生成资源的字典:
|
|
|
+ - order: 更新后的订单对象字典
|
|
|
+ - generated_resources: 生成的资源信息
|
|
|
"""
|
|
|
try:
|
|
|
order = DataOrder.query.get(order_id)
|
|
|
if not order:
|
|
|
- return None
|
|
|
+ raise ValueError(f"订单不存在: order_id={order_id}")
|
|
|
|
|
|
- if order.status != DataOrder.STATUS_MANUAL_REVIEW:
|
|
|
- raise ValueError(f"订单状态不允许审批: {order.status}")
|
|
|
+ # 允许从 pending_approval 或 manual_review 状态审批
|
|
|
+ allowed_statuses = [
|
|
|
+ DataOrder.STATUS_PENDING_APPROVAL,
|
|
|
+ DataOrder.STATUS_MANUAL_REVIEW,
|
|
|
+ ]
|
|
|
+ if order.status not in allowed_statuses:
|
|
|
+ raise ValueError(
|
|
|
+ f"订单状态 {order.status} 不允许审批,"
|
|
|
+ f"只有 {allowed_statuses} 状态可以审批"
|
|
|
+ )
|
|
|
+
|
|
|
+ # 自动生成资源
|
|
|
+ generated_resources = DataOrderService.generate_order_resources(order)
|
|
|
+
|
|
|
+ # 更新订单关联的 dataflow_id
|
|
|
+ order.result_dataflow_id = generated_resources["dataflow_id"]
|
|
|
|
|
|
+ # 更新状态为 processing
|
|
|
order.update_status(DataOrder.STATUS_PROCESSING, processed_by)
|
|
|
db.session.commit()
|
|
|
|
|
|
logger.info(
|
|
|
- f"订单审批通过: order_id={order_id}, processed_by={processed_by}"
|
|
|
+ f"订单审批通过并生成资源: order_id={order_id}, "
|
|
|
+ f"dataflow_id={generated_resources['dataflow_id']}, "
|
|
|
+ f"processed_by={processed_by}"
|
|
|
)
|
|
|
- return order
|
|
|
+
|
|
|
+ return {
|
|
|
+ "order": order.to_dict(),
|
|
|
+ "generated_resources": generated_resources,
|
|
|
+ }
|
|
|
|
|
|
except Exception as e:
|
|
|
db.session.rollback()
|
|
|
@@ -1617,18 +1766,419 @@ class DataOrderService:
|
|
|
|
|
|
@staticmethod
|
|
|
def complete_order(
|
|
|
+ order_id: int,
|
|
|
+ processed_by: str = "user",
|
|
|
+ ) -> DataOrder | None:
|
|
|
+ """
|
|
|
+ 标记订单为最终完成状态
|
|
|
+
|
|
|
+ 只允许从 onboard(数据产品就绪)状态标记完成
|
|
|
+
|
|
|
+ Args:
|
|
|
+ order_id: 订单ID
|
|
|
+ processed_by: 处理人
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 更新后的订单对象
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ order = DataOrder.query.get(order_id)
|
|
|
+ if not order:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 只允许从 onboard 状态标记完成
|
|
|
+ if order.status != DataOrder.STATUS_ONBOARD:
|
|
|
+ raise ValueError(
|
|
|
+ f"订单状态 {order.status} 不允许标记完成,"
|
|
|
+ f"只有 onboard 状态可以标记完成"
|
|
|
+ )
|
|
|
+
|
|
|
+ order.update_status(DataOrder.STATUS_COMPLETED, processed_by)
|
|
|
+ db.session.commit()
|
|
|
+
|
|
|
+ logger.info(f"订单已完成: order_id={order_id}, processed_by={processed_by}")
|
|
|
+ return order
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ db.session.rollback()
|
|
|
+ logger.error(f"完成订单失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def update_order(
|
|
|
+ order_id: int,
|
|
|
+ title: str | None = None,
|
|
|
+ description: str | None = None,
|
|
|
+ extracted_domains: list[str] | None = None,
|
|
|
+ extracted_fields: list[str] | None = None,
|
|
|
+ extraction_purpose: str | None = None,
|
|
|
+ ) -> DataOrder | None:
|
|
|
+ """
|
|
|
+ 更新数据订单(支持修改描述和提取结果)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ order_id: 订单ID
|
|
|
+ title: 订单标题(可选)
|
|
|
+ description: 需求描述(可选)
|
|
|
+ extracted_domains: 提取的业务领域列表(可选)
|
|
|
+ extracted_fields: 提取的数据字段列表(可选)
|
|
|
+ extraction_purpose: 数据用途(可选)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 更新后的订单对象
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ order = DataOrder.query.get(order_id)
|
|
|
+ if not order:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 只允许在特定状态下修改订单
|
|
|
+ allowed_statuses = [
|
|
|
+ DataOrder.STATUS_PENDING,
|
|
|
+ DataOrder.STATUS_MANUAL_REVIEW,
|
|
|
+ DataOrder.STATUS_NEED_SUPPLEMENT,
|
|
|
+ ]
|
|
|
+ if order.status not in allowed_statuses:
|
|
|
+ raise ValueError(
|
|
|
+ f"订单状态 {order.status} 不允许修改,"
|
|
|
+ f"只有 {allowed_statuses} 状态可以修改"
|
|
|
+ )
|
|
|
+
|
|
|
+ # 更新基本信息
|
|
|
+ if title is not None:
|
|
|
+ order.title = title
|
|
|
+ if description is not None:
|
|
|
+ order.description = description
|
|
|
+
|
|
|
+ # 更新提取结果
|
|
|
+ if extracted_domains is not None:
|
|
|
+ order.extracted_domains = extracted_domains
|
|
|
+ if extracted_fields is not None:
|
|
|
+ order.extracted_fields = extracted_fields
|
|
|
+ if extraction_purpose is not None:
|
|
|
+ order.extraction_purpose = extraction_purpose
|
|
|
+
|
|
|
+ # 更新状态为待处理,重新进入处理流程
|
|
|
+ order.status = DataOrder.STATUS_PENDING
|
|
|
+ order.updated_at = now_china_naive()
|
|
|
+ db.session.commit()
|
|
|
+
|
|
|
+ logger.info(f"更新数据订单成功: order_id={order_id}")
|
|
|
+ return order
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ db.session.rollback()
|
|
|
+ logger.error(f"更新数据订单失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def generate_order_resources(order: DataOrder) -> dict[str, Any]:
|
|
|
+ """
|
|
|
+ 根据订单分析结果自动生成 BusinessDomain 和 DataFlow 资源
|
|
|
+
|
|
|
+ 流程:
|
|
|
+ 1. 使用 LLM 从 description 提取输出 BusinessDomain 信息和处理逻辑
|
|
|
+ 2. 创建输出 BusinessDomain 节点
|
|
|
+ 3. 创建 DataFlow 节点
|
|
|
+ 4. 建立 INPUT/OUTPUT 关系
|
|
|
+ 5. 在 task_list 表中创建任务记录
|
|
|
+
|
|
|
+ Args:
|
|
|
+ order: 数据订单对象
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 包含生成的资源信息的字典:
|
|
|
+ - target_business_domain_id: 目标 BusinessDomain 节点 ID
|
|
|
+ - dataflow_id: DataFlow 节点 ID
|
|
|
+ - input_domain_ids: 输入 BusinessDomain 节点 ID 列表
|
|
|
+ - task_id: task_list 表中的任务 ID
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ graph_analysis = order.graph_analysis or {}
|
|
|
+ matched_domains = graph_analysis.get("matched_domains", [])
|
|
|
+
|
|
|
+ if not matched_domains:
|
|
|
+ raise ValueError("订单没有匹配的业务领域,无法生成资源")
|
|
|
+
|
|
|
+ # 1. 使用 LLM 提取输出 BusinessDomain 信息和处理逻辑
|
|
|
+ extraction_result = DataOrderService.extract_output_domain_and_logic(
|
|
|
+ description=order.description,
|
|
|
+ input_domains=matched_domains,
|
|
|
+ )
|
|
|
+
|
|
|
+ output_domain_info = extraction_result.get("output_domain", {})
|
|
|
+ processing_logic = extraction_result.get("processing_logic", "")
|
|
|
+
|
|
|
+ # 获取输出域名称,使用 LLM 提取结果或回退到默认值
|
|
|
+ target_bd_name_zh = output_domain_info.get("name_zh") or order.title
|
|
|
+ target_bd_name_en = output_domain_info.get(
|
|
|
+ "name_en", f"DP_{order.order_no}"
|
|
|
+ )
|
|
|
+ target_bd_describe = output_domain_info.get(
|
|
|
+ "describe", order.extraction_purpose or order.description
|
|
|
+ )
|
|
|
+
|
|
|
+ with neo4j_driver.get_session() as session:
|
|
|
+ # 2. 创建目标 BusinessDomain 节点(数据产品承载)
|
|
|
+ create_target_bd_query = """
|
|
|
+ CREATE (bd:BusinessDomain {
|
|
|
+ name_en: $name_en,
|
|
|
+ name_zh: $name_zh,
|
|
|
+ describe: $describe,
|
|
|
+ type: 'data_product',
|
|
|
+ created_at: datetime(),
|
|
|
+ created_by: $created_by,
|
|
|
+ source_order_id: $order_id
|
|
|
+ })
|
|
|
+ RETURN id(bd) as bd_id
|
|
|
+ """
|
|
|
+ result = session.run(
|
|
|
+ create_target_bd_query,
|
|
|
+ {
|
|
|
+ "name_en": target_bd_name_en,
|
|
|
+ "name_zh": target_bd_name_zh,
|
|
|
+ "describe": target_bd_describe,
|
|
|
+ "created_by": "system",
|
|
|
+ "order_id": order.id,
|
|
|
+ },
|
|
|
+ ).single()
|
|
|
+ if result is None:
|
|
|
+ raise ValueError("创建目标 BusinessDomain 失败")
|
|
|
+ target_bd_id = result["bd_id"]
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ f"创建目标 BusinessDomain: id={target_bd_id}, "
|
|
|
+ f"name_zh={target_bd_name_zh}, name_en={target_bd_name_en}"
|
|
|
+ )
|
|
|
+
|
|
|
+ # 3. 创建 DataFlow 节点
|
|
|
+ dataflow_name_en = f"DF_{order.order_no}"
|
|
|
+ dataflow_name_zh = f"{target_bd_name_zh}_数据流程"
|
|
|
+
|
|
|
+ # 构建 script_requirement(包含完整的数据加工定义)
|
|
|
+ input_domain_names = [
|
|
|
+ d.get("name_zh", d.get("name_en", "")) for d in matched_domains
|
|
|
+ ]
|
|
|
+ input_domain_ids = [d["id"] for d in matched_domains]
|
|
|
+
|
|
|
+ # 构建结构化的 script_requirement(JSON 格式)
|
|
|
+ script_requirement_dict = {
|
|
|
+ "source_table": input_domain_ids,
|
|
|
+ "target_table": [target_bd_id],
|
|
|
+ "rule": processing_logic,
|
|
|
+ "description": order.description,
|
|
|
+ "purpose": order.extraction_purpose or "",
|
|
|
+ "fields": order.extracted_fields or [],
|
|
|
+ }
|
|
|
+ script_requirement_str = json.dumps(
|
|
|
+ script_requirement_dict, ensure_ascii=False
|
|
|
+ )
|
|
|
+
|
|
|
+ create_dataflow_query = """
|
|
|
+ CREATE (df:DataFlow {
|
|
|
+ name_en: $name_en,
|
|
|
+ name_zh: $name_zh,
|
|
|
+ script_requirement: $script_requirement,
|
|
|
+ script_type: 'pending',
|
|
|
+ update_mode: 'full',
|
|
|
+ status: 'inactive',
|
|
|
+ created_at: datetime(),
|
|
|
+ created_by: $created_by,
|
|
|
+ source_order_id: $order_id
|
|
|
+ })
|
|
|
+ RETURN id(df) as df_id
|
|
|
+ """
|
|
|
+ result = session.run(
|
|
|
+ create_dataflow_query,
|
|
|
+ {
|
|
|
+ "name_en": dataflow_name_en,
|
|
|
+ "name_zh": dataflow_name_zh,
|
|
|
+ "script_requirement": script_requirement_str,
|
|
|
+ "created_by": "system",
|
|
|
+ "order_id": order.id,
|
|
|
+ },
|
|
|
+ ).single()
|
|
|
+ if result is None:
|
|
|
+ raise ValueError("创建 DataFlow 失败")
|
|
|
+ dataflow_id = result["df_id"]
|
|
|
+
|
|
|
+ logger.info(f"创建 DataFlow: id={dataflow_id}, name={dataflow_name_en}")
|
|
|
+
|
|
|
+ # 4. 建立 INPUT 关系(源 BusinessDomain -> DataFlow)
|
|
|
+ for domain_id in input_domain_ids:
|
|
|
+ create_input_rel_query = """
|
|
|
+ MATCH (bd:BusinessDomain), (df:DataFlow)
|
|
|
+ WHERE id(bd) = $bd_id AND id(df) = $df_id
|
|
|
+ CREATE (bd)-[:INPUT]->(df)
|
|
|
+ """
|
|
|
+ session.run(
|
|
|
+ create_input_rel_query,
|
|
|
+ {"bd_id": domain_id, "df_id": dataflow_id},
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"建立 INPUT 关系: {input_domain_ids} -> {dataflow_id}")
|
|
|
+
|
|
|
+ # 5. 建立 OUTPUT 关系(DataFlow -> 目标 BusinessDomain)
|
|
|
+ create_output_rel_query = """
|
|
|
+ MATCH (df:DataFlow), (bd:BusinessDomain)
|
|
|
+ WHERE id(df) = $df_id AND id(bd) = $bd_id
|
|
|
+ CREATE (df)-[:OUTPUT]->(bd)
|
|
|
+ """
|
|
|
+ session.run(
|
|
|
+ create_output_rel_query,
|
|
|
+ {"df_id": dataflow_id, "bd_id": target_bd_id},
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"建立 OUTPUT 关系: {dataflow_id} -> {target_bd_id}")
|
|
|
+
|
|
|
+ # 6. 在 task_list 表中创建任务记录
|
|
|
+ task_id = DataOrderService._create_task_record(
|
|
|
+ order=order,
|
|
|
+ dataflow_name_en=dataflow_name_en,
|
|
|
+ dataflow_name_zh=dataflow_name_zh,
|
|
|
+ dataflow_id=dataflow_id,
|
|
|
+ input_domain_names=input_domain_names,
|
|
|
+ target_bd_name_zh=target_bd_name_zh,
|
|
|
+ processing_logic=processing_logic,
|
|
|
+ )
|
|
|
+
|
|
|
+ return {
|
|
|
+ "target_business_domain_id": target_bd_id,
|
|
|
+ "target_business_domain_name": target_bd_name_zh,
|
|
|
+ "dataflow_id": dataflow_id,
|
|
|
+ "dataflow_name": dataflow_name_en,
|
|
|
+ "input_domain_ids": input_domain_ids,
|
|
|
+ "task_id": task_id,
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"生成订单资源失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _create_task_record(
|
|
|
+ order: DataOrder,
|
|
|
+ dataflow_name_en: str,
|
|
|
+ dataflow_name_zh: str,
|
|
|
+ dataflow_id: int,
|
|
|
+ input_domain_names: list[str],
|
|
|
+ target_bd_name_zh: str,
|
|
|
+ processing_logic: str,
|
|
|
+ ) -> int | None:
|
|
|
+ """
|
|
|
+ 在 task_list 表中创建任务记录
|
|
|
+
|
|
|
+ Args:
|
|
|
+ order: 数据订单对象
|
|
|
+ dataflow_name_en: DataFlow 英文名称
|
|
|
+ dataflow_name_zh: DataFlow 中文名称
|
|
|
+ dataflow_id: DataFlow 节点 ID
|
|
|
+ input_domain_names: 输入域名称列表
|
|
|
+ target_bd_name_zh: 目标 BusinessDomain 中文名称
|
|
|
+ processing_logic: 数据加工处理逻辑
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 创建的任务 ID
|
|
|
+ """
|
|
|
+ from datetime import datetime
|
|
|
+
|
|
|
+ from sqlalchemy import text
|
|
|
+
|
|
|
+ try:
|
|
|
+ current_time = datetime.now()
|
|
|
+
|
|
|
+ # 构建 Markdown 格式的任务描述
|
|
|
+ task_description_parts = [
|
|
|
+ f"# Task: {dataflow_name_en}\n",
|
|
|
+ "## DataFlow Configuration",
|
|
|
+ f"- **DataFlow ID**: {dataflow_id}",
|
|
|
+ f"- **DataFlow Name**: {dataflow_name_zh}",
|
|
|
+ f"- **Order ID**: {order.id}",
|
|
|
+ f"- **Order No**: {order.order_no}\n",
|
|
|
+ "## Source Tables",
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 添加输入域信息
|
|
|
+ for name in input_domain_names:
|
|
|
+ task_description_parts.append(f"- {name}")
|
|
|
+
|
|
|
+ task_description_parts.extend(
|
|
|
+ [
|
|
|
+ "",
|
|
|
+ "## Target Table",
|
|
|
+ f"- {target_bd_name_zh}\n",
|
|
|
+ "## Update Mode",
|
|
|
+ "- **Mode**: Full Refresh (全量更新)",
|
|
|
+ "- **Description**: 目标表将被清空后重新写入数据\n",
|
|
|
+ "## Request Content",
|
|
|
+ processing_logic or order.description,
|
|
|
+ "",
|
|
|
+ "## Implementation Steps",
|
|
|
+ "1. 连接数据源,读取源数据表",
|
|
|
+ "2. 根据处理逻辑执行数据转换",
|
|
|
+ "3. 写入目标数据表",
|
|
|
+ "4. 完成后回调更新订单状态为 onboard",
|
|
|
+ ]
|
|
|
+ )
|
|
|
+
|
|
|
+ task_description_md = "\n".join(task_description_parts)
|
|
|
+
|
|
|
+ # 脚本路径和名称
|
|
|
+ code_path = "datafactory/scripts"
|
|
|
+ code_name = f"{dataflow_name_en}.py"
|
|
|
+
|
|
|
+ # 插入 task_list 表
|
|
|
+ task_insert_sql = text(
|
|
|
+ "INSERT INTO public.task_list "
|
|
|
+ "(task_name, task_description, status, code_name, "
|
|
|
+ "code_path, create_by, create_time, update_time) "
|
|
|
+ "VALUES "
|
|
|
+ "(:task_name, :task_description, :status, :code_name, "
|
|
|
+ ":code_path, :create_by, :create_time, :update_time) "
|
|
|
+ "RETURNING task_id"
|
|
|
+ )
|
|
|
+
|
|
|
+ task_params = {
|
|
|
+ "task_name": dataflow_name_en,
|
|
|
+ "task_description": task_description_md,
|
|
|
+ "status": "pending",
|
|
|
+ "code_name": code_name,
|
|
|
+ "code_path": code_path,
|
|
|
+ "create_by": "system",
|
|
|
+ "create_time": current_time,
|
|
|
+ "update_time": current_time,
|
|
|
+ }
|
|
|
+
|
|
|
+ result = db.session.execute(task_insert_sql, task_params)
|
|
|
+ row = result.fetchone()
|
|
|
+ task_id = row[0] if row else None
|
|
|
+ db.session.commit()
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ f"成功创建任务记录: task_id={task_id}, task_name={dataflow_name_en}"
|
|
|
+ )
|
|
|
+ return task_id
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ db.session.rollback()
|
|
|
+ logger.error(f"创建任务记录失败: {str(e)}")
|
|
|
+ # 任务记录创建失败不阻塞主流程,返回 None
|
|
|
+ return None
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def set_order_onboard(
|
|
|
order_id: int,
|
|
|
product_id: int | None = None,
|
|
|
dataflow_id: int | None = None,
|
|
|
- processed_by: str = "system",
|
|
|
+ processed_by: str = "n8n-workflow",
|
|
|
) -> DataOrder | None:
|
|
|
"""
|
|
|
- 完成订单
|
|
|
+ 设置订单为数据产品就绪状态(供数据工厂回调)
|
|
|
|
|
|
Args:
|
|
|
order_id: 订单ID
|
|
|
- product_id: 生成的数据产品ID
|
|
|
- dataflow_id: 生成的数据流ID
|
|
|
+ product_id: 生成的数据产品ID(可选)
|
|
|
+ dataflow_id: 数据流ID(可选)
|
|
|
processed_by: 处理人
|
|
|
|
|
|
Returns:
|
|
|
@@ -1639,19 +2189,31 @@ class DataOrderService:
|
|
|
if not order:
|
|
|
return None
|
|
|
|
|
|
- order.set_result(product_id, dataflow_id)
|
|
|
- order.update_status(DataOrder.STATUS_COMPLETED, processed_by)
|
|
|
+ # 只允许从 processing 状态转换
|
|
|
+ if order.status != DataOrder.STATUS_PROCESSING:
|
|
|
+ raise ValueError(
|
|
|
+ f"订单状态 {order.status} 不允许设置为 onboard,"
|
|
|
+ f"只有 processing 状态可以转换"
|
|
|
+ )
|
|
|
+
|
|
|
+ # 更新关联信息
|
|
|
+ if product_id is not None:
|
|
|
+ order.result_product_id = product_id
|
|
|
+ if dataflow_id is not None:
|
|
|
+ order.result_dataflow_id = dataflow_id
|
|
|
+
|
|
|
+ order.update_status(DataOrder.STATUS_ONBOARD, processed_by)
|
|
|
db.session.commit()
|
|
|
|
|
|
logger.info(
|
|
|
- f"订单已完成: order_id={order_id}, product_id={product_id}, "
|
|
|
- f"dataflow_id={dataflow_id}"
|
|
|
+ f"订单设置为 onboard: order_id={order_id}, "
|
|
|
+ f"product_id={product_id}, dataflow_id={dataflow_id}"
|
|
|
)
|
|
|
return order
|
|
|
|
|
|
except Exception as e:
|
|
|
db.session.rollback()
|
|
|
- logger.error(f"完成订单失败: {str(e)}")
|
|
|
+ logger.error(f"设置订单 onboard 状态失败: {str(e)}")
|
|
|
raise
|
|
|
|
|
|
@staticmethod
|