Bladeren bron

修复返回状态问题,优化两个查询的API。

wangxq 4 dagen geleden
bovenliggende
commit
c793722717
5 gewijzigde bestanden met toevoegingen van 1043 en 40 verwijderingen
  1. 460 30
      citu_app.py
  2. 500 1
      data_pipeline/api/simple_db_manager.py
  3. 80 1
      data_pipeline/api/simple_workflow.py
  4. 1 6
      docs/training_data_webui_design.md
  5. 2 2
      requirements.txt

+ 460 - 30
citu_app.py

@@ -19,13 +19,10 @@ import sqlparse  # 用于SQL语法检查
 from common.redis_conversation_manager import RedisConversationManager  # 添加Redis对话管理器导入
 
 from common.qa_feedback_manager import QAFeedbackManager
-from common.result import success_response, bad_request_response, not_found_response, internal_error_response
-
-
 from common.result import (  # 统一导入所有需要的响应函数
-    bad_request_response, service_unavailable_response, 
+    success_response, bad_request_response, not_found_response, internal_error_response,
+    error_response, service_unavailable_response, 
     agent_success_response, agent_error_response,
-    internal_error_response, success_response,
     validation_failed_response
 )
 from app_config import (  # 添加Redis相关配置导入
@@ -2527,17 +2524,48 @@ def training_data_create():
         # 获取创建后的总记录数
         current_total = get_total_training_count()
         
-        return jsonify(success_response(
-            response_text="训练数据创建完成",
-            data={
-                "total_requested": len(data_list),
-                "successfully_created": successful_count,
-                "failed_count": len(data_list) - successful_count,
-                "results": results,
-                "summary": type_summary,
-                "current_total_count": current_total
-            }
-        ))
+        # 根据实际执行结果决定响应状态
+        failed_count = len(data_list) - successful_count
+        
+        if failed_count == 0:
+            # 全部成功
+            return jsonify(success_response(
+                response_text="训练数据创建完成",
+                data={
+                    "total_requested": len(data_list),
+                    "successfully_created": successful_count,
+                    "failed_count": failed_count,
+                    "results": results,
+                    "summary": type_summary,
+                    "current_total_count": current_total
+                }
+            ))
+        elif successful_count == 0:
+            # 全部失败
+            return jsonify(error_response(
+                response_text="训练数据创建失败",
+                data={
+                    "total_requested": len(data_list),
+                    "successfully_created": successful_count,
+                    "failed_count": failed_count,
+                    "results": results,
+                    "summary": type_summary,
+                    "current_total_count": current_total
+                }
+            )), 400
+        else:
+            # 部分成功,部分失败
+            return jsonify(error_response(
+                response_text=f"训练数据创建部分成功,成功{successful_count}条,失败{failed_count}条",
+                data={
+                    "total_requested": len(data_list),
+                    "successfully_created": successful_count,
+                    "failed_count": failed_count,
+                    "results": results,
+                    "summary": type_summary,
+                    "current_total_count": current_total
+                }
+            )), 207
         
     except Exception as e:
         logger.error(f"training_data_create执行失败: {str(e)}")
@@ -2598,18 +2626,51 @@ def training_data_delete():
         # 获取删除后的总记录数
         current_total = get_total_training_count()
         
-        return jsonify(success_response(
-            response_text="训练数据删除完成",
-            data={
-                "total_requested": len(ids),
-                "successfully_deleted": len(deleted_ids),
-                "failed_count": len(failed_ids),
-                "deleted_ids": deleted_ids,
-                "failed_ids": failed_ids,
-                "failed_details": failed_details,
-                "current_total_count": current_total
-            }
-        ))
+        # 根据实际执行结果决定响应状态
+        failed_count = len(failed_ids)
+        
+        if failed_count == 0:
+            # 全部成功
+            return jsonify(success_response(
+                response_text="训练数据删除完成",
+                data={
+                    "total_requested": len(ids),
+                    "successfully_deleted": len(deleted_ids),
+                    "failed_count": failed_count,
+                    "deleted_ids": deleted_ids,
+                    "failed_ids": failed_ids,
+                    "failed_details": failed_details,
+                    "current_total_count": current_total
+                }
+            ))
+        elif len(deleted_ids) == 0:
+            # 全部失败
+            return jsonify(error_response(
+                response_text="训练数据删除失败",
+                data={
+                    "total_requested": len(ids),
+                    "successfully_deleted": len(deleted_ids),
+                    "failed_count": failed_count,
+                    "deleted_ids": deleted_ids,
+                    "failed_ids": failed_ids,
+                    "failed_details": failed_details,
+                    "current_total_count": current_total
+                }
+            )), 400
+        else:
+            # 部分成功,部分失败
+            return jsonify(error_response(
+                response_text=f"训练数据删除部分成功,成功{len(deleted_ids)}条,失败{failed_count}条",
+                data={
+                    "total_requested": len(ids),
+                    "successfully_deleted": len(deleted_ids),
+                    "failed_count": failed_count,
+                    "deleted_ids": deleted_ids,
+                    "failed_ids": failed_ids,
+                    "failed_details": failed_details,
+                    "current_total_count": current_total
+                }
+            )), 207
         
     except Exception as e:
         logger.error(f"training_data_delete执行失败: {str(e)}")
@@ -3192,6 +3253,189 @@ def list_data_pipeline_tasks():
             response_text="获取任务列表失败,请稍后重试"
         )), 500
 
