n8n_workflow_development_guide.md 17 KB

n8n 工作流开发规范

DataOps Platform 项目 n8n 工作流开发指南

目录


概述

本项目使用 n8n 作为工作流自动化引擎,用于定时执行数据处理任务、触发 Python 脚本、监控执行状态等场景。

技术栈

组件 说明
n8n 服务器 https://n8n.citupro.com (Docker 部署)
应用服务器 company.citupro.com:982 (SSH 端口)
Python 客户端 app/core/data_factory/n8n_client.py
部署脚本 scripts/deploy_n8n_workflow.py
工作流存放 app/core/data_flow/

环境配置

Flask 配置 (config.py)

# n8n API 配置
N8N_API_URL = "https://n8n.citupro.com"
N8N_API_KEY = "your-api-key"  # 从 n8n Settings > API 获取
N8N_API_TIMEOUT = 30

获取 n8n API Key

  1. 登录 n8n 管理界面
  2. 点击右上角用户头像 → Settings
  3. 进入 API 页面
  4. 点击 "Create an API key"
  5. 保存生成的 API Key 到配置文件

工作流架构设计

标准数据处理工作流结构

┌─────────────────┐     ┌──────────────────┐     ┌────────────────┐
│  Schedule       │────▶│  SSH Execute     │────▶│  If (Check     │
│  Trigger        │     │  Command         │     │  Result)       │
└─────────────────┘     └──────────────────┘     └────────────────┘
                                                        │
                              ┌─────────────────────────┼─────────────────────────┐
                              ▼                                                   ▼
                        ┌───────────┐                                     ┌───────────┐
                        │  Success  │                                     │  Error    │
                        │  Response │                                     │  Response │
                        └───────────┘                                     └───────────┘

设计原则

  1. 单一职责: 每个工作流只处理一个数据任务
  2. 可观测性: 必须包含成功/失败响应节点
  3. 幂等性: 脚本应支持重复执行
  4. 错误隔离: 失败不应影响其他工作流

节点类型与配置

1. Schedule Trigger (定时触发器)

{
  "parameters": {
    "rule": {
      "interval": [
        {
          "field": "cronExpression",
          "expression": "0 3 * * *"
        }
      ]
    }
  },
  "id": "schedule-trigger",
  "name": "每日凌晨3点执行",
  "type": "n8n-nodes-base.scheduleTrigger",
  "typeVersion": 1.2,
  "position": [250, 300]
}

常用 Cron 表达式:

表达式 说明
0 3 * * * 每日凌晨 3:00
0 */6 * * * 每 6 小时
0 0 * * 1 每周一 00:00
0 0 1 * * 每月 1 日 00:00

2. SSH 节点 (远程命令执行)

⚠️ 重要: 当 n8n 服务器与应用服务器分离时,必须使用 SSH 节点而非 Execute Command 节点。

{
  "parameters": {
    "resource": "command",
    "operation": "execute",
    "command": "source venv/bin/activate && python app/core/data_flow/script.py --args",
    "cwd": "/opt/dataops-platform"
  },
  "id": "execute-python-script",
  "name": "执行脚本",
  "type": "n8n-nodes-base.ssh",
  "typeVersion": 1,
  "position": [500, 300],
  "credentials": {
    "sshPassword": {
      "id": "credential-id",
      "name": "SSH Password account"
    }
  }
}

SSH 节点 vs Execute Command 节点:

特性 SSH 节点 Execute Command 节点
执行位置 远程服务器 n8n 服务器本地
需要凭证 是 (SSH)
返回码字段 $json.code $json.exitCode
适用场景 跨服务器执行 本地执行

3. If 条件节点 (检查执行结果)

{
  "parameters": {
    "conditions": {
      "options": {
        "caseSensitive": true,
        "leftValue": "",
        "typeValidation": "strict"
      },
      "conditions": [
        {
          "id": "condition-success",
          "leftValue": "={{ $json.code }}",
          "rightValue": 0,
          "operator": {
            "type": "number",
            "operation": "equals"
          }
        }
      ],
      "combinator": "and"
    }
  },
  "id": "check-result",
  "name": "检查执行结果",
  "type": "n8n-nodes-base.if",
  "typeVersion": 2,
  "position": [750, 300]
}

