import_resource_data.py 是一个通用的数据资源导入工具,用于从远程数据源读取数据,按照指定的更新模式写入到目标数据资源表中。
✅ 灵活的数据源支持
✅ 灵活的目标配置
config.py 的 SQLALCHEMY_DATABASE_URI 读取✅ 两种更新模式
append:追加模式(默认),新数据追加到目标表full:全量更新,先清空目标表再写入✅ 智能列映射
create_time 时间戳✅ 命令行参数支持
pip install psycopg2-binary sqlalchemy
pip install pymysql
python app/core/data_flow/import_resource_data.py \
--source-config '{"type":"postgresql","host":"10.52.31.104","port":5432,"database":"source_db","username":"user","password":"pass","table_name":"TB_JC_KSDZB"}' \
--target-table TB_JC_KSDZB \
--update-mode append
source_config.json:{
"type": "postgresql",
"host": "10.52.31.104",
"port": 5432,
"database": "hospital_his",
"username": "his_user",
"password": "his_password",
"table_name": "TB_JC_KSDZB",
"where_clause": "TBRQ >= '2025-01-01'",
"order_by": "TBRQ DESC"
}
python app/core/data_flow/import_resource_data.py \
--source-config source_config.json \
--target-table TB_JC_KSDZB \
--update-mode append \
--limit 1000
| 参数 | 必需 | 说明 | 示例 |
|---|---|---|---|
--source-config |
✅ | 源数据库配置(JSON字符串或文件路径) | 见上方 |
--target-table |
✅ | 目标表名(数据资源的英文名) | TB_JC_KSDZB |
--update-mode |
❌ | 更新模式:append 或 full,默认 append |
append |
--limit |
❌ | 限制导入的数据行数 | 1000 |
from app.core.data_flow.import_resource_data import import_resource_data
# 源数据库配置
source_config = {
'type': 'postgresql',
'host': '10.52.31.104',
'port': 5432,
'database': 'hospital_his',
'username': 'his_user',
'password': 'his_password',
'table_name': 'TB_JC_KSDZB', # 源表名
'where_clause': "TBRQ >= '2025-01-01'", # 可选:WHERE条件
'order_by': 'TBRQ DESC' # 可选:排序
}
# 执行导入
result = import_resource_data(
source_config=source_config,
target_table_name='TB_JC_KSDZB', # 目标表名
update_mode='append', # 或 'full'
limit=1000 # 可选:限制行数
)
# 查看结果
print(f"导入成功: {result['success']}")
print(f"成功: {result['imported_count']} 条")
print(f"失败: {result['error_count']} 条")
print(f"消息: {result['message']}")
{
"type": "postgresql",
"host": "10.52.31.104",
"port": 5432,
"database": "hospital_his",
"username": "his_user",
"password": "his_password",
"table_name": "TB_JC_KSDZB",
"where_clause": "TBRQ >= '2025-01-01'",
"order_by": "TBRQ DESC"
}
{
"type": "mysql",
"host": "10.52.31.105",
"port": 3306,
"database": "hospital_his",
"username": "his_user",
"password": "his_password",
"table_name": "dept_table",
"where_clause": "status = 1",
"order_by": "update_time DESC"
}
| 字段 | 必需 | 说明 |
|---|---|---|
type |
✅ | 数据库类型:postgresql 或 mysql |
host |
✅ | 数据库主机地址 |
port |
✅ | 数据库端口 |
database |
✅ | 数据库名 |
username |
✅ | 用户名 |
password |
✅ | 密码 |
table_name |
✅ | 源表名 |
where_clause |
❌ | WHERE 过滤条件 |
order_by |
❌ | 排序条件 |
特点:
使用场景:
示例:
python app/core/data_flow/import_resource_data.py \
--source-config source_config.json \
--target-table TB_JC_KSDZB \
--update-mode append
特点:
使用场景:
示例:
python app/core/data_flow/import_resource_data.py \
--source-config source_config.json \
--target-table TB_JC_KSDZB \
--update-mode full
不区分大小写匹配
源表:YLJGDM → 目标表:yljgdm ✅
源表:HisKsDm → 目标表:HISKSDM ✅
自动添加 create_time
create_time 字段CURRENT_TIMESTAMP未匹配列处理
NULL源表结构:
CREATE TABLE source_table (
YLJGDM VARCHAR(22),
HISKSDM CHAR(20),
HISKSMC CHAR(20)
);
目标表结构:
CREATE TABLE target_table (
yljgdm VARCHAR(22),
hisksdm CHAR(20),
hisksmc CHAR(20),
extra_field VARCHAR(50), -- 源表没有
create_time TIMESTAMP -- 自动添加
);
映射结果:
YLJGDM → yljgdm ✅HISKSDM → hisksdm ✅HISKSMC → hisksmc ✅extra_field → NULL (源表没有)create_time → CURRENT_TIMESTAMP (自动添加){
'success': True, # 是否成功
'imported_count': 1250, # 成功导入行数
'error_count': 5, # 失败行数
'update_mode': 'append', # 更新模式
'message': '导入完成: 成功 1250 条, 失败 5 条' # 详细消息
}
{
'success': True,
'imported_count': 1250,
'error_count': 0,
'update_mode': 'append',
'message': '导入完成: 成功 1250 条, 失败 0 条'
}
{
'success': False,
'imported_count': 0,
'error_count': 0,
'update_mode': 'append',
'message': '连接源数据库失败: connection refused'
}
1. 解析命令行参数
↓
2. 连接源数据库
↓
3. 连接目标数据库(从 config.py)
↓
4. [full 模式] 清空目标表
↓
5. 提取源数据
↓
6. 映射列名
↓
7. 批量插入目标表(每 100 条提交一次)
↓
8. 关闭所有连接
↓
9. 返回结果
2025-11-28 10:30:00 - ResourceDataImporter - INFO - ============================================================
2025-11-28 10:30:00 - ResourceDataImporter - INFO - 开始数据导入
2025-11-28 10:30:00 - ResourceDataImporter - INFO - 源表: TB_JC_KSDZB
2025-11-28 10:30:00 - ResourceDataImporter - INFO - 目标表: TB_JC_KSDZB
2025-11-28 10:30:00 - ResourceDataImporter - INFO - 更新模式: append
2025-11-28 10:30:00 - ResourceDataImporter - INFO - ============================================================
2025-11-28 10:30:01 - ResourceDataImporter - INFO - 成功连接源数据库(PostgreSQL): 10.52.31.104:5432/hospital_his
2025-11-28 10:30:01 - ResourceDataImporter - INFO - 成功连接目标数据库: localhost:5432/dataops_platform
2025-11-28 10:30:02 - ResourceDataImporter - INFO - 从源表 TB_JC_KSDZB 提取了 1250 条数据
2025-11-28 10:30:02 - ResourceDataImporter - INFO - 目标表 TB_JC_KSDZB 的列: ['yljgdm', 'hisksdm', 'hisksmc', ...]
2025-11-28 10:30:03 - ResourceDataImporter - INFO - 已插入 100 条数据...
2025-11-28 10:30:04 - ResourceDataImporter - INFO - 已插入 200 条数据...
...
2025-11-28 10:30:15 - ResourceDataImporter - INFO - 数据插入完成: 成功 1250 条, 失败 0 条
2025-11-28 10:30:15 - ResourceDataImporter - INFO - 源数据库连接已关闭
2025-11-28 10:30:15 - ResourceDataImporter - INFO - 目标数据库会话已关闭
2025-11-28 10:30:15 - ResourceDataImporter - INFO - ============================================================
2025-11-28 10:30:15 - ResourceDataImporter - INFO - 导入结果: 导入完成: 成功 1250 条, 失败 0 条
2025-11-28 10:30:15 - ResourceDataImporter - INFO - ============================================================
python app/core/data_flow/import_resource_data.py \
--source-config '{
"type": "postgresql",
"host": "10.52.31.104",
"port": 5432,
"database": "hospital_his",
"username": "his_user",
"password": "his_password",
"table_name": "TB_JC_KSDZB",
"where_clause": "TO_CHAR(TBRQ, '\''YYYY-MM'\'') = '\''2025-11'\''",
"order_by": "TBRQ DESC"
}' \
--target-table TB_JC_KSDZB \
--update-mode append
python app/core/data_flow/import_resource_data.py \
--source-config patient_config.json \
--target-table patient_info \
--update-mode full
python app/core/data_flow/import_resource_data.py \
--source-config source_config.json \
--target-table TB_JC_KSDZB \
--update-mode append \
--limit 1000
from app.core.data_flow.import_resource_data import import_resource_data
# 配置
source_config = {
'type': 'postgresql',
'host': '10.52.31.104',
'port': 5432,
'database': 'hospital_his',
'username': 'his_user',
'password': 'his_password',
'table_name': 'TB_JC_KSDZB'
}
# 导入
result = import_resource_data(
source_config=source_config,
target_table_name='TB_JC_KSDZB',
update_mode='append'
)
# 处理结果
if result['success']:
print(f"✅ 导入成功: {result['imported_count']} 条")
else:
print(f"❌ 导入失败: {result['message']}")
错误消息:
连接源数据库失败: connection refused
原因:
解决方案:
host 和 port 配置错误消息:
连接目标数据库失败: Invalid connection string
原因:
config.py 中的 SQLALCHEMY_DATABASE_URI 配置错误解决方案:
app/config/config.py错误消息:
提取源数据失败: relation "TB_JC_KSDZB" does not exist
原因:
解决方案:
table_name 配置错误消息:
插入数据失败: permission denied for table TB_JC_KSDZB
原因:
解决方案:
测试时使用小数据量:
--limit 100
在目标表的常用查询列上添加索引:
CREATE INDEX idx_tbrq ON TB_JC_KSDZB(tbrq);
大数据量分批导入:
# 第一批
import_resource_data(source_config, 'TB_JC_KSDZB', limit=10000)
# 修改 where_clause 后继续
source_config['where_clause'] = "id > 10000 AND id <= 20000"
import_resource_data(source_config, 'TB_JC_KSDZB', limit=10000)
⚠️ 全量更新模式:
full 模式会删除目标表所有数据⚠️ 数据一致性:
⚠️ 密码安全:
⚠️ 目标数据库配置:
config.py 读取DataFlow 创建时会自动生成任务,任务描述包含:
从 DataFlow 任务描述中提取配置:
# 1. 从任务描述提取配置
task_description = get_task_description(task_id=7)
# 2. 构建 source_config
source_config = {
'type': 'postgresql',
'host': extracted_from_task['host'],
'port': extracted_from_task['port'],
'database': extracted_from_task['database'],
'username': extracted_from_task['username'],
'password': extracted_from_task['password'],
'table_name': extracted_from_task['source_table']
}
# 3. 执行导入
result = import_resource_data(
source_config=source_config,
target_table_name=extracted_from_task['target_table'],
update_mode=extracted_from_task['update_mode']
)
app/core/data_flow/import_resource_data.py - 主程序app/config/config.py - 目标数据库配置docs/DataFlow_task_list优化说明.md - 任务描述生成逻辑docs/Task_Manager_MCP_说明.md - MCP 工作流程