""" 创建 n8n Cohere API Key 凭证的脚本 注意: n8n 的凭证管理 API 可能有限制,如果 API 不支持,请使用 Web UI 手动配置。 参考文档: docs/n8n_cohere_credential_setup.md """ import os import sys import json from typing import Optional, Dict, Any import requests from loguru import logger # 添加项目根目录到路径 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from app.config.config import BaseConfig def create_cohere_credential( api_url: Optional[str] = None, api_key: Optional[str] = None, cohere_api_key: str = "4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C", credential_name: str = "Cohere API Key", ) -> Dict[str, Any]: """ 尝试通过 n8n API 创建 Cohere 凭证 Args: api_url: n8n API 地址 api_key: n8n API Key cohere_api_key: Cohere API Key 值 credential_name: 凭证名称 Returns: 创建结果 Note: n8n 的凭证管理 API 可能不支持直接创建凭证(出于安全考虑)。 如果 API 调用失败,请使用 Web UI 手动配置。 """ # 获取配置 if api_url is None or api_key is None: config = BaseConfig() api_url = api_url or config.N8N_API_URL api_key = api_key or config.N8N_API_KEY base_url = api_url.rstrip("/") headers = { "X-N8N-API-KEY": api_key, "Content-Type": "application/json", "Accept": "application/json", } # n8n 凭证 API 端点(可能不存在或需要特殊权限) # 注意: n8n 的凭证管理通常需要通过 Web UI 完成 credential_data = { "name": credential_name, "type": "cohereApi", # 凭证类型 "data": { "apiKey": cohere_api_key, }, } # 尝试多个可能的 API 端点 endpoints = [ "/api/v1/credentials", # 标准凭证 API "/rest/credentials", # REST API ] for endpoint in endpoints: url = f"{base_url}{endpoint}" logger.info(f"尝试创建凭证: {url}") try: response = requests.post( url, headers=headers, json=credential_data, timeout=30, ) logger.debug(f"响应状态码: {response.status_code}") logger.debug(f"响应内容: {response.text}") if response.status_code == 200 or response.status_code == 201: result = response.json() logger.success(f"✅ 凭证创建成功: {result}") return { "success": True, "message": "凭证创建成功", "data": result, } elif response.status_code == 401: logger.error("❌ API 认证失败,请检查 n8n API Key") return { "success": False, "message": "API 认证失败,请检查 n8n API Key", "error": "Unauthorized", } elif response.status_code == 403: logger.warning("⚠️ API 权限不足,凭证管理可能需要 Owner 权限") return { "success": False, "message": "API 权限不足,请使用 Web UI 手动配置", "error": "Forbidden", } elif response.status_code == 404: logger.warning(f"⚠️ 端点不存在: {endpoint}") continue # 尝试下一个端点 else: logger.warning( f"⚠️ 请求失败: {response.status_code} - {response.text}" ) except requests.exceptions.RequestException as e: logger.error(f"❌ 请求异常: {str(e)}") continue # 所有端点都失败,建议使用 Web UI logger.warning( "⚠️ 无法通过 API 创建凭证。n8n 的凭证管理通常需要通过 Web UI 完成。" ) logger.info("📝 请参考文档手动配置: docs/n8n_cohere_credential_setup.md") return { "success": False, "message": "无法通过 API 创建凭证,请使用 Web UI 手动配置", "manual_setup_url": f"{base_url}/home/credentials", "guide": "docs/n8n_cohere_credential_setup.md", } def main(): """主函数""" logger.info("🚀 开始创建 n8n Cohere API Key 凭证...") logger.info("API Key: 4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C") result = create_cohere_credential() print("\n" + "=" * 60) print("执行结果:") print("=" * 60) print(json.dumps(result, indent=2, ensure_ascii=False)) if not result.get("success"): print("\n" + "=" * 60) print("建议:") print("=" * 60) print("由于 n8n 的凭证管理 API 可能有限制,") print("请使用 Web UI 手动配置凭证。") print(f"\n详细步骤请参考: {result.get('guide', 'docs/n8n_cohere_credential_setup.md')}") print(f"凭证管理页面: {result.get('manual_setup_url', 'https://n8n.citupro.com/home/credentials')}") return 0 if result.get("success") else 1 if __name__ == "__main__": sys.exit(main())