+@app.flask_app.route('/api/v0/data_pipeline/tasks/query', methods=['POST'])
+def query_data_pipeline_tasks():
+    """
+    高级查询数据管道任务列表
+    
+    支持复杂筛选、排序、分页功能
+    
+    请求体:
+    {
+        "page": 1,                          // 页码,必须大于0,默认1
+        "page_size": 20,                    // 每页大小,1-100之间,默认20
+        "status": "completed",              // 可选,任务状态筛选:"pending"|"running"|"completed"|"failed"|"cancelled"
+        "task_name": "highway",             // 可选,任务名称模糊搜索,最大100字符
+        "created_by": "user123",            // 可选,创建者精确匹配
+        "db_name": "highway_db",            // 可选,数据库名称精确匹配
+        "created_time_start": "2025-01-01T00:00:00",  // 可选,创建时间范围开始
+        "created_time_end": "2025-12-31T23:59:59",    // 可选,创建时间范围结束
+        "started_time_start": "2025-01-01T00:00:00",  // 可选,开始时间范围开始
+        "started_time_end": "2025-12-31T23:59:59",    // 可选,开始时间范围结束
+        "completed_time_start": "2025-01-01T00:00:00", // 可选,完成时间范围开始
+        "completed_time_end": "2025-12-31T23:59:59",   // 可选,完成时间范围结束
+        "sort_by": "created_at",            // 可选,排序字段:"created_at"|"started_at"|"completed_at"|"task_name"|"status",默认"created_at"
+        "sort_order": "desc"                // 可选,排序方向:"asc"|"desc",默认"desc"
+    }
+    
+    响应:
+    {
+        "success": true,
+        "code": 200,
+        "message": "查询任务列表成功",
+        "data": {
+            "tasks": [...],
+            "pagination": {
+                "page": 1,
+                "page_size": 20,
+                "total": 150,
+                "total_pages": 8,
+                "has_next": true,
+                "has_prev": false
+            },
+            "filters_applied": {...},
+            "sort_applied": {...},
+            "query_time": "0.045s"
+        }
+    }
+    """
+    try:
+        # 获取请求数据
+        req = request.get_json(force=True) if request.is_json else {}
+        
+        # 解析参数,设置默认值
+        page = req.get('page', 1)
+        page_size = req.get('page_size', 20)
+        status = req.get('status')
+        task_name = req.get('task_name')
+        created_by = req.get('created_by')
+        db_name = req.get('db_name')
+        created_time_start = req.get('created_time_start')
+        created_time_end = req.get('created_time_end')
+        started_time_start = req.get('started_time_start')
+        started_time_end = req.get('started_time_end')
+        completed_time_start = req.get('completed_time_start')
+        completed_time_end = req.get('completed_time_end')
+        sort_by = req.get('sort_by', 'created_at')
+        sort_order = req.get('sort_order', 'desc')
+        
+        # 参数验证
+        # 验证分页参数
+        if page < 1:
+            return jsonify(bad_request_response(
+                response_text="页码必须大于0",
+                invalid_params=['page']
+            )), 400
+        
+        if page_size < 1 or page_size > 100:
+            return jsonify(bad_request_response(
+                response_text="每页大小必须在1-100之间",
+                invalid_params=['page_size']
+            )), 400
+        
+        # 验证任务名称长度
+        if task_name and len(task_name) > 100:
+            return jsonify(bad_request_response(
+                response_text="任务名称搜索关键词最大长度为100字符",
+                invalid_params=['task_name']
+            )), 400
+        
+        # 验证排序参数
+        allowed_sort_fields = ['created_at', 'started_at', 'completed_at', 'task_name', 'status']
+        if sort_by not in allowed_sort_fields:
+            return jsonify(bad_request_response(
+                response_text=f"不支持的排序字段: {sort_by},支持的字段: {', '.join(allowed_sort_fields)}",
+                invalid_params=['sort_by']
+            )), 400
+        
+        if sort_order.lower() not in ['asc', 'desc']:
+            return jsonify(bad_request_response(
+                response_text="排序方向必须是 'asc' 或 'desc'",
+                invalid_params=['sort_order']
+            )), 400
+        
+        # 验证状态筛选
+        if status:
+            allowed_statuses = ['pending', 'running', 'completed', 'failed', 'cancelled']
+            if status not in allowed_statuses:
+                return jsonify(bad_request_response(
+                    response_text=f"不支持的状态值: {status},支持的状态: {', '.join(allowed_statuses)}",
+                    invalid_params=['status']
+                )), 400
+        
+        # 调用管理器执行查询
+        manager = get_data_pipeline_manager()
+        result = manager.query_tasks_advanced(
+            page=page,
+            page_size=page_size,
+            status=status,
+            task_name=task_name,
+            created_by=created_by,
+            db_name=db_name,
+            created_time_start=created_time_start,
+            created_time_end=created_time_end,
+            started_time_start=started_time_start,
+            started_time_end=started_time_end,
+            completed_time_start=completed_time_start,
+            completed_time_end=completed_time_end,
+            sort_by=sort_by,
+            sort_order=sort_order
+        )
+        
+        # 格式化任务列表
+        formatted_tasks = []
+        for task in result['tasks']:
+            formatted_tasks.append({
+                "task_id": task.get('task_id'),
+                "task_name": task.get('task_name'),
+                "status": task.get('status'),
+                "step_status": task.get('step_status'),
+                "created_at": task['created_at'].isoformat() if task.get('created_at') else None,
+                "started_at": task['started_at'].isoformat() if task.get('started_at') else None,
+                "completed_at": task['completed_at'].isoformat() if task.get('completed_at') else None,
+                "created_by": task.get('by_user'),
+                "db_name": task.get('db_name'),
+                "business_context": task.get('parameters', {}).get('business_context') if task.get('parameters') else None,
+                "directory_exists": task.get('directory_exists', True),
+                "updated_at": task['updated_at'].isoformat() if task.get('updated_at') else None
+            })
+        
+        # 构建响应数据
+        response_data = {
+            "tasks": formatted_tasks,
+            "pagination": result['pagination'],
+            "filters_applied": {
+                k: v for k, v in {
+                    "status": status,
+                    "task_name": task_name,
+                    "created_by": created_by,
+                    "db_name": db_name,
+                    "created_time_start": created_time_start,
+                    "created_time_end": created_time_end,
+                    "started_time_start": started_time_start,
+                    "started_time_end": started_time_end,
+                    "completed_time_start": completed_time_start,
+                    "completed_time_end": completed_time_end
+                }.items() if v
+            },
+            "sort_applied": {
+                "sort_by": sort_by,
+                "sort_order": sort_order
+            },
+            "query_time": result.get('query_time', '0.000s')
+        }
+        
+        return jsonify(success_response(
+            response_text="查询任务列表成功",
+            data=response_data
+        ))
+        
+    except Exception as e:
+        logger.error(f"查询数据管道任务列表失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="查询任务列表失败,请稍后重试"
+        )), 500
+
 # ==================== 表检查API端点 ====================
 
 import asyncio
