DataFlow_task_list优化说明.md 14 KB

DataFlow task_list 写入优化说明

优化概述

优化了 create_dataflow 函数中写入 task_list 表的代码操作,根据 script_requirement 的内容智能生成详细的任务描述,包括源表和目标表的 DDL、数据源信息、更新模式等。

修改的文件

文件路径: app/core/data_flow/dataflows.py

优化要求实现

✅ 1. 从 script_requirement 中提取 rule 字段作为 request_content_str

# 1. 从script_requirement中提取rule字段作为request_content_str
request_content_str = req_json.get('rule', '')

功能: 将 rule 字段的值提取出来,作为任务的核心需求内容。

✅ 2. 从 script_requirement 提取 source_table 和 target_table,生成 DDL

# 2. 从script_requirement中提取source_table和target_table字段信息
source_table_ids = req_json.get('source_table', [])
target_table_ids = req_json.get('target_table', [])

# 确保是列表格式
if not isinstance(source_table_ids, list):
    source_table_ids = [source_table_ids] if source_table_ids else []
if not isinstance(target_table_ids, list):
    target_table_ids = [target_table_ids] if target_table_ids else []

# 处理source tables
for bd_id in source_table_ids:
    ddl_info = DataFlowService._generate_businessdomain_ddl(
        session, bd_id, is_target=False
    )
    if ddl_info:
        source_ddls.append(ddl_info['ddl'])

# 处理target tables
for bd_id in target_table_ids:
    ddl_info = DataFlowService._generate_businessdomain_ddl(
        session, bd_id, is_target=True, update_mode=update_mode
    )
    if ddl_info:
        target_ddls.append(ddl_info['ddl'])

功能:

  • 根据节点 ID 列表查询 BusinessDomain 节点
  • 通过 INCLUDES 关系获取 DataMeta 元数据
  • 生成完整的 CREATE TABLE DDL 语句

✅ 3. 如果 BELONGS_TO 关系连接"数据资源",获取数据源信息

# 查询时包含BELONGS_TO和COME_FROM关系
cypher = """
MATCH (bd:BusinessDomain)
WHERE id(bd) = $bd_id
OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
OPTIONAL MATCH (bd)-[:BELONGS_TO]->(label:DataLabel)
OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
RETURN bd, 
       collect(DISTINCT m) as metadata,
       label.name_zh as label_name,
       ds.type as ds_type,
       ds.host as ds_host,
       ds.port as ds_port,
       ds.database as ds_database
"""

# 如果标签是"数据资源"且有数据源,提取信息
if label_name == '数据资源' and result['ds_type']:
    data_source = {
        'type': result['ds_type'],
        'host': result['ds_host'],
        'port': result['ds_port'],
        'database': result['ds_database']
    }

功能:

  • 检查 BELONGS_TO 关系指向的标签是否为"数据资源"
  • 如果是,通过 COME_FROM 关系获取数据源信息
  • 将数据源信息添加到任务描述中

✅ 4. 从 data 参数提取 update_mode,决定更新方式

# 4. 从data参数中提取update_mode
update_mode = data.get('update_mode', 'append')

# 在任务描述中说明更新模式
if update_mode == 'append':
    task_desc_parts.append("- **Mode**: Append (追加模式)")
    task_desc_parts.append("- **Description**: 新数据将追加到目标表,不删除现有数据")
else:
    task_desc_parts.append("- **Mode**: Full Refresh (全量更新)")
    task_desc_parts.append("- **Description**: 目标表将被清空后重新写入数据")

