DataFlow_rule提取优化说明.md 7.6 KB

DataFlow _save_to_pg_database 函数优化说明

优化概述

_save_to_pg_database 函数中添加了从 script_requirement 中提取 rule 字段并保存到 script_content 的功能。

修改的文件

文件路径: app/core/data_flow/dataflows.py

函数: _save_to_pg_database

script_requirement 数据格式

{
  "code": 28,
  "rule": "rule",
  "source_table": [2317, 2307],
  "target_table": [164]
}

优化逻辑

1. 提取 rule 字段

script_requirement 中提取 rule 字段的值:

# 如果 script_requirement 是字典,直接提取 rule
if isinstance(script_requirement_raw, dict):
    rule_from_requirement = script_requirement_raw.get('rule', '')

# 如果 script_requirement 是字符串,先解析再提取 rule
elif isinstance(script_requirement_raw, str):
    try:
        parsed_req = json.loads(script_requirement)
        if isinstance(parsed_req, dict):
            rule_from_requirement = parsed_req.get('rule', '')
    except (json.JSONDecodeError, TypeError):
        pass

2. 保存到 script_content

优先级规则

  1. 优先使用前端传入的 script_content
  2. 如果 script_content 为空,则使用从 script_requirement 提取的 rule
# 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule
script_content = data.get('script_content', '')
if not script_content and rule_from_requirement:
    script_content = rule_from_requirement
    logger.info(f"script_content为空,使用从script_requirement提取的rule: {rule_from_requirement}")

修改详情

位置: 第 297-327 行

修改前:

script_requirement_raw = data.get('script_requirement', None)
if script_requirement_raw is not None:
    if isinstance(script_requirement_raw, (dict, list)):
        script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False)
    else:
        script_requirement = str(script_requirement_raw)
else:
    script_requirement = ''

script_content = data.get('script_content', '')

修改后:

script_requirement_raw = data.get('script_requirement', None)
rule_from_requirement = ''  # 用于保存从 script_requirement 中提取的 rule

if script_requirement_raw is not None:
    # 如果是字典,提取 rule 字段
    if isinstance(script_requirement_raw, dict):
        rule_from_requirement = script_requirement_raw.get('rule', '')
        script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False)
    elif isinstance(script_requirement_raw, list):
        script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False)
    else:
        # 如果已经是字符串,尝试解析以提取 rule
        script_requirement = str(script_requirement_raw)
        try:
            parsed_req = json.loads(script_requirement)
            if isinstance(parsed_req, dict):
                rule_from_requirement = parsed_req.get('rule', '')
        except (json.JSONDecodeError, TypeError):
            pass
else:
    script_requirement = ''

# 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule
script_content = data.get('script_content', '')
if not script_content and rule_from_requirement:
    script_content = rule_from_requirement
    logger.info(f"script_content为空,使用从script_requirement提取的rule: {rule_from_requirement}")

使用场景

场景 1: 前端提供 script_content

{
  "name_zh": "数据流1",
  "script_content": "自定义脚本内容",
  "script_requirement": {
    "code": 28,
    "rule": "mapping_rule",
    "source_table": [2317, 2307],
    "target_table": [164]
  }
}

结果:

  • script_content = "自定义脚本内容"
  • 使用前端提供的值,不使用 rule

场景 2: 前端未提供 script_content

{
  "name_zh": "数据流2",
  "script_requirement": {
    "code": 28,
    "rule": "mapping_rule",
    "source_table": [2317, 2307],
    "target_table": [164]
  }
}

结果:

  • script_content = "mapping_rule"
  • 自动从 script_requirement 提取 rule

场景 3: script_content 和 rule 都为空

{
  "name_zh": "数据流3",
  "script_requirement": {
    "code": 28,
    "source_table": [2317, 2307],
    "target_table": [164]
  }
}

结果:

  • script_content = ""
  • 保持为空字符串

场景 4: script_requirement 为字符串格式

{
  "name_zh": "数据流4",
  "script_requirement": "{\"code\": 28, \"rule\": \"string_rule\", \"source_table\": [100], \"target_table\": [200]}"
}

结果:

  • 先解析 JSON 字符串
  • 提取 rule = "string_rule"
  • script_content = "string_rule"

数据流向

前端数据
    │
    ├─→ script_content (如果有值)
    │       │
    │       └─→ 直接使用 ✅
    │
    └─→ script_requirement
            │
            ├─→ 解析 JSON
            │
            ├─→ 提取 rule 字段
            │
            └─→ 如果 script_content 为空
                    │
                    └─→ 使用 rule 作为 script_content ✅

保存到数据库

PostgreSQL

: dags.data_transform_scripts

字段 值来源 说明
script_requirement script_requirement (JSON字符串) 完整的需求配置
script_content script_content 或 rule 优先使用前端值,否则使用rule

示例数据:

INSERT INTO dags.data_transform_scripts 
(script_name, script_requirement, script_content)
VALUES 
('数据流示例', 
 '{"code": 28, "rule": "mapping_rule", "source_table": [2317], "target_table": [164]}',
 'mapping_rule');

日志输出

当使用 rule 填充 script_content 时,会记录日志:

INFO: script_content为空,使用从script_requirement提取的rule: mapping_rule

优势

自动填充: 前端不需要同时传递 script_contentrule
向后兼容: 如果前端提供 script_content,优先使用前端值
灵活处理: 支持 script_requirement 为字典或字符串格式
错误容忍: JSON 解析失败不会中断流程
日志追踪: 记录何时使用 rule 填充 script_content

注意事项

  1. 优先级: 前端的 script_content 优先级最高,不会被 rule 覆盖
  2. 空值检查: 只有当 script_content 为空或不存在时,才使用 rule
  3. 类型安全: 支持 script_requirement 为字典、字符串或 None
  4. 错误处理: JSON 解析失败会被捕获,不影响主流程

测试用例

测试 1: 有 script_content,有 rule

输入:

{
  "script_content": "custom_script",
  "script_requirement": {"code": 1, "rule": "auto_rule"}
}

预期: script_content = "custom_script"

测试 2: 无 script_content,有 rule

输入:

{
  "script_requirement": {"code": 1, "rule": "auto_rule"}
}

预期: script_content = "auto_rule"

测试 3: 无 script_content,无 rule

输入:

{
  "script_requirement": {"code": 1}
}

预期: script_content = ""

测试 4: script_requirement 为字符串

输入:

{
  "script_requirement": "{\"code\": 1, \"rule\": \"json_rule\"}"
}

预期: script_content = "json_rule"

相关接口

  • POST /api/dataflow/add-dataflow - 创建数据流(应用此优化)
  • PUT /api/dataflow/update-dataflow/<id> - 更新数据流

更新历史

  • 2024-11-28: 添加从 script_requirement 提取 rule 并填充 script_content 的功能