فهرست منبع

修正数据订单和自动执行脚本生成,以及数据生产线执行中的bug。

maxiaolong 3 روز پیش
والد
کامیت
34605d4619

+ 2 - 0
app/api/data_service/routes.py

@@ -413,6 +413,7 @@ def create_order():
         title: 订单标题(必填)
         description: 需求描述(必填)
         created_by: 创建人(可选,默认user)
+        data_source: 指定的数据源节点ID(可选)
     """
     try:
         data = request.get_json()
@@ -431,6 +432,7 @@ def create_order():
             title=data["title"],
             description=data["description"],
             created_by=data.get("created_by", "user"),
+            data_source=data.get("data_source"),
         )
 
         res = success(order.to_dict(), "创建数据订单成功")

+ 5 - 0
app/config/config.py

@@ -96,6 +96,11 @@ class BaseConfig:
     )
     N8N_API_TIMEOUT = int(os.environ.get("N8N_API_TIMEOUT", "30"))
 
+    # DataOps 平台 API 基础 URL(用于 n8n 工作流回调等)
+    API_BASE_URL = os.environ.get(
+        "API_BASE_URL", "https://company.citupro.com:18183/api"
+    )
+
 
 class DevelopmentConfig(BaseConfig):
     """Windows 开发环境配置"""

+ 94 - 19
app/core/data_flow/dataflows.py

@@ -544,9 +544,9 @@ class DataFlowService:
                     )
                     task_description_md = script_requirement
 
-                # 设置code_path和code_name
+                # 设置 code_path(不包含文件名)
+                # code_name 需要在获取 task_id 后生成
                 code_path = "datafactory/scripts"
-                code_name = script_name
 
                 task_insert_sql = text(
                     "INSERT INTO public.task_list\n"
@@ -554,24 +554,45 @@ class DataFlowService:
                     "code_path, create_by, create_time, update_time)\n"
                     "VALUES\n"
                     "(:task_name, :task_description, :status, :code_name, "
-                    ":code_path, :create_by, :create_time, :update_time)"
+                    ":code_path, :create_by, :create_time, :update_time)\n"
+                    "RETURNING task_id"
                 )
 
                 task_params = {
                     "task_name": script_name,
                     "task_description": task_description_md,
                     "status": "pending",
-                    "code_name": code_name,
+                    "code_name": "",  # 暂时为空,等获取 task_id 后更新
                     "code_path": code_path,
                     "create_by": "cursor",
                     "create_time": current_time,
                     "update_time": current_time,
                 }
 
-                db.session.execute(task_insert_sql, task_params)
+                result = db.session.execute(task_insert_sql, task_params)
+                row = result.fetchone()
+                task_id = row[0] if row else None
+
+                # 根据 task_id 生成脚本文件名
+                # 格式: task_{task_id}_{task_name}.py(与 auto_execute_tasks 生成的一致)
+                code_name = f"task_{task_id}_{script_name}.py"
+
+                # 更新 code_name 字段
+                if task_id:
+                    update_sql = text(
+                        "UPDATE public.task_list SET code_name = :code_name "
+                        "WHERE task_id = :task_id"
+                    )
+                    db.session.execute(
+                        update_sql, {"code_name": code_name, "task_id": task_id}
+                    )
+
                 db.session.commit()
 
-                logger.info(f"成功将任务信息写入task_list表: task_name={script_name}")
+                logger.info(
+                    f"成功将任务信息写入task_list表: "
+                    f"task_id={task_id}, task_name={script_name}, code_name={code_name}"
+                )
 
                 # 自动生成 n8n 工作流 JSON 文件
                 try:
@@ -580,6 +601,7 @@ class DataFlowService:
                         code_name=code_name,
                         code_path=code_path,
                         update_mode=update_mode,
+                        task_id=task_id,
                     )
                 except Exception as wf_error:
                     logger.warning(f"生成n8n工作流文件失败: {str(wf_error)}")
@@ -601,15 +623,17 @@ class DataFlowService:
         code_name: str,
         code_path: str,
         update_mode: str = "append",
+        task_id: Optional[int] = None,
     ) -> Optional[str]:
         """
         自动生成 n8n 工作流 JSON 文件
 
         Args:
             script_name: 脚本/任务名称
-            code_name: 代码文件名
-            code_path: 代码路径
+            code_name: 代码文件名(如 task_42_DF_DO202601210001.py)
+            code_path: 代码路径(如 datafactory/scripts)
             update_mode: 更新模式
+            task_id: 关联的任务 ID
 
         Returns:
             生成的工作流文件路径,失败返回 None
@@ -619,8 +643,11 @@ class DataFlowService:
             workflows_dir = PROJECT_ROOT / "datafactory" / "workflows"
             workflows_dir.mkdir(parents=True, exist_ok=True)
 
-            # 生成工作流文件名
-            workflow_filename = f"{script_name}_workflow.json"
+            # 生成工作流文件名(使用任务ID以便于关联)
+            if task_id:
+                workflow_filename = f"task_{task_id}_{script_name}_workflow.json"
+            else:
+                workflow_filename = f"{script_name}_workflow.json"
             workflow_path = workflows_dir / workflow_filename
 
             # 生成唯一ID
@@ -629,6 +656,7 @@ class DataFlowService:
 
             # 构建完整的 SSH 命令,包含激活 venv
             # 注意:由于 n8n 服务器与应用服务器分离,必须使用 SSH 节点
+            # code_name 已经包含 .py 后缀(如 task_42_DF_DO202601210001.py)
             ssh_command = (
                 f"cd /opt/dataops-platform && source venv/bin/activate && "
                 f"python {code_path}/{code_name}"
@@ -1392,6 +1420,15 @@ class DataFlowService:
             logger.error(f"获取数据流日志失败: {str(e)}")
             raise e
 
+    # 默认生产环境数据源配置
+    DEFAULT_DATA_SOURCE = {
+        "type": "postgresql",
+        "host": "192.168.3.143",
+        "port": 5432,
+        "database": "dataops",
+        "schema": "dags",
+    }
+
     @staticmethod
     def _generate_businessdomain_ddl(
         session,
@@ -1410,6 +1447,7 @@ class DataFlowService:
 
         Returns:
             包含ddl和data_source信息的字典,如果节点不存在则返回None
+            data_source始终返回,如果没有COME_FROM关系则使用默认生产环境配置
         """
         try:
             # 查询BusinessDomain节点、元数据、标签关系和数据源关系
@@ -1440,10 +1478,6 @@ class DataFlowService:
 
             node = result["bd"]
             metadata = result["metadata"]
-            # 处理标签数组,获取第一个有效标签名称用于判断
-            labels = result.get("labels", [])
-            valid_labels = [label for label in labels if label.get("id") is not None]
-            label_name = valid_labels[0].get("name_zh") if valid_labels else None
 
             # 生成DDL
             node_props = dict(node)
@@ -1475,7 +1509,7 @@ class DataFlowService:
             if not column_definitions:
                 column_definitions.append("    id BIGINT PRIMARY KEY COMMENT '主键ID'")
 
-            # 5. 如果是目标表,添加create_time字段
+            # 如果是目标表,添加create_time字段
             if is_target:
                 column_definitions.append(
                     "    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP "
@@ -1494,9 +1528,10 @@ class DataFlowService:
 
             ddl_content = "\n".join(ddl_lines)
 
-            # 检查BELONGS_TO关系是否连接"数据资源",如果是则返回数据源信息
-            data_source = None
-            if label_name == "数据资源" and result["ds_type"]:
+            # 始终返回数据源信息
+            # 如果通过COME_FROM关系找到了数据源,使用该数据源
+            # 否则使用默认的生产环境数据库配置
+            if result["ds_type"]:
                 data_source = {
                     "type": result["ds_type"],
                     "host": result["ds_host"],
@@ -1504,7 +1539,47 @@ class DataFlowService:
                     "database": result["ds_database"],
                     "schema": result["ds_schema"],
                 }
-                logger.info(f"获取到数据源信息: {data_source}")
+
+                # 端口验证:确保数据库类型使用正确的端口
+                # PostgreSQL 默认端口 5432,MySQL 默认端口 3306
+                # 5678 是 n8n 服务端口,不是数据库端口
+                ds_type_lower = (result["ds_type"] or "").lower()
+                current_port = data_source.get("port")
+
+                # 定义数据库类型与默认端口的映射
+                db_default_ports = {
+                    "postgresql": 5432,
+                    "postgres": 5432,
+                    "mysql": 3306,
+                    "mariadb": 3306,
+                    "sqlserver": 1433,
+                    "mssql": 1433,
+                    "oracle": 1521,
+                }
+
+                # 常见的非数据库端口(需要修正)
+                invalid_db_ports = {5678, 8080, 80, 443, 8000, 3000}
+
+                if ds_type_lower in db_default_ports:
+                    expected_port = db_default_ports[ds_type_lower]
+                    if current_port in invalid_db_ports:
+                        logger.warning(
+                            f"检测到数据源端口配置异常: type={ds_type_lower}, "
+                            f"port={current_port}(疑似非数据库端口),"
+                            f"已自动修正为默认端口 {expected_port}"
+                        )
+                        data_source["port"] = expected_port
+                    elif current_port is None:
+                        logger.info(f"数据源端口为空,使用默认端口: {expected_port}")
+                        data_source["port"] = expected_port
+
+                logger.info(f"通过COME_FROM关系获取到数据源信息: {data_source}")
+            else:
+                # 使用默认生产环境数据源配置
+                data_source = DataFlowService.DEFAULT_DATA_SOURCE.copy()
+                logger.info(
+                    f"未找到COME_FROM关系,使用默认生产环境数据源: {data_source}"
+                )
 
             logger.debug(
                 f"生成BusinessDomain DDL成功: {table_name}, is_target={is_target}"

+ 252 - 39
app/core/data_service/data_product_service.py

@@ -979,8 +979,8 @@ class DataProductService:
                 rel_key = f"OUTPUT_{df_id}_{current_bd_id}"
                 if rel_key not in lines_dict:
                     lines_dict[rel_key] = {
-                        "from": df_id,
-                        "to": current_bd_id,
+                        "from": str(df_id),
+                        "to": str(current_bd_id),
                         "text": "OUTPUT",
                     }
 
@@ -999,8 +999,8 @@ class DataProductService:
                     input_rel_key = f"INPUT_{source_id}_{df_id}"
                     if input_rel_key not in lines_dict:
                         lines_dict[input_rel_key] = {
-                            "from": source_id,
-                            "to": df_id,
+                            "from": str(source_id),
+                            "to": str(df_id),
                             "text": "INPUT",
                         }
 
@@ -1130,6 +1130,7 @@ class DataOrderService:
         title: str,
         description: str,
         created_by: str = "user",
+        data_source: int | None = None,
     ) -> DataOrder:
         """
         创建数据订单
@@ -1138,6 +1139,7 @@ class DataOrderService:
             title: 订单标题
             description: 需求描述
             created_by: 创建人
+            data_source: 指定的数据源节点ID(可选)
 
         Returns:
             创建的数据订单对象
@@ -1151,6 +1153,7 @@ class DataOrderService:
                 description=description,  # type: ignore[arg-type]
                 status=DataOrder.STATUS_PENDING,  # type: ignore[arg-type]
                 created_by=created_by,  # type: ignore[arg-type]
+                data_source=data_source,  # type: ignore[arg-type]
             )
 
             db.session.add(order)
@@ -1167,13 +1170,13 @@ class DataOrderService:
     @staticmethod
     def extract_entities(description: str) -> dict[str, Any]:
         """
-        使用 LLM 从描述中提取业务领域和数据字段
+        使用 LLM 从描述中提取业务领域、数据字段和标签信息
 
         Args:
             description: 需求描述
 
         Returns:
-            提取结果,包含 business_domains, data_fields, purpose
+            提取结果,包含 business_domains, data_fields, purpose, tags
         """
         try:
             client = OpenAI(
@@ -1183,7 +1186,7 @@ class DataOrderService:
 
             model = current_app.config.get("LLM_MODEL_NAME")
 
-            prompt = f"""分析以下数据需求描述,提取其中涉及的业务领域和数据字段
+            prompt = f"""分析以下数据需求描述,提取其中涉及的业务领域、数据字段和标签信息
 
 需求描述:{description}
 
@@ -1191,13 +1194,15 @@ class DataOrderService:
 {{
     "business_domains": ["业务领域名称1", "业务领域名称2"],
     "data_fields": ["字段名称1", "字段名称2"],
-    "purpose": "数据用途简述"
+    "purpose": "数据用途简述",
+    "tags": ["标签1", "标签2"]
 }}
 
 注意:
 1. business_domains 应该是可能存在的数据表或业务实体名称,如"人员信息"、"薪资数据"、"销售记录"等
 2. data_fields 应该是具体的数据字段名称,如"姓名"、"年龄"、"薪资"、"销售额"等
 3. purpose 简要描述数据的使用目的
+4. tags 是从需求描述中提取的标签信息,如"财务"、"销售"、"客户"、"订单"等,用于过滤匹配的业务领域。如果需求中没有明确提到标签,可以返回空数组 []
 """
 
             completion = client.chat.completions.create(
@@ -1231,6 +1236,10 @@ class DataOrderService:
 
             result = json.loads(response_text)
 
+            # 确保 tags 字段存在
+            if "tags" not in result:
+                result["tags"] = []
+
             logger.info(f"LLM 实体提取成功: {result}")
             return result
 
@@ -1240,6 +1249,7 @@ class DataOrderService:
                 "business_domains": [],
                 "data_fields": [],
                 "purpose": "",
+                "tags": [],
                 "error": "解析失败",
             }
         except Exception as e:
@@ -1248,6 +1258,7 @@ class DataOrderService:
                 "business_domains": [],
                 "data_fields": [],
                 "purpose": "",
+                "tags": [],
                 "error": str(e),
             }
 
@@ -1269,6 +1280,7 @@ class DataOrderService:
                 - name_zh: 中文名称
                 - name_en: 英文名称
                 - describe: 描述
+                - fields: 输出字段列表,每个字段包含 name_zh, name_en, data_type
             - processing_logic: 数据加工处理逻辑描述
         """
         try:
@@ -1297,7 +1309,11 @@ class DataOrderService:
     "output_domain": {{
         "name_zh": "输出数据产品的中文名称",
         "name_en": "output_product_english_name",
-        "describe": "输出数据产品的描述,说明这个数据产品包含什么内容"
+        "describe": "输出数据产品的描述,说明这个数据产品包含什么内容",
+        "fields": [
+            {{"name_zh": "字段中文名1", "name_en": "field_english_name1", "data_type": "varchar(255)"}},
+            {{"name_zh": "字段中文名2", "name_en": "field_english_name2", "data_type": "integer"}}
+        ]
     }},
     "processing_logic": "详细的数据加工处理逻辑,包括:1.需要从哪些源数据中提取什么字段;2.需要进行什么样的数据转换或计算;3.数据的过滤条件或筛选规则;4.最终输出数据的格式和字段"
 }}
@@ -1305,7 +1321,11 @@ class DataOrderService:
 注意:
 1. output_domain.name_zh 应该是一个简洁明了的数据产品名称,如"会员消费分析报表"、"销售业绩汇总表"等
 2. output_domain.name_en 应该是英文名称,使用下划线连接,如"member_consumption_analysis"