⚠️ 关键区别: SSH 节点返回 $json.code,Execute Command 节点返回 $json.exitCode

4. Set 节点 (响应构建)

成功响应:

{
  "parameters": {
    "assignments": {
      "assignments": [
        {"id": "result-success", "name": "status", "value": "success", "type": "string"},
        {"id": "result-message", "name": "message", "value": "执行成功", "type": "string"},
        {"id": "result-output", "name": "output", "value": "={{ $json.stdout }}", "type": "string"},
        {"id": "result-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string"}
      ]
    }
  },
  "id": "success-output",
  "name": "成功响应",
  "type": "n8n-nodes-base.set",
  "typeVersion": 3.4,
  "position": [1000, 200]
}

失败响应:

{
  "parameters": {
    "assignments": {
      "assignments": [
        {"id": "error-status", "name": "status", "value": "error", "type": "string"},
        {"id": "error-message", "name": "message", "value": "执行失败", "type": "string"},
        {"id": "error-output", "name": "error", "value": "={{ $json.stderr }}", "type": "string"},
        {"id": "error-code", "name": "exitCode", "value": "={{ $json.code }}", "type": "number"},
        {"id": "error-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string"}
      ]
    }
  },
  "id": "error-output",
  "name": "失败响应",
  "type": "n8n-nodes-base.set",
  "typeVersion": 3.4,
  "position": [1000, 400]
}

SSH 远程执行最佳实践

凭证配置

  1. 在 n8n 中创建 SSH 凭证:

    • 进入 Settings → Credentials
    • 点击 "Add Credential"
    • 选择 "SSH Password"
    • 配置:
      • Host: company.citupro.com (公网域名或 IP)
      • Port: 982 (SSH 端口,默认 22)
      • Username: ubuntu
      • Password: ******
  2. 测试连接: 配置完成后点击 "Test" 验证连接

命令格式

# 推荐格式:使用 cwd 参数设置工作目录
source venv/bin/activate && python script.py --args

# 不推荐:在命令中 cd
cd /opt/dataops-platform && source venv/bin/activate && python script.py

Python 脚本要求

  1. 环境配置加载: ```python from app.config.config import get_config_by_env

config = get_config_by_env() database_uri = config.SQLALCHEMY_DATABASE_URI


2. **标准输出格式** (便于 n8n 解析):
```python
print("=" * 60)
print(f"处理结果: {'成功' if success else '失败'}")
print(f"消息: {message}")
print(f"处理数量: {count}")
print("=" * 60)
  1. 退出码规范:
    • 0: 成功
    • 1: 一般错误
    • 2: 参数错误
    • 其他: 特定错误

工作流 JSON 规范

文件命名

n8n_workflow_<功能描述>.json

示例:

  • n8n_workflow_sales_data.json
  • n8n_workflow_nursing_project_income.json

存放位置

app/core/data_flow/
├── n8n_workflow_sales_data.json
├── n8n_workflow_nursing_project_income.json
├── sales_data_generator.py
└── nursing_project_income.py

JSON 结构

{
  "name": "工作流名称",
  "nodes": [...],
  "connections": {...},
  "active": false,
  "settings": {
    "executionOrder": "v1",
    "saveManualExecutions": true
  },
  "meta": {
    "templateCredsSetupCompleted": true,
    "description": "工作流描述"
  }
}

节点 ID 命名规范

节点类型 ID 格式 示例
触发器 schedule-trigger, webhook-trigger schedule-trigger
执行脚本 execute-<action> execute-python-script
条件判断 check-<condition> check-result
成功响应 success-output success-output
失败响应 error-output error-output

连接配置