@@ -4046,5 +4290,191 @@ def delete_tasks():
             response_text="删除任务失败,请稍后重试"
         )), 500
 
-logger.info("启动Flask应用: http://localhost:8084")
-app.run(host="0.0.0.0", port=8084, debug=True)
+
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/logs/query', methods=['POST'])
+def query_data_pipeline_task_logs(task_id):
+    """
+    高级查询数据管道任务日志
+    
+    支持复杂筛选、排序、分页功能
+    
+    请求体:
+    {
+        "page": 1,                          // 页码,必须大于0,默认1
+        "page_size": 50,                    // 每页大小,1-500之间,默认50
+        "level": "ERROR",                   // 可选,日志级别筛选:"DEBUG"|"INFO"|"WARNING"|"ERROR"|"CRITICAL"
+        "start_time": "2025-01-01 00:00:00", // 可选,开始时间范围 (YYYY-MM-DD HH:MM:SS)
+        "end_time": "2025-01-02 23:59:59",   // 可选,结束时间范围 (YYYY-MM-DD HH:MM:SS)
+        "keyword": "failed",                 // 可选,关键字搜索(消息内容模糊匹配)
+        "logger_name": "DDLGenerator",       // 可选,日志记录器名称精确匹配
+        "step_name": "ddl_generation",       // 可选,执行步骤名称精确匹配
+        "sort_by": "timestamp",              // 可选,排序字段:"timestamp"|"level"|"logger"|"step"|"line_number",默认"timestamp"
+        "sort_order": "desc"                 // 可选,排序方向:"asc"|"desc",默认"desc"
+    }
+    
+    响应:
+    {
+        "success": true,
+        "code": 200,
+        "message": "查询任务日志成功",
+        "data": {
+            "logs": [
+                {
+                    "timestamp": "2025-07-01 14:30:52",
+                    "level": "INFO",
+                    "logger": "SimpleWorkflowExecutor",
+                    "step": "ddl_generation",
+                    "message": "开始DDL生成",
+                    "line_number": 15
+                }
+            ],
+            "pagination": {
+                "page": 1,
+                "page_size": 50,
+                "total": 1000,
+                "total_pages": 20,
+                "has_next": true,
+                "has_prev": false
+            },
+            "log_file_info": {
+                "exists": true,
+                "file_path": "/path/to/log/file",
+                "file_size": 1024000,
+                "file_size_formatted": "1.0 MB",
+                "last_modified": "2025-07-01T14:30:52",
+                "total_lines": 5000
+            },
+            "query_time": "0.123s"
+        }
+    }
+    """
+    try:
+        # 验证任务是否存在
+        manager = get_data_pipeline_manager()
+        task_info = manager.get_task_status(task_id)
+        if not task_info:
+            return jsonify(not_found_response(
+                response_text=f"任务不存在: {task_id}"
+            )), 404
+        
+        # 解析请求数据
+        request_data = request.get_json() or {}
+        
+        # 参数验证
+        def _is_valid_time_format(time_str):
+            """验证时间格式是否有效"""
+            if not time_str:
+                return True
+            
+            # 支持的时间格式
+            time_formats = [
+                '%Y-%m-%d %H:%M:%S',     # 2025-01-01 00:00:00
+                '%Y-%m-%d',              # 2025-01-01
+                '%Y-%m-%dT%H:%M:%S',     # 2025-01-01T00:00:00
+                '%Y-%m-%dT%H:%M:%S.%f',  # 2025-01-01T00:00:00.123456
+            ]
+            
+            for fmt in time_formats:
+                try:
+                    from datetime import datetime
+                    datetime.strptime(time_str, fmt)
+                    return True
+                except ValueError:
+                    continue
+            return False
+        
+        # 提取和验证参数
+        page = request_data.get('page', 1)
+        page_size = request_data.get('page_size', 50)
+        level = request_data.get('level')
+        start_time = request_data.get('start_time')
+        end_time = request_data.get('end_time')
+        keyword = request_data.get('keyword')
+        logger_name = request_data.get('logger_name')
+        step_name = request_data.get('step_name')
+        sort_by = request_data.get('sort_by', 'timestamp')
+        sort_order = request_data.get('sort_order', 'desc')
+        
+        # 参数验证
+        if not isinstance(page, int) or page < 1:
+            return jsonify(bad_request_response(
+                response_text="页码必须是大于0的整数"
+            )), 400
+        
+        if not isinstance(page_size, int) or page_size < 1 or page_size > 500:
+            return jsonify(bad_request_response(
+                response_text="每页大小必须是1-500之间的整数"
+            )), 400
+        
+        # 验证日志级别
+        if level and level.upper() not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
+            return jsonify(bad_request_response(
+                response_text="日志级别必须是DEBUG、INFO、WARNING、ERROR、CRITICAL之一"
+            )), 400
+        
+        # 验证时间格式
+        if not _is_valid_time_format(start_time):
+            return jsonify(bad_request_response(
+                response_text="开始时间格式无效,支持格式:YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD"
+            )), 400
+        
+        if not _is_valid_time_format(end_time):
+            return jsonify(bad_request_response(
+                response_text="结束时间格式无效,支持格式:YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD"
+            )), 400
+        
+        # 验证关键字长度
+        if keyword and len(keyword) > 200:
+            return jsonify(bad_request_response(
+                response_text="关键字长度不能超过200个字符"
+            )), 400
+        
+        # 验证排序字段
+        allowed_sort_fields = ['timestamp', 'level', 'logger', 'step', 'line_number']
+        if sort_by not in allowed_sort_fields:
+            return jsonify(bad_request_response(
+                response_text=f"排序字段必须是以下之一: {', '.join(allowed_sort_fields)}"
+            )), 400
+        
+        # 验证排序方向
+        if sort_order.lower() not in ['asc', 'desc']:
+            return jsonify(bad_request_response(
+                response_text="排序方向必须是asc或desc"
+            )), 400
+        
+        # 创建工作流执行器并查询日志
+        from data_pipeline.api.simple_workflow import SimpleWorkflowExecutor
+        executor = SimpleWorkflowExecutor(task_id)
+        
+        try:
+            result = executor.query_logs_advanced(
+                page=page,
+                page_size=page_size,
+                level=level,
+                start_time=start_time,
+                end_time=end_time,
+                keyword=keyword,
+                logger_name=logger_name,
+                step_name=step_name,
+                sort_by=sort_by,
+                sort_order=sort_order
+            )
+            
+            return jsonify(success_response(
+                response_text="查询任务日志成功",
+                data=result
+            ))
+            
+        finally:
+            executor.cleanup()
+        
+    except Exception as e:
+        logger.error(f"查询数据管道任务日志失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="查询任务日志失败,请稍后重试"
+        )), 500
+
+
+if __name__ == '__main__':
+    logger.info("启动Flask应用: http://localhost:8084")
+    app.run(host="0.0.0.0", port=8084, debug=True)

