import_resource_data使用说明.md 14 KB

import_resource_data.py 使用说明

概述

import_resource_data.py 是一个通用的数据资源导入工具,用于从远程数据源读取数据,按照指定的更新模式写入到目标数据资源表中。

功能特性

灵活的数据源支持

  • PostgreSQL
  • MySQL(需要安装 pymysql)

灵活的目标配置

  • 目标数据库从 config.pySQLALCHEMY_DATABASE_URI 读取
  • 支持任意目标表名(数据资源的英文名)

两种更新模式

  • append:追加模式(默认),新数据追加到目标表
  • full:全量更新,先清空目标表再写入

智能列映射

  • 自动匹配源表和目标表的列名(不区分大小写)
  • 自动为目标表添加 create_time 时间戳

命令行参数支持

  • 接收 JSON 配置或配置文件
  • 支持限制导入数据行数

安装依赖

必需依赖

pip install psycopg2-binary sqlalchemy

可选依赖(MySQL 支持)

pip install pymysql

使用方式

方式 1: 命令行调用

基本用法

python app/core/data_flow/import_resource_data.py \
  --source-config '{"type":"postgresql","host":"10.52.31.104","port":5432,"database":"source_db","username":"user","password":"pass","table_name":"TB_JC_KSDZB"}' \
  --target-table TB_JC_KSDZB \
  --update-mode append

使用配置文件

  1. 创建配置文件 source_config.json
{
  "type": "postgresql",
  "host": "10.52.31.104",
  "port": 5432,
  "database": "hospital_his",
  "username": "his_user",
  "password": "his_password",
  "table_name": "TB_JC_KSDZB",
  "where_clause": "TBRQ >= '2025-01-01'",
  "order_by": "TBRQ DESC"
}
  1. 运行命令:
python app/core/data_flow/import_resource_data.py \
  --source-config source_config.json \
  --target-table TB_JC_KSDZB \
  --update-mode append \
  --limit 1000

命令行参数说明

参数 必需 说明 示例
--source-config 源数据库配置(JSON字符串或文件路径) 见上方
--target-table 目标表名(数据资源的英文名) TB_JC_KSDZB
--update-mode 更新模式:appendfull,默认 append append
--limit 限制导入的数据行数 1000

方式 2: Python 代码调用

from app.core.data_flow.import_resource_data import import_resource_data

# 源数据库配置
source_config = {
    'type': 'postgresql',
    'host': '10.52.31.104',
    'port': 5432,
    'database': 'hospital_his',
    'username': 'his_user',
    'password': 'his_password',
    'table_name': 'TB_JC_KSDZB',  # 源表名
    'where_clause': "TBRQ >= '2025-01-01'",  # 可选:WHERE条件
    'order_by': 'TBRQ DESC'  # 可选:排序
}

# 执行导入
result = import_resource_data(
    source_config=source_config,
    target_table_name='TB_JC_KSDZB',  # 目标表名
    update_mode='append',  # 或 'full'
    limit=1000  # 可选:限制行数
)

# 查看结果
print(f"导入成功: {result['success']}")
print(f"成功: {result['imported_count']} 条")
print(f"失败: {result['error_count']} 条")
print(f"消息: {result['message']}")

源数据库配置

PostgreSQL 配置

{
  "type": "postgresql",
  "host": "10.52.31.104",
  "port": 5432,
  "database": "hospital_his",
  "username": "his_user",
  "password": "his_password",
  "table_name": "TB_JC_KSDZB",
  "where_clause": "TBRQ >= '2025-01-01'",
  "order_by": "TBRQ DESC"
}

MySQL 配置

{
  "type": "mysql",
  "host": "10.52.31.105",
  "port": 3306,
  "database": "hospital_his",
  "username": "his_user",
  "password": "his_password",
  "table_name": "dept_table",
  "where_clause": "status = 1",
  "order_by": "update_time DESC"
}

配置字段说明

字段 必需 说明
type 数据库类型:postgresqlmysql
host 数据库主机地址
port 数据库端口
database 数据库名
username 用户名
password 密码
table_name 源表名
where_clause WHERE 过滤条件
order_by 排序条件

更新模式

append(追加模式)

特点

  • 新数据追加到目标表
  • 不删除现有数据
  • 适合增量数据导入

使用场景

  • 日志数据导入
  • 定期增量同步
  • 历史数据累积

示例

python app/core/data_flow/import_resource_data.py \
  --source-config source_config.json \
  --target-table TB_JC_KSDZB \
  --update-mode append

full(全量更新)

特点

  • 先清空目标表
  • 再写入新数据
  • 目标表数据完全替换

使用场景

  • 主数据同步
  • 配置表更新
  • 每日全量刷新

示例