{
  "connections": {
    "节点名称A": {
      "main": [
        [
          {
            "node": "节点名称B",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "检查执行结果": {
      "main": [
        [{"node": "成功响应", "type": "main", "index": 0}],
        [{"node": "失败响应", "type": "main", "index": 0}]
      ]
    }
  }
}

API 部署与管理

使用部署脚本

# 部署工作流
python scripts/deploy_n8n_workflow.py app/core/data_flow/n8n_workflow_sales_data.json

# 部署并激活
python scripts/deploy_n8n_workflow.py app/core/data_flow/n8n_workflow_sales_data.json --activate

通过 API 更新工作流

import json
import requests

# 读取工作流 JSON
with open('workflow.json', 'r', encoding='utf-8') as f:
    workflow = json.load(f)

# 准备更新数据 (只包含允许的字段)
update_data = {
    'name': workflow['name'],
    'nodes': workflow['nodes'],
    'connections': workflow['connections'],
    'settings': workflow['settings']
}

# 调用 API
response = requests.put(
    f'https://n8n.citupro.com/api/v1/workflows/{workflow_id}',
    headers={
        'X-N8N-API-KEY': 'your-api-key',
        'Content-Type': 'application/json'
    },
    json=update_data
)

API 注意事项

  1. 创建工作流时不支持的字段:

    • active (只读)
    • id (自动生成)
    • createdAt, updatedAt (自动生成)
    • tags (需要单独 API 处理)
  2. settings 中不支持的属性:

    • errorWorkflow
    • callerPolicy

N8nClient 使用

from app.core.data_factory.n8n_client import N8nClient

client = N8nClient()

# 获取工作流列表
workflows = client.list_workflows(active=True)

# 获取单个工作流
workflow = client.get_workflow('workflow-id')

# 创建工作流
result = client.create_workflow(workflow_data)

# 更新工作流
result = client.update_workflow('workflow-id', workflow_data)

# 激活/停用工作流
client.activate_workflow('workflow-id')
client.deactivate_workflow('workflow-id')

# 获取执行记录
executions = client.list_executions(workflow_id='workflow-id')

凭证管理

SSH 凭证结构

{
  "credentials": {
    "sshPassword": {
      "id": "pYTwwuyC15caQe6y",
      "name": "SSH Password account"
    }
  }
}

获取凭证 ID

  1. 登录 n8n → Settings → Credentials
  2. 点击目标凭证
  3. 从 URL 获取 ID: /home/credentials/pYTwwuyC15caQe6y

凭证使用规范

  • 不要在代码中硬编码凭证密码
  • 使用凭证 ID 引用,不直接存储密码
  • 定期轮换 SSH 密码和 API Key

错误处理与日志

Python 脚本日志规范

from loguru import logger

# 配置日志
logger.add(
    "logs/data_flow.log",
    rotation="1 day",
    retention="7 days",
    level="INFO"
)

# 使用日志
logger.info("开始处理数据")
logger.error(f"处理失败: {error}")

工作流错误通知

可以在失败分支添加通知节点:

失败响应 → Slack/Email 通知

常见问题与解决方案

1. SSH 连接超时

问题: Timed out while waiting for handshake

解决方案:

  • 检查网络连接和防火墙规则
  • 确认 SSH 端口正确 (可能不是默认的 22)
  • 如果 n8n 在 Docker 中,检查容器网络配置

2. 命令执行路径错误

问题: can't cd to /opt/dataops-platform: No such file or directory

解决方案:

  • 使用 SSH 节点的 cwd 参数设置工作目录
  • 确保路径在目标服务器上存在

3. 条件判断不生效

问题: 执行成功但走了失败分支

解决方案:

  • SSH 节点使用 $json.code,Execute Command 使用 $json.exitCode
  • 检查返回值类型匹配 (number vs string)

4. API 部署失败 400 错误

问题: request/body/settings must NOT have additional properties

解决方案:

  • 移除 settings 中的 errorWorkflowcallerPolicy 等不支持的属性
  • 只保留 executionOrdersaveManualExecutions 等基础属性

5. 数据库连接失败

问题: role "user" does not exist 或连接被拒绝

解决方案:

  • 确保脚本正确加载环境配置:

    from app.config.config import get_config_by_env
    config = get_config_by_env()
    

    工作流模板

    定时数据处理工作流模板

    {
    "name": "{{工作流名称}}",
    "nodes": [
    {
      "parameters": {
        "rule": {
          "interval": [
            {
              "field": "cronExpression",
              "expression": "{{cron表达式}}"
            }
          ]
        }
      },
      "id": "schedule-trigger",
      "name": "{{触发器名称}}",
      "type": "n8n-nodes-base.scheduleTrigger",
      "typeVersion": 1.2,
      "position": [250, 300]
    },
    {
      "parameters": {
        "resource": "command",
        "operation": "execute",
        "command": "source venv/bin/activate && python {{脚本路径}} {{参数}}",
        "cwd": "/opt/dataops-platform"
      },
      "id": "execute-python-script",
      "name": "{{执行节点名称}}",
      "type": "n8n-nodes-base.ssh",
      "typeVersion": 1,
      "position": [500, 300],
      "credentials": {
        "sshPassword": {
          "id": "{{凭证ID}}",
          "name": "SSH Password account"
        }
      }
    },
    {
      "parameters": {
        "conditions": {
          "options": {
            "caseSensitive": true,
            "leftValue": "",
            "typeValidation": "strict"
          },
          "conditions": [
            {
              "id": "condition-success",
              "leftValue": "={{ $json.code }}",
              "rightValue": 0,
              "operator": {
                "type": "number",
                "operation": "equals"
              }
            }
          ],
          "combinator": "and"
        }
      },
      "id": "check-result",
      "name": "检查执行结果",
      "type": "n8n-nodes-base.if",
      "typeVersion": 2,
      "position": [750, 300]
    },
    {
      "parameters": {
        "assignments": {
          "assignments": [
            {"id": "result-success", "name": "status", "value": "success", "type": "string"},
            {"id": "result-message", "name": "message", "value": "{{成功消息}}", "type": "string"},
            {"id": "result-output", "name": "output", "value": "={{ $json.stdout }}", "type": "string"},
            {"id": "result-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string"}
          ]
        }
      },
      "id": "success-output",
      "name": "成功响应",
      "type": "n8n-nodes-base.set",
      "typeVersion": 3.4,
      "position": [1000, 200]
    },
    {
      "parameters": {
        "assignments": {
          "assignments": [
            {"id": "error-status", "name": "status", "value": "error", "type": "string"},
            {"id": "error-message", "name": "message", "value": "{{失败消息}}", "type": "string"},
            {"id": "error-output", "name": "error", "value": "={{ $json.stderr }}", "type": "string"},
            {"id": "error-code", "name": "exitCode", "value": "={{ $json.code }}", "type": "number"},
            {"id": "error-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string"}
          ]
        }
      },
      "id": "error-output",
      "name": "失败响应",
      "type": "n8n-nodes-base.set",
      "typeVersion": 3.4,
      "position": [1000, 400]
    }
    ],
    "connections": {
    "{{触发器名称}}": {
      "main": [[{"node": "{{执行节点名称}}", "type": "main", "index": 0}]]
    },
    "{{执行节点名称}}": {
      "main": [[{"node": "检查执行结果", "type": "main", "index": 0}]]
    },
    "检查执行结果": {
      "main": [
        [{"node": "成功响应", "type": "main", "index": 0}],
        [{"node": "失败响应", "type": "main", "index": 0}]
      ]
    }
    },
    "active": false,
    "settings": {
    "executionOrder": "v1",
    "saveManualExecutions": true
    },
    "meta": {
    "templateCredsSetupCompleted": true,
    "description": "{{工作流描述}}"
    }
    }
    

参考资料


更新日志

日期 版本 更新内容
2025-12-31 1.0.0 初始版本,包含 SSH 远程执行、条件判断、API 部署等核心内容