在 _save_to_pg_database 函数中添加了从 script_requirement 中提取 rule 字段并保存到 script_content 的功能。
文件路径: app/core/data_flow/dataflows.py
函数: _save_to_pg_database
{
"code": 28,
"rule": "rule",
"source_table": [2317, 2307],
"target_table": [164]
}
从 script_requirement 中提取 rule 字段的值:
# 如果 script_requirement 是字典,直接提取 rule
if isinstance(script_requirement_raw, dict):
rule_from_requirement = script_requirement_raw.get('rule', '')
# 如果 script_requirement 是字符串,先解析再提取 rule
elif isinstance(script_requirement_raw, str):
try:
parsed_req = json.loads(script_requirement)
if isinstance(parsed_req, dict):
rule_from_requirement = parsed_req.get('rule', '')
except (json.JSONDecodeError, TypeError):
pass
优先级规则:
script_contentscript_content 为空,则使用从 script_requirement 提取的 rule# 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule
script_content = data.get('script_content', '')
if not script_content and rule_from_requirement:
script_content = rule_from_requirement
logger.info(f"script_content为空,使用从script_requirement提取的rule: {rule_from_requirement}")
位置: 第 297-327 行
修改前:
script_requirement_raw = data.get('script_requirement', None)
if script_requirement_raw is not None:
if isinstance(script_requirement_raw, (dict, list)):
script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False)
else:
script_requirement = str(script_requirement_raw)
else:
script_requirement = ''
script_content = data.get('script_content', '')
修改后:
script_requirement_raw = data.get('script_requirement', None)
rule_from_requirement = '' # 用于保存从 script_requirement 中提取的 rule
if script_requirement_raw is not None:
# 如果是字典,提取 rule 字段
if isinstance(script_requirement_raw, dict):
rule_from_requirement = script_requirement_raw.get('rule', '')
script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False)
elif isinstance(script_requirement_raw, list):
script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False)
else:
# 如果已经是字符串,尝试解析以提取 rule
script_requirement = str(script_requirement_raw)
try:
parsed_req = json.loads(script_requirement)
if isinstance(parsed_req, dict):
rule_from_requirement = parsed_req.get('rule', '')
except (json.JSONDecodeError, TypeError):
pass
else:
script_requirement = ''
# 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule
script_content = data.get('script_content', '')
if not script_content and rule_from_requirement:
script_content = rule_from_requirement
logger.info(f"script_content为空,使用从script_requirement提取的rule: {rule_from_requirement}")
{
"name_zh": "数据流1",
"script_content": "自定义脚本内容",
"script_requirement": {
"code": 28,
"rule": "mapping_rule",
"source_table": [2317, 2307],
"target_table": [164]
}
}
结果:
script_content = "自定义脚本内容" ✅{
"name_zh": "数据流2",
"script_requirement": {
"code": 28,
"rule": "mapping_rule",
"source_table": [2317, 2307],
"target_table": [164]
}
}
结果:
script_content = "mapping_rule" ✅{
"name_zh": "数据流3",
"script_requirement": {
"code": 28,
"source_table": [2317, 2307],
"target_table": [164]
}
}
结果:
script_content = ""{
"name_zh": "数据流4",
"script_requirement": "{\"code\": 28, \"rule\": \"string_rule\", \"source_table\": [100], \"target_table\": [200]}"
}
结果:
"string_rule"script_content = "string_rule" ✅前端数据
│
├─→ script_content (如果有值)
│ │
│ └─→ 直接使用 ✅
│
└─→ script_requirement
│
├─→ 解析 JSON
│
├─→ 提取 rule 字段
│
└─→ 如果 script_content 为空
│
└─→ 使用 rule 作为 script_content ✅
表: dags.data_transform_scripts
| 字段 | 值来源 | 说明 |
|---|---|---|
| script_requirement | script_requirement (JSON字符串) | 完整的需求配置 |
| script_content | script_content 或 rule | 优先使用前端值,否则使用rule |
示例数据:
INSERT INTO dags.data_transform_scripts
(script_name, script_requirement, script_content)
VALUES
('数据流示例',
'{"code": 28, "rule": "mapping_rule", "source_table": [2317], "target_table": [164]}',
'mapping_rule');
当使用 rule 填充 script_content 时,会记录日志:
INFO: script_content为空,使用从script_requirement提取的rule: mapping_rule
✅ 自动填充: 前端不需要同时传递 script_content 和 rule
✅ 向后兼容: 如果前端提供 script_content,优先使用前端值
✅ 灵活处理: 支持 script_requirement 为字典或字符串格式
✅ 错误容忍: JSON 解析失败不会中断流程
✅ 日志追踪: 记录何时使用 rule 填充 script_content
script_content 优先级最高,不会被 rule 覆盖script_content 为空或不存在时,才使用 rulescript_requirement 为字典、字符串或 None输入:
{
"script_content": "custom_script",
"script_requirement": {"code": 1, "rule": "auto_rule"}
}
预期: script_content = "custom_script"
输入:
{
"script_requirement": {"code": 1, "rule": "auto_rule"}
}
预期: script_content = "auto_rule"
输入:
{
"script_requirement": {"code": 1}
}
预期: script_content = ""
输入:
{
"script_requirement": "{\"code\": 1, \"rule\": \"json_rule\"}"
}
预期: script_content = "json_rule"
POST /api/dataflow/add-dataflow - 创建数据流(应用此优化)PUT /api/dataflow/update-dataflow/<id> - 更新数据流