DataOps Platform 项目 n8n 工作流开发指南
本项目使用 n8n 作为工作流自动化引擎,用于定时执行数据处理任务、触发 Python 脚本、监控执行状态等场景。
| 组件 | 说明 |
|---|---|
| n8n 服务器 | https://n8n.citupro.com (Docker 部署) |
| 应用服务器 | company.citupro.com:982 (SSH 端口) |
| Python 客户端 | app/core/data_factory/n8n_client.py |
| 部署脚本 | scripts/deploy_n8n_workflow.py |
| 工作流存放 | app/core/data_flow/ |
# n8n API 配置
N8N_API_URL = "https://n8n.citupro.com"
N8N_API_KEY = "your-api-key" # 从 n8n Settings > API 获取
N8N_API_TIMEOUT = 30
┌─────────────────┐ ┌──────────────────┐ ┌────────────────┐
│ Schedule │────▶│ SSH Execute │────▶│ If (Check │
│ Trigger │ │ Command │ │ Result) │
└─────────────────┘ └──────────────────┘ └────────────────┘
│
┌─────────────────────────┼─────────────────────────┐
▼ ▼
┌───────────┐ ┌───────────┐
│ Success │ │ Error │
│ Response │ │ Response │
└───────────┘ └───────────┘
{
"parameters": {
"rule": {
"interval": [
{
"field": "cronExpression",
"expression": "0 3 * * *"
}
]
}
},
"id": "schedule-trigger",
"name": "每日凌晨3点执行",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1.2,
"position": [250, 300]
}
常用 Cron 表达式:
| 表达式 | 说明 |
|---|---|
0 3 * * * |
每日凌晨 3:00 |
0 */6 * * * |
每 6 小时 |
0 0 * * 1 |
每周一 00:00 |
0 0 1 * * |
每月 1 日 00:00 |
⚠️ 重要: 当 n8n 服务器与应用服务器分离时,必须使用 SSH 节点而非 Execute Command 节点。
{
"parameters": {
"resource": "command",
"operation": "execute",
"command": "source venv/bin/activate && python app/core/data_flow/script.py --args",
"cwd": "/opt/dataops-platform"
},
"id": "execute-python-script",
"name": "执行脚本",
"type": "n8n-nodes-base.ssh",
"typeVersion": 1,
"position": [500, 300],
"credentials": {
"sshPassword": {
"id": "credential-id",
"name": "SSH Password account"
}
}
}
SSH 节点 vs Execute Command 节点:
| 特性 | SSH 节点 | Execute Command 节点 |
|---|---|---|
| 执行位置 | 远程服务器 | n8n 服务器本地 |
| 需要凭证 | 是 (SSH) | 否 |
| 返回码字段 | $json.code |
$json.exitCode |
| 适用场景 | 跨服务器执行 | 本地执行 |
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict"
},
"conditions": [
{
"id": "condition-success",
"leftValue": "={{ $json.code }}",
"rightValue": 0,
"operator": {
"type": "number",
"operation": "equals"
}
}
],
"combinator": "and"
}
},
"id": "check-result",
"name": "检查执行结果",
"type": "n8n-nodes-base.if",
"typeVersion": 2,
"position": [750, 300]
}
⚠️ 关键区别: SSH 节点返回
$json.code,Execute Command 节点返回$json.exitCode
成功响应:
{
"parameters": {
"assignments": {
"assignments": [
{"id": "result-success", "name": "status", "value": "success", "type": "string"},
{"id": "result-message", "name": "message", "value": "执行成功", "type": "string"},
{"id": "result-output", "name": "output", "value": "={{ $json.stdout }}", "type": "string"},
{"id": "result-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string"}
]
}
},
"id": "success-output",
"name": "成功响应",
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [1000, 200]
}
失败响应:
{
"parameters": {
"assignments": {
"assignments": [
{"id": "error-status", "name": "status", "value": "error", "type": "string"},
{"id": "error-message", "name": "message", "value": "执行失败", "type": "string"},
{"id": "error-output", "name": "error", "value": "={{ $json.stderr }}", "type": "string"},
{"id": "error-code", "name": "exitCode", "value": "={{ $json.code }}", "type": "number"},
{"id": "error-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string"}
]
}
},
"id": "error-output",
"name": "失败响应",
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [1000, 400]
}
在 n8n 中创建 SSH 凭证:
company.citupro.com (公网域名或 IP)982 (SSH 端口,默认 22)ubuntu******测试连接: 配置完成后点击 "Test" 验证连接
# 推荐格式:使用 cwd 参数设置工作目录
source venv/bin/activate && python script.py --args
# 不推荐:在命令中 cd
cd /opt/dataops-platform && source venv/bin/activate && python script.py
config = get_config_by_env() database_uri = config.SQLALCHEMY_DATABASE_URI
2. **标准输出格式** (便于 n8n 解析):
```python
print("=" * 60)
print(f"处理结果: {'成功' if success else '失败'}")
print(f"消息: {message}")
print(f"处理数量: {count}")
print("=" * 60)
0: 成功1: 一般错误2: 参数错误其他: 特定错误n8n_workflow_<功能描述>.json
示例:
n8n_workflow_sales_data.jsonn8n_workflow_nursing_project_income.jsonapp/core/data_flow/
├── n8n_workflow_sales_data.json
├── n8n_workflow_nursing_project_income.json
├── sales_data_generator.py
└── nursing_project_income.py
{
"name": "工作流名称",
"nodes": [...],
"connections": {...},
"active": false,
"settings": {
"executionOrder": "v1",
"saveManualExecutions": true
},
"meta": {
"templateCredsSetupCompleted": true,
"description": "工作流描述"
}
}
| 节点类型 | ID 格式 | 示例 |
|---|---|---|
| 触发器 | schedule-trigger, webhook-trigger |
schedule-trigger |
| 执行脚本 | execute-<action> |
execute-python-script |
| 条件判断 | check-<condition> |
check-result |
| 成功响应 | success-output |
success-output |
| 失败响应 | error-output |
error-output |
{
"connections": {
"节点名称A": {
"main": [
[
{
"node": "节点名称B",
"type": "main",
"index": 0
}
]
]
},
"检查执行结果": {
"main": [
[{"node": "成功响应", "type": "main", "index": 0}],
[{"node": "失败响应", "type": "main", "index": 0}]
]
}
}
}
# 部署工作流
python scripts/deploy_n8n_workflow.py app/core/data_flow/n8n_workflow_sales_data.json
# 部署并激活
python scripts/deploy_n8n_workflow.py app/core/data_flow/n8n_workflow_sales_data.json --activate
import json
import requests
# 读取工作流 JSON
with open('workflow.json', 'r', encoding='utf-8') as f:
workflow = json.load(f)
# 准备更新数据 (只包含允许的字段)
update_data = {
'name': workflow['name'],
'nodes': workflow['nodes'],
'connections': workflow['connections'],
'settings': workflow['settings']
}
# 调用 API
response = requests.put(
f'https://n8n.citupro.com/api/v1/workflows/{workflow_id}',
headers={
'X-N8N-API-KEY': 'your-api-key',
'Content-Type': 'application/json'
},
json=update_data
)
创建工作流时不支持的字段:
active (只读)id (自动生成)createdAt, updatedAt (自动生成)tags (需要单独 API 处理)settings 中不支持的属性:
errorWorkflowcallerPolicyfrom app.core.data_factory.n8n_client import N8nClient
client = N8nClient()
# 获取工作流列表
workflows = client.list_workflows(active=True)
# 获取单个工作流
workflow = client.get_workflow('workflow-id')
# 创建工作流
result = client.create_workflow(workflow_data)
# 更新工作流
result = client.update_workflow('workflow-id', workflow_data)
# 激活/停用工作流
client.activate_workflow('workflow-id')
client.deactivate_workflow('workflow-id')
# 获取执行记录
executions = client.list_executions(workflow_id='workflow-id')
{
"credentials": {
"sshPassword": {
"id": "pYTwwuyC15caQe6y",
"name": "SSH Password account"
}
}
}
/home/credentials/pYTwwuyC15caQe6yfrom loguru import logger
# 配置日志
logger.add(
"logs/data_flow.log",
rotation="1 day",
retention="7 days",
level="INFO"
)
# 使用日志
logger.info("开始处理数据")
logger.error(f"处理失败: {error}")
可以在失败分支添加通知节点:
失败响应 → Slack/Email 通知
问题: Timed out while waiting for handshake
解决方案:
问题: can't cd to /opt/dataops-platform: No such file or directory
解决方案:
cwd 参数设置工作目录问题: 执行成功但走了失败分支
解决方案:
$json.code,Execute Command 使用 $json.exitCode问题: request/body/settings must NOT have additional properties
解决方案:
errorWorkflow、callerPolicy 等不支持的属性executionOrder、saveManualExecutions 等基础属性问题: role "user" does not exist 或连接被拒绝
解决方案:
确保脚本正确加载环境配置:
from app.config.config import get_config_by_env
config = get_config_by_env()
{
"name": "{{工作流名称}}",
"nodes": [
{
"parameters": {
"rule": {
"interval": [
{
"field": "cronExpression",
"expression": "{{cron表达式}}"
}
]
}
},
"id": "schedule-trigger",
"name": "{{触发器名称}}",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1.2,
"position": [250, 300]
},
{
"parameters": {
"resource": "command",
"operation": "execute",
"command": "source venv/bin/activate && python {{脚本路径}} {{参数}}",
"cwd": "/opt/dataops-platform"
},
"id": "execute-python-script",
"name": "{{执行节点名称}}",
"type": "n8n-nodes-base.ssh",
"typeVersion": 1,
"position": [500, 300],
"credentials": {
"sshPassword": {
"id": "{{凭证ID}}",
"name": "SSH Password account"
}
}
},
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict"
},
"conditions": [
{
"id": "condition-success",
"leftValue": "={{ $json.code }}",
"rightValue": 0,
"operator": {
"type": "number",
"operation": "equals"
}
}
],
"combinator": "and"
}
},
"id": "check-result",
"name": "检查执行结果",
"type": "n8n-nodes-base.if",
"typeVersion": 2,
"position": [750, 300]
},
{
"parameters": {
"assignments": {
"assignments": [
{"id": "result-success", "name": "status", "value": "success", "type": "string"},
{"id": "result-message", "name": "message", "value": "{{成功消息}}", "type": "string"},
{"id": "result-output", "name": "output", "value": "={{ $json.stdout }}", "type": "string"},
{"id": "result-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string"}
]
}
},
"id": "success-output",
"name": "成功响应",
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [1000, 200]
},
{
"parameters": {
"assignments": {
"assignments": [
{"id": "error-status", "name": "status", "value": "error", "type": "string"},
{"id": "error-message", "name": "message", "value": "{{失败消息}}", "type": "string"},
{"id": "error-output", "name": "error", "value": "={{ $json.stderr }}", "type": "string"},
{"id": "error-code", "name": "exitCode", "value": "={{ $json.code }}", "type": "number"},
{"id": "error-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string"}
]
}
},
"id": "error-output",
"name": "失败响应",
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [1000, 400]
}
],
"connections": {
"{{触发器名称}}": {
"main": [[{"node": "{{执行节点名称}}", "type": "main", "index": 0}]]
},
"{{执行节点名称}}": {
"main": [[{"node": "检查执行结果", "type": "main", "index": 0}]]
},
"检查执行结果": {
"main": [
[{"node": "成功响应", "type": "main", "index": 0}],
[{"node": "失败响应", "type": "main", "index": 0}]
]
}
},
"active": false,
"settings": {
"executionOrder": "v1",
"saveManualExecutions": true
},
"meta": {
"templateCredsSetupCompleted": true,
"description": "{{工作流描述}}"
}
}
| 日期 | 版本 | 更新内容 |
|---|---|---|
| 2025-12-31 | 1.0.0 | 初始版本,包含 SSH 远程执行、条件判断、API 部署等核心内容 |