DataFlow_实施步骤优化说明.md 7.9 KB

DataFlow 实施步骤优化说明

优化概述

优化了 DataFlow 创建时 task_list 表中实施步骤的生成逻辑,根据任务类型(远程数据源导入 vs 数据转换)智能生成不同的实施步骤。

优化内容

1. 判断逻辑

通过检查 data_source_info 是否存在来判断任务类型:

  • data_source_info: 远程数据源导入任务(BusinessDomain 的 BELONGS_TO 关系连接的是"数据资源")
  • data_source_info: 数据转换任务(从内部表到表的数据转换)

2. 远程数据源导入任务(简化步骤)

当检测到 data_source_info 存在时,生成的实施步骤为:

## Implementation Steps
1. Create an n8n workflow to execute the data import task
2. Configure the workflow to call `import_resource_data.py` Python script
3. Pass the following parameters to the Python execution node:
   - `--source-config`: JSON configuration for the remote data source
   - `--target-table`: Target table name (data resource English name)
   - `--update-mode`: append/full
4. The Python script will automatically:
   - Connect to the remote data source
   - Extract data from the source table
   - Write data to target table using {update_mode} mode

特点

  • 只需创建 n8n 工作流
  • 直接调用现成的 import_resource_data.py 工具
  • 明确列出需要传递的参数
  • 无需手动编写 Python 代码

3. 数据转换任务(完整步骤)

data_source_info 不存在时,生成的实施步骤为:

## Implementation Steps
1. Extract data from source tables as specified in the DDL
2. Apply transformation logic according to the rule:
   - Rule: {request_content_str}
3. Generate Python program to implement the data transformation logic
4. Write transformed data to target table using {update_mode} mode
5. Create an n8n workflow to schedule and execute the Python program

特点

  • 需要从 source tables 提取数据
  • 应用指定的计算规则(rule)
  • 需要生成自定义 Python 程序
  • 将转换后的数据写入 target table
  • 创建 n8n 工作流来调度执行

代码位置

文件: app/core/data_flow/dataflows.py

函数: DataFlowService._save_to_pg_database()

代码段: 第 464-484 行

实现示例

示例 1: 远程数据源导入任务

场景: 从远程 PostgreSQL 数据库导入科室对照表

输入数据:

{
  "name_zh": "科室对照表数据导入",
  "name_en": "dept_data_import",
  "script_requirement": {
    "rule": "从远程数据源导入科室对照表数据",
    "source_table": [2317],  // BusinessDomain ID (BELONGS_TO "数据资源")
    "target_table": [164]
  },
  "update_mode": "append"
}

生成的任务描述(包含数据源信息):

# Task: dept_data_import

## Data Source
- **Type**: postgresql
- **Host**: 10.52.31.104
- **Port**: 5432
- **Database**: hospital_db

## Source Tables (DDL)
```sql
CREATE TABLE TB_JC_KSDZB (
  YLJGDM VARCHAR(22) NOT NULL,
  HISKSDM CHAR(20) NOT NULL,
  ...
);

Target Tables (DDL)

CREATE TABLE departments (
  yljgdm VARCHAR(22),
  hisksdm CHAR(20),
  ...
  create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Update Mode

  • Mode: Append (追加模式)
  • Description: 新数据将追加到目标表,不删除现有数据

Request Content

从远程数据源导入科室对照表数据

Implementation Steps

  1. Create an n8n workflow to execute the data import task
  2. Configure the workflow to call import_resource_data.py Python script
  3. Pass the following parameters to the Python execution node:
    • --source-config: JSON configuration for the remote data source
    • --target-table: Target table name (data resource English name)
    • --update-mode: append
  4. The Python script will automatically:

    • Connect to the remote data source
    • Extract data from the source table
    • Write data to target table using append mode

      
      ### 示例 2: 数据转换任务
      
      **场景**: 将科室对照表映射到数据模型
      
      **输入数据**:
      ```json
      {
      "name_zh": "科室对照表映射到数据模型",
      "name_en": "dept_table_mapping",
      "script_requirement": {
      "rule": "将科室对照表中的 HIS 科室信息映射到标准化的数据模型结构",
      "source_table": [2307, 2308],  // BusinessDomain IDs (BELONGS_TO "数据模型")
      "target_table": [164]
      },
      "update_mode": "full"
      }
      