-3. processing_logic 应该详细描述数据加工的完整流程,便于后续生成数据处理脚本
+3. output_domain.fields 必须列出输出数据产品的所有字段,每个字段包含:
+   - name_zh: 字段中文名称
+   - name_en: 字段英文名称,使用下划线连接
+   - data_type: 数据类型,如 varchar(255)、integer、decimal(10,2)、date、timestamp 等
+4. processing_logic 应该详细描述数据加工的完整流程,便于后续生成数据处理脚本
 """
 
             completion = client.chat.completions.create(
@@ -1345,7 +1365,11 @@ class DataOrderService:
                     "name_zh": "数据产品",
                     "name_en": "data_product",
                     "describe": description[:200] if description else "",
+                    "fields": [],
                 }
+            # 确保 fields 字段存在
+            if "fields" not in result["output_domain"]:
+                result["output_domain"]["fields"] = []
             if "processing_logic" not in result:
                 result["processing_logic"] = description
 
@@ -1360,6 +1384,7 @@ class DataOrderService:
                     "name_zh": "数据产品",
                     "name_en": "data_product",
                     "describe": description[:200] if description else "",
+                    "fields": [],
                 },
                 "processing_logic": description,
                 "error": "解析失败",
@@ -1371,34 +1396,58 @@ class DataOrderService:
                     "name_zh": "数据产品",
                     "name_en": "data_product",
                     "describe": description[:200] if description else "",
+                    "fields": [],
                 },
                 "processing_logic": description,
                 "error": str(e),
             }
 
     @staticmethod
-    def find_matching_domains(domain_names: list[str]) -> list[dict[str, Any]]:
+    def find_matching_domains(
+        domain_names: list[str], tags: list[str] | None = None
+    ) -> list[dict[str, Any]]:
         """
         在 Neo4j 中查找匹配的 BusinessDomain 节点
 
         Args:
             domain_names: 业务领域名称列表
+            tags: 标签名称列表(可选),如果提供,则只返回包含这些标签的业务领域
 
         Returns:
             匹配的 BusinessDomain 节点列表
         """
         try:
             with neo4j_driver.get_session() as session:
-                # 使用模糊匹配查找 BusinessDomain
-                cypher = """
-                UNWIND $domain_names AS name
-                MATCH (bd:BusinessDomain)
-                WHERE bd.name_zh CONTAINS name OR name CONTAINS bd.name_zh
-                   OR bd.name_en CONTAINS name OR name CONTAINS bd.name_en
-                RETURN DISTINCT id(bd) as id, bd.name_zh as name_zh,
-                       bd.name_en as name_en, bd.describe as describe
-                """
-                result = session.run(cypher, {"domain_names": domain_names})
+                # 构建基础查询:使用模糊匹配查找 BusinessDomain
+                if tags and len(tags) > 0:
+                    # 如果有标签过滤条件,添加标签匹配
+                    cypher = """
+                    UNWIND $domain_names AS name
+                    MATCH (bd:BusinessDomain)
+                    WHERE (bd.name_zh CONTAINS name OR name CONTAINS bd.name_zh
+                           OR bd.name_en CONTAINS name OR name CONTAINS bd.name_en)
+                    WITH DISTINCT bd
+                    OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
+                    WITH bd, collect(DISTINCT label.name_zh) as bd_tags,
+                         collect(DISTINCT label.name_en) as bd_tags_en
+                    WHERE ANY(tag IN $tags WHERE tag IN bd_tags OR tag IN bd_tags_en)
+                    RETURN DISTINCT id(bd) as id, bd.name_zh as name_zh,
+                           bd.name_en as name_en, bd.describe as describe
+                    """
+                    result = session.run(
+                        cypher, {"domain_names": domain_names, "tags": tags}
+                    )
+                else:
+                    # 没有标签过滤条件,使用原来的查询
+                    cypher = """
+                    UNWIND $domain_names AS name
+                    MATCH (bd:BusinessDomain)
+                    WHERE bd.name_zh CONTAINS name OR name CONTAINS bd.name_zh
+                       OR bd.name_en CONTAINS name OR name CONTAINS bd.name_en
+                    RETURN DISTINCT id(bd) as id, bd.name_zh as name_zh,
+                           bd.name_en as name_en, bd.describe as describe
+                    """
+                    result = session.run(cypher, {"domain_names": domain_names})
 
                 domains = []
                 for record in result:
@@ -1411,7 +1460,8 @@ class DataOrderService:
                         }
                     )
 
-                logger.info(f"找到 {len(domains)} 个匹配的 BusinessDomain")
+                tag_info = f",标签过滤: {tags}" if tags else ""
+                logger.info(f"找到 {len(domains)} 个匹配的 BusinessDomain{tag_info}")
                 return domains
 
         except Exception as e:
@@ -1597,6 +1647,7 @@ class DataOrderService:
             domains = extraction_result.get("business_domains", [])
             fields = extraction_result.get("data_fields", [])
             purpose = extraction_result.get("purpose", "")
+            tags = extraction_result.get("tags", [])
 
             order.set_extraction_result(
                 domains=domains,
@@ -1604,8 +1655,10 @@ class DataOrderService:
                 purpose=purpose,
             )
 
-            # 2. 在图谱中查找匹配的节点
-            matched_domains = DataOrderService.find_matching_domains(domains)
+            # 2. 在图谱中查找匹配的节点(如果提取到了标签,使用标签过滤)
+            matched_domains = DataOrderService.find_matching_domains(
+                domains, tags=tags if tags else None
+            )
             matched_fields = DataOrderService.find_matching_fields(fields)
 
             if not matched_domains:
@@ -1917,6 +1970,9 @@ class DataOrderService:
                 "describe", order.extraction_purpose or order.description
             )
 
+            # 获取输出字段列表(用于创建元数据节点)
+            output_fields = output_domain_info.get("fields", [])
+
             with neo4j_driver.get_session() as session:
                 # 2. 创建目标 BusinessDomain 节点(数据产品承载)
                 create_target_bd_query = """
@@ -1925,6 +1981,12 @@ class DataOrderService:
                     name_zh: $name_zh,
                     describe: $describe,
                     type: 'data_product',
+                    category: 'DataOps',
+                    organization: 'system',
+                    leader: 'admin',
+                    frequency: '月',
+                    data_sensitivity: '低',
+                    status: true,
                     created_at: datetime(),
                     created_by: $created_by,
                     source_order_id: $order_id
@@ -1966,6 +2028,17 @@ class DataOrderService:
                         f"DataSource:{order.data_source}"
                     )
 
+                # 2.2 为目标 BusinessDomain 创建关联的元数据节点
+                if output_fields:
+                    meta_ids = DataOrderService._create_metadata_for_business_domain(
+                        session=session,
+                        bd_id=target_bd_id,
+                        fields=output_fields,
+                    )
+                    logger.info(
+                        f"为目标 BusinessDomain 创建了 {len(meta_ids)} 个元数据关联"
+                    )
+
                 # 3. 创建 DataFlow 节点
                 dataflow_name_en = f"DF_{order.order_no}"
                 dataflow_name_zh = f"{target_bd_name_zh}_数据流程"
@@ -1998,8 +2071,12 @@ class DataOrderService:
                     script_requirement: $script_requirement,
                     script_type: 'python',
                     script_path: $script_path,
-                    update_mode: 'full',
-                    status: 'inactive',
+                    update_mode: 'append',
+                    status: 'active',
+                    category: 'DataOps',
+                    organization: 'system',
+                    leader: 'admin',
+                    frequency: '月',
                     created_at: datetime(),
                     created_by: $created_by,
                     source_order_id: $order_id
@@ -2023,6 +2100,17 @@ class DataOrderService:
 
                 logger.info(f"创建 DataFlow: id={dataflow_id}, name={dataflow_name_en}")
 
+                # 3.1 建立 DataFlow 与"数据流程"标签的 LABEL 关系
+                create_dataflow_tag_query = """
+                MATCH (df:DataFlow), (label:DataLabel {name_zh: '数据流程'})
+                WHERE id(df) = $df_id
+                CREATE (df)-[:LABEL]->(label)
+                """
+                session.run(create_dataflow_tag_query, {"df_id": dataflow_id})
+                logger.info(
+                    f"建立 DataFlow 标签关系: {dataflow_id} -> DataLabel(数据流程)"
+                )
+
                 # 4. 建立 INPUT 关系(源 BusinessDomain -> DataFlow)
                 for domain_id in input_domain_ids:
                     create_input_rel_query = """
@@ -2076,11 +2164,32 @@ class DataOrderService:
                 dataflow_id=dataflow_id,
                 source_table_ids=input_domain_ids,
                 target_bd_id=target_bd_id,
-                update_mode="full",
+                update_mode="append",
                 processing_logic=processing_logic,
                 product_id=product_id,
             )
 
+            # 8. 任务创建成功后,更新 DataFlow 的 script_path
+            # 脚本命名格式为: task_{task_id}_{task_name}.py
+            if task_id and dataflow_id:
+                script_path = (
+                    f"datafactory/scripts/task_{task_id}_{dataflow_name_en}.py"
+                )
+                with neo4j_driver.get_session() as session:
+                    update_script_path_query = """
+                    MATCH (df:DataFlow)
+                    WHERE id(df) = $df_id
+                    SET df.script_path = $script_path
+                    """
+                    session.run(
+                        update_script_path_query,
+                        {"df_id": dataflow_id, "script_path": script_path},
+                    )
+                    logger.info(
+                        f"更新 DataFlow 脚本路径: "
+                        f"dataflow_id={dataflow_id}, script_path={script_path}"
+                    )
+
             return {
                 "target_business_domain_id": target_bd_id,
                 "target_business_domain_name": target_bd_name_zh,
@@ -2095,6 +2204,84 @@ class DataOrderService:
             logger.error(f"生成订单资源失败: {str(e)}")
             raise
 
+    @staticmethod
+    def _create_metadata_for_business_domain(
+        session,
+        bd_id: int,
+        fields: list[dict[str, Any]],
+    ) -> list[int]:
+        """
+        为 BusinessDomain 创建关联的元数据节点
+
+        对每个字段:
+        1. 检查是否已存在相同 name_zh 的 DataMeta 节点
+        2. 若不存在则创建新节点,若存在则复用
+        3. 建立 BusinessDomain -[:INCLUDES]-> DataMeta 关系
+
+        Args:
+            session: Neo4j session
+            bd_id: BusinessDomain 节点 ID
+            fields: 字段列表,每个字段包含 name_zh, name_en, data_type
+
+        Returns:
+            创建/关联的 DataMeta 节点 ID 列表
+        """
+        from datetime import datetime
+
+        meta_ids = []
+
+        for field in fields:
+            name_zh = field.get("name_zh", "").strip()
+            if not name_zh:
+                continue
+
+            name_en = field.get("name_en", "").strip() or name_zh
+            data_type = field.get("data_type", "varchar(255)").strip()
+
+            # 使用 MERGE 创建或复用 DataMeta 节点
+            meta_merge_query = """
+            MERGE (m:DataMeta {name_zh: $name_zh})
+            ON CREATE SET
+                m.name_en = $name_en,
+                m.data_type = $data_type,
+                m.create_time = $create_time,
+                m.status = true
+            RETURN m, id(m) as meta_id
+            """
+            result = session.run(
+                meta_merge_query,
+                {
+                    "name_zh": name_zh,
+                    "name_en": name_en,
+                    "data_type": data_type,
+                    "create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
+                },
+            ).single()
+
+            if not result:
+                logger.warning(f"创建/获取 DataMeta 失败: name_zh={name_zh}")
+                continue
+
+            meta_id = result["meta_id"]
+            meta_ids.append(meta_id)
+
+            # 建立 INCLUDES 关系
+            rel_query = """
+            MATCH (bd:BusinessDomain), (m:DataMeta)
+            WHERE id(bd) = $bd_id AND id(m) = $meta_id
+            MERGE (bd)-[:INCLUDES]->(m)
+            """
+            session.run(rel_query, {"bd_id": bd_id, "meta_id": meta_id})
+
+            logger.debug(
+                f"关联元数据: BusinessDomain({bd_id}) -> DataMeta({meta_id}, {name_zh})"
+            )
+
+        logger.info(
+            f"为 BusinessDomain({bd_id}) 创建/关联了 {len(meta_ids)} 个元数据节点"
+        )
+        return meta_ids
+
     @staticmethod
     def _register_order_data_product(
         order: DataOrder,
@@ -2120,7 +2307,7 @@ class DataOrderService:
         """
         try:
             # 从订单的数据源获取 schema
-            target_schema = "public"
+            target_schema = "dags"  # 缺省数据产品都保存在dags schema中
             if order.data_source:
                 with neo4j_driver.get_session() as session:
                     query = """
@@ -2309,9 +2496,10 @@ class DataOrderService:
 
             task_description_md = "\n".join(task_desc_parts)
 
-            # 脚本路径和名称
+            # 脚本路径(不包含文件名)
             code_path = "datafactory/scripts"
-            code_name = dataflow_name_en
+            # code_name 暂时设置为空,等任务创建后根据 task_id 生成
+            # 实际的脚本名称格式为: task_{task_id}_{task_name}.py
 
             # 插入 task_list 表
             task_insert_sql = text(
@@ -2328,7 +2516,7 @@ class DataOrderService:
                 "task_name": dataflow_name_en,
                 "task_description": task_description_md,
                 "status": "pending",
-                "code_name": code_name,
+                "code_name": "",  # 暂时为空,等获取 task_id 后更新
                 "code_path": code_path,
                 "create_by": "system",
                 "create_time": current_time,
@@ -2338,10 +2526,26 @@ class DataOrderService:
             result = db.session.execute(task_insert_sql, task_params)
             row = result.fetchone()
             task_id = row[0] if row else None
+
+            # 根据 task_id 生成脚本文件名(与 auto_execute_tasks.py 生成的脚本名称保持一致)
+            # 格式: task_{task_id}_{task_name}.py
+            code_name = f"task_{task_id}_{dataflow_name_en}.py"
+
+            # 更新 code_name 字段
+            if task_id:
+                update_sql = text(
+                    "UPDATE public.task_list SET code_name = :code_name "
+                    "WHERE task_id = :task_id"
+                )
+                db.session.execute(
+                    update_sql, {"code_name": code_name, "task_id": task_id}
+                )
+
             db.session.commit()
 
             logger.info(
-                f"成功创建任务记录: task_id={task_id}, task_name={dataflow_name_en}"
+                f"成功创建任务记录: task_id={task_id}, "
+                f"task_name={dataflow_name_en}, code_name={code_name}"
             )
 
             # 自动生成 n8n 工作流 JSON 文件
@@ -2354,6 +2558,7 @@ class DataOrderService:
                     order_id=order.id,
                     dataflow_id=dataflow_id,
                     product_id=product_id,
+                    task_id=task_id,
                 )
                 if workflow_path:
                     logger.info(f"成功生成n8n工作流文件: {workflow_path}")
@@ -2378,6 +2583,7 @@ class DataOrderService:
         order_id: int | None = None,
         dataflow_id: int | None = None,
         product_id: int | None = None,
+        task_id: int | None = None,
     ) -> str | None:
         """
         自动生成 n8n 工作流 JSON 文件
@@ -2391,12 +2597,13 @@ class DataOrderService:
 
         Args:
             script_name: 脚本/任务名称
-            code_name: 代码文件名
-            code_path: 代码路径
+            code_name: 代码文件名(如 task_42_DF_DO202601210001.py)
+            code_path: 代码路径(如 datafactory/scripts)
             update_mode: 更新模式
             order_id: 关联的数据订单 ID(用于回调更新状态)
             dataflow_id: 关联的 DataFlow ID
             product_id: 关联的数据产品 ID
+            task_id: 关联的任务 ID
 
         Returns:
             生成的工作流文件路径,失败返回 None
@@ -2413,8 +2620,11 @@ class DataOrderService:
             workflows_dir = project_root / "datafactory" / "workflows"
             workflows_dir.mkdir(parents=True, exist_ok=True)
 