python app/core/data_flow/import_resource_data.py \
  --source-config source_config.json \
  --target-table TB_JC_KSDZB \
  --update-mode full

列映射机制

自动映射规则

  1. 不区分大小写匹配

    源表:YLJGDM → 目标表:yljgdm ✅
    源表:HisKsDm → 目标表:HISKSDM ✅
    
  2. 自动添加 create_time

    • 目标表自动添加 create_time 字段
    • 值为当前时间戳 CURRENT_TIMESTAMP
  3. 未匹配列处理

    • 目标表有、源表没有的列 → 设为 NULL
    • 源表有、目标表没有的列 → 忽略

映射示例

源表结构

CREATE TABLE source_table (
    YLJGDM VARCHAR(22),
    HISKSDM CHAR(20),
    HISKSMC CHAR(20)
);

目标表结构

CREATE TABLE target_table (
    yljgdm VARCHAR(22),
    hisksdm CHAR(20),
    hisksmc CHAR(20),
    extra_field VARCHAR(50),  -- 源表没有
    create_time TIMESTAMP  -- 自动添加
);

映射结果

  • YLJGDMyljgdm
  • HISKSDMhisksdm
  • HISKSMChisksmc
  • extra_fieldNULL (源表没有)
  • create_timeCURRENT_TIMESTAMP (自动添加)

返回结果

结果结构

{
    'success': True,  # 是否成功
    'imported_count': 1250,  # 成功导入行数
    'error_count': 5,  # 失败行数
    'update_mode': 'append',  # 更新模式
    'message': '导入完成: 成功 1250 条, 失败 5 条'  # 详细消息
}

成功示例

{
    'success': True,
    'imported_count': 1250,
    'error_count': 0,
    'update_mode': 'append',
    'message': '导入完成: 成功 1250 条, 失败 0 条'
}

失败示例

{
    'success': False,
    'imported_count': 0,
    'error_count': 0,
    'update_mode': 'append',
    'message': '连接源数据库失败: connection refused'
}

执行流程

1. 解析命令行参数
   ↓
2. 连接源数据库
   ↓
3. 连接目标数据库(从 config.py)
   ↓
4. [full 模式] 清空目标表
   ↓
5. 提取源数据
   ↓
6. 映射列名
   ↓
7. 批量插入目标表(每 100 条提交一次)
   ↓
8. 关闭所有连接
   ↓
9. 返回结果

日志输出

日志格式

2025-11-28 10:30:00 - ResourceDataImporter - INFO - ============================================================
2025-11-28 10:30:00 - ResourceDataImporter - INFO - 开始数据导入
2025-11-28 10:30:00 - ResourceDataImporter - INFO - 源表: TB_JC_KSDZB
2025-11-28 10:30:00 - ResourceDataImporter - INFO - 目标表: TB_JC_KSDZB
2025-11-28 10:30:00 - ResourceDataImporter - INFO - 更新模式: append
2025-11-28 10:30:00 - ResourceDataImporter - INFO - ============================================================
2025-11-28 10:30:01 - ResourceDataImporter - INFO - 成功连接源数据库(PostgreSQL): 10.52.31.104:5432/hospital_his
2025-11-28 10:30:01 - ResourceDataImporter - INFO - 成功连接目标数据库: localhost:5432/dataops_platform
2025-11-28 10:30:02 - ResourceDataImporter - INFO - 从源表 TB_JC_KSDZB 提取了 1250 条数据
2025-11-28 10:30:02 - ResourceDataImporter - INFO - 目标表 TB_JC_KSDZB 的列: ['yljgdm', 'hisksdm', 'hisksmc', ...]
2025-11-28 10:30:03 - ResourceDataImporter - INFO - 已插入 100 条数据...
2025-11-28 10:30:04 - ResourceDataImporter - INFO - 已插入 200 条数据...
...
2025-11-28 10:30:15 - ResourceDataImporter - INFO - 数据插入完成: 成功 1250 条, 失败 0 条
2025-11-28 10:30:15 - ResourceDataImporter - INFO - 源数据库连接已关闭
2025-11-28 10:30:15 - ResourceDataImporter - INFO - 目标数据库会话已关闭
2025-11-28 10:30:15 - ResourceDataImporter - INFO - ============================================================
2025-11-28 10:30:15 - ResourceDataImporter - INFO - 导入结果: 导入完成: 成功 1250 条, 失败 0 条
2025-11-28 10:30:15 - ResourceDataImporter - INFO - ============================================================

使用示例

示例 1: 科室对照表导入(追加模式)

python app/core/data_flow/import_resource_data.py \
  --source-config '{
    "type": "postgresql",
    "host": "10.52.31.104",
    "port": 5432,
    "database": "hospital_his",
    "username": "his_user",
    "password": "his_password",
    "table_name": "TB_JC_KSDZB",
    "where_clause": "TO_CHAR(TBRQ, '\''YYYY-MM'\'') = '\''2025-11'\''",
    "order_by": "TBRQ DESC"
  }' \
  --target-table TB_JC_KSDZB \
  --update-mode append

