# import_resource_data.py 使用说明 ## 概述 `import_resource_data.py` 是一个通用的数据资源导入工具,用于从远程数据源读取数据,按照指定的更新模式写入到目标数据资源表中。 ## 功能特性 ✅ **灵活的数据源支持** - PostgreSQL - MySQL(需要安装 pymysql) ✅ **灵活的目标配置** - 目标数据库从 `config.py` 的 `SQLALCHEMY_DATABASE_URI` 读取 - 支持任意目标表名(数据资源的英文名) ✅ **两种更新模式** - `append`:追加模式(默认),新数据追加到目标表 - `full`:全量更新,先清空目标表再写入 ✅ **智能列映射** - 自动匹配源表和目标表的列名(不区分大小写) - 自动为目标表添加 `create_time` 时间戳 ✅ **命令行参数支持** - 接收 JSON 配置或配置文件 - 支持限制导入数据行数 ## 安装依赖 ### 必需依赖 ```bash pip install psycopg2-binary sqlalchemy ``` ### 可选依赖(MySQL 支持) ```bash pip install pymysql ``` ## 使用方式 ### 方式 1: 命令行调用 #### 基本用法 ```bash python app/core/data_flow/import_resource_data.py \ --source-config '{"type":"postgresql","host":"10.52.31.104","port":5432,"database":"source_db","username":"user","password":"pass","table_name":"TB_JC_KSDZB"}' \ --target-table TB_JC_KSDZB \ --update-mode append ``` #### 使用配置文件 1. 创建配置文件 `source_config.json`: ```json { "type": "postgresql", "host": "10.52.31.104", "port": 5432, "database": "hospital_his", "username": "his_user", "password": "his_password", "table_name": "TB_JC_KSDZB", "where_clause": "TBRQ >= '2025-01-01'", "order_by": "TBRQ DESC" } ``` 2. 运行命令: ```bash python app/core/data_flow/import_resource_data.py \ --source-config source_config.json \ --target-table TB_JC_KSDZB \ --update-mode append \ --limit 1000 ``` #### 命令行参数说明 | 参数 | 必需 | 说明 | 示例 | |------|------|------|------| | `--source-config` | ✅ | 源数据库配置(JSON字符串或文件路径) | 见上方 | | `--target-table` | ✅ | 目标表名(数据资源的英文名) | `TB_JC_KSDZB` | | `--update-mode` | ❌ | 更新模式:`append` 或 `full`,默认 `append` | `append` | | `--limit` | ❌ | 限制导入的数据行数 | `1000` | ### 方式 2: Python 代码调用 ```python from app.core.data_flow.import_resource_data import import_resource_data # 源数据库配置 source_config = { 'type': 'postgresql', 'host': '10.52.31.104', 'port': 5432, 'database': 'hospital_his', 'username': 'his_user', 'password': 'his_password', 'table_name': 'TB_JC_KSDZB', # 源表名 'where_clause': "TBRQ >= '2025-01-01'", # 可选:WHERE条件 'order_by': 'TBRQ DESC' # 可选:排序 } # 执行导入 result = import_resource_data( source_config=source_config, target_table_name='TB_JC_KSDZB', # 目标表名 update_mode='append', # 或 'full' limit=1000 # 可选:限制行数 ) # 查看结果 print(f"导入成功: {result['success']}") print(f"成功: {result['imported_count']} 条") print(f"失败: {result['error_count']} 条") print(f"消息: {result['message']}") ``` ## 源数据库配置 ### PostgreSQL 配置 ```json { "type": "postgresql", "host": "10.52.31.104", "port": 5432, "database": "hospital_his", "username": "his_user", "password": "his_password", "table_name": "TB_JC_KSDZB", "where_clause": "TBRQ >= '2025-01-01'", "order_by": "TBRQ DESC" } ``` ### MySQL 配置 ```json { "type": "mysql", "host": "10.52.31.105", "port": 3306, "database": "hospital_his", "username": "his_user", "password": "his_password", "table_name": "dept_table", "where_clause": "status = 1", "order_by": "update_time DESC" } ``` ### 配置字段说明 | 字段 | 必需 | 说明 | |------|------|------| | `type` | ✅ | 数据库类型:`postgresql` 或 `mysql` | | `host` | ✅ | 数据库主机地址 | | `port` | ✅ | 数据库端口 | | `database` | ✅ | 数据库名 | | `username` | ✅ | 用户名 | | `password` | ✅ | 密码 | | `table_name` | ✅ | 源表名 | | `where_clause` | ❌ | WHERE 过滤条件 | | `order_by` | ❌ | 排序条件 | ## 更新模式 ### append(追加模式) **特点**: - 新数据追加到目标表 - 不删除现有数据 - 适合增量数据导入 **使用场景**: - 日志数据导入 - 定期增量同步 - 历史数据累积 **示例**: ```bash python app/core/data_flow/import_resource_data.py \ --source-config source_config.json \ --target-table TB_JC_KSDZB \ --update-mode append ``` ### full(全量更新) **特点**: - 先清空目标表 - 再写入新数据 - 目标表数据完全替换 **使用场景**: - 主数据同步 - 配置表更新 - 每日全量刷新 **示例**: ```bash python app/core/data_flow/import_resource_data.py \ --source-config source_config.json \ --target-table TB_JC_KSDZB \ --update-mode full ``` ## 列映射机制 ### 自动映射规则 1. **不区分大小写匹配** ``` 源表:YLJGDM → 目标表:yljgdm ✅ 源表:HisKsDm → 目标表:HISKSDM ✅ ``` 2. **自动添加 create_time** - 目标表自动添加 `create_time` 字段 - 值为当前时间戳 `CURRENT_TIMESTAMP` 3. **未匹配列处理** - 目标表有、源表没有的列 → 设为 `NULL` - 源表有、目标表没有的列 → 忽略 ### 映射示例 **源表结构**: ```sql CREATE TABLE source_table ( YLJGDM VARCHAR(22), HISKSDM CHAR(20), HISKSMC CHAR(20) ); ``` **目标表结构**: ```sql CREATE TABLE target_table ( yljgdm VARCHAR(22), hisksdm CHAR(20), hisksmc CHAR(20), extra_field VARCHAR(50), -- 源表没有 create_time TIMESTAMP -- 自动添加 ); ``` **映射结果**: - `YLJGDM` → `yljgdm` ✅ - `HISKSDM` → `hisksdm` ✅ - `HISKSMC` → `hisksmc` ✅ - `extra_field` → `NULL` (源表没有) - `create_time` → `CURRENT_TIMESTAMP` (自动添加) ## 返回结果 ### 结果结构 ```python { 'success': True, # 是否成功 'imported_count': 1250, # 成功导入行数 'error_count': 5, # 失败行数 'update_mode': 'append', # 更新模式 'message': '导入完成: 成功 1250 条, 失败 5 条' # 详细消息 } ``` ### 成功示例 ```python { 'success': True, 'imported_count': 1250, 'error_count': 0, 'update_mode': 'append', 'message': '导入完成: 成功 1250 条, 失败 0 条' } ``` ### 失败示例 ```python { 'success': False, 'imported_count': 0, 'error_count': 0, 'update_mode': 'append', 'message': '连接源数据库失败: connection refused' } ``` ## 执行流程 ``` 1. 解析命令行参数 ↓ 2. 连接源数据库 ↓ 3. 连接目标数据库(从 config.py) ↓ 4. [full 模式] 清空目标表 ↓ 5. 提取源数据 ↓ 6. 映射列名 ↓ 7. 批量插入目标表(每 100 条提交一次) ↓ 8. 关闭所有连接 ↓ 9. 返回结果 ``` ## 日志输出 ### 日志格式 ``` 2025-11-28 10:30:00 - ResourceDataImporter - INFO - ============================================================ 2025-11-28 10:30:00 - ResourceDataImporter - INFO - 开始数据导入 2025-11-28 10:30:00 - ResourceDataImporter - INFO - 源表: TB_JC_KSDZB 2025-11-28 10:30:00 - ResourceDataImporter - INFO - 目标表: TB_JC_KSDZB 2025-11-28 10:30:00 - ResourceDataImporter - INFO - 更新模式: append 2025-11-28 10:30:00 - ResourceDataImporter - INFO - ============================================================ 2025-11-28 10:30:01 - ResourceDataImporter - INFO - 成功连接源数据库(PostgreSQL): 10.52.31.104:5432/hospital_his 2025-11-28 10:30:01 - ResourceDataImporter - INFO - 成功连接目标数据库: localhost:5432/dataops_platform 2025-11-28 10:30:02 - ResourceDataImporter - INFO - 从源表 TB_JC_KSDZB 提取了 1250 条数据 2025-11-28 10:30:02 - ResourceDataImporter - INFO - 目标表 TB_JC_KSDZB 的列: ['yljgdm', 'hisksdm', 'hisksmc', ...] 2025-11-28 10:30:03 - ResourceDataImporter - INFO - 已插入 100 条数据... 2025-11-28 10:30:04 - ResourceDataImporter - INFO - 已插入 200 条数据... ... 2025-11-28 10:30:15 - ResourceDataImporter - INFO - 数据插入完成: 成功 1250 条, 失败 0 条 2025-11-28 10:30:15 - ResourceDataImporter - INFO - 源数据库连接已关闭 2025-11-28 10:30:15 - ResourceDataImporter - INFO - 目标数据库会话已关闭 2025-11-28 10:30:15 - ResourceDataImporter - INFO - ============================================================ 2025-11-28 10:30:15 - ResourceDataImporter - INFO - 导入结果: 导入完成: 成功 1250 条, 失败 0 条 2025-11-28 10:30:15 - ResourceDataImporter - INFO - ============================================================ ``` ## 使用示例 ### 示例 1: 科室对照表导入(追加模式) ```bash python app/core/data_flow/import_resource_data.py \ --source-config '{ "type": "postgresql", "host": "10.52.31.104", "port": 5432, "database": "hospital_his", "username": "his_user", "password": "his_password", "table_name": "TB_JC_KSDZB", "where_clause": "TO_CHAR(TBRQ, '\''YYYY-MM'\'') = '\''2025-11'\''", "order_by": "TBRQ DESC" }' \ --target-table TB_JC_KSDZB \ --update-mode append ``` ### 示例 2: 患者信息全量更新 ```bash python app/core/data_flow/import_resource_data.py \ --source-config patient_config.json \ --target-table patient_info \ --update-mode full ``` ### 示例 3: 限制导入前 1000 条 ```bash python app/core/data_flow/import_resource_data.py \ --source-config source_config.json \ --target-table TB_JC_KSDZB \ --update-mode append \ --limit 1000 ``` ### 示例 4: Python 代码调用 ```python from app.core.data_flow.import_resource_data import import_resource_data # 配置 source_config = { 'type': 'postgresql', 'host': '10.52.31.104', 'port': 5432, 'database': 'hospital_his', 'username': 'his_user', 'password': 'his_password', 'table_name': 'TB_JC_KSDZB' } # 导入 result = import_resource_data( source_config=source_config, target_table_name='TB_JC_KSDZB', update_mode='append' ) # 处理结果 if result['success']: print(f"✅ 导入成功: {result['imported_count']} 条") else: print(f"❌ 导入失败: {result['message']}") ``` ## 错误处理 ### 常见错误 #### 1. 连接源数据库失败 **错误消息**: ``` 连接源数据库失败: connection refused ``` **原因**: - 数据库地址或端口错误 - 防火墙阻止连接 - 数据库未启动 **解决方案**: - 检查 `host` 和 `port` 配置 - 确认防火墙规则 - 确认数据库服务运行中 #### 2. 连接目标数据库失败 **错误消息**: ``` 连接目标数据库失败: Invalid connection string ``` **原因**: - `config.py` 中的 `SQLALCHEMY_DATABASE_URI` 配置错误 **解决方案**: - 检查 `app/config/config.py` - 确认数据库连接字符串格式正确 #### 3. 表不存在 **错误消息**: ``` 提取源数据失败: relation "TB_JC_KSDZB" does not exist ``` **原因**: - 源表名错误 - 目标表不存在 **解决方案**: - 检查 `table_name` 配置 - 在目标数据库中创建对应表 #### 4. 权限不足 **错误消息**: ``` 插入数据失败: permission denied for table TB_JC_KSDZB ``` **原因**: - 数据库用户没有写入权限 **解决方案**: - 授予用户 INSERT 权限 - 使用有足够权限的用户 ## 性能优化 ### 1. 批量提交 - 每 100 条数据提交一次 - 平衡性能和内存使用 ### 2. 使用 limit 参数 测试时使用小数据量: ```bash --limit 100 ``` ### 3. 添加索引 在目标表的常用查询列上添加索引: ```sql CREATE INDEX idx_tbrq ON TB_JC_KSDZB(tbrq); ``` ### 4. 分批导入 大数据量分批导入: ```python # 第一批 import_resource_data(source_config, 'TB_JC_KSDZB', limit=10000) # 修改 where_clause 后继续 source_config['where_clause'] = "id > 10000 AND id <= 20000" import_resource_data(source_config, 'TB_JC_KSDZB', limit=10000) ``` ## 注意事项 ⚠️ **全量更新模式**: - 使用 `full` 模式会**删除目标表所有数据** - 请确认后再使用 ⚠️ **数据一致性**: - 使用事务保证数据一致性 - 失败会自动回滚 ⚠️ **密码安全**: - 避免在命令行直接暴露密码 - 建议使用配置文件 - 配置文件设置适当的文件权限 ⚠️ **目标数据库配置**: - 目标数据库从 `config.py` 读取 - 不能在运行时指定目标数据库 ## 与 DataFlow 集成 ### 自动调用 DataFlow 创建时会自动生成任务,任务描述包含: - 数据源信息 - 源表 DDL - 目标表 DDL - 更新模式 ### 手动调用 从 DataFlow 任务描述中提取配置: ```python # 1. 从任务描述提取配置 task_description = get_task_description(task_id=7) # 2. 构建 source_config source_config = { 'type': 'postgresql', 'host': extracted_from_task['host'], 'port': extracted_from_task['port'], 'database': extracted_from_task['database'], 'username': extracted_from_task['username'], 'password': extracted_from_task['password'], 'table_name': extracted_from_task['source_table'] } # 3. 执行导入 result = import_resource_data( source_config=source_config, target_table_name=extracted_from_task['target_table'], update_mode=extracted_from_task['update_mode'] ) ``` ## 相关文件 - `app/core/data_flow/import_resource_data.py` - 主程序 - `app/config/config.py` - 目标数据库配置 - `docs/DataFlow_task_list优化说明.md` - 任务描述生成逻辑 - `docs/Task_Manager_MCP_说明.md` - MCP 工作流程 ## 更新历史 - **2025-11-28**: 重构为通用数据资源导入工具,支持命令行参数和灵活配置