maxiaolong d3f48c98ad 优化调整auto_excute_tasks代码功能。 пре 15 часа
..
DF_DO202601130001_workflow.json d3f48c98ad 优化调整auto_excute_tasks代码功能。 пре 15 часа
README_import_product_inventory.md 9e4d968671 bug修复。数据流程优化。 数据任务自动化处理优化。 пре 6 дана
import_product_inventory_workflow.json d3f48c98ad 优化调整auto_excute_tasks代码功能。 пре 15 часа

README_import_product_inventory.md

产品库存表数据导入工作流

📋 概述

这个 n8n 工作流用于从远程 PostgreSQL 数据库导入产品库存数据到本地数据资源表 test_product_inventory

🎯 任务信息

  • 任务ID: 22
  • 任务名称: 导入原始的产品库存表
  • 创建时间: 2026-01-07 10:29:12
  • 创建者: cursor

🔧 工作流配置

数据源配置

  • 类型: PostgreSQL
  • 主机: 192.168.3.143
  • 端口: 5432
  • 数据库: dataops
  • 用户名: postgres
  • 源表: test_product_inventory

目标表配置

  • Schema: dags
  • 表名: test_product_inventory
  • 更新模式: Append (追加模式)

目标表结构

CREATE TABLE test_product_inventory (
    id serial COMMENT '编号',
    sku varchar(50) COMMENT '商品货号',
    category varchar(100) COMMENT '类别',
    brand varchar(100) COMMENT '品牌',
    supplier varchar(200) COMMENT '供应商',
    warehouse varchar(100) COMMENT '仓库',
    current_stock integer COMMENT '当前库存',
    safety_stock integer COMMENT '安全库存',
    max_stock integer COMMENT '最大库存',
    unit_cost numeric(10, 2) COMMENT '单位成本',
    selling_price numeric(10, 2) COMMENT '销售价格',
    stock_status varchar(50) COMMENT '库存状态',
    last_inbound_date date COMMENT '最近入库日期',
    last_outbound_date date COMMENT '最近出库日期',
    inbound_quantity_30d integer COMMENT '30天入库数量',
    outbound_quantity_30d integer COMMENT '30天出库数量',
    turnover_rate numeric(5, 2) COMMENT '周转率',
    is_active boolean COMMENT '是否有效',
    created_at timestamp COMMENT '创建时间',
    updated_at timestamp COMMENT '更新时间',
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间'
);
COMMENT ON TABLE test_product_inventory IS '产品库存表';

📦 工作流组成

1. Manual Trigger (手动触发器)

  • 通过点击 n8n 界面中的按钮手动触发工作流

2. Execute Import Script (执行导入脚本)

  • 执行 Python 脚本 import_resource_data.py
  • 传递必要的参数进行数据导入

🚀 使用步骤

步骤 1: 导入工作流到 n8n

  1. 打开 n8n 界面
  2. 点击 "Import from File" 或 "Import from URL"
  3. 选择工作流文件: import_product_inventory_workflow.json
  4. 导入成功后,工作流会出现在你的工作流列表中

步骤 2: 配置数据库密码

⚠️ 重要: 在运行工作流之前,必须配置数据库密码!

  1. 打开导入的工作流
  2. 点击 "Execute Import Script" 节点
  3. 在 "Command" 字段中,找到 YOUR_PASSWORD_HERE 占位符
  4. 将其替换为实际的 PostgreSQL 数据库密码

示例:

python "g:\code-lab\DataOps-platform-new\datafactory\scripts\import_resource_data.py" --source-config '{"type":"postgresql","host":"192.168.3.143","port":5432,"database":"dataops","username":"postgres","password":"your_actual_password","table_name":"test_product_inventory"}' --target-table test_product_inventory --update-mode append

步骤 3: 激活并运行工作流

  1. 点击工作流右上角的 "Active" 开关激活工作流
  2. 点击 "Execute Workflow" 按钮手动触发工作流
  3. 查看执行结果

📊 执行结果

工作流执行后,你可以在 n8n 界面中查看:

  • ✅ 成功导入的数据行数
  • ❌ 失败的数据行数
  • 📝 详细的执行日志

🔍 验证数据导入

导入完成后,可以通过以下 SQL 查询验证数据:

-- 查看导入的数据总数
SELECT COUNT(*) FROM test_product_inventory;

-- 查看最近导入的数据
SELECT * FROM test_product_inventory 
ORDER BY create_time DESC 
LIMIT 10;

-- 按类别统计库存
SELECT category, COUNT(*) as count, SUM(current_stock) as total_stock
FROM test_product_inventory
GROUP BY category;

🛠️ 故障排查

问题 1: 无法连接到源数据库

解决方案:

  • 检查网络连接是否正常
  • 验证数据库主机地址、端口是否正确
  • 确认数据库用户名和密码是否正确
  • 检查防火墙设置

问题 2: Python 脚本执行失败

解决方案:

  • 确认 Python 环境已正确安装
  • 检查所需的 Python 包是否已安装 (psycopg2, sqlalchemy, pymysql)
  • 验证脚本路径是否正确

问题 3: 目标表不存在

解决方案:

  • 在目标数据库中创建 test_product_inventory
  • 使用上面提供的 DDL 语句创建表

📝 注意事项

  1. 更新模式: 当前配置为 append (追加模式),新数据会追加到目标表,不会删除现有数据
  2. 数据安全: 请妥善保管数据库密码,不要将包含密码的配置文件提交到版本控制系统
  3. 性能优化: 如果数据量很大,可以考虑添加 --limit 参数限制每次导入的数据量
  4. 定时执行: 如需定时执行,可以将 Manual Trigger 替换为 Schedule Trigger 或 Cron Trigger

🔄 扩展功能

添加数据过滤

如需只导入特定条件的数据,可以在源配置中添加 where_clause:

{
  "type": "postgresql",
  "host": "192.168.3.143",
  "port": 5432,
  "database": "dataops",
  "username": "postgres",
  "password": "your_password",
  "table_name": "test_product_inventory",
  "where_clause": "created_at >= '2026-01-01' AND is_active = true"
}

添加数据排序

如需按特定字段排序,可以添加 order_by:

{
  "type": "postgresql",
  "host": "192.168.3.143",
  "port": 5432,
  "database": "dataops",
  "username": "postgres",
  "password": "your_password",
  "table_name": "test_product_inventory",
  "order_by": "created_at DESC"
}

限制导入数量

如需限制每次导入的数据量,可以添加 --limit 参数:

--limit 1000

📞 支持

如有问题,请联系:

  • 创建者: cursor
  • 创建时间: 2026-01-07

最后更新: 2026-01-07