优化了 DataFlow 创建时 task_list 表中实施步骤的生成逻辑,根据任务类型(远程数据源导入 vs 数据转换)智能生成不同的实施步骤。
通过检查 data_source_info 是否存在来判断任务类型:
data_source_info: 远程数据源导入任务(BusinessDomain 的 BELONGS_TO 关系连接的是"数据资源")data_source_info: 数据转换任务(从内部表到表的数据转换)当检测到 data_source_info 存在时,生成的实施步骤为:
## Implementation Steps
1. Create an n8n workflow to execute the data import task
2. Configure the workflow to call `import_resource_data.py` Python script
3. Pass the following parameters to the Python execution node:
- `--source-config`: JSON configuration for the remote data source
- `--target-table`: Target table name (data resource English name)
- `--update-mode`: append/full
4. The Python script will automatically:
- Connect to the remote data source
- Extract data from the source table
- Write data to target table using {update_mode} mode
特点:
import_resource_data.py 工具当 data_source_info 不存在时,生成的实施步骤为:
## Implementation Steps
1. Extract data from source tables as specified in the DDL
2. Apply transformation logic according to the rule:
- Rule: {request_content_str}
3. Generate Python program to implement the data transformation logic
4. Write transformed data to target table using {update_mode} mode
5. Create an n8n workflow to schedule and execute the Python program
特点:
文件: app/core/data_flow/dataflows.py
函数: DataFlowService._save_to_pg_database()
代码段: 第 464-484 行
场景: 从远程 PostgreSQL 数据库导入科室对照表
输入数据:
{
"name_zh": "科室对照表数据导入",
"name_en": "dept_data_import",
"script_requirement": {
"rule": "从远程数据源导入科室对照表数据",
"source_table": [2317], // BusinessDomain ID (BELONGS_TO "数据资源")
"target_table": [164]
},
"update_mode": "append"
}
生成的任务描述(包含数据源信息):
# Task: dept_data_import
## Data Source
- **Type**: postgresql
- **Host**: 10.52.31.104
- **Port**: 5432
- **Database**: hospital_db
## Source Tables (DDL)
```sql
CREATE TABLE TB_JC_KSDZB (
YLJGDM VARCHAR(22) NOT NULL,
HISKSDM CHAR(20) NOT NULL,
...
);
CREATE TABLE departments (
yljgdm VARCHAR(22),
hisksdm CHAR(20),
...
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
从远程数据源导入科室对照表数据
import_resource_data.py Python script--source-config: JSON configuration for the remote data source--target-table: Target table name (data resource English name)--update-mode: appendThe Python script will automatically:
Write data to target table using append mode
### 示例 2: 数据转换任务
**场景**: 将科室对照表映射到数据模型
**输入数据**:
```json
{
"name_zh": "科室对照表映射到数据模型",
"name_en": "dept_table_mapping",
"script_requirement": {
"rule": "将科室对照表中的 HIS 科室信息映射到标准化的数据模型结构",
"source_table": [2307, 2308], // BusinessDomain IDs (BELONGS_TO "数据模型")
"target_table": [164]
},
"update_mode": "full"
}
生成的任务描述(无数据源信息):
# Task: dept_table_mapping
## Source Tables (DDL)
```sql
CREATE TABLE TB_JC_KSDZB (
YLJGDM VARCHAR(22) NOT NULL,
HISKSDM CHAR(20) NOT NULL,
...
);
CREATE TABLE TB_JC_KSXX (
KSDM CHAR(20) NOT NULL,
KSMC CHAR(50),
...
);
CREATE TABLE departments (
dept_code CHAR(20),
dept_name CHAR(50),
...
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
将科室对照表中的 HIS 科室信息映射到标准化的数据模型结构
Create an n8n workflow to schedule and execute the Python program
## 优化效果
### 1. 任务描述更清晰
- 不同类型的任务有不同的实施步骤
- 远程导入任务不再要求生成复杂的转换逻辑
### 2. 降低实施难度
- 远程导入任务只需配置 n8n 工作流调用现有工具
- 无需为简单的数据导入编写自定义 Python 代码
### 3. 提高可执行性
- 明确列出所需参数和工具
- 步骤更具可操作性
### 4. 区分任务类型
- 系统能够自动识别任务类型
- 为不同类型的任务提供针对性的指导
## 技术细节
### 判断条件
```python
# 判断是否为远程数据源导入任务
if data_source_info:
# 从远程数据源导入数据的简化步骤
# ...
else:
# 数据转换任务的完整步骤
# ...
data_source_info 是通过查询 Neo4j 图数据库获取的:
script_requirement 中提取 source_table ID 列表BELONGS_TO 关系BELONGS_TO 连接的是 "数据资源" DataLabel,则查询 COME_FROM 关系获取 DataSource 信息# 3. 如果BELONGS_TO关系连接的是"数据资源",获取数据源信息
if ddl_info.get('data_source') and not data_source_info:
data_source_info = ddl_info['data_source']
app/core/data_flow/dataflows.py: 实施步骤生成逻辑app/core/data_flow/import_resource_data.py: 远程数据源导入工具docs/import_resource_data使用说明.md: 数据导入工具使用文档{
"name_zh": "测试远程导入",
"name_en": "test_remote_import",
"script_requirement": {
"rule": "测试从远程数据源导入",
"source_table": [<远程数据资源的BD_ID>],
"target_table": [<目标表的BD_ID>]
},
"update_mode": "append"
}
预期结果: 生成简化的实施步骤,包含 import_resource_data.py 调用说明
{
"name_zh": "测试数据转换",
"name_en": "test_transformation",
"script_requirement": {
"rule": "测试数据转换逻辑",
"source_table": [<内部表1的BD_ID>, <内部表2的BD_ID>],
"target_table": [<目标表的BD_ID>]
},
"update_mode": "full"
}
预期结果: 生成完整的实施步骤,包含数据提取、转换、写入和 n8n 工作流创建