功能:

  • 提取 update_mode 字段(appendfull
  • 在任务描述中明确说明数据写入方式

✅ 5. 目标表缺省添加 create_time 字段

# 5. 如果是目标表,添加create_time字段
if is_target:
    column_definitions.append("    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间'")

功能:

  • 为所有目标表自动添加 create_time 字段
  • 数据类型为 TIMESTAMP
  • 默认值为当前时间戳
  • 用于记录数据写入的时间

新增辅助方法

_generate_businessdomain_ddl

位置: 第 1025-1115 行

签名:

@staticmethod
def _generate_businessdomain_ddl(
    session, 
    bd_id: int, 
    is_target: bool = False, 
    update_mode: str = 'append'
) -> Optional[Dict[str, Any]]

参数:

  • session: Neo4j session 对象
  • bd_id: BusinessDomain 节点 ID
  • is_target: 是否为目标表(目标表需添加 create_time)
  • update_mode: 更新模式(append/full)

返回:

{
    'ddl': 'CREATE TABLE ...',
    'table_name': 'TB_JC_KSDZB',
    'data_source': {
        'type': 'postgresql',
        'host': '10.52.31.104',
        'port': 5432,
        'database': 'mydatabase'
    }
}

查询逻辑:

MATCH (bd:BusinessDomain)
WHERE id(bd) = $bd_id
OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
OPTIONAL MATCH (bd)-[:BELONGS_TO]->(label:DataLabel)
OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
RETURN bd, 
       collect(DISTINCT m) as metadata,
       label.name_zh as label_name,
       ds.type, ds.host, ds.port, ds.database

生成的任务描述示例

script_requirement 输入

{
  "code": 28,
  "rule": "将科室对照表的数据映射到数据模型中",
  "source_table": [2317, 2307],
  "target_table": [164]
}

生成的任务描述(Markdown格式)

# Task: 科室对照表映射到数据模型

## Data Source
- **Type**: postgresql
- **Host**: 10.52.31.104
- **Port**: 5432
- **Database**: hospital_db

## Source Tables (DDL)
```sql
CREATE TABLE TB_JC_KSDZB (
    YLJGDM VARCHAR(22) COMMENT '医疗机构代码',
    HISKSDM CHAR(20) COMMENT 'HIS科室代码',
    HISKSMC CHAR(20) COMMENT 'HIS科室名称',
    BAKSDM CHAR(20) COMMENT '病案科室代码',
    BAKSMC CHAR(20) COMMENT '病案科室名称'
);
COMMENT ON TABLE TB_JC_KSDZB IS '科室对照表';
CREATE TABLE TB_DEPT_INFO (
    DEPT_ID INTEGER COMMENT '科室ID',
    DEPT_NAME VARCHAR(100) COMMENT '科室名称',
    DEPT_TYPE CHAR(10) COMMENT '科室类型'
);
COMMENT ON TABLE TB_DEPT_INFO IS '科室信息表';

Target Tables (DDL)

CREATE TABLE DM_DEPARTMENT (
    dept_code VARCHAR(50) COMMENT '科室代码',
    dept_name VARCHAR(100) COMMENT '科室名称',
    dept_category VARCHAR(50) COMMENT '科室分类',
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间'
);
COMMENT ON TABLE DM_DEPARTMENT IS '科室数据模型';

Update Mode

  • Mode: Append (追加模式)
  • Description: 新数据将追加到目标表,不删除现有数据

Request Content

将科室对照表的数据映射到数据模型中

Implementation Steps

  1. Extract data from source tables
  2. Apply transformation logic according to the rule
  3. Write data to target table using append mode
  4. Generate Python program to implement the logic
  5. Generate n8n workflow to schedule and execute the Python program

    
    ## 数据流程
    
    

前端上传 script_requirement

提取 rule → request_content_str

提取 source_table IDs → [2317, 2307]

提取 target_table IDs → [164]

查询 BusinessDomain 节点

    ↓
    ├─→ 获取 INCLUDES 关系 → DataMeta
    ├─→ 获取 BELONGS_TO 关系 → DataLabel
    └─→ 如果是"数据资源" → 获取 COME_FROM 关系 → DataSource
    ↓

生成源表 DDL(不含 create_time)

生成目标表 DDL(含 create_time)

构建 Markdown 任务描述

写入 task_list 表


## DDL 生成规则

### 源表 DDL

```sql
CREATE TABLE {table_name} (
    {column_name} {data_type} COMMENT '{comment}',
    ...
);
COMMENT ON TABLE {table_name} IS '{table_comment}';

目标表 DDL(额外添加 create_time)

CREATE TABLE {table_name} (
    {column_name} {data_type} COMMENT '{comment}',
    ...,
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间'
);
COMMENT ON TABLE {table_name} IS '{table_comment}';

字段说明

script_requirement 字段

字段 类型 说明 示例
code Number 代码标识 28
rule String 转换规则/需求描述 "将科室对照表的数据映射到数据模型中"
source_table Array 源表的 BusinessDomain 节点 ID 列表 [2317, 2307]
target_table Array 目标表的 BusinessDomain 节点 ID 列表 [164]

生成的 data_source 信息

字段 说明
type 数据库类型(postgresql/mysql等)
host 数据库主机地址
port 数据库端口
database 数据库名称

优势

智能提取: 自动从 script_requirement 提取所有必要信息
完整 DDL: 包含所有字段、类型、注释信息
数据源感知: 自动识别并提取数据源信息
更新模式: 明确说明追加或全量更新
时间戳记录: 目标表自动添加 create_time 字段
结构清晰: Markdown 格式,易于阅读
错误容忍: 完善的异常处理,不影响主流程

测试示例

输入数据

{
  "name_zh": "科室对照表映射到数据模型",
  "name_en": "deparment_table_mapping",
  "category": "应用类",
  "leader": "system",
  "organization": "citu",
  "script_type": "python",
  "update_mode": "append",
  "frequency": "月",
  "describe": "数据映射任务",
  "status": "active",
  "script_requirement": {
    "code": 28,
    "rule": "将科室对照表的数据映射到数据模型中,保留所有字段",
    "source_table": [2317, 2307],
    "target_table": [164]
  }
}

生成的任务描述

保存到 task_list.task_description 字段,包含:

  1. 任务标题: 数据流名称
  2. 数据源信息: type, host, port, database
  3. 源表 DDL: 完整的建表语句(不含 create_time)
  4. 目标表 DDL: 完整的建表语句(含 create_time)
  5. 更新模式: append 或 full,附带说明
  6. 请求内容: rule 字段的值
  7. 实施步骤: 详细的执行步骤

关键特性

1. 智能 DDL 生成

源表示例:

CREATE TABLE TB_JC_KSDZB (
    YLJGDM VARCHAR(22) COMMENT '医疗机构代码',
    HISKSDM CHAR(20) COMMENT 'HIS科室代码',
    HISKSMC CHAR(20) COMMENT 'HIS科室名称'
);
COMMENT ON TABLE TB_JC_KSDZB IS '科室对照表';

目标表示例(自动添加 create_time):

CREATE TABLE DM_DEPARTMENT (
    dept_code VARCHAR(50) COMMENT '科室代码',
    dept_name VARCHAR(100) COMMENT '科室名称',
    dept_category VARCHAR(50) COMMENT '科室分类',
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间'
);
COMMENT ON TABLE DM_DEPARTMENT IS '科室数据模型';

2. 数据源自动识别

只有当 BusinessDomain 的 BELONGS_TO 关系指向"数据资源"标签时,才会查询并返回数据源信息:

BusinessDomain → BELONGS_TO → DataLabel("数据资源")
       ↓
   COME_FROM
       ↓
  DataSource (type, host, port, database)

3. 更新模式说明

追加模式 (append):

## Update Mode
- **Mode**: Append (追加模式)
- **Description**: 新数据将追加到目标表,不删除现有数据

全量更新 (full):

## Update Mode
- **Mode**: Full Refresh (全量更新)
- **Description**: 目标表将被清空后重新写入数据

4. create_time 字段

目的: 记录数据写入的时间戳

规则:

  • ✅ 仅目标表添加此字段
  • ✅ 类型: TIMESTAMP
  • ✅ 默认值: CURRENT_TIMESTAMP
  • ✅ 注释: "数据创建时间"

代码结构

主函数优化(第 365-434 行)

# 1. 提取 rule
request_content_str = req_json.get('rule', '')

# 2. 提取 source_table 和 target_table
source_table_ids = req_json.get('source_table', [])
target_table_ids = req_json.get('target_table', [])

# 4. 提取 update_mode
update_mode = data.get('update_mode', 'append')

# 生成 DDL
for bd_id in source_table_ids:
    ddl_info = DataFlowService._generate_businessdomain_ddl(
        session, bd_id, is_target=False
    )

for bd_id in target_table_ids:
    ddl_info = DataFlowService._generate_businessdomain_ddl(
        session, bd_id, is_target=True, update_mode=update_mode
    )

# 构建任务描述
# ... 组装 Markdown 格式的任务描述

辅助方法(第 1025-1115 行)

@staticmethod
def _generate_businessdomain_ddl(
    session, 
    bd_id: int, 
    is_target: bool = False, 
    update_mode: str = 'append'
) -> Optional[Dict[str, Any]]:
    """
    根据 BusinessDomain 节点 ID 生成 DDL
    
    功能:
    - 查询 BusinessDomain 节点
    - 获取 INCLUDES 关系的 DataMeta 元数据
    - 检查 BELONGS_TO 关系(是否为"数据资源")
    - 获取 COME_FROM 关系的 DataSource 信息
    - 为目标表添加 create_time 字段
    - 生成完整的 DDL 语句
    """

验证结果

  • Linter 检查: 无错误
  • Python 编译: 通过
  • 代码结构: 清晰,易维护
  • 错误处理: 完善的异常捕获
  • 日志记录: 详细的操作日志

注意事项

  1. 节点 ID 验证: 确保 source_table 和 target_table 中的 ID 是有效的 BusinessDomain 节点
  2. 数据源可选: 只有标记为"数据资源"的 BusinessDomain 才会有数据源信息
  3. create_time 字段: 只添加到目标表,不添加到源表
  4. 错误不中断: DDL 生成失败不会中断整个数据流创建过程
  5. 默认值: 如果没有元数据,会添加默认的 id 主键列

相关接口

  • POST /api/dataflow/add-dataflow - 创建数据流(使用此优化)
  • GET /api/dataflow/get-BD-list - 获取 BusinessDomain 列表

更新历史

  • 2024-11-28: 优化 task_list 写入逻辑,支持智能 DDL 生成和数据源识别