+ 500 - 1
data_pipeline/api/simple_db_manager.py

@@ -5,7 +5,9 @@ Data Pipeline API 简化数据库管理器
 """
 
 import json
+import re
 from datetime import datetime
+from pathlib import Path
 from typing import Dict, Any, List, Optional, Tuple
 
 import psycopg2
@@ -393,4 +395,501 @@ class SimpleTaskManager:
             else:
                 return "database"
         except Exception:
-            return "database"
+            return "database"
+
+    def query_tasks_advanced(self, 
+                            page: int = 1,
+                            page_size: int = 20,
+                            status: str = None,
+                            task_name: str = None,
+                            created_by: str = None,
+                            db_name: str = None,
+                            created_time_start: str = None,
+                            created_time_end: str = None,
+                            started_time_start: str = None,
+                            started_time_end: str = None,
+                            completed_time_start: str = None,
+                            completed_time_end: str = None,
+                            sort_by: str = "created_at",
+                            sort_order: str = "desc") -> dict:
+        """
+        高级任务查询,支持复杂筛选、排序、分页
+        
+        Args:
+            page: 页码,必须大于0,默认1
+            page_size: 每页大小,1-100之间,默认20
+            status: 可选,任务状态筛选
+            task_name: 可选,任务名称模糊搜索
+            created_by: 可选,创建者精确匹配
+            db_name: 可选,数据库名称精确匹配
+            created_time_start: 可选,创建时间范围开始
+            created_time_end: 可选,创建时间范围结束
+            started_time_start: 可选,开始时间范围开始
+            started_time_end: 可选,开始时间范围结束
+            completed_time_start: 可选,完成时间范围开始
+            completed_time_end: 可选,完成时间范围结束
+            sort_by: 可选,排序字段,默认"created_at"
+            sort_order: 可选,排序方向,默认"desc"
+        
+        Returns:
+            {
+                "tasks": [...],
+                "pagination": {
+                    "page": 1,
+                    "page_size": 20,
+                    "total": 150,
+                    "total_pages": 8,
+                    "has_next": True,
+                    "has_prev": False
+                }
+            }
+        """
+        try:
+            import time
+            start_time = time.time()
+            
+            # 参数验证和处理
+            page = max(page, 1)
+            page_size = min(max(page_size, 1), 100)  # 限制在1-100之间
+            offset = (page - 1) * page_size
+            
+            # 构建WHERE条件
+            where_conditions = []
+            params = []
+            
+            # 状态筛选
+            if status:
+                where_conditions.append("t.status = %s")
+                params.append(status)
+            
+            # 任务名称模糊搜索
+            if task_name:
+                where_conditions.append("t.task_name ILIKE %s")
+                params.append(f"%{task_name}%")
+            
+            # 创建者精确匹配
+            if created_by:
+                where_conditions.append("t.by_user = %s")
+                params.append(created_by)
+            
+            # 数据库名称精确匹配
+            if db_name:
+                where_conditions.append("t.db_name = %s")
+                params.append(db_name)
+            
+            # 时间范围筛选
+            # 创建时间范围
+            if created_time_start:
+                where_conditions.append("t.created_at >= %s")
+                params.append(created_time_start)
+            if created_time_end:
+                where_conditions.append("t.created_at <= %s")
+                params.append(created_time_end)
+            
+            # 开始时间范围
+            if started_time_start:
+                where_conditions.append("t.started_at >= %s")
+                params.append(started_time_start)
+            if started_time_end:
+                where_conditions.append("t.started_at <= %s")
+                params.append(started_time_end)
+            
+            # 完成时间范围
+            if completed_time_start:
+                where_conditions.append("t.completed_at >= %s")
+                params.append(completed_time_start)
+            if completed_time_end:
+                where_conditions.append("t.completed_at <= %s")
+                params.append(completed_time_end)
+            
+            # 构建WHERE子句
+            where_clause = ""
+            if where_conditions:
+                where_clause = "WHERE " + " AND ".join(where_conditions)
+            
+            # 构建ORDER BY子句
+            # 验证排序字段白名单
+            allowed_sort_fields = ['created_at', 'started_at', 'completed_at', 'task_name', 'status']
+            if sort_by not in allowed_sort_fields:
+                sort_by = 'created_at'
+            
+            # 验证排序方向
+            sort_order_upper = sort_order.upper()
+            if sort_order_upper not in ['ASC', 'DESC']:
+                sort_order_upper = 'DESC'
+            
+            order_clause = f"ORDER BY t.{sort_by} {sort_order_upper}"
+            
+            conn = self._get_connection()
+            with conn.cursor(cursor_factory=RealDictCursor) as cursor:
+                # 首先获取总数
+                count_query = f"""
+                    SELECT COUNT(*) as total
+                    FROM data_pipeline_tasks t
+                    {where_clause}
+                """
+                cursor.execute(count_query, params)
+                total_count = cursor.fetchone()['total']
+                
+                # 然后获取分页数据
+                data_params = params + [page_size, offset]
+                data_query = f"""
+                    SELECT 
+                        t.task_id,
+                        t.task_name,
+                        t.task_type,
+                        t.status,
+                        t.parameters,
+                        t.error_message,
+                        t.created_at,
+                        t.started_at,
+                        t.completed_at,
+                        t.created_type,
+                        t.by_user,
+                        t.output_directory,
+                        t.db_name,
+                        COALESCE(t.directory_exists, TRUE) as directory_exists,
+                        t.updated_at,
+                        CASE 
+                            WHEN COUNT(s.step_name) = 0 THEN NULL
+                            WHEN COUNT(s.step_name) FILTER (WHERE s.step_status = 'failed') > 0 THEN 'failed'
+                            WHEN COUNT(s.step_name) FILTER (WHERE s.step_status = 'running') > 0 THEN 'running'
+                            WHEN COUNT(s.step_name) FILTER (WHERE s.step_status = 'completed') = COUNT(s.step_name) THEN 'all_completed'
+                            WHEN COUNT(s.step_name) FILTER (WHERE s.step_status = 'completed') > 0 THEN 'partial_completed'
+                            ELSE 'pending'
+                        END as step_status
+                    FROM data_pipeline_tasks t
+                    LEFT JOIN data_pipeline_task_steps s ON t.task_id = s.task_id
+                    {where_clause}
+                    GROUP BY t.task_id, t.task_name, t.task_type, t.status, t.parameters, t.error_message, 
+                             t.created_at, t.started_at, t.completed_at, t.created_type, t.by_user, 
+                             t.output_directory, t.db_name, t.directory_exists, t.updated_at
+                    {order_clause}
+                    LIMIT %s OFFSET %s
+                """
+                
+                cursor.execute(data_query, data_params)
+                tasks = [dict(row) for row in cursor.fetchall()]
+                
+                # 计算分页信息
+                total_pages = (total_count + page_size - 1) // page_size if page_size > 0 else 1
+                has_next = page < total_pages
+                has_prev = page > 1
+                
+                query_time = time.time() - start_time
+                
+                return {
+                    "tasks": tasks,
+                    "pagination": {
+                        "page": page,
+                        "page_size": page_size,
+                        "total": total_count,
+                        "total_pages": total_pages,
+                        "has_next": has_next,
+                        "has_prev": has_prev
+                    },
+                    "query_time": f"{query_time:.3f}s"
+                }
+                
+        except Exception as e:
+            self.logger.error(f"高级任务查询失败: {e}")
+            raise
+
+    def query_logs_advanced(self,
+                           task_id: str,
+                           page: int = 1,
+                           page_size: int = 50,
+                           level: str = None,
+                           start_time: str = None,
+                           end_time: str = None,
+                           keyword: str = None,
+                           logger_name: str = None,
+                           step_name: str = None,
+                           sort_by: str = "timestamp",
+                           sort_order: str = "desc") -> dict:
+        """
+        高级日志查询,支持复杂筛选、排序、分页
+        
+        Args:
+            task_id: 任务ID
+            page: 页码,必须大于0,默认1
+            page_size: 每页大小,1-500之间,默认50
+            level: 可选,日志级别筛选 (DEBUG, INFO, WARNING, ERROR, CRITICAL)
+            start_time: 可选,开始时间范围 (YYYY-MM-DD HH:MM:SS)
+            end_time: 可选,结束时间范围 (YYYY-MM-DD HH:MM:SS)
+            keyword: 可选,关键字搜索(消息内容模糊匹配)
+            logger_name: 可选,日志记录器名称精确匹配
+            step_name: 可选,执行步骤名称精确匹配
+            sort_by: 可选,排序字段,默认"timestamp"
+            sort_order: 可选,排序方向,默认"desc"
+        
+        Returns:
+            {
+                "logs": [...],
+                "pagination": {
+                    "page": 1,
+                    "page_size": 50,
+                    "total": 1000,
+                    "total_pages": 20,
+                    "has_next": True,
+                    "has_prev": False
+                },
+                "log_file_info": {...}
+            }
+        """
+        try:
+            import time
+            
+            start_query_time = time.time()
+            
+            # 参数验证和处理
+            page = max(page, 1)
+            page_size = min(max(page_size, 1), 500)  # 限制在1-500之间
+            
+            # 获取日志文件路径
+            project_root = Path(__file__).parent.parent.parent
+            task_dir = project_root / "data_pipeline" / "training_data" / task_id
+            log_file = task_dir / "data_pipeline.log"
+            
+            # 检查日志文件是否存在
+            if not log_file.exists():
+                return {
+                    "logs": [],
+                    "pagination": {
+                        "page": page,
+                        "page_size": page_size,
+                        "total": 0,
+                        "total_pages": 0,
+                        "has_next": False,
+                        "has_prev": False
+                    },
+                    "log_file_info": {
+                        "exists": False,
+                        "file_path": str(log_file),
+                        "error": "日志文件不存在"
+                    },
+                    "query_time": f"{time.time() - start_query_time:.3f}s"
+                }
+            
+            # 读取并解析日志文件
+            parsed_logs = self._parse_log_file(log_file)
+            
+            # 应用过滤器
+            filtered_logs = self._filter_logs(
+                parsed_logs,
+                level=level,
+                start_time=start_time,
+                end_time=end_time,
+                keyword=keyword,
+                logger_name=logger_name,
+                step_name=step_name
+            )
+            
+            # 排序
+            sorted_logs = self._sort_logs(filtered_logs, sort_by, sort_order)
+            
+            # 分页
+            total_count = len(sorted_logs)
+            start_index = (page - 1) * page_size
+            end_index = start_index + page_size
+            paginated_logs = sorted_logs[start_index:end_index]
+            
+            # 计算分页信息
+            total_pages = (total_count + page_size - 1) // page_size if page_size > 0 else 1
+            has_next = page < total_pages
+            has_prev = page > 1
+            
+            # 获取文件信息
+            file_stat = log_file.stat()
+            log_file_info = {
+                "exists": True,
+                "file_path": str(log_file),
+                "file_size": file_stat.st_size,
+                "file_size_formatted": self._format_file_size(file_stat.st_size),
+                "last_modified": datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
+                "total_lines": len(parsed_logs)
+            }
+            
+            query_time = time.time() - start_query_time
+            
+            return {
+                "logs": paginated_logs,
+                "pagination": {
+                    "page": page,
+                    "page_size": page_size,
+                    "total": total_count,
+                    "total_pages": total_pages,
+                    "has_next": has_next,
+                    "has_prev": has_prev
+                },
+                "log_file_info": log_file_info,
+                "query_time": f"{query_time:.3f}s"
+            }
+            
+        except Exception as e:
+            self.logger.error(f"日志查询失败: {e}")
+            return {
+                "logs": [],
+                "pagination": {
+                    "page": page,
+                    "page_size": page_size,
+                    "total": 0,
+                    "total_pages": 0,
+                    "has_next": False,
+                    "has_prev": False
+                },
+                "log_file_info": {
+                    "exists": False,
+                    "error": str(e)
+                },
+                "query_time": "0.000s"
+            }
+    
+    def _parse_log_file(self, log_file_path: Path) -> List[Dict[str, Any]]:
+        """
+        解析日志文件,提取结构化信息
+        """
+        try:
+            logs = []
+            with open(log_file_path, 'r', encoding='utf-8') as f:
+                lines = f.readlines()
+            
+            # 日志行格式: 2025-07-01 14:30:52 [INFO] SimpleWorkflowExecutor: 任务开始执行
+            log_pattern = r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+?): (.+)$'
+            current_log = None
+            line_number = 0
+            
+            for line in lines:
+                line_number += 1
+                line = line.rstrip('\n\r')
+                
+                if not line.strip():
+                    continue
+                
+                match = re.match(log_pattern, line)
+                if match:
+                    # 如果有之前的日志,先保存
+                    if current_log:
+                        logs.append(current_log)
+                    
+                    # 解析新的日志条目
+                    timestamp, level, logger_name, message = match.groups()
+                    
+                    # 尝试从日志记录器名称中提取步骤信息
+                    step_name = self._extract_step_from_logger(logger_name)
+                    
+                    current_log = {
+                        "timestamp": timestamp,
+                        "level": level,
+                        "logger": logger_name,
+                        "step": step_name,
+                        "message": message,
+                        "line_number": line_number
+                    }
+                else:
+                    # 多行日志(如异常堆栈),追加到当前日志的消息中
+                    if current_log:
+                        current_log["message"] += f"\n{line}"
+            
+            # 保存最后一个日志条目
+            if current_log:
+                logs.append(current_log)
+            
+            return logs
+            
+        except Exception as e:
+            self.logger.error(f"解析日志文件失败: {e}")
+            return []
+    
+    def _extract_step_from_logger(self, logger_name: str) -> Optional[str]:
+        """
+        从日志记录器名称中提取步骤信息
+        """
+        # 映射日志记录器名称到步骤名称
+        logger_to_step = {
+            "DDLGenerator": "ddl_generation",
+            "QAGenerator": "qa_generation", 
+            "QSGenerator": "qa_generation",
+            "SQLValidator": "sql_validation",
+            "TrainingDataLoader": "training_load",
+            "VannaTrainer": "training_load",
+            "SchemaWorkflowOrchestrator": None,  # 总体协调器
+            "SimpleWorkflowExecutor": None,      # 工作流执行器
+        }
+        
+        return logger_to_step.get(logger_name)
+    
+    def _filter_logs(self, logs: List[Dict[str, Any]], **filters) -> List[Dict[str, Any]]:
+        """
+        根据条件过滤日志
+        """
+        filtered = logs
+        
+        # 日志级别过滤
+        if filters.get('level'):
+            level = filters['level'].upper()
+            filtered = [log for log in filtered if log.get('level') == level]
+        
+        # 时间范围过滤
+        if filters.get('start_time'):
+            start_time = filters['start_time']
+            filtered = [log for log in filtered if log.get('timestamp', '') >= start_time]
+        
+        if filters.get('end_time'):
+            end_time = filters['end_time']
+            filtered = [log for log in filtered if log.get('timestamp', '') <= end_time]
+        
+        # 关键字搜索(消息内容模糊匹配)
+        if filters.get('keyword'):
+            keyword = filters['keyword'].lower()
+            filtered = [log for log in filtered 
+                       if keyword in log.get('message', '').lower()]
+        
+        # 日志记录器名称精确匹配
+        if filters.get('logger_name'):
+            logger_name = filters['logger_name']
+            filtered = [log for log in filtered if log.get('logger') == logger_name]
+        
+        # 步骤名称精确匹配
+        if filters.get('step_name'):
+            step_name = filters['step_name']
+            filtered = [log for log in filtered if log.get('step') == step_name]
+        
+        return filtered
+    
+    def _sort_logs(self, logs: List[Dict[str, Any]], sort_by: str, sort_order: str) -> List[Dict[str, Any]]:
+        """
+        对日志进行排序
+        """
+        # 验证排序字段
+        allowed_sort_fields = ['timestamp', 'level', 'logger', 'step', 'line_number']
+        if sort_by not in allowed_sort_fields:
+            sort_by = 'timestamp'
+        
+        # 验证排序方向
+        reverse = sort_order.lower() == 'desc'
+        
+        try:
+            # 特殊处理时间戳排序
+            if sort_by == 'timestamp':
+                return sorted(logs, key=lambda x: x.get('timestamp', ''), reverse=reverse)
+            else:
+                return sorted(logs, key=lambda x: x.get(sort_by, ''), reverse=reverse)
+        except Exception as e:
+            self.logger.error(f"日志排序失败: {e}")
+            return logs
+    
+    def _format_file_size(self, size_bytes: int) -> str:
+        """格式化文件大小显示"""
+        if size_bytes == 0:
+            return "0 B"
+        
+        size_names = ["B", "KB", "MB", "GB"]
+        i = 0
+        size = float(size_bytes)
+        
+        while size >= 1024.0 and i < len(size_names) - 1:
+            size /= 1024.0
+            i += 1
+        
+        return f"{size:.1f} {size_names[i]}"

+ 80 - 1
data_pipeline/api/simple_workflow.py

@@ -451,7 +451,74 @@ class SimpleWorkflowExecutor:
             except Exception as e:
                 self.logger.error(f"重定向orchestrator日志失败: {e}")
     
-
+    def query_logs_advanced(self,
+                           page: int = 1,
+                           page_size: int = 50,
+                           level: str = None,
+                           start_time: str = None,
+                           end_time: str = None,
+                           keyword: str = None,
+                           logger_name: str = None,
+                           step_name: str = None,
+                           sort_by: str = "timestamp",
+                           sort_order: str = "desc") -> dict:
+        """
+        高级日志查询(工作流层)
+        
+        Args:
+            page: 页码,必须大于0,默认1
+            page_size: 每页大小,1-500之间,默认50
+            level: 可选,日志级别筛选
+            start_time: 可选,开始时间范围
+            end_time: 可选,结束时间范围
+            keyword: 可选,关键字搜索
+            logger_name: 可选,日志记录器名称
+            step_name: 可选,执行步骤名称
+            sort_by: 可选,排序字段
+            sort_order: 可选,排序方向
+            
+        Returns:
+            日志查询结果
+        """
+        try:
+            # 调用数据库层方法
+            result = self.task_manager.query_logs_advanced(
+                task_id=self.task_id,
+                page=page,
+                page_size=page_size,
+                level=level,
+                start_time=start_time,
+                end_time=end_time,
+                keyword=keyword,
+                logger_name=logger_name,
+                step_name=step_name,
+                sort_by=sort_by,
+                sort_order=sort_order
+            )
+            
+            # 记录查询操作
+            self.logger.info(f"日志查询完成: {self.task_id}, 页码: {page}, 结果数: {len(result.get('logs', []))}")
+            
+            return result
+            
+        except Exception as e:
+            self.logger.error(f"日志查询失败: {e}")
+            return {
+                "logs": [],
+                "pagination": {
+                    "page": page,
+                    "page_size": page_size,
+                    "total": 0,
+                    "total_pages": 0,
+                    "has_next": False,
+                    "has_prev": False
+                },
+                "log_file_info": {
+                    "exists": False,
+                    "error": str(e)
+                },
+                "query_time": "0.000s"
+            }
     
     def cleanup(self):
         """清理资源"""
@@ -541,6 +608,18 @@ class SimpleWorkflowManager:
         """获取任务列表"""
         return self.task_manager.get_tasks_list(**kwargs)
     
+    def query_tasks_advanced(self, **kwargs) -> dict:
+        """
+        高级任务查询,支持复杂筛选、排序、分页
+        
+        Args:
+            **kwargs: 传递给数据库层的查询参数
+        
+        Returns:
+            包含任务列表和分页信息的字典
+        """
+        return self.task_manager.query_tasks_advanced(**kwargs)
+    
     def cleanup(self):
         """清理资源"""
         try:

+ 1 - 6
docs/training_data_webui_design.md

@@ -22,7 +22,7 @@
 ### 整体布局
 ```
 ┌─────────────────────────────────────────┐
-│                导航栏                    
+│                导航栏                   │
 ├─────────────────────────────────────────┤
 │                                         │
 │              主要内容区                  │
@@ -276,8 +276,6 @@ flowchart TD
     H --> I[等待执行完成]
     I --> J[查看生成文件]
     
-    style A fill:#e1f5fe
-    style J fill:#c8e6c9
 ```
 
 ### 流程2:分步控制执行(高级用户)
@@ -298,9 +296,6 @@ flowchart TD
     K --> H
     J --> L[执行训练加载]
     L --> M[完成]
-    
-    style A fill:#e1f5fe
-    style M fill:#c8e6c9
 ```
 
 ## 响应式设计考虑

+ 2 - 2
requirements.txt

@@ -1,9 +1,9 @@
 vanna[chromadb,openai,postgres]==0.7.9
 flask==3.1.1
 plotly==5.22.0
-langchain-core==0.3.64
+langchain-core>=0.3.67
 langchain-postgres==0.0.14
 langgraph==0.4.8
-langchain==0.3.23
+langchain>=0.3.23
 redis==5.0.1
 sqlparse==0.4.4