-            # 生成工作流文件名
-            workflow_filename = f"{script_name}_workflow.json"
+            # 生成工作流文件名(使用任务ID以便于关联)
+            if task_id:
+                workflow_filename = f"task_{task_id}_{script_name}_workflow.json"
+            else:
+                workflow_filename = f"{script_name}_workflow.json"
             workflow_path = workflows_dir / workflow_filename
 
             # 生成唯一ID
@@ -2423,13 +2633,16 @@ class DataOrderService:
 
             # 构建完整的 SSH 命令,包含激活 venv
             # 注意:由于 n8n 服务器与应用服务器分离,必须使用 SSH 节点
+            # code_name 已经包含 .py 后缀(如 task_42_DF_DO202601210001.py)
             ssh_command = (
                 f"cd /opt/dataops-platform && source venv/bin/activate && "
-                f"python {code_path}/{code_name}.py"
+                f"python {code_path}/{code_name}"
             )
 
-            # API 基础 URL(从配置获取或使用默认值)
-            api_base_url = "http://192.168.3.143:5000"
+            # API 基础 URL(从配置获取)
+            from app.config.config import BaseConfig
+
+            api_base_url = BaseConfig.API_BASE_URL
 
             # 构建节点列表
             nodes = [

+ 0 - 317
datafactory/scripts/task_38_DF_DO202601160001.py

@@ -1,317 +0,0 @@
-#!/usr/bin/env python
-"""
-任务ID: 38
-任务名称: DF_DO202601160001
-任务描述: 仓库库存汇总统计
-  1. 从标签为数据资源的产品库存表中提取字段:仓库编号、库存数量
-  2. 按照仓库进行分组,对库存数量进行求和计算
-  3. 无特殊过滤条件
-  4. 最终输出数据格式包含字段:仓库编号、总库存数量
-
-更新模式: Full Refresh (全量更新)
-
-源表: dags.test_product_inventory (数据资源-产品库存表)
-目标表: dags.warehouse_inventory_summary (仓库库存汇总表)
-
-创建时间: 2026-01-16
-"""
-
-from __future__ import annotations
-
-import os
-import sys
-from datetime import datetime
-from typing import Any
-
-import pandas as pd
-import psycopg2
-from loguru import logger
-
-# 添加项目根目录到Python路径
-PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
-sys.path.insert(0, PROJECT_ROOT)
-
-from app.config.config import config, current_env
-
-# 获取配置
-app_config = config[current_env]
-
-
-def get_database_connection() -> psycopg2.extensions.connection:
-    """
-    获取数据库连接
-
-    根据任务描述,数据库配置:
-    - Host: 192.168.3.143
-    - Port: 5432 (标准 PostgreSQL 端口,任务描述中的 5678 有误)
-    - Database: dataops
-    - Schema: dags (源表 test_product_inventory 和目标表 warehouse_inventory_summary 都在 dags schema)
-
-    Returns:
-        psycopg2 连接对象
-    """
-    conn = psycopg2.connect(
-        host="192.168.3.143",
-        port=5432,
-        database="dataops",
-        user="postgres",
-        password="dataOps",
-        options="-c search_path=dags,public",  # 确保可以访问 dags 和 public schema
-    )
-    logger.info("数据库连接成功: 192.168.3.143:5432/dataops (schema: dags,public)")
-    return conn
-
-
-def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
-    """
-    确保目标表存在,如果不存在则创建
-
-    Args:
-        conn: 数据库连接
-    """
-    cursor = conn.cursor()
-    target_table = "warehouse_inventory_summary"
-    target_schema = "dags"
-
-    try:
-        # 检查表是否存在
-        cursor.execute(
-            """
-            SELECT EXISTS(
-                SELECT 1 FROM information_schema.tables
-                WHERE table_schema = %s
-                AND table_name = %s
-            )
-        """,
-            (target_schema, target_table),
-        )
-        result = cursor.fetchone()
-        exists = result[0] if result else False
-
-        if not exists:
-            logger.info(f"目标表不存在,正在创建 {target_schema}.{target_table}...")
-            # PostgreSQL 不支持在列定义中使用 COMMENT,需要分开
-            create_table_sql = f"""
-            CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
-                id BIGSERIAL PRIMARY KEY,
-                warehouse VARCHAR(100) NOT NULL,
-                total_stock INTEGER NOT NULL DEFAULT 0,
-                create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
-            );
-            """
-            cursor.execute(create_table_sql)
-
-            # 添加注释
-            cursor.execute(
-                f"COMMENT ON TABLE {target_schema}.{target_table} IS '仓库库存汇总表'"
-            )
-            cursor.execute(
-                f"COMMENT ON COLUMN {target_schema}.{target_table}.warehouse IS '仓库编号'"
-            )
-            cursor.execute(
-                f"COMMENT ON COLUMN {target_schema}.{target_table}.total_stock IS '总库存数量'"
-            )
-            cursor.execute(
-                f"COMMENT ON COLUMN {target_schema}.{target_table}.create_time IS '数据创建时间'"
-            )
-            conn.commit()
-            logger.info(f"目标表 {target_schema}.{target_table} 创建成功")
-        else:
-            logger.info(f"目标表 {target_schema}.{target_table} 已存在")
-
-    except Exception as e:
-        conn.rollback()
-        logger.error(f"创建目标表失败: {e}")
-        raise
-    finally:
-        cursor.close()
-
-
-def extract_and_transform(conn: psycopg2.extensions.connection) -> pd.DataFrame:
-    """
-    从源表提取数据并进行转换
-
-    根据任务描述:
-    1. 从产品库存表中提取字段:仓库编号(warehouse)、库存数量(current_stock)
-    2. 按照仓库进行分组,对库存数量进行求和计算
-
-    Args:
-        conn: 数据库连接
-
-    Returns:
-        转换后的DataFrame,包含 warehouse 和 total_stock 列
-    """
-    # 源表位于 dags schema(由任务 37 创建)
-    query = """
-    SELECT
-        warehouse,
-        SUM(current_stock) AS total_stock
-    FROM dags.test_product_inventory
-    GROUP BY warehouse
-    ORDER BY warehouse
-    """
-
-    logger.info("正在从源表提取并汇总数据...")
-
-    try:
-        df = pd.read_sql(query, conn)
-        logger.info(f"成功汇总 {len(df)} 个仓库的库存数据")
-        return df
-    except Exception as e:
-        logger.error(f"数据提取转换失败: {e}")
-        raise
-
-
-def load_to_target(
-    df: pd.DataFrame,
-    conn: psycopg2.extensions.connection,
-) -> int:
-    """
-    将数据加载到目标表(全量更新模式)
-
-    Args:
-        df: 要加载的DataFrame
-        conn: 数据库连接
-
-    Returns:
-        插入的记录数
-    """
-    if df.empty:
-        logger.warning("没有数据需要加载")
-        return 0
-
-    cursor = conn.cursor()
-    target_table = "dags.warehouse_inventory_summary"
-
-    try:
-        # 全量更新模式:先清空目标表
-        logger.info("全量更新模式:清空目标表...")
-        cursor.execute(f"TRUNCATE TABLE {target_table}")
-        logger.info("目标表已清空")
-
-        # 插入新数据
-        insert_sql = f"""
-        INSERT INTO {target_table} (warehouse, total_stock, create_time)
-        VALUES (%s, %s, %s)
-        """
-
-        current_time = datetime.now()
-        records = [
-            (row["warehouse"], int(row["total_stock"]), current_time)
-            for _, row in df.iterrows()
-        ]
-
-        cursor.executemany(insert_sql, records)
-        conn.commit()
-
-        inserted_count = len(records)
-        logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
-        return inserted_count
-
-    except Exception as e:
-        conn.rollback()
-        logger.error(f"数据加载失败: {e}")
-        raise
-    finally:
-        cursor.close()
-
-
-def main() -> dict[str, Any]:
-    """
-    主函数:执行ETL流程
-
-    Returns:
-        执行结果字典
-    """
-    result = {
-        "task_id": 38,
-        "task_name": "DF_DO202601160001",
-        "status": "failed",
-        "warehouses_processed": 0,
-        "records_loaded": 0,
-        "error_message": None,
-        "execution_time": None,
-    }
-
-    start_time = datetime.now()
-    conn = None
-
-    try:
-        logger.info("=" * 60)
-        logger.info("任务开始: DF_DO202601160001 - 仓库库存汇总")
-        logger.info("=" * 60)
-
-        # 步骤1: 建立数据库连接
-        logger.info("[Step 1/4] 建立数据库连接...")
-        conn = get_database_connection()
-
-        # 步骤2: 确保目标表存在
-        logger.info("[Step 2/4] 检查/创建目标表...")
-        ensure_target_table_exists(conn)
-
-        # 步骤3: 提取并转换数据
-        logger.info("[Step 3/4] 提取并转换数据...")
-        df = extract_and_transform(conn)
-        result["warehouses_processed"] = len(df)
-
-        # 输出汇总结果预览
-        if not df.empty:
-            logger.info("仓库库存汇总预览:")
-            for _, row in df.iterrows():
-                logger.info(f"  {row['warehouse']}: {row['total_stock']:,} 件")
-
-        # 步骤4: 加载到目标表(全量更新)
-        logger.info("[Step 4/4] 加载数据到目标表...")
-        records_loaded = load_to_target(df, conn)
-        result["records_loaded"] = records_loaded
-
-        result["status"] = "success"
-        logger.info("=" * 60)
-        logger.info(
-            f"任务完成! 处理仓库数: {result['warehouses_processed']}, 加载记录数: {result['records_loaded']}"
-        )
-        logger.info("=" * 60)
-
-    except Exception as e:
-        result["status"] = "failed"
-        result["error_message"] = str(e)
-        logger.error(f"任务执行失败: {e}")
-        raise
-
-    finally:
-        # 关闭数据库连接
-        if conn:
-            conn.close()
-            logger.debug("数据库连接已关闭")
-
-        result["execution_time"] = str(datetime.now() - start_time)
-
-    return result
-
-
-if __name__ == "__main__":
-    # 配置日志
-    # 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出
-    logger.remove()
-    logger.add(
-        sys.stdout,
-        level="INFO",
-        format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
-    )
-    logger.add(
-        os.path.join(PROJECT_ROOT, "logs", "task_38_DF_DO202601160001.log"),
-        level="DEBUG",
-        rotation="10 MB",
-        retention="7 days",
-        encoding="utf-8",
-    )
-
-    try:
-        result = main()
-        if result["status"] == "success":
-            sys.exit(0)
-        else:
-            sys.exit(1)
-    except Exception as e:
-        logger.exception(f"脚本执行异常: {e}")
-        sys.exit(1)

+ 182 - 177
datafactory/scripts/task_37_产品库存表原始数据导入.py → datafactory/scripts/task_43_产品库存表的原始数据导入.py

@@ -1,14 +1,13 @@
 #!/usr/bin/env python
 """
-任务ID: 37
-任务名称: 产品库存表原始数据导入
-任务描述: 把产品库存表的原始数据导入到数据资源的产品库存表中
-更新模式: Append (追加模式)
+数据流任务脚本 - 产品库存表的原始数据导入
 
-源表: product_inventory_table_raw_data (原始数据表)
-目标表: dags.test_product_inventory (数据资源表)
+任务ID: 43
+任务名称: 产品库存表的原始数据导入
+创建时间: 2026-01-21
+更新模式: Append (追加模式)
 
-创建时间: 2026-01-16
+描述: 从源表 public.product_inventory_table_raw_data 导入数据到目标表 dags.test_product_inventory
 """
 
 from __future__ import annotations
@@ -26,144 +25,71 @@ from loguru import logger
 PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
 sys.path.insert(0, PROJECT_ROOT)
 
-from app.config.config import config, current_env
-
-# 获取配置
-app_config = config[current_env]
+# 任务配置
+TASK_ID = 43
+TASK_NAME = "产品库存表的原始数据导入"
+UPDATE_MODE = "append"
+
+# 源数据库配置
+SOURCE_CONFIG = {
+    "host": "192.168.3.143",
+    "port": 5432,
+    "database": "dataops",
+    "user": "postgres",
+    "password": "dataOps",
+}
+
+# 目标数据库配置
+TARGET_CONFIG = {
+    "host": "192.168.3.143",
+    "port": 5432,  # PostgreSQL 默认端口
+    "database": "dataops",
+    "user": "postgres",
+    "password": "dataOps",
+}
+
+# 目标表配置
+TARGET_SCHEMA = "dags"
+TARGET_TABLE = "test_product_inventory"
 
 
 def get_source_connection() -> psycopg2.extensions.connection:
     """
-    获取源数据库连接(原始数据库)
+    获取源数据库连接
 
     Returns:
         psycopg2 连接对象
     """
-    # 解析应用数据库URI作为源数据库
-    # 实际使用时可根据任务描述中的数据源配置调整
-    db_uri = app_config.SQLALCHEMY_DATABASE_URI
-
-    # 解析连接字符串
-    # 格式: postgresql://user:password@host:port/database
-    if db_uri.startswith("postgresql://"):
-        db_uri = db_uri[13:]  # 移除 postgresql://
-
-    # 分割用户信息和主机信息
-    user_part, host_part = db_uri.split("@")
-    username, password = user_part.split(":")
-    host_db = host_part.split("/")
-    host_port = host_db[0].split(":")
-    host = host_port[0]
-    port = int(host_port[1]) if len(host_port) > 1 else 5432
-    database = host_db[1] if len(host_db) > 1 else "dataops"
-
     conn = psycopg2.connect(
-        host=host,
-        port=port,
-        database=database,
-        user=username,
-        password=password,
+        host=SOURCE_CONFIG["host"],
+        port=SOURCE_CONFIG["port"],
+        database=SOURCE_CONFIG["database"],
+        user=SOURCE_CONFIG["user"],
+        password=SOURCE_CONFIG["password"],
     )
-    logger.info(f"源数据库连接成功: {host}:{port}/{database}")
+    logger.info("源数据库连接成功")
     return conn
 
 
 def get_target_connection() -> psycopg2.extensions.connection:
     """
-    获取目标数据库连接(数据资源数据库)
-
-    根据任务描述,目标数据库:
-    - Host: 192.168.3.143
-    - Port: 5432 (标准 PostgreSQL 端口,任务描述中的 5678 有误)
-    - Database: dataops
-    - Schema: dags
+    获取目标数据库连接
 
     Returns:
         psycopg2 连接对象
     """
     conn = psycopg2.connect(
-        host="192.168.3.143",
-        port=5432,
-        database="dataops",
-        user="postgres",
-        password="dataOps",
-        options="-c search_path=dags,public",
+        host=TARGET_CONFIG["host"],
+        port=TARGET_CONFIG["port"],
+        database=TARGET_CONFIG["database"],
+        user=TARGET_CONFIG["user"],
+        password=TARGET_CONFIG["password"],
+        options=f"-c search_path={TARGET_SCHEMA},public",
     )
-    logger.info("目标数据库连接成功: 192.168.3.143:5432/dataops (schema: dags)")
+    logger.info("目标数据库连接成功")
     return conn
 
 
-def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame:
-    """
-    从源表提取数据
-
-    Args:
-        conn: 源数据库连接
-
-    Returns:
-        包含源数据的DataFrame
-    """
-    query = """
-    SELECT
-        sku,
-        product_name,
-        category,
-        brand,
-        supplier,
-        warehouse,
-        current_stock,
-        safety_stock,
-        max_stock,
-        unit_cost,
-        selling_price,
-        stock_status,
-        last_inbound_date,
-        last_outbound_date,
-        inbound_quantity_30d,
-        outbound_quantity_30d,
-        turnover_rate,
-        is_active,
-        created_at,
-        updated_at
-    FROM product_inventory_table_raw_data
-    """
-
-    logger.info("正在从源表提取数据...")
-
-    try:
-        df = pd.read_sql(query, conn)
-        logger.info(f"成功提取 {len(df)} 条记录")
-        return df
-    except Exception as e:
-        logger.error(f"提取源数据失败: {e}")
-        raise
-
-
-def transform_data(df: pd.DataFrame) -> pd.DataFrame:
-    """
-    数据转换处理
-
-    根据任务描述:直接导入,无需特殊转换
-
-    Args:
-        df: 源数据DataFrame
-
-    Returns:
-        转换后的DataFrame
-    """
-    logger.info("正在执行数据转换...")
-
-    # 确保列名与目标表匹配(不区分大小写)
-    df.columns = df.columns.str.lower()
-
-    # 添加 create_time 字段(目标表可能需要)
-    if "create_time" not in df.columns:
-        df["create_time"] = datetime.now()
-
-    logger.info(f"数据转换完成,共 {len(df)} 条记录")
-    return df
-
-
 def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
     """
     确保目标表存在,如果不存在则创建
@@ -172,8 +98,6 @@ def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
         conn: 目标数据库连接
     """
     cursor = conn.cursor()
-    target_table = "test_product_inventory"
-    target_schema = "dags"
 
     try:
         # 检查表是否存在
@@ -185,18 +109,18 @@ def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
                 AND table_name = %s
             )
         """,
-            (target_schema, target_table),
+            (TARGET_SCHEMA, TARGET_TABLE),
         )
         result = cursor.fetchone()
         exists = result[0] if result else False
 
         if not exists:
-            logger.info(f"目标表不存在,正在创建 {target_schema}.{target_table}...")
+            logger.info(f"目标表不存在,正在创建 {TARGET_SCHEMA}.{TARGET_TABLE}...")
 
-            # 创建表 SQL(根据任务描述中的 DDL)
+            # 根据任务描述中的 DDL 创建表
             create_table_sql = f"""
