DataFlow_script_requirement优化说明.md 7.3 KB

DataFlow create_dataflow 函数优化说明

优化概述

create_dataflow 函数中新增了对 script_requirement 属性的处理,将其作为 JSON 字符串保存到 Neo4j 的 DataFlow 节点中,同时也保存到 PostgreSQL 数据库中。

修改的文件

文件路径: app/core/data_flow/dataflows.py

前端数据格式

前端上传的数据流配置数据格式示例:

{
  "name_zh": "科室对照表映射到数据模型",
  "name_en": "deparment_table_mapping",
  "category": "应用类",
  "leader": "system",
  "organization": "citu",
  "script_type": "python",
  "update_mode": "append",
  "frequency": "月",
  "tag": null,
  "describe": null,
  "status": "active",
  "script_requirement": {
    "code": 28,
    "rule": "rule",
    "source_table": [
      2317,
      2307
    ],
    "target_table": [
      164
    ]
  }
}

具体修改

1. 在 create_dataflow 函数中添加 script_requirement 处理

位置: 第 197-221 行

修改内容:

# 处理 script_requirement,将其转换为 JSON 字符串
script_requirement = data.get('script_requirement', None)
if script_requirement is not None:
    # 如果是字典或列表,转换为 JSON 字符串
    if isinstance(script_requirement, (dict, list)):
        script_requirement_str = json.dumps(script_requirement, ensure_ascii=False)
    else:
        # 如果已经是字符串,直接使用
        script_requirement_str = str(script_requirement)
else:
    script_requirement_str = ''

# 准备节点数据
node_data = {
    'name_zh': dataflow_name,
    'name_en': name_en,
    'category': data.get('category', ''),
    'organization': data.get('organization', ''),
    'leader': data.get('leader', ''),
    'frequency': data.get('frequency', ''),
    'tag': data.get('tag', ''),
    'describe': data.get('describe', ''),
    'status': data.get('status', 'inactive'),
    'update_mode': data.get('update_mode', 'append'),
    'script_requirement': script_requirement_str,  # 新增的字段
    'created_at': get_formatted_time(),
    'updated_at': get_formatted_time()
}

功能说明:

  1. 从前端数据中获取 script_requirement
  2. 如果是字典或列表类型,使用 json.dumps() 转换为 JSON 字符串
  3. 如果已经是字符串,直接使用
  4. 如果不存在,设置为空字符串
  5. 将 JSON 字符串保存到 Neo4j 节点的 script_requirement 属性中

2. 在 _save_to_pg_database 函数中添加 script_requirement 处理

位置: 第 297-310 行

修改内容:

# 提取脚本相关信息
# 处理 script_requirement,确保保存为 JSON 字符串
script_requirement_raw = data.get('script_requirement', None)
if script_requirement_raw is not None:
    # 如果是字典或列表,转换为 JSON 字符串
    if isinstance(script_requirement_raw, (dict, list)):
        script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False)
    else:
        # 如果已经是字符串,直接使用
        script_requirement = str(script_requirement_raw)
else:
    script_requirement = ''

script_content = data.get('script_content', '')

功能说明:

  1. create_dataflow 函数中的处理逻辑一致
  2. 确保保存到 PostgreSQL 数据库的 script_requirement 字段也是 JSON 字符串格式
  3. 保证 Neo4j 和 PostgreSQL 中的数据格式一致

script_requirement 数据结构

script_requirement 是一个 JSON 对象,包含以下字段:

字段 类型 说明 示例
code Number 代码标识 28
rule String 规则名称 "rule"
source_table Array 源表节点ID数组 [2317, 2307]
target_table Array 目标表节点ID数组 [164]

保存格式:

在数据库中,script_requirement 以 JSON 字符串形式保存:

"{\"code\": 28, \"rule\": \"rule\", \"source_table\": [2317, 2307], \"target_table\": [164]}"

存储位置

1. Neo4j 图数据库

  • 节点类型: DataFlow
  • 属性名: script_requirement
  • 数据类型: String (JSON 格式)

2. PostgreSQL 数据库

  • 表名: dags.data_transform_scripts
  • 字段名: script_requirement
  • 数据类型: TEXT (JSON 格式)

使用示例

创建 DataFlow 时提供 script_requirement

dataflow_data = {
    "name_zh": "科室对照表映射到数据模型",
    "name_en": "deparment_table_mapping",
    "category": "应用类",
    "leader": "system",
    "organization": "citu",
    "script_type": "python",
    "update_mode": "append",
    "frequency": "月",
    "status": "active",
    "describe": "将科室对照表数据映射到数据模型",
    "script_requirement": {
        "code": 28,
        "rule": "mapping_rule",
        "source_table": [2317, 2307],
        "target_table": [164]
    }
}

# 创建数据流
result = DataFlowService.create_dataflow(dataflow_data)

从 Neo4j 查询包含 script_requirement 的 DataFlow

MATCH (df:DataFlow {name_zh: "科室对照表映射到数据模型"})
RETURN df.script_requirement

解析 script_requirement

import json

# 从数据库获取 script_requirement 字符串
script_requirement_str = dataflow_node['script_requirement']

# 解析 JSON 字符串
if script_requirement_str:
    script_requirement = json.loads(script_requirement_str)
    
    code = script_requirement.get('code')
    rule = script_requirement.get('rule')
    source_tables = script_requirement.get('source_table', [])
    target_tables = script_requirement.get('target_table', [])
    
    print(f"Code: {code}")
    print(f"Rule: {rule}")
    print(f"Source tables: {source_tables}")
    print(f"Target tables: {target_tables}")

兼容性说明

  1. 向后兼容: 如果前端不提供 script_requirement,默认保存为空字符串,不影响现有功能
  2. 格式灵活: 支持前端传递字典、列表或字符串格式,都会正确转换为 JSON 字符串
  3. 数据一致性: Neo4j 和 PostgreSQL 中的 script_requirement 格式保持一致

注意事项

  1. JSON 格式: script_requirement 在数据库中以 JSON 字符串形式保存,使用时需要解析
  2. 编码问题: 使用 ensure_ascii=False 确保中文字符正确保存
  3. 空值处理: 如果 script_requirementNone 或不存在,保存为空字符串 ''
  4. 类型检查: 代码会检查 script_requirement 的类型,确保正确转换

测试建议

测试用例 1: 完整的 script_requirement

{
  "name_zh": "测试数据流1",
  "describe": "测试描述",
  "script_requirement": {
    "code": 1,
    "rule": "test_rule",
    "source_table": [100, 101],
    "target_table": [200]
  }
}

测试用例 2: 空的 script_requirement

{
  "name_zh": "测试数据流2",
  "describe": "测试描述",
  "script_requirement": null
}

测试用例 3: script_requirement 为字符串

{
  "name_zh": "测试数据流3",
  "describe": "测试描述",
  "script_requirement": "{\"code\": 3, \"rule\": \"string_rule\"}"
}

相关接口

  • POST /api/dataflow/add-dataflow - 创建数据流
  • GET /api/dataflow/get-dataflow/<id> - 获取数据流详情(包含 script_requirement)
  • PUT /api/dataflow/update-dataflow/<id> - 更新数据流(可更新 script_requirement)

更新历史

  • 2024-11-28: 初始版本,添加 script_requirement 字段支持