生成的任务描述(无数据源信息):

# Task: dept_table_mapping

## Source Tables (DDL)
```sql
CREATE TABLE TB_JC_KSDZB (
  YLJGDM VARCHAR(22) NOT NULL,
  HISKSDM CHAR(20) NOT NULL,
  ...
);

CREATE TABLE TB_JC_KSXX (
  KSDM CHAR(20) NOT NULL,
  KSMC CHAR(50),
  ...
);

Target Tables (DDL)

CREATE TABLE departments (
  dept_code CHAR(20),
  dept_name CHAR(50),
  ...
  create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Update Mode

  • Mode: Full Refresh (全量更新)
  • Description: 目标表将被清空后重新写入数据

Request Content

将科室对照表中的 HIS 科室信息映射到标准化的数据模型结构

Implementation Steps

  1. Extract data from source tables as specified in the DDL
  2. Apply transformation logic according to the rule:
    • Rule: 将科室对照表中的 HIS 科室信息映射到标准化的数据模型结构
  3. Generate Python program to implement the data transformation logic
  4. Write transformed data to target table using full mode
  5. Create an n8n workflow to schedule and execute the Python program

    
    ## 优化效果
    
    ### 1. 任务描述更清晰
    - 不同类型的任务有不同的实施步骤
    - 远程导入任务不再要求生成复杂的转换逻辑
    
    ### 2. 降低实施难度
    - 远程导入任务只需配置 n8n 工作流调用现有工具
    - 无需为简单的数据导入编写自定义 Python 代码
    
    ### 3. 提高可执行性
    - 明确列出所需参数和工具
    - 步骤更具可操作性
    
    ### 4. 区分任务类型
    - 系统能够自动识别任务类型
    - 为不同类型的任务提供针对性的指导
    
    ## 技术细节
    
    ### 判断条件
    
    ```python
    # 判断是否为远程数据源导入任务
    if data_source_info:
    # 从远程数据源导入数据的简化步骤
    # ...
    else:
    # 数据转换任务的完整步骤
    # ...
    

data_source_info 的来源

data_source_info 是通过查询 Neo4j 图数据库获取的:

  1. script_requirement 中提取 source_table ID 列表
  2. 查询 BusinessDomain 节点
  3. 检查 BusinessDomain 的 BELONGS_TO 关系
  4. 如果 BELONGS_TO 连接的是 "数据资源" DataLabel,则查询 COME_FROM 关系获取 DataSource 信息
# 3. 如果BELONGS_TO关系连接的是"数据资源",获取数据源信息
if ddl_info.get('data_source') and not data_source_info:
    data_source_info = ddl_info['data_source']

相关文件

  • app/core/data_flow/dataflows.py: 实施步骤生成逻辑
  • app/core/data_flow/import_resource_data.py: 远程数据源导入工具
  • docs/import_resource_data使用说明.md: 数据导入工具使用文档

测试建议

测试用例 1: 远程数据源导入

{
  "name_zh": "测试远程导入",
  "name_en": "test_remote_import",
  "script_requirement": {
    "rule": "测试从远程数据源导入",
    "source_table": [<远程数据资源的BD_ID>],
    "target_table": [<目标表的BD_ID>]
  },
  "update_mode": "append"
}

预期结果: 生成简化的实施步骤,包含 import_resource_data.py 调用说明

测试用例 2: 数据转换

{
  "name_zh": "测试数据转换",
  "name_en": "test_transformation",
  "script_requirement": {
    "rule": "测试数据转换逻辑",
    "source_table": [<内部表1的BD_ID>, <内部表2的BD_ID>],
    "target_table": [<目标表的BD_ID>]
  },
  "update_mode": "full"
}

预期结果: 生成完整的实施步骤,包含数据提取、转换、写入和 n8n 工作流创建

版本信息

  • 创建时间: 2025-11-29
  • 作者: cursor
  • 版本: 1.0