-            CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
-                id SERIAL PRIMARY KEY,
+            CREATE TABLE IF NOT EXISTS {TARGET_SCHEMA}.{TARGET_TABLE} (
+                id SERIAL,
                 sku VARCHAR(50),
                 product_name VARCHAR(200),
                 category VARCHAR(100),
@@ -224,11 +148,12 @@ def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
 
             # 添加表注释
             cursor.execute(
-                f"COMMENT ON TABLE {target_schema}.{target_table} IS '产品库存表'"
+                f"COMMENT ON TABLE {TARGET_SCHEMA}.{TARGET_TABLE} IS '产品库存表'"
             )
 
             # 添加列注释
             column_comments = {
+                "id": "ID",
                 "sku": "SKU",
                 "product_name": "产品名称",
                 "category": "类别",
@@ -251,17 +176,19 @@ def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
                 "updated_at": "更新时间",
                 "create_time": "数据创建时间",
             }
-
-            for col_name, col_comment in column_comments.items():
-                cursor.execute(
-                    f"COMMENT ON COLUMN {target_schema}.{target_table}.{col_name} IS %s",
-                    (col_comment,),
-                )
+            for col, comment in column_comments.items():
+                try:
+                    cursor.execute(
+                        f"COMMENT ON COLUMN {TARGET_SCHEMA}.{TARGET_TABLE}.{col} IS %s",
+                        (comment,),
+                    )
+                except Exception as e:
+                    logger.warning(f"添加列注释失败 {col}: {e}")
 
             conn.commit()
-            logger.info(f"目标表 {target_schema}.{target_table} 创建成功")
+            logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 创建成功")
         else:
-            logger.info(f"目标表 {target_schema}.{target_table} 已存在")
+            logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 已存在")
 
     except Exception as e:
         conn.rollback()
@@ -271,17 +198,85 @@ def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
         cursor.close()
 
 
+def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame:
+    """
+    从源表提取数据
+
+    Args:
+        conn: 源数据库连接
+
+    Returns:
+        包含源数据的DataFrame
+    """
+    query = """
+    SELECT
+        id,
+        sku,
+        product_name,
+        category,
+        brand,
+        supplier,
+        warehouse,
+        current_stock,
+        safety_stock,
+        max_stock,
+        unit_cost,
+        selling_price,
+        stock_status,
+        last_inbound_date,
+        last_outbound_date,
+        inbound_quantity_30d,
+        outbound_quantity_30d,
+        turnover_rate,
+        is_active,
+        created_at,
+        updated_at
+    FROM public.product_inventory_table_raw_data
+    """
+
+    logger.info("正在从源表提取数据...")
+
+    try:
+        df = pd.read_sql(query, conn)
+        logger.info(f"成功提取 {len(df)} 条记录")
+        return df
+    except Exception as e:
+        logger.error(f"提取源数据失败: {e}")
+        raise
+
+
+def transform_data(df: pd.DataFrame) -> pd.DataFrame:
+    """
+    数据转换处理
+
+    Args:
+        df: 源数据DataFrame
+
+    Returns:
+        转换后的DataFrame
+    """
+    logger.info("正在执行数据转换...")
+
+    # 本任务为简单导入,无需复杂转换
+    # 源表和目标表字段基本一致,直接映射
+
+    logger.info(f"数据转换完成,共 {len(df)} 条记录")
+    return df
+
+
 def load_to_target(
     df: pd.DataFrame,
     conn: psycopg2.extensions.connection,
+    update_mode: str = "append",
     batch_size: int = 1000,
 ) -> int:
     """
-    将数据加载到目标表(追加模式)
+    将数据加载到目标表
 
     Args:
         df: 要加载的DataFrame
         conn: 目标数据库连接
+        update_mode: 更新模式(append 或 full)
         batch_size: 批量插入大小
 
     Returns:
@@ -293,47 +288,55 @@ def load_to_target(
 
     logger.info(f"正在将 {len(df)} 条记录加载到目标表...")
 
-    target_table = "dags.test_product_inventory"
-
-    # 准备插入的列
-    columns = [
-        "sku",
-        "product_name",
-        "category",
-        "brand",
-        "supplier",
-        "warehouse",
-        "current_stock",
-        "safety_stock",
-        "max_stock",
-        "unit_cost",
-        "selling_price",
-        "stock_status",
-        "last_inbound_date",
-        "last_outbound_date",
-        "inbound_quantity_30d",
-        "outbound_quantity_30d",
-        "turnover_rate",
-        "is_active",
-        "created_at",
-        "updated_at",
-    ]
-
-    # 构建插入SQL(使用完整的 schema.table 格式)
-    placeholders = ", ".join(["%s"] * len(columns))
-    column_names = ", ".join(columns)
-    insert_sql = f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})"
-
+    target_table = f"{TARGET_SCHEMA}.{TARGET_TABLE}"
     cursor = conn.cursor()
     inserted_count = 0
 
     try:
+        # 全量更新模式:先清空目标表
+        if update_mode.lower() == "full":
+            logger.info("全量更新模式:清空目标表...")
+            cursor.execute(f"TRUNCATE TABLE {target_table}")
+            logger.info("目标表已清空")
+
+        # 目标表结构准备插入的列(不包含 create_time,由数据库自动设置)
+        columns = [
+            "sku",
+            "product_name",
+            "category",
+            "brand",
+            "supplier",
+            "warehouse",
+            "current_stock",
+            "safety_stock",
+            "max_stock",
+            "unit_cost",
+            "selling_price",
+            "stock_status",
+            "last_inbound_date",
+            "last_outbound_date",
+            "inbound_quantity_30d",
+            "outbound_quantity_30d",
+            "turnover_rate",
+            "is_active",
+            "created_at",
+            "updated_at",
+        ]
+
+        # 构建插入SQL
+        placeholders = ", ".join(["%s"] * len(columns))
+        column_names = ", ".join(columns)
+        insert_sql = (
+            f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})"
+        )
+
+        # 批量插入
         for i in range(0, len(df), batch_size):
             batch_df = df.iloc[i : i + batch_size]
             records = []
             for _, row in batch_df.iterrows():
                 record = tuple(
-                    row[col] if col in row.index else None for col in columns
+                    None if pd.isna(row.get(col)) else row.get(col) for col in columns
                 )
                 records.append(record)
 
@@ -361,8 +364,8 @@ def main() -> dict[str, Any]:
         执行结果字典
     """
     result = {
-        "task_id": 37,
-        "task_name": "产品库存表原始数据导入",
+        "task_id": TASK_ID,
+        "task_name": TASK_NAME,
         "status": "failed",
         "records_extracted": 0,
         "records_loaded": 0,
@@ -376,7 +379,7 @@ def main() -> dict[str, Any]:
 
     try:
         logger.info("=" * 60)
-        logger.info("任务开始: 产品库存表原始数据导入")
+        logger.info(f"任务开始: {TASK_NAME}")
         logger.info("=" * 60)
 
         # 步骤1: 建立数据库连接
@@ -384,7 +387,7 @@ def main() -> dict[str, Any]:
         source_conn = get_source_connection()
         target_conn = get_target_connection()
 
-        # 步骤2: 确保目标表存在
+        # 步骤2: 确保目标表存在(重要:必须在数据加载前执行)
         logger.info("[Step 2/5] 检查/创建目标表...")
         ensure_target_table_exists(target_conn)
 
@@ -397,9 +400,11 @@ def main() -> dict[str, Any]:
         logger.info("[Step 4/5] 数据转换...")
         df_transformed = transform_data(df)
 
-        # 步骤5: 加载到目标表(追加模式)
+        # 步骤5: 加载到目标表
         logger.info("[Step 5/5] 加载数据到目标表...")
-        records_loaded = load_to_target(df_transformed, target_conn)
+        records_loaded = load_to_target(
+            df_transformed, target_conn, update_mode=UPDATE_MODE
+        )
         result["records_loaded"] = records_loaded
 
         result["status"] = "success"
@@ -439,7 +444,7 @@ if __name__ == "__main__":
         format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
     )
     logger.add(
-        os.path.join(PROJECT_ROOT, "logs", "task_37_产品库存表原始数据导入.log"),
+        os.path.join(PROJECT_ROOT, "logs", f"task_{TASK_ID}.log"),
         level="DEBUG",
         rotation="10 MB",
         retention="7 days",

+ 415 - 0
datafactory/scripts/task_44_DF_DO202601210001.py

@@ -0,0 +1,415 @@
+#!/usr/bin/env python
+"""
+数据流任务脚本 - 仓库库存汇总表_数据流程
+
+任务ID: 44
+任务名称: DF_DO202601210001
+创建时间: 2026-01-21
+更新模式: Append (追加模式)
+
+关联信息:
+- Order ID: 26
+- Order No: DO202601210001
+- DataFlow ID: 2291
+- DataFlow Name: 仓库库存汇总表_数据流程
+- Product ID: 23
+
+描述: 从源表 test_product_inventory 提取仓库名称和库存数量,
+按仓库名称分组求和,输出到 warehouse_inventory_summary 表
+"""
+
+from __future__ import annotations
+
+import os
+import sys
+from datetime import datetime
+from typing import Any
+
+import pandas as pd
+import psycopg2
+from loguru import logger
+
+# 添加项目根目录到Python路径
+PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
+sys.path.insert(0, PROJECT_ROOT)
+
+# 任务配置
+TASK_ID = 44
+TASK_NAME = "DF_DO202601210001"
+UPDATE_MODE = "append"
+
+# 源数据库配置(与目标相同)
+SOURCE_CONFIG = {
+    "host": "192.168.3.143",
+    "port": 5432,  # PostgreSQL 默认端口
+    "database": "dataops",
+    "user": "postgres",
+    "password": "dataOps",
+}
+
+# 目标数据库配置
+TARGET_CONFIG = {
+    "host": "192.168.3.143",
+    "port": 5432,  # PostgreSQL 默认端口
+    "database": "dataops",
+    "user": "postgres",
+    "password": "dataOps",
+}
+
+# 源表配置
+SOURCE_SCHEMA = "dags"
+SOURCE_TABLE = "test_product_inventory"
+
+# 目标表配置
+TARGET_SCHEMA = "dags"
+TARGET_TABLE = "warehouse_inventory_summary"
+
+
+def get_source_connection() -> psycopg2.extensions.connection:
+    """
+    获取源数据库连接
+
+    Returns:
+        psycopg2 连接对象
+    """
+    conn = psycopg2.connect(
+        host=SOURCE_CONFIG["host"],
+        port=SOURCE_CONFIG["port"],
+        database=SOURCE_CONFIG["database"],
+        user=SOURCE_CONFIG["user"],
+        password=SOURCE_CONFIG["password"],
+        options=f"-c search_path={SOURCE_SCHEMA},public",
+    )
+    logger.info("源数据库连接成功")
+    return conn
+
+
+def get_target_connection() -> psycopg2.extensions.connection:
+    """
+    获取目标数据库连接
+
+    Returns:
+        psycopg2 连接对象
+    """
+    conn = psycopg2.connect(
+        host=TARGET_CONFIG["host"],
+        port=TARGET_CONFIG["port"],
+        database=TARGET_CONFIG["database"],
+        user=TARGET_CONFIG["user"],
+        password=TARGET_CONFIG["password"],
+        options=f"-c search_path={TARGET_SCHEMA},public",
+    )
+    logger.info("目标数据库连接成功")
+    return conn
+
+
+def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
+    """
+    确保目标表存在,如果不存在则创建
+
+    Args:
+        conn: 目标数据库连接
+    """
+    cursor = conn.cursor()
+
+    try:
+        # 检查表是否存在
+        cursor.execute(
+            """
+            SELECT EXISTS(
+                SELECT 1 FROM information_schema.tables
+                WHERE table_schema = %s
+                AND table_name = %s
+            )
+        """,
+            (TARGET_SCHEMA, TARGET_TABLE),
+        )
+        result = cursor.fetchone()
+        exists = result[0] if result else False
+
+        if not exists:
+            logger.info(f"目标表不存在,正在创建 {TARGET_SCHEMA}.{TARGET_TABLE}...")
+
+            # 根据任务描述中的 DDL 创建表
+            create_table_sql = f"""
+            CREATE TABLE IF NOT EXISTS {TARGET_SCHEMA}.{TARGET_TABLE} (
+                warehouse_name VARCHAR(255),
+                total_inventory INTEGER,
+                create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            );
+            """
+            cursor.execute(create_table_sql)
+
+            # 添加表注释
+            cursor.execute(
+                f"COMMENT ON TABLE {TARGET_SCHEMA}.{TARGET_TABLE} IS '仓库库存汇总表'"
+            )
+
+            # 添加列注释
+            column_comments = {
+                "warehouse_name": "仓库名称",
+                "total_inventory": "库存数量",
+                "create_time": "数据创建时间",
+            }
+            for col, comment in column_comments.items():
+                try:
+                    cursor.execute(
+                        f"COMMENT ON COLUMN {TARGET_SCHEMA}.{TARGET_TABLE}.{col} IS %s",
+                        (comment,),
+                    )
+                except Exception as e:
+                    logger.warning(f"添加列注释失败 {col}: {e}")
+
+            conn.commit()
+            logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 创建成功")
+        else:
+            logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 已存在")
+
+    except Exception as e:
+        conn.rollback()
+        logger.error(f"创建目标表失败: {e}")
+        raise
+    finally:
+        cursor.close()
+
+
+def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame:
+    """
+    从源表提取数据
+
+    根据任务描述:
+    1. 从源数据'产品库存表'中提取'仓库名称'字段;
+    2. 对'产品库存表'中的'库存数量'字段进行求和计算;
+    3. 按'仓库名称'进行分组;
+    4. 最终输出数据格式包含'仓库名称'和对应的'库存数量'两个字段。
+
+    Args:
+        conn: 源数据库连接
+
+    Returns:
+        包含汇总数据的DataFrame
+    """
+    query = f"""
+    SELECT
+        warehouse AS warehouse_name,
+        SUM(current_stock) AS total_inventory
+    FROM {SOURCE_SCHEMA}.{SOURCE_TABLE}
+    WHERE warehouse IS NOT NULL
+    GROUP BY warehouse
+    ORDER BY warehouse
+    """
+
+    logger.info("正在从源表提取并汇总数据...")
+
+    try:
+        df = pd.read_sql(query, conn)
+        logger.info(f"成功提取 {len(df)} 条汇总记录")
+        return df
+    except Exception as e:
+        logger.error(f"提取源数据失败: {e}")
+        raise
+
+
+def transform_data(df: pd.DataFrame) -> pd.DataFrame:
+    """
+    数据转换处理
+
+    Args:
+        df: 源数据DataFrame
+
+    Returns:
+        转换后的DataFrame
+    """
+    logger.info("正在执行数据转换...")
+
+    # 数据已在SQL中完成汇总,此处仅做数据清洗
+    # 确保字段名称与目标表一致
+    df = df.rename(
+        columns={
+            "warehouse_name": "warehouse_name",
+            "total_inventory": "total_inventory",
+        }
+    )
+
+    # 处理空值:将 None/NaN 的库存数量设为 0
+    df["total_inventory"] = df["total_inventory"].fillna(0).astype(int)
+
+    logger.info(f"数据转换完成,共 {len(df)} 条记录")
+    return df
+
+
+def load_to_target(
+    df: pd.DataFrame,
+    conn: psycopg2.extensions.connection,
+    update_mode: str = "append",
+    batch_size: int = 1000,
+) -> int:
+    """
+    将数据加载到目标表
+
+    Args:
+        df: 要加载的DataFrame
+        conn: 目标数据库连接
+        update_mode: 更新模式(append 或 full)
+        batch_size: 批量插入大小
+
+    Returns:
+        插入的记录数
+    """
+    if df.empty:
+        logger.warning("没有数据需要加载")
+        return 0
+
+    logger.info(f"正在将 {len(df)} 条记录加载到目标表...")
+
+    target_table = f"{TARGET_SCHEMA}.{TARGET_TABLE}"
+    cursor = conn.cursor()
+    inserted_count = 0
+
+    try:
+        # 全量更新模式:先清空目标表
+        if update_mode.lower() == "full":
+            logger.info("全量更新模式:清空目标表...")
+            cursor.execute(f"TRUNCATE TABLE {target_table}")
+            logger.info("目标表已清空")
+
+        # 目标表结构准备插入的列(不包含 create_time,由数据库自动设置)
+        columns = ["warehouse_name", "total_inventory"]
+
+        # 构建插入SQL
+        placeholders = ", ".join(["%s"] * len(columns))
+        column_names = ", ".join(columns)
+        insert_sql = (
+            f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})"
+        )
+
+        # 批量插入
+        for i in range(0, len(df), batch_size):
+            batch_df = df.iloc[i : i + batch_size]
+            records = []
+            for _, row in batch_df.iterrows():
+                record = tuple(
+                    None if pd.isna(row.get(col)) else row.get(col) for col in columns
+                )
+                records.append(record)
+
+            cursor.executemany(insert_sql, records)
+            inserted_count += len(records)
+            logger.debug(f"已插入 {inserted_count}/{len(df)} 条记录")
+
+        conn.commit()
+        logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
+        return inserted_count
+
+    except Exception as e:
+        conn.rollback()
+        logger.error(f"数据加载失败: {e}")
+        raise
+    finally:
+        cursor.close()
+
+
+def main() -> dict[str, Any]:
+    """
+    主函数:执行ETL流程
+
+    Returns:
+        执行结果字典
+    """
+    result = {
+        "task_id": TASK_ID,
+        "task_name": TASK_NAME,
+        "status": "failed",
+        "records_extracted": 0,
+        "records_loaded": 0,
+        "error_message": None,
+        "execution_time": None,
+    }
+
+    start_time = datetime.now()
+    source_conn = None
+    target_conn = None
+
+    try:
+        logger.info("=" * 60)
+        logger.info(f"任务开始: {TASK_NAME}")
+        logger.info("=" * 60)
+
+        # 步骤1: 建立数据库连接
+        logger.info("[Step 1/5] 建立数据库连接...")
+        source_conn = get_source_connection()
+        target_conn = get_target_connection()
+
+        # 步骤2: 确保目标表存在(重要:必须在数据加载前执行)
+        logger.info("[Step 2/5] 检查/创建目标表...")
+        ensure_target_table_exists(target_conn)
+
+        # 步骤3: 从源表提取数据
+        logger.info("[Step 3/5] 提取并汇总源数据...")
+        df = extract_source_data(source_conn)
+        result["records_extracted"] = len(df)
+
+        # 步骤4: 数据转换
+        logger.info("[Step 4/5] 数据转换...")
+        df_transformed = transform_data(df)
+
+        # 步骤5: 加载到目标表
+        logger.info("[Step 5/5] 加载数据到目标表...")
+        records_loaded = load_to_target(
+            df_transformed, target_conn, update_mode=UPDATE_MODE
+        )
+        result["records_loaded"] = records_loaded
+
+        result["status"] = "success"
+        logger.info("=" * 60)
+        logger.info(
+            f"任务完成! 提取: {result['records_extracted']}, 加载: {result['records_loaded']}"
+        )
+        logger.info("=" * 60)
+
+    except Exception as e:
+        result["status"] = "failed"
+        result["error_message"] = str(e)
+        logger.error(f"任务执行失败: {e}")
+        raise
+
+    finally:
+        # 关闭数据库连接
+        if source_conn:
+            source_conn.close()
+            logger.debug("源数据库连接已关闭")
+        if target_conn:
+            target_conn.close()
+            logger.debug("目标数据库连接已关闭")
+
+        result["execution_time"] = str(datetime.now() - start_time)
+
+    return result
+
+
+if __name__ == "__main__":
+    # 配置日志
+    # 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出
+    logger.remove()
+    logger.add(
+        sys.stdout,
+        level="INFO",
+        format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
+    )
+    logger.add(
+        os.path.join(PROJECT_ROOT, "logs", f"task_{TASK_ID}.log"),
+        level="DEBUG",
+        rotation="10 MB",
+        retention="7 days",
+        encoding="utf-8",
+    )
+
+    try:
+        result = main()
+        if result["status"] == "success":
+            sys.exit(0)
+        else:
+            sys.exit(1)
+    except Exception as e:
+        logger.exception(f"脚本执行异常: {e}")
+        sys.exit(1)