示例 2: 患者信息全量更新

python app/core/data_flow/import_resource_data.py \
  --source-config patient_config.json \
  --target-table patient_info \
  --update-mode full

示例 3: 限制导入前 1000 条

python app/core/data_flow/import_resource_data.py \
  --source-config source_config.json \
  --target-table TB_JC_KSDZB \
  --update-mode append \
  --limit 1000

示例 4: Python 代码调用

from app.core.data_flow.import_resource_data import import_resource_data

# 配置
source_config = {
    'type': 'postgresql',
    'host': '10.52.31.104',
    'port': 5432,
    'database': 'hospital_his',
    'username': 'his_user',
    'password': 'his_password',
    'table_name': 'TB_JC_KSDZB'
}

# 导入
result = import_resource_data(
    source_config=source_config,
    target_table_name='TB_JC_KSDZB',
    update_mode='append'
)

# 处理结果
if result['success']:
    print(f"✅ 导入成功: {result['imported_count']} 条")
else:
    print(f"❌ 导入失败: {result['message']}")

错误处理

常见错误

1. 连接源数据库失败

错误消息

连接源数据库失败: connection refused

原因

  • 数据库地址或端口错误
  • 防火墙阻止连接
  • 数据库未启动

解决方案

  • 检查 hostport 配置
  • 确认防火墙规则
  • 确认数据库服务运行中

2. 连接目标数据库失败

错误消息

连接目标数据库失败: Invalid connection string

原因

  • config.py 中的 SQLALCHEMY_DATABASE_URI 配置错误

解决方案

  • 检查 app/config/config.py
  • 确认数据库连接字符串格式正确

3. 表不存在

错误消息

提取源数据失败: relation "TB_JC_KSDZB" does not exist

原因

  • 源表名错误
  • 目标表不存在

解决方案

  • 检查 table_name 配置
  • 在目标数据库中创建对应表

4. 权限不足

错误消息

插入数据失败: permission denied for table TB_JC_KSDZB

原因

  • 数据库用户没有写入权限

解决方案

  • 授予用户 INSERT 权限
  • 使用有足够权限的用户

性能优化

1. 批量提交

  • 每 100 条数据提交一次
  • 平衡性能和内存使用

2. 使用 limit 参数

测试时使用小数据量:

--limit 100

3. 添加索引

在目标表的常用查询列上添加索引:

CREATE INDEX idx_tbrq ON TB_JC_KSDZB(tbrq);

4. 分批导入

大数据量分批导入:

# 第一批
import_resource_data(source_config, 'TB_JC_KSDZB', limit=10000)

# 修改 where_clause 后继续
source_config['where_clause'] = "id > 10000 AND id <= 20000"
import_resource_data(source_config, 'TB_JC_KSDZB', limit=10000)

注意事项

⚠️ 全量更新模式

  • 使用 full 模式会删除目标表所有数据
  • 请确认后再使用

⚠️ 数据一致性

  • 使用事务保证数据一致性
  • 失败会自动回滚

⚠️ 密码安全

  • 避免在命令行直接暴露密码
  • 建议使用配置文件
  • 配置文件设置适当的文件权限

⚠️ 目标数据库配置

  • 目标数据库从 config.py 读取
  • 不能在运行时指定目标数据库

与 DataFlow 集成

自动调用

DataFlow 创建时会自动生成任务,任务描述包含:

  • 数据源信息
  • 源表 DDL
  • 目标表 DDL
  • 更新模式

手动调用

从 DataFlow 任务描述中提取配置:

# 1. 从任务描述提取配置
task_description = get_task_description(task_id=7)

# 2. 构建 source_config
source_config = {
    'type': 'postgresql',
    'host': extracted_from_task['host'],
    'port': extracted_from_task['port'],
    'database': extracted_from_task['database'],
    'username': extracted_from_task['username'],
    'password': extracted_from_task['password'],
    'table_name': extracted_from_task['source_table']
}

# 3. 执行导入
result = import_resource_data(
    source_config=source_config,
    target_table_name=extracted_from_task['target_table'],
    update_mode=extracted_from_task['update_mode']
)

相关文件

  • app/core/data_flow/import_resource_data.py - 主程序
  • app/config/config.py - 目标数据库配置
  • docs/DataFlow_task_list优化说明.md - 任务描述生成逻辑
  • docs/Task_Manager_MCP_说明.md - MCP 工作流程

更新历史

  • 2025-11-28: 重构为通用数据资源导入工具,支持命令行参数和灵活配置