在 create_dataflow 函数中新增了对 script_requirement 属性的处理,将其作为 JSON 字符串保存到 Neo4j 的 DataFlow 节点中,同时也保存到 PostgreSQL 数据库中。
文件路径: app/core/data_flow/dataflows.py
前端上传的数据流配置数据格式示例:
{
"name_zh": "科室对照表映射到数据模型",
"name_en": "deparment_table_mapping",
"category": "应用类",
"leader": "system",
"organization": "citu",
"script_type": "python",
"update_mode": "append",
"frequency": "月",
"tag": null,
"describe": null,
"status": "active",
"script_requirement": {
"code": 28,
"rule": "rule",
"source_table": [
2317,
2307
],
"target_table": [
164
]
}
}
create_dataflow 函数中添加 script_requirement 处理位置: 第 197-221 行
修改内容:
# 处理 script_requirement,将其转换为 JSON 字符串
script_requirement = data.get('script_requirement', None)
if script_requirement is not None:
# 如果是字典或列表,转换为 JSON 字符串
if isinstance(script_requirement, (dict, list)):
script_requirement_str = json.dumps(script_requirement, ensure_ascii=False)
else:
# 如果已经是字符串,直接使用
script_requirement_str = str(script_requirement)
else:
script_requirement_str = ''
# 准备节点数据
node_data = {
'name_zh': dataflow_name,
'name_en': name_en,
'category': data.get('category', ''),
'organization': data.get('organization', ''),
'leader': data.get('leader', ''),
'frequency': data.get('frequency', ''),
'tag': data.get('tag', ''),
'describe': data.get('describe', ''),
'status': data.get('status', 'inactive'),
'update_mode': data.get('update_mode', 'append'),
'script_requirement': script_requirement_str, # 新增的字段
'created_at': get_formatted_time(),
'updated_at': get_formatted_time()
}
功能说明:
script_requirementjson.dumps() 转换为 JSON 字符串script_requirement 属性中_save_to_pg_database 函数中添加 script_requirement 处理位置: 第 297-310 行
修改内容:
# 提取脚本相关信息
# 处理 script_requirement,确保保存为 JSON 字符串
script_requirement_raw = data.get('script_requirement', None)
if script_requirement_raw is not None:
# 如果是字典或列表,转换为 JSON 字符串
if isinstance(script_requirement_raw, (dict, list)):
script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False)
else:
# 如果已经是字符串,直接使用
script_requirement = str(script_requirement_raw)
else:
script_requirement = ''
script_content = data.get('script_content', '')
功能说明:
create_dataflow 函数中的处理逻辑一致script_requirement 字段也是 JSON 字符串格式script_requirement 是一个 JSON 对象,包含以下字段:
| 字段 | 类型 | 说明 | 示例 |
|---|---|---|---|
| code | Number | 代码标识 | 28 |
| rule | String | 规则名称 | "rule" |
| source_table | Array | 源表节点ID数组 | [2317, 2307] |
| target_table | Array | 目标表节点ID数组 | [164] |
保存格式:
在数据库中,script_requirement 以 JSON 字符串形式保存:
"{\"code\": 28, \"rule\": \"rule\", \"source_table\": [2317, 2307], \"target_table\": [164]}"
DataFlowscript_requirementdags.data_transform_scriptsscript_requirementdataflow_data = {
"name_zh": "科室对照表映射到数据模型",
"name_en": "deparment_table_mapping",
"category": "应用类",
"leader": "system",
"organization": "citu",
"script_type": "python",
"update_mode": "append",
"frequency": "月",
"status": "active",
"describe": "将科室对照表数据映射到数据模型",
"script_requirement": {
"code": 28,
"rule": "mapping_rule",
"source_table": [2317, 2307],
"target_table": [164]
}
}
# 创建数据流
result = DataFlowService.create_dataflow(dataflow_data)
MATCH (df:DataFlow {name_zh: "科室对照表映射到数据模型"})
RETURN df.script_requirement
import json
# 从数据库获取 script_requirement 字符串
script_requirement_str = dataflow_node['script_requirement']
# 解析 JSON 字符串
if script_requirement_str:
script_requirement = json.loads(script_requirement_str)
code = script_requirement.get('code')
rule = script_requirement.get('rule')
source_tables = script_requirement.get('source_table', [])
target_tables = script_requirement.get('target_table', [])
print(f"Code: {code}")
print(f"Rule: {rule}")
print(f"Source tables: {source_tables}")
print(f"Target tables: {target_tables}")
script_requirement,默认保存为空字符串,不影响现有功能script_requirement 格式保持一致script_requirement 在数据库中以 JSON 字符串形式保存,使用时需要解析ensure_ascii=False 确保中文字符正确保存script_requirement 为 None 或不存在,保存为空字符串 ''script_requirement 的类型,确保正确转换{
"name_zh": "测试数据流1",
"describe": "测试描述",
"script_requirement": {
"code": 1,
"rule": "test_rule",
"source_table": [100, 101],
"target_table": [200]
}
}
{
"name_zh": "测试数据流2",
"describe": "测试描述",
"script_requirement": null
}
{
"name_zh": "测试数据流3",
"describe": "测试描述",
"script_requirement": "{\"code\": 3, \"rule\": \"string_rule\"}"
}
POST /api/dataflow/add-dataflow - 创建数据流GET /api/dataflow/get-dataflow/<id> - 获取数据流详情(包含 script_requirement)PUT /api/dataflow/update-dataflow/<id> - 更新数据流(可更新 script_requirement)script_requirement 字段支持