# DataFlow create_dataflow 函数优化说明 ## 优化概述 在 `create_dataflow` 函数中新增了对 `script_requirement` 属性的处理,将其作为 JSON 字符串保存到 Neo4j 的 DataFlow 节点中,同时也保存到 PostgreSQL 数据库中。 ## 修改的文件 **文件路径**: `app/core/data_flow/dataflows.py` ## 前端数据格式 前端上传的数据流配置数据格式示例: ```json { "name_zh": "科室对照表映射到数据模型", "name_en": "deparment_table_mapping", "category": "应用类", "leader": "system", "organization": "citu", "script_type": "python", "update_mode": "append", "frequency": "月", "tag": null, "describe": null, "status": "active", "script_requirement": { "code": 28, "rule": "rule", "source_table": [ 2317, 2307 ], "target_table": [ 164 ] } } ``` ## 具体修改 ### 1. 在 `create_dataflow` 函数中添加 `script_requirement` 处理 **位置**: 第 197-221 行 **修改内容**: ```python # 处理 script_requirement,将其转换为 JSON 字符串 script_requirement = data.get('script_requirement', None) if script_requirement is not None: # 如果是字典或列表,转换为 JSON 字符串 if isinstance(script_requirement, (dict, list)): script_requirement_str = json.dumps(script_requirement, ensure_ascii=False) else: # 如果已经是字符串,直接使用 script_requirement_str = str(script_requirement) else: script_requirement_str = '' # 准备节点数据 node_data = { 'name_zh': dataflow_name, 'name_en': name_en, 'category': data.get('category', ''), 'organization': data.get('organization', ''), 'leader': data.get('leader', ''), 'frequency': data.get('frequency', ''), 'tag': data.get('tag', ''), 'describe': data.get('describe', ''), 'status': data.get('status', 'inactive'), 'update_mode': data.get('update_mode', 'append'), 'script_requirement': script_requirement_str, # 新增的字段 'created_at': get_formatted_time(), 'updated_at': get_formatted_time() } ``` **功能说明**: 1. 从前端数据中获取 `script_requirement` 2. 如果是字典或列表类型,使用 `json.dumps()` 转换为 JSON 字符串 3. 如果已经是字符串,直接使用 4. 如果不存在,设置为空字符串 5. 将 JSON 字符串保存到 Neo4j 节点的 `script_requirement` 属性中 ### 2. 在 `_save_to_pg_database` 函数中添加 `script_requirement` 处理 **位置**: 第 297-310 行 **修改内容**: ```python # 提取脚本相关信息 # 处理 script_requirement,确保保存为 JSON 字符串 script_requirement_raw = data.get('script_requirement', None) if script_requirement_raw is not None: # 如果是字典或列表,转换为 JSON 字符串 if isinstance(script_requirement_raw, (dict, list)): script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False) else: # 如果已经是字符串,直接使用 script_requirement = str(script_requirement_raw) else: script_requirement = '' script_content = data.get('script_content', '') ``` **功能说明**: 1. 与 `create_dataflow` 函数中的处理逻辑一致 2. 确保保存到 PostgreSQL 数据库的 `script_requirement` 字段也是 JSON 字符串格式 3. 保证 Neo4j 和 PostgreSQL 中的数据格式一致 ## script_requirement 数据结构 `script_requirement` 是一个 JSON 对象,包含以下字段: | 字段 | 类型 | 说明 | 示例 | |------|------|------|------| | code | Number | 代码标识 | 28 | | rule | String | 规则名称 | "rule" | | source_table | Array | 源表节点ID数组 | [2317, 2307] | | target_table | Array | 目标表节点ID数组 | [164] | **保存格式**: 在数据库中,`script_requirement` 以 JSON 字符串形式保存: ```json "{\"code\": 28, \"rule\": \"rule\", \"source_table\": [2317, 2307], \"target_table\": [164]}" ``` ## 存储位置 ### 1. Neo4j 图数据库 - **节点类型**: `DataFlow` - **属性名**: `script_requirement` - **数据类型**: String (JSON 格式) ### 2. PostgreSQL 数据库 - **表名**: `dags.data_transform_scripts` - **字段名**: `script_requirement` - **数据类型**: TEXT (JSON 格式) ## 使用示例 ### 创建 DataFlow 时提供 script_requirement ```python dataflow_data = { "name_zh": "科室对照表映射到数据模型", "name_en": "deparment_table_mapping", "category": "应用类", "leader": "system", "organization": "citu", "script_type": "python", "update_mode": "append", "frequency": "月", "status": "active", "describe": "将科室对照表数据映射到数据模型", "script_requirement": { "code": 28, "rule": "mapping_rule", "source_table": [2317, 2307], "target_table": [164] } } # 创建数据流 result = DataFlowService.create_dataflow(dataflow_data) ``` ### 从 Neo4j 查询包含 script_requirement 的 DataFlow ```cypher MATCH (df:DataFlow {name_zh: "科室对照表映射到数据模型"}) RETURN df.script_requirement ``` ### 解析 script_requirement ```python import json # 从数据库获取 script_requirement 字符串 script_requirement_str = dataflow_node['script_requirement'] # 解析 JSON 字符串 if script_requirement_str: script_requirement = json.loads(script_requirement_str) code = script_requirement.get('code') rule = script_requirement.get('rule') source_tables = script_requirement.get('source_table', []) target_tables = script_requirement.get('target_table', []) print(f"Code: {code}") print(f"Rule: {rule}") print(f"Source tables: {source_tables}") print(f"Target tables: {target_tables}") ``` ## 兼容性说明 1. **向后兼容**: 如果前端不提供 `script_requirement`,默认保存为空字符串,不影响现有功能 2. **格式灵活**: 支持前端传递字典、列表或字符串格式,都会正确转换为 JSON 字符串 3. **数据一致性**: Neo4j 和 PostgreSQL 中的 `script_requirement` 格式保持一致 ## 注意事项 1. **JSON 格式**: `script_requirement` 在数据库中以 JSON 字符串形式保存,使用时需要解析 2. **编码问题**: 使用 `ensure_ascii=False` 确保中文字符正确保存 3. **空值处理**: 如果 `script_requirement` 为 `None` 或不存在,保存为空字符串 `''` 4. **类型检查**: 代码会检查 `script_requirement` 的类型,确保正确转换 ## 测试建议 ### 测试用例 1: 完整的 script_requirement ```json { "name_zh": "测试数据流1", "describe": "测试描述", "script_requirement": { "code": 1, "rule": "test_rule", "source_table": [100, 101], "target_table": [200] } } ``` ### 测试用例 2: 空的 script_requirement ```json { "name_zh": "测试数据流2", "describe": "测试描述", "script_requirement": null } ``` ### 测试用例 3: script_requirement 为字符串 ```json { "name_zh": "测试数据流3", "describe": "测试描述", "script_requirement": "{\"code\": 3, \"rule\": \"string_rule\"}" } ``` ## 相关接口 - `POST /api/dataflow/add-dataflow` - 创建数据流 - `GET /api/dataflow/get-dataflow/` - 获取数据流详情(包含 script_requirement) - `PUT /api/dataflow/update-dataflow/` - 更新数据流(可更新 script_requirement) ## 更新历史 - **2024-11-28**: 初始版本,添加 `script_requirement` 字段支持