# n8n 工作流开发规范 > DataOps Platform 项目 n8n 工作流开发指南 ## 目录 - [概述](#概述) - [环境配置](#环境配置) - [工作流架构设计](#工作流架构设计) - [节点类型与配置](#节点类型与配置) - [SSH 远程执行最佳实践](#ssh-远程执行最佳实践) - [条件判断与分支逻辑](#条件判断与分支逻辑) - [工作流 JSON 规范](#工作流-json-规范) - [API 部署与管理](#api-部署与管理) - [凭证管理](#凭证管理) - [错误处理与日志](#错误处理与日志) - [常见问题与解决方案](#常见问题与解决方案) - [工作流模板](#工作流模板) --- ## 概述 本项目使用 n8n 作为工作流自动化引擎,用于定时执行数据处理任务、触发 Python 脚本、监控执行状态等场景。 ### 技术栈 | 组件 | 说明 | |------|------| | n8n 服务器 | `https://n8n.citupro.com` (Docker 部署) | | 应用服务器 | `company.citupro.com:982` (SSH 端口) | | Python 客户端 | `app/core/data_factory/n8n_client.py` | | 部署脚本 | `scripts/deploy_n8n_workflow.py` | | 工作流存放 | `app/core/data_flow/` | --- ## 环境配置 ### Flask 配置 (config.py) ```python # n8n API 配置 N8N_API_URL = "https://n8n.citupro.com" N8N_API_KEY = "your-api-key" # 从 n8n Settings > API 获取 N8N_API_TIMEOUT = 30 ``` ### 获取 n8n API Key 1. 登录 n8n 管理界面 2. 点击右上角用户头像 → Settings 3. 进入 API 页面 4. 点击 "Create an API key" 5. 保存生成的 API Key 到配置文件 --- ## 工作流架构设计 ### 标准数据处理工作流结构 ``` ┌─────────────────┐ ┌──────────────────┐ ┌────────────────┐ │ Schedule │────▶│ SSH Execute │────▶│ If (Check │ │ Trigger │ │ Command │ │ Result) │ └─────────────────┘ └──────────────────┘ └────────────────┘ │ ┌─────────────────────────┼─────────────────────────┐ ▼ ▼ ┌───────────┐ ┌───────────┐ │ Success │ │ Error │ │ Response │ │ Response │ └───────────┘ └───────────┘ ``` ### 设计原则 1. **单一职责**: 每个工作流只处理一个数据任务 2. **可观测性**: 必须包含成功/失败响应节点 3. **幂等性**: 脚本应支持重复执行 4. **错误隔离**: 失败不应影响其他工作流 --- ## 节点类型与配置 ### 1. Schedule Trigger (定时触发器) ```json { "parameters": { "rule": { "interval": [ { "field": "cronExpression", "expression": "0 3 * * *" } ] } }, "id": "schedule-trigger", "name": "每日凌晨3点执行", "type": "n8n-nodes-base.scheduleTrigger", "typeVersion": 1.2, "position": [250, 300] } ``` **常用 Cron 表达式**: | 表达式 | 说明 | |--------|------| | `0 3 * * *` | 每日凌晨 3:00 | | `0 */6 * * *` | 每 6 小时 | | `0 0 * * 1` | 每周一 00:00 | | `0 0 1 * *` | 每月 1 日 00:00 | ### 2. SSH 节点 (远程命令执行) > ⚠️ **重要**: 当 n8n 服务器与应用服务器分离时,必须使用 SSH 节点而非 Execute Command 节点。 ```json { "parameters": { "resource": "command", "operation": "execute", "command": "source venv/bin/activate && python app/core/data_flow/script.py --args", "cwd": "/opt/dataops-platform" }, "id": "execute-python-script", "name": "执行脚本", "type": "n8n-nodes-base.ssh", "typeVersion": 1, "position": [500, 300], "credentials": { "sshPassword": { "id": "credential-id", "name": "SSH Password account" } } } ``` **SSH 节点 vs Execute Command 节点**: | 特性 | SSH 节点 | Execute Command 节点 | |------|----------|---------------------| | 执行位置 | 远程服务器 | n8n 服务器本地 | | 需要凭证 | 是 (SSH) | 否 | | 返回码字段 | `$json.code` | `$json.exitCode` | | 适用场景 | 跨服务器执行 | 本地执行 | ### 3. If 条件节点 (检查执行结果) ```json { "parameters": { "conditions": { "options": { "caseSensitive": true, "leftValue": "", "typeValidation": "strict" }, "conditions": [ { "id": "condition-success", "leftValue": "={{ $json.code }}", "rightValue": 0, "operator": { "type": "number", "operation": "equals" } } ], "combinator": "and" } }, "id": "check-result", "name": "检查执行结果", "type": "n8n-nodes-base.if", "typeVersion": 2, "position": [750, 300] } ``` > ⚠️ **关键区别**: SSH 节点返回 `$json.code`,Execute Command 节点返回 `$json.exitCode` ### 4. Set 节点 (响应构建) **成功响应**: ```json { "parameters": { "assignments": { "assignments": [ {"id": "result-success", "name": "status", "value": "success", "type": "string"}, {"id": "result-message", "name": "message", "value": "执行成功", "type": "string"}, {"id": "result-output", "name": "output", "value": "={{ $json.stdout }}", "type": "string"}, {"id": "result-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string"} ] } }, "id": "success-output", "name": "成功响应", "type": "n8n-nodes-base.set", "typeVersion": 3.4, "position": [1000, 200] } ``` **失败响应**: ```json { "parameters": { "assignments": { "assignments": [ {"id": "error-status", "name": "status", "value": "error", "type": "string"}, {"id": "error-message", "name": "message", "value": "执行失败", "type": "string"}, {"id": "error-output", "name": "error", "value": "={{ $json.stderr }}", "type": "string"}, {"id": "error-code", "name": "exitCode", "value": "={{ $json.code }}", "type": "number"}, {"id": "error-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string"} ] } }, "id": "error-output", "name": "失败响应", "type": "n8n-nodes-base.set", "typeVersion": 3.4, "position": [1000, 400] } ``` --- ## SSH 远程执行最佳实践 ### 凭证配置 1. **在 n8n 中创建 SSH 凭证**: - 进入 Settings → Credentials - 点击 "Add Credential" - 选择 "SSH Password" - 配置: - **Host**: `company.citupro.com` (公网域名或 IP) - **Port**: `982` (SSH 端口,默认 22) - **Username**: `ubuntu` - **Password**: `******` 2. **测试连接**: 配置完成后点击 "Test" 验证连接 ### 命令格式 ```bash # 推荐格式:使用 cwd 参数设置工作目录 source venv/bin/activate && python script.py --args # 不推荐:在命令中 cd cd /opt/dataops-platform && source venv/bin/activate && python script.py ``` ### Python 脚本要求 1. **环境配置加载**: ```python from app.config.config import get_config_by_env config = get_config_by_env() database_uri = config.SQLALCHEMY_DATABASE_URI ``` 2. **标准输出格式** (便于 n8n 解析): ```python print("=" * 60) print(f"处理结果: {'成功' if success else '失败'}") print(f"消息: {message}") print(f"处理数量: {count}") print("=" * 60) ``` 3. **退出码规范**: - `0`: 成功 - `1`: 一般错误 - `2`: 参数错误 - `其他`: 特定错误 --- ## 工作流 JSON 规范 ### 文件命名 ``` n8n_workflow_<功能描述>.json ``` 示例: - `n8n_workflow_sales_data.json` - `n8n_workflow_nursing_project_income.json` ### 存放位置 ``` app/core/data_flow/ ├── n8n_workflow_sales_data.json ├── n8n_workflow_nursing_project_income.json ├── sales_data_generator.py └── nursing_project_income.py ``` ### JSON 结构 ```json { "name": "工作流名称", "nodes": [...], "connections": {...}, "active": false, "settings": { "executionOrder": "v1", "saveManualExecutions": true }, "meta": { "templateCredsSetupCompleted": true, "description": "工作流描述" } } ``` ### 节点 ID 命名规范 | 节点类型 | ID 格式 | 示例 | |----------|---------|------| | 触发器 | `schedule-trigger`, `webhook-trigger` | `schedule-trigger` | | 执行脚本 | `execute-` | `execute-python-script` | | 条件判断 | `check-` | `check-result` | | 成功响应 | `success-output` | `success-output` | | 失败响应 | `error-output` | `error-output` | ### 连接配置 ```json { "connections": { "节点名称A": { "main": [ [ { "node": "节点名称B", "type": "main", "index": 0 } ] ] }, "检查执行结果": { "main": [ [{"node": "成功响应", "type": "main", "index": 0}], [{"node": "失败响应", "type": "main", "index": 0}] ] } } } ``` --- ## API 部署与管理 ### 使用部署脚本 ```bash # 部署工作流 python scripts/deploy_n8n_workflow.py app/core/data_flow/n8n_workflow_sales_data.json # 部署并激活 python scripts/deploy_n8n_workflow.py app/core/data_flow/n8n_workflow_sales_data.json --activate ``` ### 通过 API 更新工作流 ```python import json import requests # 读取工作流 JSON with open('workflow.json', 'r', encoding='utf-8') as f: workflow = json.load(f) # 准备更新数据 (只包含允许的字段) update_data = { 'name': workflow['name'], 'nodes': workflow['nodes'], 'connections': workflow['connections'], 'settings': workflow['settings'] } # 调用 API response = requests.put( f'https://n8n.citupro.com/api/v1/workflows/{workflow_id}', headers={ 'X-N8N-API-KEY': 'your-api-key', 'Content-Type': 'application/json' }, json=update_data ) ``` ### API 注意事项 1. **创建工作流时不支持的字段**: - `active` (只读) - `id` (自动生成) - `createdAt`, `updatedAt` (自动生成) - `tags` (需要单独 API 处理) 2. **settings 中不支持的属性**: - `errorWorkflow` - `callerPolicy` ### N8nClient 使用 ```python from app.core.data_factory.n8n_client import N8nClient client = N8nClient() # 获取工作流列表 workflows = client.list_workflows(active=True) # 获取单个工作流 workflow = client.get_workflow('workflow-id') # 创建工作流 result = client.create_workflow(workflow_data) # 更新工作流 result = client.update_workflow('workflow-id', workflow_data) # 激活/停用工作流 client.activate_workflow('workflow-id') client.deactivate_workflow('workflow-id') # 获取执行记录 executions = client.list_executions(workflow_id='workflow-id') ``` --- ## 凭证管理 ### SSH 凭证结构 ```json { "credentials": { "sshPassword": { "id": "pYTwwuyC15caQe6y", "name": "SSH Password account" } } } ``` ### 获取凭证 ID 1. 登录 n8n → Settings → Credentials 2. 点击目标凭证 3. 从 URL 获取 ID: `/home/credentials/pYTwwuyC15caQe6y` ### 凭证使用规范 - 不要在代码中硬编码凭证密码 - 使用凭证 ID 引用,不直接存储密码 - 定期轮换 SSH 密码和 API Key --- ## 错误处理与日志 ### Python 脚本日志规范 ```python from loguru import logger # 配置日志 logger.add( "logs/data_flow.log", rotation="1 day", retention="7 days", level="INFO" ) # 使用日志 logger.info("开始处理数据") logger.error(f"处理失败: {error}") ``` ### 工作流错误通知 可以在失败分支添加通知节点: ``` 失败响应 → Slack/Email 通知 ``` --- ## 常见问题与解决方案 ### 1. SSH 连接超时 **问题**: `Timed out while waiting for handshake` **解决方案**: - 检查网络连接和防火墙规则 - 确认 SSH 端口正确 (可能不是默认的 22) - 如果 n8n 在 Docker 中,检查容器网络配置 ### 2. 命令执行路径错误 **问题**: `can't cd to /opt/dataops-platform: No such file or directory` **解决方案**: - 使用 SSH 节点的 `cwd` 参数设置工作目录 - 确保路径在目标服务器上存在 ### 3. 条件判断不生效 **问题**: 执行成功但走了失败分支 **解决方案**: - SSH 节点使用 `$json.code`,Execute Command 使用 `$json.exitCode` - 检查返回值类型匹配 (number vs string) ### 4. API 部署失败 400 错误 **问题**: `request/body/settings must NOT have additional properties` **解决方案**: - 移除 settings 中的 `errorWorkflow`、`callerPolicy` 等不支持的属性 - 只保留 `executionOrder`、`saveManualExecutions` 等基础属性 ### 5. 数据库连接失败 **问题**: `role "user" does not exist` 或连接被拒绝 **解决方案**: - 确保脚本正确加载环境配置: ```python from app.config.config import get_config_by_env config = get_config_by_env() ``` --- ## 工作流模板 ### 定时数据处理工作流模板 ```json { "name": "{{工作流名称}}", "nodes": [ { "parameters": { "rule": { "interval": [ { "field": "cronExpression", "expression": "{{cron表达式}}" } ] } }, "id": "schedule-trigger", "name": "{{触发器名称}}", "type": "n8n-nodes-base.scheduleTrigger", "typeVersion": 1.2, "position": [250, 300] }, { "parameters": { "resource": "command", "operation": "execute", "command": "source venv/bin/activate && python {{脚本路径}} {{参数}}", "cwd": "/opt/dataops-platform" }, "id": "execute-python-script", "name": "{{执行节点名称}}", "type": "n8n-nodes-base.ssh", "typeVersion": 1, "position": [500, 300], "credentials": { "sshPassword": { "id": "{{凭证ID}}", "name": "SSH Password account" } } }, { "parameters": { "conditions": { "options": { "caseSensitive": true, "leftValue": "", "typeValidation": "strict" }, "conditions": [ { "id": "condition-success", "leftValue": "={{ $json.code }}", "rightValue": 0, "operator": { "type": "number", "operation": "equals" } } ], "combinator": "and" } }, "id": "check-result", "name": "检查执行结果", "type": "n8n-nodes-base.if", "typeVersion": 2, "position": [750, 300] }, { "parameters": { "assignments": { "assignments": [ {"id": "result-success", "name": "status", "value": "success", "type": "string"}, {"id": "result-message", "name": "message", "value": "{{成功消息}}", "type": "string"}, {"id": "result-output", "name": "output", "value": "={{ $json.stdout }}", "type": "string"}, {"id": "result-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string"} ] } }, "id": "success-output", "name": "成功响应", "type": "n8n-nodes-base.set", "typeVersion": 3.4, "position": [1000, 200] }, { "parameters": { "assignments": { "assignments": [ {"id": "error-status", "name": "status", "value": "error", "type": "string"}, {"id": "error-message", "name": "message", "value": "{{失败消息}}", "type": "string"}, {"id": "error-output", "name": "error", "value": "={{ $json.stderr }}", "type": "string"}, {"id": "error-code", "name": "exitCode", "value": "={{ $json.code }}", "type": "number"}, {"id": "error-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string"} ] } }, "id": "error-output", "name": "失败响应", "type": "n8n-nodes-base.set", "typeVersion": 3.4, "position": [1000, 400] } ], "connections": { "{{触发器名称}}": { "main": [[{"node": "{{执行节点名称}}", "type": "main", "index": 0}]] }, "{{执行节点名称}}": { "main": [[{"node": "检查执行结果", "type": "main", "index": 0}]] }, "检查执行结果": { "main": [ [{"node": "成功响应", "type": "main", "index": 0}], [{"node": "失败响应", "type": "main", "index": 0}] ] } }, "active": false, "settings": { "executionOrder": "v1", "saveManualExecutions": true }, "meta": { "templateCredsSetupCompleted": true, "description": "{{工作流描述}}" } } ``` --- ## 参考资料 - [n8n 官方文档](https://docs.n8n.io/) - [n8n REST API 文档](https://docs.n8n.io/api/api-reference/) - [项目 n8n 客户端](../app/core/data_factory/n8n_client.py) - [部署脚本](../scripts/deploy_n8n_workflow.py) --- ## 更新日志 | 日期 | 版本 | 更新内容 | |------|------|----------| | 2025-12-31 | 1.0.0 | 初始版本,包含 SSH 远程执行、条件判断、API 部署等核心内容 |