优化了 create_dataflow 函数中写入 task_list 表的代码操作,根据 script_requirement 的内容智能生成详细的任务描述,包括源表和目标表的 DDL、数据源信息、更新模式等。
文件路径: app/core/data_flow/dataflows.py
# 1. 从script_requirement中提取rule字段作为request_content_str
request_content_str = req_json.get('rule', '')
功能: 将 rule 字段的值提取出来,作为任务的核心需求内容。
# 2. 从script_requirement中提取source_table和target_table字段信息
source_table_ids = req_json.get('source_table', [])
target_table_ids = req_json.get('target_table', [])
# 确保是列表格式
if not isinstance(source_table_ids, list):
source_table_ids = [source_table_ids] if source_table_ids else []
if not isinstance(target_table_ids, list):
target_table_ids = [target_table_ids] if target_table_ids else []
# 处理source tables
for bd_id in source_table_ids:
ddl_info = DataFlowService._generate_businessdomain_ddl(
session, bd_id, is_target=False
)
if ddl_info:
source_ddls.append(ddl_info['ddl'])
# 处理target tables
for bd_id in target_table_ids:
ddl_info = DataFlowService._generate_businessdomain_ddl(
session, bd_id, is_target=True, update_mode=update_mode
)
if ddl_info:
target_ddls.append(ddl_info['ddl'])
功能:
# 查询时包含BELONGS_TO和COME_FROM关系
cypher = """
MATCH (bd:BusinessDomain)
WHERE id(bd) = $bd_id
OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
OPTIONAL MATCH (bd)-[:BELONGS_TO]->(label:DataLabel)
OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
RETURN bd,
collect(DISTINCT m) as metadata,
label.name_zh as label_name,
ds.type as ds_type,
ds.host as ds_host,
ds.port as ds_port,
ds.database as ds_database
"""
# 如果标签是"数据资源"且有数据源,提取信息
if label_name == '数据资源' and result['ds_type']:
data_source = {
'type': result['ds_type'],
'host': result['ds_host'],
'port': result['ds_port'],
'database': result['ds_database']
}
功能:
# 4. 从data参数中提取update_mode
update_mode = data.get('update_mode', 'append')
# 在任务描述中说明更新模式
if update_mode == 'append':
task_desc_parts.append("- **Mode**: Append (追加模式)")
task_desc_parts.append("- **Description**: 新数据将追加到目标表,不删除现有数据")
else:
task_desc_parts.append("- **Mode**: Full Refresh (全量更新)")
task_desc_parts.append("- **Description**: 目标表将被清空后重新写入数据")
功能:
update_mode 字段(append 或 full)# 5. 如果是目标表,添加create_time字段
if is_target:
column_definitions.append(" create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间'")
功能:
create_time 字段TIMESTAMP_generate_businessdomain_ddl位置: 第 1025-1115 行
签名:
@staticmethod
def _generate_businessdomain_ddl(
session,
bd_id: int,
is_target: bool = False,
update_mode: str = 'append'
) -> Optional[Dict[str, Any]]
参数:
session: Neo4j session 对象bd_id: BusinessDomain 节点 IDis_target: 是否为目标表(目标表需添加 create_time)update_mode: 更新模式(append/full)返回:
{
'ddl': 'CREATE TABLE ...',
'table_name': 'TB_JC_KSDZB',
'data_source': {
'type': 'postgresql',
'host': '10.52.31.104',
'port': 5432,
'database': 'mydatabase'
}
}
查询逻辑:
MATCH (bd:BusinessDomain)
WHERE id(bd) = $bd_id
OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
OPTIONAL MATCH (bd)-[:BELONGS_TO]->(label:DataLabel)
OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
RETURN bd,
collect(DISTINCT m) as metadata,
label.name_zh as label_name,
ds.type, ds.host, ds.port, ds.database
{
"code": 28,
"rule": "将科室对照表的数据映射到数据模型中",
"source_table": [2317, 2307],
"target_table": [164]
}
# Task: 科室对照表映射到数据模型
## Data Source
- **Type**: postgresql
- **Host**: 10.52.31.104
- **Port**: 5432
- **Database**: hospital_db
## Source Tables (DDL)
```sql
CREATE TABLE TB_JC_KSDZB (
YLJGDM VARCHAR(22) COMMENT '医疗机构代码',
HISKSDM CHAR(20) COMMENT 'HIS科室代码',
HISKSMC CHAR(20) COMMENT 'HIS科室名称',
BAKSDM CHAR(20) COMMENT '病案科室代码',
BAKSMC CHAR(20) COMMENT '病案科室名称'
);
COMMENT ON TABLE TB_JC_KSDZB IS '科室对照表';
CREATE TABLE TB_DEPT_INFO (
DEPT_ID INTEGER COMMENT '科室ID',
DEPT_NAME VARCHAR(100) COMMENT '科室名称',
DEPT_TYPE CHAR(10) COMMENT '科室类型'
);
COMMENT ON TABLE TB_DEPT_INFO IS '科室信息表';
CREATE TABLE DM_DEPARTMENT (
dept_code VARCHAR(50) COMMENT '科室代码',
dept_name VARCHAR(100) COMMENT '科室名称',
dept_category VARCHAR(50) COMMENT '科室分类',
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间'
);
COMMENT ON TABLE DM_DEPARTMENT IS '科室数据模型';
将科室对照表的数据映射到数据模型中
Generate n8n workflow to schedule and execute the Python program
## 数据流程
前端上传 script_requirement
↓
提取 rule → request_content_str
↓
提取 source_table IDs → [2317, 2307]
↓
提取 target_table IDs → [164]
↓
查询 BusinessDomain 节点
↓
├─→ 获取 INCLUDES 关系 → DataMeta
├─→ 获取 BELONGS_TO 关系 → DataLabel
└─→ 如果是"数据资源" → 获取 COME_FROM 关系 → DataSource
↓
生成源表 DDL(不含 create_time)
↓
生成目标表 DDL(含 create_time)
↓
构建 Markdown 任务描述
↓
写入 task_list 表
## DDL 生成规则
### 源表 DDL
```sql
CREATE TABLE {table_name} (
{column_name} {data_type} COMMENT '{comment}',
...
);
COMMENT ON TABLE {table_name} IS '{table_comment}';
CREATE TABLE {table_name} (
{column_name} {data_type} COMMENT '{comment}',
...,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间'
);
COMMENT ON TABLE {table_name} IS '{table_comment}';
| 字段 | 类型 | 说明 | 示例 |
|---|---|---|---|
| code | Number | 代码标识 | 28 |
| rule | String | 转换规则/需求描述 | "将科室对照表的数据映射到数据模型中" |
| source_table | Array | 源表的 BusinessDomain 节点 ID 列表 | [2317, 2307] |
| target_table | Array | 目标表的 BusinessDomain 节点 ID 列表 | [164] |
| 字段 | 说明 |
|---|---|
| type | 数据库类型(postgresql/mysql等) |
| host | 数据库主机地址 |
| port | 数据库端口 |
| database | 数据库名称 |
✅ 智能提取: 自动从 script_requirement 提取所有必要信息
✅ 完整 DDL: 包含所有字段、类型、注释信息
✅ 数据源感知: 自动识别并提取数据源信息
✅ 更新模式: 明确说明追加或全量更新
✅ 时间戳记录: 目标表自动添加 create_time 字段
✅ 结构清晰: Markdown 格式,易于阅读
✅ 错误容忍: 完善的异常处理,不影响主流程
{
"name_zh": "科室对照表映射到数据模型",
"name_en": "deparment_table_mapping",
"category": "应用类",
"leader": "system",
"organization": "citu",
"script_type": "python",
"update_mode": "append",
"frequency": "月",
"describe": "数据映射任务",
"status": "active",
"script_requirement": {
"code": 28,
"rule": "将科室对照表的数据映射到数据模型中,保留所有字段",
"source_table": [2317, 2307],
"target_table": [164]
}
}
保存到 task_list.task_description 字段,包含:
源表示例:
CREATE TABLE TB_JC_KSDZB (
YLJGDM VARCHAR(22) COMMENT '医疗机构代码',
HISKSDM CHAR(20) COMMENT 'HIS科室代码',
HISKSMC CHAR(20) COMMENT 'HIS科室名称'
);
COMMENT ON TABLE TB_JC_KSDZB IS '科室对照表';
目标表示例(自动添加 create_time):
CREATE TABLE DM_DEPARTMENT (
dept_code VARCHAR(50) COMMENT '科室代码',
dept_name VARCHAR(100) COMMENT '科室名称',
dept_category VARCHAR(50) COMMENT '科室分类',
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间'
);
COMMENT ON TABLE DM_DEPARTMENT IS '科室数据模型';
只有当 BusinessDomain 的 BELONGS_TO 关系指向"数据资源"标签时,才会查询并返回数据源信息:
BusinessDomain → BELONGS_TO → DataLabel("数据资源")
↓
COME_FROM
↓
DataSource (type, host, port, database)
追加模式 (append):
## Update Mode
- **Mode**: Append (追加模式)
- **Description**: 新数据将追加到目标表,不删除现有数据
全量更新 (full):
## Update Mode
- **Mode**: Full Refresh (全量更新)
- **Description**: 目标表将被清空后重新写入数据
目的: 记录数据写入的时间戳
规则:
TIMESTAMPCURRENT_TIMESTAMP# 1. 提取 rule
request_content_str = req_json.get('rule', '')
# 2. 提取 source_table 和 target_table
source_table_ids = req_json.get('source_table', [])
target_table_ids = req_json.get('target_table', [])
# 4. 提取 update_mode
update_mode = data.get('update_mode', 'append')
# 生成 DDL
for bd_id in source_table_ids:
ddl_info = DataFlowService._generate_businessdomain_ddl(
session, bd_id, is_target=False
)
for bd_id in target_table_ids:
ddl_info = DataFlowService._generate_businessdomain_ddl(
session, bd_id, is_target=True, update_mode=update_mode
)
# 构建任务描述
# ... 组装 Markdown 格式的任务描述
@staticmethod
def _generate_businessdomain_ddl(
session,
bd_id: int,
is_target: bool = False,
update_mode: str = 'append'
) -> Optional[Dict[str, Any]]:
"""
根据 BusinessDomain 节点 ID 生成 DDL
功能:
- 查询 BusinessDomain 节点
- 获取 INCLUDES 关系的 DataMeta 元数据
- 检查 BELONGS_TO 关系(是否为"数据资源")
- 获取 COME_FROM 关系的 DataSource 信息
- 为目标表添加 create_time 字段
- 生成完整的 DDL 语句
"""
POST /api/dataflow/add-dataflow - 创建数据流(使用此优化)GET /api/dataflow/get-BD-list - 获取 BusinessDomain 列表