# DataFlow _save_to_pg_database 函数优化说明 ## 优化概述 在 `_save_to_pg_database` 函数中添加了从 `script_requirement` 中提取 `rule` 字段并保存到 `script_content` 的功能。 ## 修改的文件 **文件路径**: `app/core/data_flow/dataflows.py` **函数**: `_save_to_pg_database` ## script_requirement 数据格式 ```json { "code": 28, "rule": "rule", "source_table": [2317, 2307], "target_table": [164] } ``` ## 优化逻辑 ### 1. 提取 rule 字段 从 `script_requirement` 中提取 `rule` 字段的值: ```python # 如果 script_requirement 是字典,直接提取 rule if isinstance(script_requirement_raw, dict): rule_from_requirement = script_requirement_raw.get('rule', '') # 如果 script_requirement 是字符串,先解析再提取 rule elif isinstance(script_requirement_raw, str): try: parsed_req = json.loads(script_requirement) if isinstance(parsed_req, dict): rule_from_requirement = parsed_req.get('rule', '') except (json.JSONDecodeError, TypeError): pass ``` ### 2. 保存到 script_content **优先级规则**: 1. **优先使用前端传入的 `script_content`** 2. **如果 `script_content` 为空,则使用从 `script_requirement` 提取的 `rule`** ```python # 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule script_content = data.get('script_content', '') if not script_content and rule_from_requirement: script_content = rule_from_requirement logger.info(f"script_content为空,使用从script_requirement提取的rule: {rule_from_requirement}") ``` ## 修改详情 **位置**: 第 297-327 行 **修改前**: ```python script_requirement_raw = data.get('script_requirement', None) if script_requirement_raw is not None: if isinstance(script_requirement_raw, (dict, list)): script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False) else: script_requirement = str(script_requirement_raw) else: script_requirement = '' script_content = data.get('script_content', '') ``` **修改后**: ```python script_requirement_raw = data.get('script_requirement', None) rule_from_requirement = '' # 用于保存从 script_requirement 中提取的 rule if script_requirement_raw is not None: # 如果是字典,提取 rule 字段 if isinstance(script_requirement_raw, dict): rule_from_requirement = script_requirement_raw.get('rule', '') script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False) elif isinstance(script_requirement_raw, list): script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False) else: # 如果已经是字符串,尝试解析以提取 rule script_requirement = str(script_requirement_raw) try: parsed_req = json.loads(script_requirement) if isinstance(parsed_req, dict): rule_from_requirement = parsed_req.get('rule', '') except (json.JSONDecodeError, TypeError): pass else: script_requirement = '' # 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule script_content = data.get('script_content', '') if not script_content and rule_from_requirement: script_content = rule_from_requirement logger.info(f"script_content为空,使用从script_requirement提取的rule: {rule_from_requirement}") ``` ## 使用场景 ### 场景 1: 前端提供 script_content ```json { "name_zh": "数据流1", "script_content": "自定义脚本内容", "script_requirement": { "code": 28, "rule": "mapping_rule", "source_table": [2317, 2307], "target_table": [164] } } ``` **结果**: - `script_content` = `"自定义脚本内容"` ✅ - 使用前端提供的值,不使用 rule ### 场景 2: 前端未提供 script_content ```json { "name_zh": "数据流2", "script_requirement": { "code": 28, "rule": "mapping_rule", "source_table": [2317, 2307], "target_table": [164] } } ``` **结果**: - `script_content` = `"mapping_rule"` ✅ - 自动从 script_requirement 提取 rule ### 场景 3: script_content 和 rule 都为空 ```json { "name_zh": "数据流3", "script_requirement": { "code": 28, "source_table": [2317, 2307], "target_table": [164] } } ``` **结果**: - `script_content` = `""` - 保持为空字符串 ### 场景 4: script_requirement 为字符串格式 ```json { "name_zh": "数据流4", "script_requirement": "{\"code\": 28, \"rule\": \"string_rule\", \"source_table\": [100], \"target_table\": [200]}" } ``` **结果**: - 先解析 JSON 字符串 - 提取 rule = `"string_rule"` - `script_content` = `"string_rule"` ✅ ## 数据流向 ``` 前端数据 │ ├─→ script_content (如果有值) │ │ │ └─→ 直接使用 ✅ │ └─→ script_requirement │ ├─→ 解析 JSON │ ├─→ 提取 rule 字段 │ └─→ 如果 script_content 为空 │ └─→ 使用 rule 作为 script_content ✅ ``` ## 保存到数据库 ### PostgreSQL **表**: `dags.data_transform_scripts` | 字段 | 值来源 | 说明 | |------|--------|------| | script_requirement | script_requirement (JSON字符串) | 完整的需求配置 | | script_content | script_content 或 rule | 优先使用前端值,否则使用rule | **示例数据**: ```sql INSERT INTO dags.data_transform_scripts (script_name, script_requirement, script_content) VALUES ('数据流示例', '{"code": 28, "rule": "mapping_rule", "source_table": [2317], "target_table": [164]}', 'mapping_rule'); ``` ## 日志输出 当使用 rule 填充 script_content 时,会记录日志: ``` INFO: script_content为空,使用从script_requirement提取的rule: mapping_rule ``` ## 优势 ✅ **自动填充**: 前端不需要同时传递 `script_content` 和 `rule` ✅ **向后兼容**: 如果前端提供 `script_content`,优先使用前端值 ✅ **灵活处理**: 支持 `script_requirement` 为字典或字符串格式 ✅ **错误容忍**: JSON 解析失败不会中断流程 ✅ **日志追踪**: 记录何时使用 rule 填充 script_content ## 注意事项 1. **优先级**: 前端的 `script_content` 优先级最高,不会被 rule 覆盖 2. **空值检查**: 只有当 `script_content` 为空或不存在时,才使用 rule 3. **类型安全**: 支持 `script_requirement` 为字典、字符串或 None 4. **错误处理**: JSON 解析失败会被捕获,不影响主流程 ## 测试用例 ### 测试 1: 有 script_content,有 rule **输入**: ```json { "script_content": "custom_script", "script_requirement": {"code": 1, "rule": "auto_rule"} } ``` **预期**: `script_content = "custom_script"` ### 测试 2: 无 script_content,有 rule **输入**: ```json { "script_requirement": {"code": 1, "rule": "auto_rule"} } ``` **预期**: `script_content = "auto_rule"` ### 测试 3: 无 script_content,无 rule **输入**: ```json { "script_requirement": {"code": 1} } ``` **预期**: `script_content = ""` ### 测试 4: script_requirement 为字符串 **输入**: ```json { "script_requirement": "{\"code\": 1, \"rule\": \"json_rule\"}" } ``` **预期**: `script_content = "json_rule"` ## 相关接口 - `POST /api/dataflow/add-dataflow` - 创建数据流(应用此优化) - `PUT /api/dataflow/update-dataflow/` - 更新数据流 ## 更新历史 - **2024-11-28**: 添加从 script_requirement 提取 rule 并填充 script_content 的功能