+ 270 - 0
docs/cohere_bearer_token_guide.md

@@ -0,0 +1,270 @@
+# Cohere API Key 作为 Bearer Token 使用指南
+
+## 📌 重要说明
+
+**Cohere API Key 本身就是 Bearer Token!**
+
+你不需要"创建"一个新的 Bearer authorization key。Cohere API Key (`4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C`) 可以直接用作 Bearer token 在 HTTP 请求中进行身份验证。
+
+---
+
+## 🔑 Bearer Token 格式
+
+Bearer token 的标准格式是:
+
+```
+Authorization: Bearer <API_KEY>
+```
+
+对于 Cohere API,格式为:
+
+```
+Authorization: Bearer 4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C
+```
+
+---
+
+## 💻 在不同场景中使用
+
+### 1. Python 代码中使用
+
+```python
+import requests
+
+# Cohere API Key
+api_key = "4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C"
+
+# 设置请求头
+headers = {
+    "Authorization": f"Bearer {api_key}",
+    "Content-Type": "application/json",
+}
+
+# 调用 Cohere API
+response = requests.get(
+    "https://api.cohere.ai/v1/models",
+    headers=headers,
+    timeout=10,
+)
+
+print(response.json())
+```
+
+### 2. cURL 命令中使用
+
+```bash
+curl -X GET "https://api.cohere.ai/v1/models" \
+  -H "Authorization: Bearer 4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C" \
+  -H "Content-Type: application/json"
+```
+
+### 3. JavaScript/Node.js 中使用
+
+```javascript
+const apiKey = "4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C";
+
+fetch("https://api.cohere.ai/v1/models", {
+  method: "GET",
+  headers: {
+    "Authorization": `Bearer ${apiKey}`,
+    "Content-Type": "application/json",
+  },
+})
+  .then((response) => response.json())
+  .then((data) => console.log(data));
+```
+
+### 4. n8n HTTP Request 节点中使用
+
+#### 方法 1: 直接在 Header 中设置
+
+1. 添加 **HTTP Request** 节点
+2. 配置如下:
+   - **Method**: `GET` 或 `POST`
+   - **URL**: `https://api.cohere.ai/v1/models`(或其他 Cohere API 端点)
+   - **Authentication**: `None`
+   - **Send Headers**: ✅ 启用
+   - **Headers**:
+     ```
+     Authorization: Bearer 4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C
+     Content-Type: application/json
+     ```
+
+#### 方法 2: 使用表达式(推荐,更安全)
+
+在 n8n 中使用表达式,避免硬编码 API Key:
+
+1. 在 **Headers** 中设置:
+   - **Name**: `Authorization`
+   - **Value**: `Bearer {{ $env.COHERE_API_KEY }}` 或使用凭证
+
+2. 或者使用 n8n 的凭证系统:
+   - 创建 **Generic Credential Type** 凭证
+   - 存储 API Key
+   - 在 HTTP Request 节点中引用
+
+---
+
+## 🔧 在 n8n 中创建通用 Bearer Token 凭证
+
+如果你想在 n8n 中创建一个通用的 Bearer Token 凭证(用于 HTTP Request 节点),可以这样做:
+
+### 步骤 1: 创建 Generic Credential Type
+
+1. 访问: https://n8n.citupro.com/home/credentials
+2. 点击 **"Add Credential"**
+3. 搜索 **"Generic Credential Type"** 或 **"HTTP Header Auth"**
+4. 选择相应的凭证类型
+
+### 步骤 2: 配置凭证
+
+**选项 A: Generic Credential Type**
+- **Name**: `Cohere Bearer Token`
+- **Credential Data**:
+  ```json
+  {
+    "apiKey": "4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C"
+  }
+  ```
+
+**选项 B: HTTP Header Auth**(如果可用)
+- **Name**: `Cohere Bearer Token`
+- **Header Name**: `Authorization`
+- **Header Value**: `Bearer 4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C`
+
+### 步骤 3: 在 HTTP Request 节点中使用
+
+1. 打开 HTTP Request 节点
+2. 在 **Authentication** 字段中选择创建的凭证
+3. 或者手动在 **Headers** 中添加:
+   ```
+   Authorization: Bearer {{ $credentials.cohereBearerToken.apiKey }}
+   ```
+
+---
+
+## 📝 完整示例:调用 Cohere Rerank API
+
+### Python 示例
+
+```python
+import requests
+
+api_key = "4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C"
+
+headers = {
+    "Authorization": f"Bearer {api_key}",
+    "Content-Type": "application/json",
+}
+
+# Rerank API 请求
+data = {
+    "model": "rerank-multilingual-v3.0",
+    "query": "什么是人工智能?",
+    "documents": [
+        "人工智能是计算机科学的一个分支",
+        "机器学习是人工智能的核心技术",
+        "深度学习是机器学习的一个子集",
+    ],
+    "top_n": 2,
+}
+
+response = requests.post(
+    "https://api.cohere.ai/v1/rerank",
+    headers=headers,
+    json=data,
+    timeout=30,
+)
+
+print(response.json())
+```
+
+### n8n HTTP Request 节点配置
+
+**节点配置**:
+- **Method**: `POST`
+- **URL**: `https://api.cohere.ai/v1/rerank`
+- **Authentication**: `None`
+- **Send Headers**: ✅
+- **Headers**:
+  ```
+  Authorization: Bearer 4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C
+  Content-Type: application/json
+  ```
+- **Send Body**: ✅
+- **Body Content Type**: `JSON`
+- **JSON Body**:
+  ```json
+  {
+    "model": "rerank-multilingual-v3.0",
+    "query": "{{ $json.query }}",
+    "documents": {{ $json.documents }},
+    "top_n": 2
+  }
+  ```
+
+---
+
+## 🛠️ 实用脚本
+
+我已经创建了一个测试脚本 `scripts/test_cohere_api_key.py`,它展示了如何使用 Bearer token:
+
+```bash
+python scripts/test_cohere_api_key.py 4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C
+```
+
+这个脚本会:
+1. 使用 Bearer token 格式调用 Cohere API
+2. 验证 API Key 是否有效
+3. 测试 Rerank API 功能
+
+---
+
+## 🔒 安全最佳实践
+
+1. **不要在代码中硬编码 API Key**
+   - 使用环境变量
+   - 使用配置文件(不提交到 Git)
+   - 使用 n8n 凭证系统
+
+2. **使用环境变量**:
+   ```python
+   import os
+   api_key = os.environ.get("COHERE_API_KEY")
+   ```
+
+3. **在 n8n 中使用表达式**:
+   ```
+   Bearer {{ $env.COHERE_API_KEY }}
+   ```
+
+4. **定期轮换 API Key**
+   - 在 Cohere Dashboard 中生成新 Key
+   - 更新所有使用该 Key 的地方
+
+---
+
+## 📚 相关文档
+
+- [Cohere API 文档](https://docs.cohere.com/)
+- [Cohere Rerank API](https://docs.cohere.com/docs/reranking)
+- [n8n HTTP Request 节点文档](https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.httprequest/)
+- [Bearer Token 认证说明](https://oauth.net/2/bearer-tokens/)
+
+---
+
+## ✅ 总结
+
+**你不需要创建新的 Bearer authorization key!**
+
+- Cohere API Key 就是 Bearer token
+- 格式: `Authorization: Bearer <API_KEY>`
+- 直接在 HTTP 请求头中使用即可
+- 在 n8n 中,可以使用 HTTP Request 节点 + Header 配置,或使用 Cohere 专用节点(已配置好凭证)
+
+**你的 Cohere API Key**: `4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C`  
+**Bearer Token 格式**: `Bearer 4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C`
+
+---
+
+**最后更新**: 2026-01-19

+ 160 - 0
docs/n8n_cohere_credential_setup.md

@@ -0,0 +1,160 @@
+# n8n Cohere API Key 凭证配置指南
+
+**目标**: 在 `n8n.citupro.com` 中配置 Cohere API Key 凭证  
+**API Key**: `4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C`  
+**日期**: 2025-01-27
+
+---
+
+## ✅ 凭证已创建成功!
+
+**当前凭证 ID**: `Trtn1MtQhDwHYoPO`  
+**凭证名称**: `Cohere API Key`  
+**凭证类型**: `cohereApi`  
+**创建时间**: 2026-01-19T03:55:55.019Z
+
+**注意**: 如果遇到 "Forbidden" 错误,请参考 [故障排除指南](n8n_cohere_credential_troubleshooting.md)
+
+你现在可以在工作流中使用此凭证了!
+
+---
+
+## 📋 方法一:通过 Web UI 手动配置(推荐)
+
+### 步骤 1: 登录 n8n 实例
+
+1. 访问 **https://n8n.citupro.com**
+2. 使用你的账号登录
+
+### 步骤 2: 进入凭证管理页面
+
+1. 点击左侧边栏的 **"Settings"**(设置)图标
+2. 在设置菜单中,点击 **"Credentials"**(凭证)
+3. 或者直接访问:`https://n8n.citupro.com/home/credentials`
+
+### 步骤 3: 创建新的 Cohere 凭证
+
+1. 点击页面右上角的 **"Add Credential"**(添加凭证)按钮
+2. 在搜索框中输入 **"Cohere"**
+3. 选择 **"Cohere API"** 凭证类型
+
+### 步骤 4: 填写凭证信息
+
+在凭证配置表单中填写:
+
+- **Name**(名称): `Cohere API Key` 或 `Cohere - Production`(便于识别的名称)
+- **API Key**: `4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C`
+
+### 步骤 5: 保存凭证
+
+1. 点击 **"Save"**(保存)按钮
+2. n8n 会自动验证 API Key 是否有效
+3. 如果验证成功,凭证会出现在凭证列表中
+
+---
+
+## 🔧 方法二:通过 API 脚本配置(实验性)
+
+如果 n8n API 支持凭证管理,可以使用以下脚本:
+
+### 使用脚本创建凭证
+
+```bash
+python scripts/create_n8n_cohere_credential.py
+```
+
+**注意**: n8n 的凭证管理 API 可能有限制,如果 API 不支持,请使用方法一(Web UI)。
+
+---
+
+## ✅ 验证配置
+
+### 在工作流中使用凭证
+
+1. 打开任意包含 **Reranker Cohere** 节点的工作流
+2. 点击 **Reranker Cohere** 节点
+3. 在右侧配置面板中,找到 **"Credential to connect with"** 字段
+4. 从下拉菜单中选择刚才创建的 **"Cohere API Key"** 凭证
+5. 保存节点配置
+
+### 测试凭证是否有效
+
+1. 创建一个简单的测试工作流:
+   - 添加 **Manual Trigger** 节点
+   - 添加 **Reranker Cohere** 节点
+   - 配置模型为 `rerank-multilingual-v3.0`
+   - 连接两个节点
+2. 点击 **"Execute Workflow"** 测试
+3. 如果执行成功,说明凭证配置正确
+
+---
+
+## 📝 凭证使用示例
+
+### 在工作流 JSON 中引用凭证
+
+```json
+{
+  "parameters": {
+    "modelName": "rerank-multilingual-v3.0",
+    "topN": 5
+  },
+  "type": "@n8n/n8n-nodes-langchain.rerankerCohere",
+  "name": "Reranker Cohere",
+  "credentials": {
+    "cohereApi": {
+      "id": "凭证ID",
+      "name": "Cohere API Key"
+    }
+  }
+}
+```
+
+**获取凭证 ID**:
+1. 在凭证列表中点击创建的凭证
+2. 从浏览器地址栏获取 ID:`/home/credentials/{凭证ID}`
+
+---
+
+## 🔒 安全注意事项
+
+1. **不要公开 API Key**: 在分享工作流 JSON 或截图时,不要包含 API Key
+2. **权限管理**: 确保只有授权用户可以访问凭证
+3. **定期轮换**: 建议定期更换 API Key 以提高安全性
+4. **凭证加密**: n8n 会自动加密存储凭证,不会明文显示
+
+---
+
+## 🆘 常见问题
+
+### Q1: 找不到 "Cohere API" 凭证类型?
+
+**A**: 确保你的 n8n 版本支持 Cohere 节点。如果使用的是较旧版本,可能需要:
+- 更新 n8n 到最新版本
+- 或安装 `@n8n/n8n-nodes-langchain` 包
+
+### Q2: API Key 验证失败?
+
+**A**: 检查:
+- API Key 是否正确复制(没有多余空格)
+- Cohere 账户是否有效
+- 网络连接是否正常
+
+### Q3: 如何查看凭证 ID?
+
+**A**: 
+1. 进入 Settings → Credentials
+2. 点击目标凭证
+3. 从浏览器地址栏获取:`/home/credentials/{凭证ID}`
+
+---
+
+## 📚 相关文档
+
+- [n8n Cohere 节点文档](https://docs.n8n.io/integrations/builtin/credentials/cohere/)
+- [n8n 凭证管理指南](https://docs.n8n.io/credentials/add-edit-credentials/)
+- [Cohere API 文档](https://docs.cohere.com/)
+
+---
+
+**配置完成后,你就可以在工作流中使用 Cohere Reranker 节点了!** 🎉

+ 188 - 0
docs/n8n_cohere_credential_troubleshooting.md

@@ -0,0 +1,188 @@
+# n8n Cohere API Key 凭证故障排除指南
+
+## 🔴 错误信息
+
+```
+Couldn't connect with these settings
+Forbidden - perhaps check your credentials?
+```
+
+---
+
+## ✅ 已完成的修复
+
+**新凭证 ID**: `Trtn1MtQhDwHYoPO`  
+**凭证名称**: `Cohere API Key`  
+**创建时间**: 2026-01-19T03:55:55.019Z
+
+旧凭证 `xpFKP9JT4PmYZw61` 已删除,新凭证已创建。
+
+---
+
+## 🔍 问题诊断步骤
+
+### 步骤 1: 验证 API Key 本身是否有效
+
+运行测试脚本:
+
+```bash
+python scripts/test_cohere_api_key.py 4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C
+```
+
+**预期结果**: 应该返回 `成功: True` 和 `状态码: 200`
+
+如果测试失败,说明 API Key 本身有问题,需要:
+- 检查 Cohere Dashboard: https://dashboard.cohere.com/
+- 确认 API Key 是否有效
+- 检查账户状态和权限
+
+### 步骤 2: 检查 n8n 凭证配置
+
+1. 访问凭证管理页面: https://n8n.citupro.com/home/credentials
+2. 找到 "Cohere API Key" 凭证(ID: `Trtn1MtQhDwHYoPO`)
+3. 点击打开凭证详情
+4. 检查 API Key 字段:
+   - 确保没有多余的空格
+   - 确保没有换行符
+   - 确保完整复制了所有字符
+
+### 步骤 3: 在工作流中重新选择凭证
+
+1. 打开使用 Cohere Reranker 节点的工作流
+2. 点击 **Reranker Cohere** 节点
+3. 在 **"Credential to connect with"** 字段中:
+   - 先选择 "None"(清除旧凭证)
+   - 保存节点
+   - 再次打开节点
+   - 选择 "Cohere API Key" 凭证
+4. 保存节点配置
+
+### 步骤 4: 测试节点连接
+
+1. 在 Reranker Cohere 节点配置中,点击 **"Test step"** 或 **"Execute Node"**
+2. 观察是否还有 "Forbidden" 错误
+
+---
+
+## 🛠️ 常见原因和解决方案
+
+### 原因 1: API Key 格式问题
+
+**症状**: API Key 在复制时可能包含隐藏字符
+
+**解决方案**:
+1. 在 n8n 凭证编辑页面,完全删除 API Key 字段内容
+2. 重新输入 API Key(不要复制粘贴,手动输入)
+3. 或者使用脚本重新创建凭证
+
+### 原因 2: 凭证缓存问题
+
+**症状**: 即使更新了凭证,节点仍然使用旧配置
+
+**解决方案**:
+1. 在工作流中,先断开凭证连接(选择 "None")
+2. 保存工作流
+3. 重新打开节点,选择新凭证
+4. 保存工作流
+
+### 原因 3: Cohere 账户权限问题
+
+**症状**: API Key 有效,但某些功能被限制
+
+**解决方案**:
+1. 检查 Cohere Dashboard 中的账户状态
+2. 确认账户是否有使用 Reranker API 的权限
+3. 检查 API 使用限制和配额
+
+### 原因 4: n8n 版本兼容性问题
+
+**症状**: 凭证类型不被识别
+
+**解决方案**:
+1. 检查 n8n 版本是否支持 Cohere 节点
+2. 确保安装了 `@n8n/n8n-nodes-langchain` 包
+3. 更新 n8n 到最新版本
+
+---
+
+## 🔧 手动修复步骤
+
+如果脚本修复不起作用,请手动修复:
+
+### 方法 1: 通过 Web UI 更新凭证
+
+1. 访问: https://n8n.citupro.com/home/credentials
+2. 找到 "Cohere API Key" 凭证
+3. 点击编辑
+4. 在 API Key 字段中:
+   - 完全删除现有内容
+   - 重新输入: `4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C`
+   - **注意**: 确保没有前后空格
+5. 点击 "Test connection" 或 "Save"
+6. 如果测试失败,检查错误信息
+
+### 方法 2: 删除并重新创建凭证
+
+1. 访问凭证管理页面
+2. 删除现有的 "Cohere API Key" 凭证
+3. 创建新凭证:
+   - 类型: Cohere API
+   - 名称: Cohere API Key
+   - API Key: `4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C`
+4. 保存并测试
+
+### 方法 3: 使用脚本重新创建
+
+```bash
+# 删除旧凭证并创建新凭证
+python scripts/update_n8n_cohere_credential.py --recreate
+
+# 或者指定要删除的凭证 ID
+python scripts/update_n8n_cohere_credential.py --recreate --credential-id <旧凭证ID>
+```
+
+---
+
+## 📋 验证清单
+
+完成修复后,请验证以下项目:
+
+- [ ] API Key 测试脚本返回成功
+- [ ] n8n 凭证配置中 API Key 正确(无空格、无换行)
+- [ ] 工作流节点已选择正确的凭证
+- [ ] 节点测试/执行不再报 "Forbidden" 错误
+- [ ] 可以正常使用 Reranker 功能
+
+---
+
+## 🆘 如果问题仍然存在
+
+如果按照以上步骤操作后问题仍然存在,请检查:
+
+1. **网络连接**: 确保 n8n 服务器可以访问 `api.cohere.ai`
+2. **防火墙规则**: 检查是否有防火墙阻止了 API 请求
+3. **Cohere 服务状态**: 检查 Cohere 服务是否正常运行
+4. **n8n 日志**: 查看 n8n 服务器日志获取详细错误信息
+
+### 获取详细错误信息
+
+1. 在 n8n 工作流编辑器中,点击失败的节点
+2. 查看 "Execution Data" 或 "Error" 标签
+3. 复制完整的错误信息
+4. 检查错误堆栈和详细信息
+
+---
+
+## 📞 联系支持
+
+如果问题持续存在,请提供以下信息:
+
+1. n8n 版本号
+2. Cohere API Key 测试结果(使用测试脚本)
+3. 完整的错误信息(从 n8n 节点执行结果中复制)
+4. 凭证配置截图(隐藏 API Key 敏感部分)
+
+---
+
+**最后更新**: 2026-01-19  
+**凭证 ID**: `Trtn1MtQhDwHYoPO`

+ 41 - 21
scripts/auto_execute_tasks.py

@@ -621,20 +621,35 @@ def sync_completed_tasks_to_db() -> int:
             # 使用 code_file 判断是否为 Python 脚本
             is_python_script = code_file and code_file.endswith(".py")
 
+            # 修复路径重复问题:统一处理脚本路径
             if is_python_script:
-                logger.info(f"任务 {task_id} 使用 Python 脚本: {code_path}/{code_file}")
+                if code_file.startswith(code_path):
+                    # code_file 已经是完整路径
+                    full_script_path = code_file
+                    # 提取纯文件名用于数据库存储
+                    code_file_name = Path(code_file).name
+                elif "/" in code_file or "\\" in code_file:
+                    # code_file 包含其他路径,提取文件名
+                    code_file_name = Path(code_file).name
+                    full_script_path = f"{code_path}/{code_file_name}"
+                else:
+                    # code_file 只是文件名
+                    code_file_name = code_file
+                    full_script_path = f"{code_path}/{code_file}"
+                logger.info(f"任务 {task_id} 使用 Python 脚本: {full_script_path}")
             else:
                 logger.info(
                     f"任务 {task_id} 的 code_file ({code_file}) 不是 Python 脚本,跳过 DataFlow 更新"
                 )
+                code_file_name = code_file
+                full_script_path = ""
 
-            if update_task_status(task_id, "completed", code_file, code_path):
+            if update_task_status(task_id, "completed", code_file_name, code_path):
                 updated += 1
                 logger.info(f"已同步任务 {task_id} 为 completed")
 
                 # 只有 Python 脚本才更新 DataFlow 节点的 script_path
                 if task_name and is_python_script:
-                    full_script_path = f"{code_path}/{code_file}"
                     if update_dataflow_script_path(
                         task_name, full_script_path, task_id=task_id
                     ):
@@ -1067,11 +1082,11 @@ def find_remote_workflow_files(task_info: dict[str, Any]) -> list[str]:
                 remote_files.append(remote_file)
                 logger.info(f"  匹配到工作流: {Path(remote_file).name}")
 
-        # 如果没有匹配到任何文件,添加所有文件(让用户决定)
-        # 注意:可以根据业务需求调整此策略
+        # 如果没有匹配到任何文件,不再自动部署所有文件
+        # 这样可以避免误部署其他任务的工作流
         if not remote_files and all_json_files:
-            logger.info("没有精确匹配的工作流,将尝试部署所有远程工作流文件")
-            remote_files = all_json_files
+            logger.info("没有精确匹配的工作流文件,跳过远程工作流部署")
+            # 不再自动部署所有文件,避免重复部署问题
 
         ssh.close()
         return remote_files
@@ -1175,25 +1190,18 @@ def deploy_remote_workflow_to_n8n(remote_file_path: str) -> bool:
                         break
 
                 if existing_wf:
-                    # 更新已存在的工作流
+                    # 已存在同名工作流,跳过创建避免重复
                     workflow_id = existing_wf.get("id")
-                    update_url = f"{api_url.rstrip('/')}/api/v1/workflows/{workflow_id}"
-                    logger.info(f"发现已存在的工作流 (ID: {workflow_id}),将更新...")
-
-                    update_response = requests.patch(
-                        update_url,
-                        headers=headers,
-                        json=workflow_payload,
-                        timeout=timeout,
+                    logger.info(
+                        f"发现已存在的工作流 (ID: {workflow_id}),跳过部署避免重复"
                     )
-                    update_response.raise_for_status()
                     logger.info(
-                        f"✅ 工作流更新成功! ID: {workflow_id}, 名称: {workflow_name}"
+                        "如需更新工作流,请手动在 n8n 控制台操作或删除后重新部署"
                     )
-                    return True
+                    return True  # 返回成功,因为工作流已存在
 
         except requests.exceptions.RequestException as e:
-            logger.debug(f"检查已存在工作流时出错(将尝试创建新工作流): {e}")
+            logger.warning(f"检查已存在工作流时出错: {e}")
 
         # 调用 n8n API 创建工作流
         create_url = f"{api_url.rstrip('/')}/api/v1/workflows"
@@ -1416,7 +1424,19 @@ def auto_deploy_completed_task(task_info: dict[str, Any]) -> bool:
 
     # 1. 部署 Python 脚本
     if actual_script_file.endswith(".py"):
-        script_path = f"{code_path}/{actual_script_file}"
+        # 修复路径重复问题:如果 actual_script_file 已经包含 code_path,则只使用 actual_script_file
+        # 否则拼接 code_path 和 actual_script_file
+        if actual_script_file.startswith(code_path):
+            # actual_script_file 已经是完整路径,如 "datafactory/scripts/task_41_xxx.py"
+            script_path = actual_script_file
+        elif "/" in actual_script_file or "\\" in actual_script_file:
+            # actual_script_file 包含路径分隔符但不以 code_path 开头
+            # 可能是其他格式的路径,提取文件名后拼接
+            script_filename = Path(actual_script_file).name
+            script_path = f"{code_path}/{script_filename}"
+        else:
+            # actual_script_file 只是文件名,正常拼接
+            script_path = f"{code_path}/{actual_script_file}"
         logger.info(f"📦 部署 Python 脚本: {script_path}")
 
         if deploy_script_to_production(script_path):

+ 190 - 0
scripts/cohere_api_example.py

@@ -0,0 +1,190 @@
+"""
+Cohere API 使用示例 - 展示如何使用 Bearer Token
+
+演示如何使用 Cohere API Key 作为 Bearer token 调用 Cohere API
+"""
+
+import os
+import sys
+from typing import Any, Dict, List, Optional
+
+import requests
+from loguru import logger
+
+# 配置日志
+logger.remove()
+logger.add(
+    sys.stdout,
+    level="INFO",
+    format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}",
+)
+
+
+class CohereAPIClient:
+    """Cohere API 客户端 - 使用 Bearer Token 认证"""
+
+    def __init__(self, api_key: Optional[str] = None):
+        """
+        初始化 Cohere API 客户端
+
+        Args:
+            api_key: Cohere API Key(如果不提供,从环境变量读取)
+        """
+        self.api_key = api_key or os.environ.get("COHERE_API_KEY")
+        if not self.api_key:
+            raise ValueError("请提供 Cohere API Key 或设置 COHERE_API_KEY 环境变量")
+
+        self.base_url = "https://api.cohere.ai/v1"
+        self.headers = {
+            "Authorization": f"Bearer {self.api_key}",
+            "Content-Type": "application/json",
+        }
+
+    def list_models(self) -> Dict[str, Any]:
+        """
+        获取可用的模型列表
+
+        Returns:
+            模型列表
+        """
+        logger.info("获取模型列表...")
+        response = requests.get(
+            f"{self.base_url}/models",
+            headers=self.headers,
+            timeout=10,
+        )
+        response.raise_for_status()
+        return response.json()
+
+    def rerank(
+        self,
+        query: str,
+        documents: List[str],
+        model: str = "rerank-multilingual-v3.0",
+        top_n: int = 3,
+    ) -> Dict[str, Any]:
+        """
+        使用 Rerank API 对文档进行重排序
+
+        Args:
+            query: 查询文本
+            documents: 文档列表
+            model: 使用的模型(默认: rerank-multilingual-v3.0)
+            top_n: 返回前 N 个结果
+
+        Returns:
+            重排序结果
+        """
+        logger.info(f"使用模型 {model} 对 {len(documents)} 个文档进行重排序...")
+
+        data = {
+            "model": model,
+            "query": query,
+            "documents": documents,
+            "top_n": top_n,
+        }
+
+        response = requests.post(
+            f"{self.base_url}/rerank",
+            headers=self.headers,
+            json=data,
+            timeout=30,
+        )
+        response.raise_for_status()
+        return response.json()
+
+    def test_connection(self) -> bool:
+        """
+        测试 API 连接是否正常
+
+        Returns:
+            连接是否成功
+        """
+        try:
+            result = self.list_models()
+            logger.success(f"连接成功!可用模型数量: {len(result.get('models', []))}")
+            return True
+        except Exception as e:
+            logger.error(f"连接失败: {str(e)}")
+            return False
+
+
+def main():
+    """主函数 - 演示如何使用 Cohere API"""
+    # 从命令行参数或环境变量获取 API Key
+    api_key = sys.argv[1] if len(sys.argv) > 1 else os.environ.get("COHERE_API_KEY")
+
+    if not api_key:
+        logger.error("请提供 Cohere API Key")
+        logger.info("使用方法: python cohere_api_example.py <API_KEY>")
+        logger.info("或设置环境变量: COHERE_API_KEY=<API_KEY>")
+        return 1
+
+    # 创建客户端
+    client = CohereAPIClient(api_key=api_key)
+
+    # 测试连接
+    logger.info("=" * 60)
+    logger.info("测试 API 连接...")
+    logger.info("=" * 60)
+    if not client.test_connection():
+        return 1
+
+    # 示例 1: 获取模型列表
+    logger.info("\n" + "=" * 60)
+    logger.info("示例 1: 获取模型列表")
+    logger.info("=" * 60)
+    try:
+        models = client.list_models()
+        logger.info(f"可用模型: {len(models.get('models', []))} 个")
+        # 显示前几个模型
+        for model in models.get("models", [])[:5]:
+            logger.info(f"  - {model.get('name', 'N/A')}")
+    except Exception as e:
+        logger.error(f"获取模型列表失败: {str(e)}")
+
+    # 示例 2: 使用 Rerank API
+    logger.info("\n" + "=" * 60)
+    logger.info("示例 2: 使用 Rerank API 重排序文档")
+    logger.info("=" * 60)
+
+    query = "什么是人工智能?"
+    documents = [
+        "人工智能是计算机科学的一个分支,致力于创建能够执行通常需要人类智能的任务的系统。",
+        "机器学习是人工智能的核心技术之一,它使计算机能够从数据中学习。",
+        "深度学习是机器学习的一个子集,使用神经网络来模拟人脑的工作方式。",
+        "自然语言处理是人工智能的一个领域,专注于计算机与人类语言之间的交互。",
+    ]
+
+    try:
+        result = client.rerank(
+            query=query,
+            documents=documents,
+            model="rerank-multilingual-v3.0",
+            top_n=2,
+        )
+
+        logger.success("重排序完成!")
+        logger.info(f"查询: {query}")
+        logger.info(f"返回了 {len(result.get('results', []))} 个结果:\n")
+
+        for idx, item in enumerate(result.get("results", []), 1):
+            doc_index = item.get("index", 0)
+            relevance_score = item.get("relevance_score", 0)
+            document = documents[doc_index] if doc_index < len(documents) else "N/A"
+            logger.info(f"{idx}. 文档 {doc_index + 1} (相关性: {relevance_score:.4f})")
+            logger.info(f"   {document[:80]}...")
+
+    except Exception as e:
+        logger.error(f"Rerank API 调用失败: {str(e)}")
+        return 1
+
+    logger.info("\n" + "=" * 60)
+    logger.success("所有示例执行完成!")
+    logger.info("=" * 60)
+
+    return 0
+
+
+if __name__ == "__main__":
+    sys.exit(main())

+ 161 - 0
scripts/create_n8n_cohere_credential.py

@@ -0,0 +1,161 @@
+"""
+创建 n8n Cohere API Key 凭证的脚本
+
+注意: n8n 的凭证管理 API 可能有限制,如果 API 不支持,请使用 Web UI 手动配置。
+参考文档: docs/n8n_cohere_credential_setup.md
+"""
+
+import os
+import sys
+import json
+from typing import Optional, Dict, Any
+
+import requests
+from loguru import logger
+
+# 添加项目根目录到路径
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+
+from app.config.config import BaseConfig
+
+
+def create_cohere_credential(
+    api_url: Optional[str] = None,
+    api_key: Optional[str] = None,
+    cohere_api_key: str = "4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C",
+    credential_name: str = "Cohere API Key",
+) -> Dict[str, Any]:
+    """
+    尝试通过 n8n API 创建 Cohere 凭证
+
+    Args:
+        api_url: n8n API 地址
+        api_key: n8n API Key
+        cohere_api_key: Cohere API Key 值
+        credential_name: 凭证名称
+
+    Returns:
+        创建结果
+
+    Note:
+        n8n 的凭证管理 API 可能不支持直接创建凭证(出于安全考虑)。
+        如果 API 调用失败,请使用 Web UI 手动配置。
+    """
+    # 获取配置
+    if api_url is None or api_key is None:
+        config = BaseConfig()
+        api_url = api_url or config.N8N_API_URL
+        api_key = api_key or config.N8N_API_KEY
+
+    base_url = api_url.rstrip("/")
+    headers = {
+        "X-N8N-API-KEY": api_key,
+        "Content-Type": "application/json",
+        "Accept": "application/json",
+    }
+
+    # n8n 凭证 API 端点(可能不存在或需要特殊权限)
+    # 注意: n8n 的凭证管理通常需要通过 Web UI 完成
+    credential_data = {
+        "name": credential_name,
+        "type": "cohereApi",  # 凭证类型
+        "data": {
+            "apiKey": cohere_api_key,
+        },
+    }
+
+    # 尝试多个可能的 API 端点
+    endpoints = [
+        "/api/v1/credentials",  # 标准凭证 API
+        "/rest/credentials",  # REST API
+    ]
+
+    for endpoint in endpoints:
+        url = f"{base_url}{endpoint}"
+        logger.info(f"尝试创建凭证: {url}")
+
+        try:
+            response = requests.post(
+                url,
+                headers=headers,
+                json=credential_data,
+                timeout=30,
+            )
+
+            logger.debug(f"响应状态码: {response.status_code}")
+            logger.debug(f"响应内容: {response.text}")
+
+            if response.status_code == 200 or response.status_code == 201:
+                result = response.json()
+                logger.success(f"✅ 凭证创建成功: {result}")
+                return {
+                    "success": True,
+                    "message": "凭证创建成功",
+                    "data": result,
+                }
+            elif response.status_code == 401:
+                logger.error("❌ API 认证失败,请检查 n8n API Key")
+                return {
+                    "success": False,
+                    "message": "API 认证失败,请检查 n8n API Key",
+                    "error": "Unauthorized",
+                }
+            elif response.status_code == 403:
+                logger.warning("⚠️  API 权限不足,凭证管理可能需要 Owner 权限")
+                return {
+                    "success": False,
+                    "message": "API 权限不足,请使用 Web UI 手动配置",
+                    "error": "Forbidden",
+                }
+            elif response.status_code == 404:
+                logger.warning(f"⚠️  端点不存在: {endpoint}")
+                continue  # 尝试下一个端点
+            else:
+                logger.warning(
+                    f"⚠️  请求失败: {response.status_code} - {response.text}"
+                )
+
+        except requests.exceptions.RequestException as e:
+            logger.error(f"❌ 请求异常: {str(e)}")
+            continue
+
+    # 所有端点都失败,建议使用 Web UI
+    logger.warning(
+        "⚠️  无法通过 API 创建凭证。n8n 的凭证管理通常需要通过 Web UI 完成。"
+    )
+    logger.info("📝 请参考文档手动配置: docs/n8n_cohere_credential_setup.md")
+
+    return {
+        "success": False,
+        "message": "无法通过 API 创建凭证,请使用 Web UI 手动配置",
+        "manual_setup_url": f"{base_url}/home/credentials",
+        "guide": "docs/n8n_cohere_credential_setup.md",
+    }
+
+
+def main():
+    """主函数"""
+    logger.info("🚀 开始创建 n8n Cohere API Key 凭证...")
+    logger.info("API Key: 4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C")
+
+    result = create_cohere_credential()
+
+    print("\n" + "=" * 60)
+    print("执行结果:")
+    print("=" * 60)
+    print(json.dumps(result, indent=2, ensure_ascii=False))
+
+    if not result.get("success"):
+        print("\n" + "=" * 60)
+        print("建议:")
+        print("=" * 60)
+        print("由于 n8n 的凭证管理 API 可能有限制,")
+        print("请使用 Web UI 手动配置凭证。")
+        print(f"\n详细步骤请参考: {result.get('guide', 'docs/n8n_cohere_credential_setup.md')}")
+        print(f"凭证管理页面: {result.get('manual_setup_url', 'https://n8n.citupro.com/home/credentials')}")
+
+    return 0 if result.get("success") else 1
+
+
+if __name__ == "__main__":
+    sys.exit(main())

+ 186 - 0
scripts/test_cohere_api_key.py

@@ -0,0 +1,186 @@
+"""
+测试 Cohere API Key 是否有效
+
+用于验证 API Key 是否可以正常使用
+"""
+
+import sys
+import os
+from typing import Optional
+
+import requests
+from loguru import logger
+
+# 配置日志
+logger.remove()
+logger.add(
+    sys.stdout,
+    level="INFO",
+    format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>",
+)
+
+
+def test_cohere_api_key(api_key: str) -> dict:
+    """
+    测试 Cohere API Key 是否有效
+
+    Args:
+        api_key: Cohere API Key
+
+    Returns:
+        测试结果
+    """
+    # Cohere API 端点
+    base_url = "https://api.cohere.ai/v1"
+
+    headers = {
+        "Authorization": f"Bearer {api_key}",
+        "Content-Type": "application/json",
+    }
+
+    # 测试 1: 检查 API Key 基本信息
+    logger.info("测试 1: 检查 API Key 基本信息...")
+    try:
+        # 使用 models 端点测试(轻量级)
+        response = requests.get(
+            f"{base_url}/models",
+            headers=headers,
+            timeout=10,
+        )
+
+        logger.debug(f"响应状态码: {response.status_code}")
+        logger.debug(f"响应内容: {response.text[:200]}")
+
+        if response.status_code == 200:
+            logger.success("✅ API Key 验证成功!")
+            models = response.json()
+            return {
+                "success": True,
+                "message": "API Key 有效",
+                "status_code": response.status_code,
+                "models_count": len(models.get("models", [])),
+            }
+        elif response.status_code == 401:
+            logger.error("❌ API Key 认证失败 - 无效的 API Key")
+            return {
+                "success": False,
+                "message": "API Key 认证失败",
+                "error": "Unauthorized",
+                "status_code": 401,
+            }
+        elif response.status_code == 403:
+            logger.error("❌ API Key 权限不足 - 可能是账户权限问题")
+            return {
+                "success": False,
+                "message": "API Key 权限不足",
+                "error": "Forbidden",
+                "status_code": 403,
+            }
+        else:
+            logger.warning(f"⚠️  意外的响应状态码: {response.status_code}")
+            return {
+                "success": False,
+                "message": f"意外的响应: {response.status_code}",
+                "error": response.text[:200],
+                "status_code": response.status_code,
+            }
+
+    except requests.exceptions.RequestException as e:
+        logger.error(f"❌ 请求异常: {str(e)}")
+        return {
+            "success": False,
+            "message": f"请求失败: {str(e)}",
+            "error": str(e),
+        }
+
+    # 测试 2: 尝试使用 rerank 端点(如果测试1失败)
+    logger.info("测试 2: 尝试使用 rerank 端点...")
+    try:
+        rerank_data = {
+            "model": "rerank-multilingual-v3.0",
+            "query": "test query",
+            "documents": ["test document 1", "test document 2"],
+            "top_n": 2,
+        }
+
+        response = requests.post(
+            f"{base_url}/rerank",
+            headers=headers,
+            json=rerank_data,
+            timeout=10,
+        )
+
+        logger.debug(f"Rerank 响应状态码: {response.status_code}")
+        logger.debug(f"Rerank 响应内容: {response.text[:200]}")
+
+        if response.status_code == 200:
+            logger.success("✅ Rerank API 测试成功!")
+            return {
+                "success": True,
+                "message": "Rerank API 可用",
+                "status_code": response.status_code,
+            }
+        else:
+            logger.warning(f"⚠️  Rerank API 测试失败: {response.status_code}")
+            return {
+                "success": False,
+                "message": f"Rerank API 测试失败: {response.status_code}",
+                "error": response.text[:200],
+                "status_code": response.status_code,
+            }
+
+    except requests.exceptions.RequestException as e:
+        logger.error(f"❌ Rerank 请求异常: {str(e)}")
+        return {
+            "success": False,
+            "message": f"Rerank 请求失败: {str(e)}",
+            "error": str(e),
+        }
+
+
+def main():
+    """主函数"""
+    # 从命令行参数或环境变量获取 API Key
+    api_key = sys.argv[1] if len(sys.argv) > 1 else os.environ.get("COHERE_API_KEY")
+
+    if not api_key:
+        logger.error("❌ 请提供 Cohere API Key")
+        logger.info("使用方法: python test_cohere_api_key.py <API_KEY>")
+        logger.info("或设置环境变量: COHERE_API_KEY=<API_KEY>")
+        return 1
+
+    # 隐藏部分 API Key(安全显示)
+    masked_key = api_key[:8] + "..." + api_key[-4:] if len(api_key) > 12 else "***"
+    logger.info(f"🔍 测试 Cohere API Key: {masked_key}")
+
+    result = test_cohere_api_key(api_key)
+
+    print("\n" + "=" * 60)
+    print("测试结果:")
+    print("=" * 60)
+    print(f"成功: {result.get('success', False)}")
+    print(f"消息: {result.get('message', 'N/A')}")
+    if result.get("error"):
+        print(f"错误: {result.get('error')}")
+    if result.get("status_code"):
+        print(f"状态码: {result.get('status_code')}")
+
+    if not result.get("success"):
+        print("\n" + "=" * 60)
+        print("可能的原因:")
+        print("=" * 60)
+        print("1. API Key 无效或已过期")
+        print("2. API Key 格式不正确(应该没有空格或换行)")
+        print("3. Cohere 账户权限不足")
+        print("4. API Key 未激活或账户未验证")
+        print("5. 网络连接问题")
+        print("\n建议:")
+        print("- 检查 Cohere Dashboard: https://dashboard.cohere.com/")
+        print("- 确认 API Key 是否正确复制(无多余空格)")
+        print("- 检查账户状态和权限")
+
+    return 0 if result.get("success") else 1
+
+
+if __name__ == "__main__":
+    sys.exit(main())

+ 309 - 0
scripts/update_n8n_cohere_credential.py

@@ -0,0 +1,309 @@
+"""
+更新 n8n 中的 Cohere API Key 凭证
+
+用于修复 "Forbidden" 错误,重新配置凭证
+"""
+
+import os
+import sys
+import json
+from typing import Optional, Dict, Any
+
+import requests
+from loguru import logger
+
+# 添加项目根目录到路径
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+
+from app.config.config import BaseConfig
+
+# 配置日志(避免 emoji 编码问题)
+logger.remove()
+logger.add(
+    sys.stdout,
+    level="INFO",
+    format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}",
+)
+
+
+def update_cohere_credential(
+    credential_id: str,
+    api_url: Optional[str] = None,
+    api_key: Optional[str] = None,
+    cohere_api_key: str = "4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C",
+    credential_name: Optional[str] = None,
+) -> Dict[str, Any]:
+    """
+    更新 n8n 中的 Cohere 凭证
+
+    Args:
+        credential_id: 凭证 ID
+        api_url: n8n API 地址
+        api_key: n8n API Key
+        cohere_api_key: Cohere API Key 值
+        credential_name: 凭证名称(可选)
+
+    Returns:
+        更新结果
+    """
+    # 获取配置
+    if api_url is None or api_key is None:
+        config = BaseConfig()
+        api_url = api_url or config.N8N_API_URL
+        api_key = api_key or config.N8N_API_KEY
+
+    base_url = api_url.rstrip("/")
+    headers = {
+        "X-N8N-API-KEY": api_key,
+        "Content-Type": "application/json",
+        "Accept": "application/json",
+    }
+
+    # 先获取现有凭证信息
+    logger.info(f"获取凭证信息: {credential_id}")
+    try:
+        get_response = requests.get(
+            f"{base_url}/api/v1/credentials/{credential_id}",
+            headers=headers,
+            timeout=30,
+        )
+
+        if get_response.status_code == 200:
+            existing_credential = get_response.json()
+            logger.info(f"现有凭证名称: {existing_credential.get('name')}")
+            credential_name = credential_name or existing_credential.get("name", "Cohere API Key")
+        else:
+            logger.warning(f"无法获取现有凭证: {get_response.status_code}")
+            credential_name = credential_name or "Cohere API Key"
+
+    except Exception as e:
+        logger.warning(f"获取凭证信息失败: {str(e)}")
+        credential_name = credential_name or "Cohere API Key"
+
+    # 准备更新数据
+    # 注意: n8n 凭证更新可能需要特定的数据格式
+    update_data = {
+        "name": credential_name,
+        "type": "cohereApi",
+        "data": {
+            "apiKey": cohere_api_key,
+        },
+    }
+
+    logger.info(f"更新凭证: {credential_id}")
+    logger.info(f"API Key (前8位): {cohere_api_key[:8]}...")
+
+    try:
+        # 尝试更新凭证
+        response = requests.put(
+            f"{base_url}/api/v1/credentials/{credential_id}",
+            headers=headers,
+            json=update_data,
+            timeout=30,
+        )
+
+        logger.debug(f"响应状态码: {response.status_code}")
+        logger.debug(f"响应内容: {response.text[:500]}")
+
+        if response.status_code == 200:
+            result = response.json()
+            logger.success(f"凭证更新成功: {result.get('name')}")
+            return {
+                "success": True,
+                "message": "凭证更新成功",
+                "data": result,
+            }
+        elif response.status_code == 401:
+            logger.error("API 认证失败,请检查 n8n API Key")
+            return {
+                "success": False,
+                "message": "API 认证失败,请检查 n8n API Key",
+                "error": "Unauthorized",
+            }
+        elif response.status_code == 403:
+            logger.error("API 权限不足,凭证更新可能需要 Owner 权限")
+            return {
+                "success": False,
+                "message": "API 权限不足,请使用 Web UI 手动更新",
+                "error": "Forbidden",
+            }
+        elif response.status_code == 404:
+            logger.error(f"凭证不存在: {credential_id}")
+            return {
+                "success": False,
+                "message": f"凭证不存在: {credential_id}",
+                "error": "Not Found",
+            }
+        else:
+            logger.warning(f"更新失败: {response.status_code} - {response.text[:200]}")
+            return {
+                "success": False,
+                "message": f"更新失败: {response.status_code}",
+                "error": response.text[:200],
+                "status_code": response.status_code,
+            }
+
+    except requests.exceptions.RequestException as e:
+        logger.error(f"请求异常: {str(e)}")
+        return {
+            "success": False,
+            "message": f"请求失败: {str(e)}",
+            "error": str(e),
+        }
+
+
+def delete_and_recreate_credential(
+    credential_id: Optional[str] = None,
+    api_url: Optional[str] = None,
+    api_key: Optional[str] = None,
+    cohere_api_key: str = "4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C",
+) -> Dict[str, Any]:
+    """
+    删除旧凭证并重新创建
+
+    Args:
+        credential_id: 要删除的凭证 ID(如果为 None,则只创建新凭证)
+        api_url: n8n API 地址
+        api_key: n8n API Key
+        cohere_api_key: Cohere API Key 值
+
+    Returns:
+        操作结果
+    """
+    # 获取配置
+    if api_url is None or api_key is None:
+        config = BaseConfig()
+        api_url = api_url or config.N8N_API_URL
+        api_key = api_key or config.N8N_API_KEY
+
+    base_url = api_url.rstrip("/")
+    headers = {
+        "X-N8N-API-KEY": api_key,
+        "Content-Type": "application/json",
+        "Accept": "application/json",
+    }
+
+    # 删除旧凭证(如果提供)
+    if credential_id:
+        logger.info(f"删除旧凭证: {credential_id}")
+        try:
+            delete_response = requests.delete(
+                f"{base_url}/api/v1/credentials/{credential_id}",
+                headers=headers,
+                timeout=30,
+            )
+            if delete_response.status_code in [200, 204]:
+                logger.success("旧凭证已删除")
+            else:
+                logger.warning(f"删除凭证失败: {delete_response.status_code}")
+        except Exception as e:
+            logger.warning(f"删除凭证异常: {str(e)}")
+
+    # 创建新凭证
+    logger.info("创建新凭证...")
+    credential_data = {
+        "name": "Cohere API Key",
+        "type": "cohereApi",
+        "data": {
+            "apiKey": cohere_api_key,
+        },
+    }
+
+    try:
+        response = requests.post(
+            f"{base_url}/api/v1/credentials",
+            headers=headers,
+            json=credential_data,
+            timeout=30,
+        )
+
+        if response.status_code in [200, 201]:
+            result = response.json()
+            logger.success(f"新凭证创建成功: {result.get('id')}")
+            return {
+                "success": True,
+                "message": "凭证重新创建成功",
+                "data": result,
+                "credential_id": result.get("id"),
+            }
+        else:
+            logger.error(f"创建凭证失败: {response.status_code} - {response.text[:200]}")
+            return {
+                "success": False,
+                "message": f"创建凭证失败: {response.status_code}",
+                "error": response.text[:200],
+            }
+
+    except requests.exceptions.RequestException as e:
+        logger.error(f"请求异常: {str(e)}")
+        return {
+            "success": False,
+            "message": f"请求失败: {str(e)}",
+            "error": str(e),
+        }
+
+
+def main():
+    """主函数"""
+    import argparse
+
+    parser = argparse.ArgumentParser(description="更新 n8n Cohere API Key 凭证")
+    parser.add_argument(
+        "--credential-id",
+        type=str,
+        help="要更新的凭证 ID(如果不提供,将创建新凭证)",
+    )
+    parser.add_argument(
+        "--recreate",
+        action="store_true",
+        help="删除旧凭证并重新创建",
+    )
+    parser.add_argument(
+        "--api-key",
+        type=str,
+        help="Cohere API Key(默认使用配置中的值)",
+        default="4pLcF0CGE7LeDmAudBQHdvAxGaKwNOKfxUGkHb5C",
+    )
+
+    args = parser.parse_args()
+
+    logger.info("开始更新 n8n Cohere API Key 凭证...")
+
+    if args.recreate:
+        result = delete_and_recreate_credential(
+            credential_id=args.credential_id,
+            cohere_api_key=args.api_key,
+        )
+    elif args.credential_id:
+        result = update_cohere_credential(
+            credential_id=args.credential_id,
+            cohere_api_key=args.api_key,
+        )
+    else:
+        logger.error("请提供 --credential-id 或使用 --recreate 选项")
+        return 1
+
+    print("\n" + "=" * 60)
+    print("执行结果:")
+    print("=" * 60)
+    print(json.dumps(result, indent=2, ensure_ascii=False))
+
+    if result.get("success"):
+        if "credential_id" in result:
+            print(f"\n新凭证 ID: {result.get('credential_id')}")
+        print("\n请在工作流中使用更新后的凭证。")
+    else:
+        print("\n" + "=" * 60)
+        print("建议:")
+        print("=" * 60)
+        print("如果 API 更新失败,请使用 Web UI 手动更新:")
+        print("1. 访问: https://n8n.citupro.com/home/credentials")
+        print("2. 找到 Cohere API Key 凭证")
+        print("3. 点击编辑,重新输入 API Key")
+        print("4. 确保 API Key 没有多余的空格或换行")
+
+    return 0 if result.get("success") else 1
+
+
+if __name__ == "__main__":
+    sys.exit(main())

+ 40 - 17
tasks/task_execute_instructions.md

@@ -2,7 +2,7 @@
 
 **重要:请立即执行以下任务!**
 
-**生成时间**: 2026-01-16 14:24:07
+**生成时间**: 2026-01-21 18:28:11
 
 **待执行任务数量**: 2
 
@@ -24,18 +24,25 @@
 
 ---
 
-## 任务 1: 产品库存表原始数据导入
+## 任务 1: 产品库存表原始数据导入
 
-- **任务ID**: `37`
-- **创建时间**: 2026-01-16 12:41:09
+- **任务ID**: `43`
+- **创建时间**: 2026-01-21 18:20:44
 - **创建者**: cursor
 
 ### 任务描述
 
-# Task: 产品库存表原始数据导入
+# Task: 产品库存表原始数据导入
 
 ## Source Tables
 ### product_inventory_table_raw_data
+**Data Source**
+- **Type**: RDBMS
+- **Host**: 192.168.3.143
+- **Port**: 5432
+- **Database**: dataops
+- **Schema**: public
+
 **DDL**
 ```sql
 CREATE TABLE product_inventory_table_raw_data (
@@ -107,26 +114,33 @@ COMMENT ON TABLE test_product_inventory IS '产品库存表';
 - **Description**: 新数据将追加到目标表,不删除现有数据
 
 ## Request Content
-把产品库存表的原始数据导入到数据资源的产品库存表中
+从标签为原始数据的产品库存表导入数据到数据资源的产品库存表
 
 ## Implementation Steps
 1. Extract data from source tables as specified in the DDL
 2. Apply transformation logic according to the rule:
-   - Rule: 把产品库存表的原始数据导入到数据资源的产品库存表中
+   - Rule: 从标签为原始数据的产品库存表导入数据到数据资源的产品库存表
 3. Generate Python program to implement the data transformation logic
 4. Write transformed data to target table using append mode
 
 ---
 
-## 任务 2: DF_DO202601160001
+## 任务 2: DF_DO202601210001
 
-- **任务ID**: `38`
-- **创建时间**: 2026-01-16 12:43:52
+- **任务ID**: `44`
+- **创建时间**: 2026-01-21 18:25:47
 - **创建者**: system
 
 ### 任务描述
 
-# Task: DF_DO202601160001
+# Task: DF_DO202601210001
+
+## Related Information
+- **Order ID**: 26
+- **Order No**: DO202601210001
+- **DataFlow ID**: 2291
+- **DataFlow Name**: 仓库库存汇总表_数据流程
+- **Product ID**: 23
 
 ## Source Tables
 ### test_product_inventory
@@ -164,30 +178,39 @@ CREATE TABLE test_product_inventory (
 );
 COMMENT ON TABLE test_product_inventory IS '产品库存表';
 ```
+
 ## Target Tables
 ### warehouse_inventory_summary
+**Data Source**
+- **Type**: postgresql
+- **Host**: 192.168.3.143
+- **Port**: 5678
+- **Database**: dataops
+- **Schema**: dags
+
 **DDL**
 ```sql
 CREATE TABLE warehouse_inventory_summary (
-    id BIGINT PRIMARY KEY COMMENT '主键ID',
+    total_inventory integer COMMENT '库存数量',
+    warehouse_name varchar(255) COMMENT '仓库名称',
     create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间'
 );
 COMMENT ON TABLE warehouse_inventory_summary IS '仓库库存汇总表';
 ```
 
 ## Update Mode
-- **Mode**: Full Refresh (全量更新)
-- **Description**: 目标表将被清空后重新写入数据
+- **Mode**: Append (追加模式)
+- **Description**: 新数据将追加到目标表,不删除现有数据
 
 ## Request Content
-1.从标签为数据资源的产品库存表中提取字段:仓库编号、库存数量;2.按照仓库进行分组,对库存数量进行求和计算;3.无特殊过滤条件;4.最终输出数据格式包含字段:仓库编号、总库存数量
+1. 从源数据'产品库存表'中提取'仓库名称'字段;2. 对'产品库存表'中的'库存数量'字段进行求和计算;3. 按'仓库名称'进行分组;4. 最终输出数据格式包含'仓库名称'和对应的'库存数量'两个字段。
 
 ## Implementation Steps
 1. Extract data from source tables as specified in the DDL
 2. Apply transformation logic according to the rule:
-   - Rule: 1.从标签为数据资源的产品库存表中提取字段:仓库编号、库存数量;2.按照仓库进行分组,对库存数量进行求和计算;3.无特殊过滤条件;4.最终输出数据格式包含字段:仓库编号、总库存数量
+   - Rule: 1. 从源数据'产品库存表'中提取'仓库名称'字段;2. 对'产品库存表'中的'库存数量'字段进行求和计算;3. 按'仓库名称'进行分组;4. 最终输出数据格式包含'仓库名称'和对应的'库存数量'两个字段。
 3. Generate Python program to implement the data transformation logic
-4. Write transformed data to target table using full mode
+4. Write transformed data to target table using append mode
 
 ---
 

+ 2 - 2
tasks/task_trigger.txt

@@ -1,8 +1,8 @@
 CURSOR_AUTO_EXECUTE_TASK_TRIGGER
-生成时间: 2026-01-16 14:24:07
+生成时间: 2026-01-21 18:28:11
 状态: 有待执行任务
 待处理任务数: 2
-任务ID列表: [37, 38]
+任务ID列表: [43, 44]
 
 此文件用于触发Cursor自动执行任务。