Browse Source

补充了漏掉的data_pipeline API.

wangxq 1 month ago
parent
commit
989b3bf3ca
1 changed files with 2282 additions and 3 deletions
  1. 2282 3
      unified_api.py

+ 2282 - 3
unified_api.py

@@ -13,6 +13,7 @@ import sys
 from datetime import datetime, timedelta
 from typing import Optional, Dict, Any, TYPE_CHECKING, Union
 import signal
+from threading import Thread
 
 if TYPE_CHECKING:
     from react_agent.agent import CustomReactAgent
@@ -22,7 +23,7 @@ from core.logging import initialize_logging, get_app_logger
 initialize_logging()
 
 # 标准 Flask 导入
-from flask import Flask, request, jsonify, session
+from flask import Flask, request, jsonify, session, send_file
 import redis.asyncio as redis
 
 # 基础依赖
@@ -34,6 +35,10 @@ import sqlparse
 from core.vanna_llm_factory import create_vanna_instance
 from common.redis_conversation_manager import RedisConversationManager
 from common.qa_feedback_manager import QAFeedbackManager
+# Data Pipeline 相关导入 - 从 citu_app.py 迁移
+from data_pipeline.api.simple_workflow import SimpleWorkflowManager, SimpleWorkflowExecutor
+from data_pipeline.api.simple_file_manager import SimpleFileManager
+from data_pipeline.api.table_inspector_api import TableInspectorAPI
 from common.result import (
     success_response, bad_request_response, not_found_response, internal_error_response,
     error_response, service_unavailable_response, 
@@ -54,12 +59,15 @@ logger = get_app_logger("UnifiedApp")
 # React Agent 导入
 try:
     from react_agent.agent import CustomReactAgent
+    from react_agent.enhanced_redis_api import get_conversation_detail_from_redis
 except ImportError:
     try:
         from test.custom_react_agent.agent import CustomReactAgent
+        from test.custom_react_agent.enhanced_redis_api import get_conversation_detail_from_redis
     except ImportError:
         logger.warning("无法导入 CustomReactAgent,React Agent功能将不可用")
         CustomReactAgent = None
+        get_conversation_detail_from_redis = None
 
 # 初始化核心组件
 vn = create_vanna_instance()
@@ -165,6 +173,186 @@ async def ensure_agent_ready() -> bool:
         await get_react_agent()
         return True
 
+def get_user_conversations_simple_sync(user_id: str, limit: int = 10):
+    """直接从Redis获取用户对话,测试版本"""
+    import redis
+    import json
+    
+    try:
+        # 创建Redis连接
+        redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
+        redis_client.ping()
+        
+        # 扫描用户的checkpoint keys
+        pattern = f"checkpoint:{user_id}:*"
+        logger.info(f"🔍 扫描模式: {pattern}")
+        
+        keys = []
+        cursor = 0
+        while True:
+            cursor, batch = redis_client.scan(cursor=cursor, match=pattern, count=1000)
+            keys.extend(batch)
+            if cursor == 0:
+                break
+        
+        logger.info(f"📋 找到 {len(keys)} 个keys")
+        
+        # 解析thread信息
+        thread_data = {}
+        for key in keys:
+            try:
+                parts = key.split(':')
+                if len(parts) >= 4:
+                    thread_id = f"{parts[1]}:{parts[2]}"  # user_id:timestamp
+                    timestamp = parts[2]
+                    
+                    if thread_id not in thread_data:
+                        thread_data[thread_id] = {
+                            "thread_id": thread_id,
+                            "timestamp": timestamp,
+                            "keys": []
+                        }
+                    thread_data[thread_id]["keys"].append(key)
+            except Exception as e:
+                logger.warning(f"解析key失败 {key}: {e}")
+                continue
+        
+        logger.info(f"📊 找到 {len(thread_data)} 个thread")
+        
+        # 按时间戳排序
+        sorted_threads = sorted(
+            thread_data.values(),
+            key=lambda x: x["timestamp"],
+            reverse=True
+        )[:limit]
+        
+        # 获取每个thread的详细信息
+        conversations = []
+        for thread_info in sorted_threads:
+            try:
+                thread_id = thread_info["thread_id"]
+                
+                # 获取最新的checkpoint数据
+                latest_key = max(thread_info["keys"])
+                
+                # 先检查key的数据类型
+                key_type = redis_client.type(latest_key)
+                logger.info(f"🔍 Key {latest_key} 的类型: {key_type}")
+                
+                data = None
+                if key_type == 'string':
+                    data = redis_client.get(latest_key)
+                elif key_type == 'hash':
+                    # 如果是hash类型,获取所有字段
+                    hash_data = redis_client.hgetall(latest_key)
+                    logger.info(f"🔍 Hash字段: {list(hash_data.keys())}")
+                    # 尝试获取可能的数据字段
+                    for field in ['data', 'state', 'value', 'checkpoint']:
+                        if field in hash_data:
+                            data = hash_data[field]
+                            break
+                    if not data and hash_data:
+                        # 如果没找到预期字段,取第一个值试试
+                        data = list(hash_data.values())[0]
+                elif key_type == 'list':
+                    # 如果是list类型,获取最后一个元素
+                    data = redis_client.lindex(latest_key, -1)
+                elif key_type == 'ReJSON-RL':
+                    # 这是RedisJSON类型,使用JSON.GET命令
+                    logger.info(f"🔍 使用JSON.GET获取RedisJSON数据")
+                    try:
+                        # 使用JSON.GET命令获取整个JSON对象
+                        json_data = redis_client.execute_command('JSON.GET', latest_key)
+                        if json_data:
+                            data = json_data  # JSON.GET返回的就是JSON字符串
+                            logger.info(f"🔍 JSON数据长度: {len(data)} 字符")
+                        else:
+                            logger.warning(f"⚠️ JSON.GET 返回空数据")
+                            continue
+                    except Exception as json_error:
+                        logger.error(f"❌ JSON.GET 失败: {json_error}")
+                        continue
+                else:
+                    logger.warning(f"⚠️ 未知的key类型: {key_type}")
+                    continue
+                
+                if data:
+                    try:
+                        checkpoint_data = json.loads(data)
+                        
+                        # 调试:查看JSON数据结构
+                        logger.info(f"🔍 JSON顶级keys: {list(checkpoint_data.keys())}")
+                        
+                        # 根据您提供的JSON结构,消息在 checkpoint.channel_values.messages
+                        messages = []
+                        
+                        # 首先检查是否有checkpoint字段
+                        if 'checkpoint' in checkpoint_data:
+                            checkpoint = checkpoint_data['checkpoint']
+                            if isinstance(checkpoint, dict) and 'channel_values' in checkpoint:
+                                channel_values = checkpoint['channel_values']
+                                if isinstance(channel_values, dict) and 'messages' in channel_values:
+                                    messages = channel_values['messages']
+                                    logger.info(f"🔍 找到messages: {len(messages)} 条消息")
+                        
+                        # 如果没有checkpoint字段,尝试直接在channel_values
+                        if not messages and 'channel_values' in checkpoint_data:
+                            channel_values = checkpoint_data['channel_values']
+                            if isinstance(channel_values, dict) and 'messages' in channel_values:
+                                messages = channel_values['messages']
+                                logger.info(f"🔍 找到messages(直接路径): {len(messages)} 条消息")
+                        
+                        # 生成对话预览
+                        preview = "空对话"
+                        if messages:
+                            for msg in messages:
+                                # 处理LangChain消息格式:{"lc": 1, "type": "constructor", "id": ["langchain", "schema", "messages", "HumanMessage"], "kwargs": {"content": "...", "type": "human"}}
+                                if isinstance(msg, dict):
+                                    # 检查是否是LangChain格式的HumanMessage
+                                    if (msg.get('lc') == 1 and 
+                                        msg.get('type') == 'constructor' and 
+                                        'id' in msg and 
+                                        isinstance(msg['id'], list) and 
+                                        len(msg['id']) >= 4 and
+                                        msg['id'][3] == 'HumanMessage' and
+                                        'kwargs' in msg):
+                                        
+                                        kwargs = msg['kwargs']
+                                        if kwargs.get('type') == 'human' and 'content' in kwargs:
+                                            content = str(kwargs['content'])
+                                            preview = content[:50] + "..." if len(content) > 50 else content
+                                            break
+                                    # 兼容其他格式
+                                    elif msg.get('type') == 'human' and 'content' in msg:
+                                        content = str(msg['content'])
+                                        preview = content[:50] + "..." if len(content) > 50 else content
+                                        break
+                        
+                        conversations.append({
+                            "thread_id": thread_id,
+                            "user_id": user_id,
+                            "timestamp": thread_info["timestamp"],
+                            "message_count": len(messages),
+                            "conversation_preview": preview
+                        })
+                        
+                    except json.JSONDecodeError:
+                        logger.error(f"❌ JSON解析失败,数据类型: {type(data)}, 长度: {len(str(data))}")
+                        logger.error(f"❌ 数据开头: {str(data)[:200]}...")
+                        continue
+                    
+            except Exception as e:
+                logger.error(f"处理thread {thread_info['thread_id']} 失败: {e}")
+                continue
+        
+        redis_client.close()
+        logger.info(f"✅ 返回 {len(conversations)} 个对话")
+        return conversations
+        
+    except Exception as e:
+        logger.error(f"❌ Redis查询失败: {e}")
+        return []
+
 def cleanup_resources():
     """清理资源"""
     global _react_agent_instance, _redis_client
@@ -214,7 +402,7 @@ def health_check():
 
 @app.route("/api/v0/ask_react_agent", methods=["POST"])
 async def ask_react_agent():
-    """异步React Agent智能问答接口"""
+    """异步React Agent智能问答接口(从 custom_react_agent 迁移,原路由:/api/chat)"""
     global _react_agent_instance
     
     # 确保Agent已初始化
@@ -236,7 +424,7 @@ async def ask_react_agent():
                 "code": 400,
                 "message": "请求格式错误",
                 "success": False,
-                "error": "无效的JSON格式",
+                "error": "无效的JSON格式,请检查请求体中是否存在语法错误(如多余的逗号、引号不匹配等)",
                 "details": str(json_error)
             }), 400
         
@@ -1276,6 +1464,459 @@ def training_data_delete():
             response_text="删除训练数据失败,请稍后重试"
         )), 500
 
+# ==================== React Agent 扩展API ====================
+
+@app.route('/api/v0/react/users/<user_id>/conversations', methods=['GET'])
+async def get_user_conversations_react(user_id: str):
+    """异步获取用户的聊天记录列表(从 custom_react_agent 迁移)"""
+    global _react_agent_instance
+    
+    try:
+        # 获取查询参数
+        limit = request.args.get('limit', 10, type=int)
+        
+        # 限制limit的范围
+        limit = max(1, min(limit, 50))  # 限制在1-50之间
+        
+        logger.info(f"📋 异步获取用户 {user_id} 的对话列表,限制 {limit} 条")
+        
+        # 确保Agent可用
+        if not await ensure_agent_ready():
+            return jsonify({
+                "success": False,
+                "error": "Agent 未就绪",
+                "timestamp": datetime.now().isoformat()
+            }), 503
+        
+        # 直接调用异步方法
+        conversations = await _react_agent_instance.get_user_recent_conversations(user_id, limit)
+        
+        return jsonify({
+            "success": True,
+            "data": {
+                "user_id": user_id,
+                "conversations": conversations,
+                "total_count": len(conversations),
+                "limit": limit
+            },
+            "timestamp": datetime.now().isoformat()
+        }), 200
+        
+    except Exception as e:
+        logger.error(f"❌ 异步获取用户 {user_id} 对话列表失败: {e}")
+        return jsonify({
+            "success": False,
+            "error": str(e),
+            "timestamp": datetime.now().isoformat()
+        }), 500
+
+@app.route('/api/v0/react/users/<user_id>/conversations/<thread_id>', methods=['GET'])
+async def get_user_conversation_detail_react(user_id: str, thread_id: str):
+    """异步获取特定对话的详细历史(从 custom_react_agent 迁移)"""
+    global _react_agent_instance
+    
+    try:
+        # 验证thread_id格式是否匹配user_id
+        if not thread_id.startswith(f"{user_id}:"):
+            return jsonify({
+                "success": False,
+                "error": f"Thread ID {thread_id} 不属于用户 {user_id}",
+                "timestamp": datetime.now().isoformat()
+            }), 400
+        
+        logger.info(f"📖 异步获取用户 {user_id} 的对话 {thread_id} 详情")
+        
+        # 确保Agent可用
+        if not await ensure_agent_ready():
+            return jsonify({
+                "success": False,
+                "error": "Agent 未就绪",
+                "timestamp": datetime.now().isoformat()
+            }), 503
+        
+        # 直接调用异步方法
+        history = await _react_agent_instance.get_conversation_history(thread_id)
+        logger.info(f"✅ 异步成功获取对话历史,消息数量: {len(history)}")
+        
+        if not history:
+            return jsonify({
+                "success": False,
+                "error": f"未找到对话 {thread_id}",
+                "timestamp": datetime.now().isoformat()
+            }), 404
+        
+        return jsonify({
+            "success": True,
+            "data": {
+                "user_id": user_id,
+                "thread_id": thread_id,
+                "message_count": len(history),
+                "messages": history
+            },
+            "timestamp": datetime.now().isoformat()
+        }), 200
+        
+    except Exception as e:
+        import traceback
+        logger.error(f"❌ 异步获取对话 {thread_id} 详情失败: {e}")
+        logger.error(f"❌ 详细错误信息: {traceback.format_exc()}")
+        return jsonify({
+            "success": False,
+            "error": str(e),
+            "timestamp": datetime.now().isoformat()
+        }), 500
+
+@app.route('/api/test/redis', methods=['GET'])
+def test_redis_connection():
+    """测试Redis连接和基本查询(从 custom_react_agent 迁移)"""
+    try:
+        import redis
+        
+        # 创建Redis连接
+        r = redis.Redis(host='localhost', port=6379, decode_responses=True)
+        r.ping()
+        
+        # 扫描checkpoint keys
+        pattern = "checkpoint:*"
+        keys = []
+        cursor = 0
+        count = 0
+        
+        while True:
+            cursor, batch = r.scan(cursor=cursor, match=pattern, count=100)
+            keys.extend(batch)
+            count += len(batch)
+            if cursor == 0 or count > 500:  # 限制扫描数量
+                break
+        
+        # 统计用户
+        users = {}
+        for key in keys:
+            try:
+                parts = key.split(':')
+                if len(parts) >= 2:
+                    user_id = parts[1]
+                    users[user_id] = users.get(user_id, 0) + 1
+            except:
+                continue
+        
+        r.close()
+        
+        return jsonify({
+            "success": True,
+            "data": {
+                "redis_connected": True,
+                "total_checkpoint_keys": len(keys),
+                "users_found": list(users.keys()),
+                "user_key_counts": users,
+                "sample_keys": keys[:5] if keys else []
+            },
+            "timestamp": datetime.now().isoformat()
+        }), 200
+        
+    except Exception as e:
+        logger.error(f"❌ Redis测试失败: {e}")
+        return jsonify({
+            "success": False,
+            "error": str(e),
+            "timestamp": datetime.now().isoformat()
+        }), 500
+
+@app.route('/api/v0/react/direct/users/<user_id>/conversations', methods=['GET'])
+def test_get_user_conversations_simple(user_id: str):
+    """测试简单Redis查询获取用户对话列表(从 custom_react_agent 迁移)"""
+    try:
+        limit = request.args.get('limit', 10, type=int)
+        limit = max(1, min(limit, 50))
+        
+        logger.info(f"🧪 测试获取用户 {user_id} 的对话列表(简单Redis方式)")
+        
+        # 使用简单Redis查询
+        conversations = get_user_conversations_simple_sync(user_id, limit)
+        
+        return jsonify({
+            "success": True,
+            "method": "simple_redis_query",
+            "data": {
+                "user_id": user_id,
+                "conversations": conversations,
+                "total_count": len(conversations),
+                "limit": limit
+            },
+            "timestamp": datetime.now().isoformat()
+        }), 200
+        
+    except Exception as e:
+        logger.error(f"❌ 测试简单Redis查询失败: {e}")
+        return jsonify({
+            "success": False,
+            "error": str(e),
+            "timestamp": datetime.now().isoformat()
+        }), 500
+
+@app.route('/api/v0/react/direct/conversations/<thread_id>', methods=['GET'])
+def get_conversation_detail_api(thread_id: str):
+    """
+    获取特定对话的详细信息 - 支持include_tools开关参数(从 custom_react_agent 迁移)
+    
+    Query Parameters:
+        - include_tools: bool, 是否包含工具调用信息,默认false
+                        true: 返回完整对话(human/ai/tool/system)
+                        false: 只返回human/ai消息,清理工具调用信息
+        - user_id: str, 可选的用户ID验证
+        
+    Examples:
+        GET /api/conversations/wang:20250709195048728?include_tools=true   # 完整模式
+        GET /api/conversations/wang:20250709195048728?include_tools=false  # 简化模式(默认)
+        GET /api/conversations/wang:20250709195048728                      # 简化模式(默认)
+    """
+    try:
+        # 获取查询参数
+        include_tools = request.args.get('include_tools', 'false').lower() == 'true'
+        user_id = request.args.get('user_id')
+        
+        # 验证thread_id格式
+        if ':' not in thread_id:
+            return jsonify({
+                "success": False,
+                "error": "Invalid thread_id format. Expected format: user_id:timestamp",
+                "timestamp": datetime.now().isoformat()
+            }), 400
+        
+        # 如果提供了user_id,验证thread_id是否属于该用户
+        thread_user_id = thread_id.split(':')[0]
+        if user_id and thread_user_id != user_id:
+            return jsonify({
+                "success": False,
+                "error": f"Thread ID {thread_id} does not belong to user {user_id}",
+                "timestamp": datetime.now().isoformat()
+            }), 400
+        
+        logger.info(f"📖 获取对话详情 - Thread: {thread_id}, Include Tools: {include_tools}")
+        
+        # 检查enhanced_redis_api是否可用
+        if get_conversation_detail_from_redis is None:
+            return jsonify({
+                "success": False,
+                "error": "enhanced_redis_api 模块不可用",
+                "timestamp": datetime.now().isoformat()
+            }), 503
+        
+        # 从Redis获取对话详情(使用我们的新函数)
+        result = get_conversation_detail_from_redis(thread_id, include_tools)
+        
+        if not result['success']:
+            logger.warning(f"⚠️ 获取对话详情失败: {result['error']}")
+            return jsonify({
+                "success": False,
+                "error": result['error'],
+                "timestamp": datetime.now().isoformat()
+            }), 404
+        
+        # 添加API元数据
+        result['data']['api_metadata'] = {
+            "timestamp": datetime.now().isoformat(),
+            "api_version": "v1",
+            "endpoint": "get_conversation_detail",
+            "query_params": {
+                "include_tools": include_tools,
+                "user_id": user_id
+            }
+        }
+        
+        mode_desc = "完整模式" if include_tools else "简化模式"
+        logger.info(f"✅ 成功获取对话详情 - Messages: {result['data']['message_count']}, Mode: {mode_desc}")
+        
+        return jsonify({
+            "success": True,
+            "data": result['data'],
+            "timestamp": datetime.now().isoformat()
+        }), 200
+        
+    except Exception as e:
+        import traceback
+        logger.error(f"❌ 获取对话详情异常: {e}")
+        logger.error(f"❌ 详细错误信息: {traceback.format_exc()}")
+        
+        return jsonify({
+            "success": False,
+            "error": str(e),
+            "timestamp": datetime.now().isoformat()
+        }), 500
+
+@app.route('/api/v0/react/direct/conversations/<thread_id>/compare', methods=['GET'])
+def compare_conversation_modes_api(thread_id: str):
+    """
+    比较完整模式和简化模式的对话内容
+    用于调试和理解两种模式的差异(从 custom_react_agent 迁移)
+    
+    Examples:
+        GET /api/conversations/wang:20250709195048728/compare
+    """
+    try:
+        logger.info(f"🔍 比较对话模式 - Thread: {thread_id}")
+        
+        # 检查enhanced_redis_api是否可用
+        if get_conversation_detail_from_redis is None:
+            return jsonify({
+                "success": False,
+                "error": "enhanced_redis_api 模块不可用",
+                "timestamp": datetime.now().isoformat()
+            }), 503
+        
+        # 获取完整模式
+        full_result = get_conversation_detail_from_redis(thread_id, include_tools=True)
+        
+        # 获取简化模式
+        simple_result = get_conversation_detail_from_redis(thread_id, include_tools=False)
+        
+        if not (full_result['success'] and simple_result['success']):
+            return jsonify({
+                "success": False,
+                "error": "无法获取对话数据进行比较",
+                "timestamp": datetime.now().isoformat()
+            }), 404
+        
+        # 构建比较结果
+        comparison = {
+            "thread_id": thread_id,
+            "full_mode": {
+                "message_count": full_result['data']['message_count'],
+                "stats": full_result['data']['stats'],
+                "sample_messages": full_result['data']['messages'][:3]  # 只显示前3条作为示例
+            },
+            "simple_mode": {
+                "message_count": simple_result['data']['message_count'],
+                "stats": simple_result['data']['stats'],
+                "sample_messages": simple_result['data']['messages'][:3]  # 只显示前3条作为示例
+            },
+            "comparison_summary": {
+                "message_count_difference": full_result['data']['message_count'] - simple_result['data']['message_count'],
+                "tools_filtered_out": full_result['data']['stats'].get('tool_messages', 0),
+                "ai_messages_with_tools": full_result['data']['stats'].get('messages_with_tools', 0),
+                "filtering_effectiveness": "有效" if (full_result['data']['message_count'] - simple_result['data']['message_count']) > 0 else "无差异"
+            },
+            "metadata": {
+                "timestamp": datetime.now().isoformat(),
+                "note": "sample_messages 只显示前3条消息作为示例,完整消息请使用相应的详情API"
+            }
+        }
+        
+        logger.info(f"✅ 模式比较完成 - 完整: {comparison['full_mode']['message_count']}, 简化: {comparison['simple_mode']['message_count']}")
+        
+        return jsonify({
+            "success": True,
+            "data": comparison,
+            "timestamp": datetime.now().isoformat()
+        }), 200
+        
+    except Exception as e:
+        logger.error(f"❌ 对话模式比较失败: {e}")
+        return jsonify({
+            "success": False,
+            "error": str(e),
+            "timestamp": datetime.now().isoformat()
+        }), 500
+
+@app.route('/api/v0/react/direct/conversations/<thread_id>/summary', methods=['GET'])
+def get_conversation_summary_api(thread_id: str):
+    """
+    获取对话摘要信息(只包含基本统计,不返回具体消息)(从 custom_react_agent 迁移)
+    
+    Query Parameters:
+        - include_tools: bool, 影响统计信息的计算方式
+        
+    Examples:
+        GET /api/conversations/wang:20250709195048728/summary?include_tools=true
+    """
+    try:
+        include_tools = request.args.get('include_tools', 'false').lower() == 'true'
+        
+        # 验证thread_id格式
+        if ':' not in thread_id:
+            return jsonify({
+                "success": False,
+                "error": "Invalid thread_id format. Expected format: user_id:timestamp",
+                "timestamp": datetime.now().isoformat()
+            }), 400
+        
+        logger.info(f"📊 获取对话摘要 - Thread: {thread_id}, Include Tools: {include_tools}")
+        
+        # 检查enhanced_redis_api是否可用
+        if get_conversation_detail_from_redis is None:
+            return jsonify({
+                "success": False,
+                "error": "enhanced_redis_api 模块不可用",
+                "timestamp": datetime.now().isoformat()
+            }), 503
+        
+        # 获取完整对话信息
+        result = get_conversation_detail_from_redis(thread_id, include_tools)
+        
+        if not result['success']:
+            return jsonify({
+                "success": False,
+                "error": result['error'],
+                "timestamp": datetime.now().isoformat()
+            }), 404
+        
+        # 只返回摘要信息,不包含具体消息
+        data = result['data']
+        summary = {
+            "thread_id": data['thread_id'],
+            "user_id": data['user_id'],
+            "include_tools": data['include_tools'],
+            "message_count": data['message_count'],
+            "stats": data['stats'],
+            "metadata": data['metadata'],
+            "first_message_preview": None,
+            "last_message_preview": None,
+            "conversation_preview": None
+        }
+        
+        # 添加消息预览
+        messages = data.get('messages', [])
+        if messages:
+            # 第一条human消息预览
+            for msg in messages:
+                if msg['type'] == 'human':
+                    content = str(msg['content'])
+                    summary['first_message_preview'] = content[:100] + "..." if len(content) > 100 else content
+                    break
+            
+            # 最后一条ai消息预览
+            for msg in reversed(messages):
+                if msg['type'] == 'ai' and msg.get('content', '').strip():
+                    content = str(msg['content'])
+                    summary['last_message_preview'] = content[:100] + "..." if len(content) > 100 else content
+                    break
+            
+            # 生成对话预览(第一条human消息)
+            summary['conversation_preview'] = summary['first_message_preview']
+        
+        # 添加API元数据
+        summary['api_metadata'] = {
+            "timestamp": datetime.now().isoformat(),
+            "api_version": "v1",
+            "endpoint": "get_conversation_summary"
+        }
+        
+        logger.info(f"✅ 成功获取对话摘要")
+        
+        return jsonify({
+            "success": True,
+            "data": summary,
+            "timestamp": datetime.now().isoformat()
+        }), 200
+        
+    except Exception as e:
+        logger.error(f"❌ 获取对话摘要失败: {e}")
+        return jsonify({
+            "success": False,
+            "error": str(e),
+            "timestamp": datetime.now().isoformat()
+        }), 500
+
 # ==================== 启动逻辑 ====================
 
 def signal_handler(signum, frame):
@@ -1297,3 +1938,1641 @@ if __name__ == '__main__':
     
     # 启动标准Flask应用(支持异步路由)
     app.run(host="0.0.0.0", port=8084, debug=False, threaded=True)
+
+# Data Pipeline 全局变量 - 从 citu_app.py 迁移
+data_pipeline_manager = None
+data_pipeline_file_manager = None
+
+def get_data_pipeline_manager():
+    """获取Data Pipeline管理器单例(从 citu_app.py 迁移)"""
+    global data_pipeline_manager
+    if data_pipeline_manager is None:
+        data_pipeline_manager = SimpleWorkflowManager()
+    return data_pipeline_manager
+
+def get_data_pipeline_file_manager():
+    """获取Data Pipeline文件管理器单例(从 citu_app.py 迁移)"""
+    global data_pipeline_file_manager
+    if data_pipeline_file_manager is None:
+        data_pipeline_file_manager = SimpleFileManager()
+    return data_pipeline_file_manager
+
+# ==================== QA缓存管理API (从 citu_app.py 迁移) ====================
+
+@app.route('/api/v0/qa_cache_stats', methods=['GET'])
+def qa_cache_stats():
+    """获取问答缓存统计信息(从 citu_app.py 迁移)"""
+    try:
+        stats = redis_conversation_manager.get_qa_cache_stats()
+        
+        return jsonify(success_response(
+            response_text="获取问答缓存统计成功",
+            data=stats
+        ))
+        
+    except Exception as e:
+        logger.error(f"获取问答缓存统计失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="获取问答缓存统计失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/qa_cache_cleanup', methods=['POST'])
+def qa_cache_cleanup():
+    """清空所有问答缓存(从 citu_app.py 迁移)"""
+    try:
+        if not redis_conversation_manager.is_available():
+            return jsonify(internal_error_response(
+                response_text="Redis连接不可用,无法执行清理操作"
+            )), 500
+        
+        deleted_count = redis_conversation_manager.clear_all_qa_cache()
+        
+        return jsonify(success_response(
+            response_text="问答缓存清理完成",
+            data={
+                "deleted_count": deleted_count,
+                "cleared": deleted_count > 0,
+                "cleanup_time": datetime.now().isoformat()
+            }
+        ))
+        
+    except Exception as e:
+        logger.error(f"清空问答缓存失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="清空问答缓存失败,请稍后重试"
+        )), 500
+
+# ==================== Database API (从 citu_app.py 迁移) ====================
+
+@app.route('/api/v0/database/tables', methods=['POST'])
+def get_database_tables():
+    """
+    获取数据库表列表(从 citu_app.py 迁移)
+    
+    请求体:
+    {
+        "db_connection": "postgresql://postgres:postgres@192.168.67.1:5432/highway_db",  // 可选,不传则使用默认配置
+        "schema": "public,ods",  // 可选,支持多个schema用逗号分隔,默认为public
+        "table_name_pattern": "ods_*"  // 可选,表名模式匹配,支持通配符:ods_*、*_dim、*fact*、ods_%
+    }
+    
+    响应:
+    {
+        "success": true,
+        "code": 200,
+        "message": "获取表列表成功",
+        "data": {
+            "tables": ["public.table1", "public.table2", "ods.table3"],
+            "total": 3,
+            "schemas": ["public", "ods"],
+            "table_name_pattern": "ods_*"
+        }
+    }
+    """
+    try:
+        req = request.get_json(force=True)
+        
+        # 处理数据库连接参数(可选)
+        db_connection = req.get('db_connection')
+        if not db_connection:
+            # 使用app_config的默认数据库配置
+            import app_config
+            db_params = app_config.APP_DB_CONFIG
+            db_connection = f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
+            logger.info("使用默认数据库配置获取表列表")
+        else:
+            logger.info("使用用户指定的数据库配置获取表列表")
+        
+        # 可选参数
+        schema = req.get('schema', '')
+        table_name_pattern = req.get('table_name_pattern')
+        
+        # 创建表检查API实例
+        table_inspector = TableInspectorAPI()
+        
+        # 使用asyncio运行异步方法
+        async def get_tables():
+            return await table_inspector.get_tables_list(db_connection, schema, table_name_pattern)
+        
+        # 在新的事件循环中运行异步方法
+        try:
+            loop = asyncio.new_event_loop()
+            asyncio.set_event_loop(loop)
+            tables = loop.run_until_complete(get_tables())
+        finally:
+            loop.close()
+        
+        # 解析schema信息
+        parsed_schemas = table_inspector._parse_schemas(schema)
+        
+        response_data = {
+            "tables": tables,
+            "total": len(tables),
+            "schemas": parsed_schemas,
+            "db_connection_info": {
+                "database": db_connection.split('/')[-1].split('?')[0] if '/' in db_connection else "unknown"
+            }
+        }
+        
+        # 如果使用了表名模式,添加到响应中
+        if table_name_pattern:
+            response_data["table_name_pattern"] = table_name_pattern
+        
+        return jsonify(success_response(
+            response_text="获取表列表成功",
+            data=response_data
+        )), 200
+        
+    except Exception as e:
+        logger.error(f"获取数据库表列表失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text=f"获取表列表失败: {str(e)}"
+        )), 500
+
+@app.route('/api/v0/database/table/ddl', methods=['POST'])
+def get_table_ddl():
+    """
+    获取表的DDL语句或MD文档(从 citu_app.py 迁移)
+    
+    请求体:
+    {
+        "db_connection": "postgresql://postgres:postgres@192.168.67.1:5432/highway_db",  // 可选,不传则使用默认配置
+        "table": "public.test",
+        "business_context": "这是高速公路服务区的相关数据",  // 可选
+        "type": "ddl"  // 可选,支持ddl/md/both,默认为ddl
+    }
+    
+    响应:
+    {
+        "success": true,
+        "code": 200,
+        "message": "获取表DDL成功",
+        "data": {
+            "ddl": "create table public.test (...);",
+            "md": "## test表...",  // 仅当type为md或both时返回
+            "table_info": {
+                "table_name": "test",
+                "schema_name": "public",
+                "full_name": "public.test",
+                "comment": "测试表",
+                "field_count": 10,
+                "row_count": 1000
+            },
+            "fields": [...]
+        }
+    }
+    """
+    try:
+        req = request.get_json(force=True)
+        
+        # 处理参数(table仍为必需,db_connection可选)
+        table = req.get('table')
+        db_connection = req.get('db_connection')
+        
+        if not table:
+            return jsonify(bad_request_response(
+                response_text="缺少必需参数:table",
+                missing_params=['table']
+            )), 400
+        
+        if not db_connection:
+            # 使用app_config的默认数据库配置
+            import app_config
+            db_params = app_config.APP_DB_CONFIG
+            db_connection = f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
+            logger.info("使用默认数据库配置获取表DDL")
+        else:
+            logger.info("使用用户指定的数据库配置获取表DDL")
+        
+        # 可选参数
+        business_context = req.get('business_context', '')
+        output_type = req.get('type', 'ddl')
+        
+        # 验证type参数
+        valid_types = ['ddl', 'md', 'both']
+        if output_type not in valid_types:
+            return jsonify(bad_request_response(
+                response_text=f"无效的type参数: {output_type},支持的值: {valid_types}",
+                invalid_params=['type']
+            )), 400
+        
+        # 创建表检查API实例
+        table_inspector = TableInspectorAPI()
+        
+        # 使用asyncio运行异步方法
+        async def get_ddl():
+            return await table_inspector.get_table_ddl(
+                db_connection=db_connection,
+                table=table,
+                business_context=business_context,
+                output_type=output_type
+            )
+        
+        # 在新的事件循环中运行异步方法
+        try:
+            loop = asyncio.new_event_loop()
+            asyncio.set_event_loop(loop)
+            result = loop.run_until_complete(get_ddl())
+        finally:
+            loop.close()
+        
+        response_data = {
+            **result,
+            "generation_info": {
+                "business_context": business_context,
+                "output_type": output_type,
+                "has_llm_comments": bool(business_context),
+                "database": db_connection.split('/')[-1].split('?')[0] if '/' in db_connection else "unknown"
+            }
+        }
+        
+        return jsonify(success_response(
+            response_text=f"获取表{output_type.upper()}成功",
+            data=response_data
+        )), 200
+        
+    except Exception as e:
+        logger.error(f"获取表DDL失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text=f"获取表{output_type.upper() if 'output_type' in locals() else 'DDL'}失败: {str(e)}"
+        )), 500
+
+# ==================== Data Pipeline API (从 citu_app.py 迁移) ====================
+
+@app.route('/api/v0/data_pipeline/tasks', methods=['POST'])
+def create_data_pipeline_task():
+    """创建数据管道任务(从 citu_app.py 迁移)"""
+    try:
+        req = request.get_json(force=True)
+        
+        # table_list_file和business_context现在都是可选参数
+        # 如果未提供table_list_file,将使用文件上传模式
+        
+        # 创建任务(支持可选的db_connection参数)
+        manager = get_data_pipeline_manager()
+        task_id = manager.create_task(
+            table_list_file=req.get('table_list_file'),
+            business_context=req.get('business_context'),
+            db_name=req.get('db_name'),  # 可选参数,用于指定特定数据库名称
+            db_connection=req.get('db_connection'),  # 可选参数,用于指定数据库连接字符串
+            task_name=req.get('task_name'),  # 可选参数,用于指定任务名称
+            enable_sql_validation=req.get('enable_sql_validation', True),
+            enable_llm_repair=req.get('enable_llm_repair', True),
+            modify_original_file=req.get('modify_original_file', True),
+            enable_training_data_load=req.get('enable_training_data_load', True)
+        )
+        
+        # 获取任务信息
+        task_info = manager.get_task_status(task_id)
+        
+        response_data = {
+            "task_id": task_id,
+            "task_name": task_info.get('task_name'),
+            "status": task_info.get('status'),
+            "created_at": task_info.get('created_at').isoformat() if task_info.get('created_at') else None
+        }
+        
+        # 检查是否为文件上传模式
+        file_upload_mode = not req.get('table_list_file')
+        response_message = "任务创建成功"
+        
+        if file_upload_mode:
+            response_data["file_upload_mode"] = True
+            response_data["next_step"] = f"POST /api/v0/data_pipeline/tasks/{task_id}/upload-table-list"
+            response_message += ",请上传表清单文件后再执行任务"
+        
+        return jsonify(success_response(
+            response_text=response_message,
+            data=response_data
+        )), 201
+        
+    except Exception as e:
+        logger.error(f"创建数据管道任务失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="创建任务失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/data_pipeline/tasks/<task_id>/execute', methods=['POST'])
+def execute_data_pipeline_task(task_id):
+    """执行数据管道任务(从 citu_app.py 迁移)"""
+    try:
+        req = request.get_json(force=True) if request.is_json else {}
+        execution_mode = req.get('execution_mode', 'complete')
+        step_name = req.get('step_name')
+        
+        # 验证执行模式
+        if execution_mode not in ['complete', 'step']:
+            return jsonify(bad_request_response(
+                response_text="无效的执行模式,必须是 'complete' 或 'step'",
+                invalid_params=['execution_mode']
+            )), 400
+        
+        # 如果是步骤执行模式,验证步骤名称
+        if execution_mode == 'step':
+            if not step_name:
+                return jsonify(bad_request_response(
+                    response_text="步骤执行模式需要指定step_name",
+                    missing_params=['step_name']
+                )), 400
+            
+            valid_steps = ['ddl_generation', 'qa_generation', 'sql_validation', 'training_load']
+            if step_name not in valid_steps:
+                return jsonify(bad_request_response(
+                    response_text=f"无效的步骤名称,支持的步骤: {', '.join(valid_steps)}",
+                    invalid_params=['step_name']
+                )), 400
+        
+        # 检查任务是否存在
+        manager = get_data_pipeline_manager()
+        task_info = manager.get_task_status(task_id)
+        if not task_info:
+            return jsonify(not_found_response(
+                response_text=f"任务不存在: {task_id}"
+            )), 404
+        
+        # 使用subprocess启动独立进程执行任务
+        def run_task_subprocess():
+            try:
+                import subprocess
+                import sys
+                from pathlib import Path
+                
+                # 构建执行命令
+                python_executable = sys.executable
+                script_path = Path(__file__).parent / "data_pipeline" / "task_executor.py"
+                
+                cmd = [
+                    python_executable,
+                    str(script_path),
+                    "--task-id", task_id,
+                    "--execution-mode", execution_mode
+                ]
+                
+                if step_name:
+                    cmd.extend(["--step-name", step_name])
+                
+                logger.info(f"启动任务进程: {' '.join(cmd)}")
+                
+                # 启动后台进程(不等待完成)
+                process = subprocess.Popen(
+                    cmd,
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.PIPE,
+                    text=True,
+                    cwd=Path(__file__).parent
+                )
+                
+                logger.info(f"任务进程已启动: PID={process.pid}, task_id={task_id}")
+                
+            except Exception as e:
+                logger.error(f"启动任务进程失败: {task_id}, 错误: {str(e)}")
+        
+        # 在新线程中启动subprocess(避免阻塞API响应)
+        thread = Thread(target=run_task_subprocess, daemon=True)
+        thread.start()
+        
+        response_data = {
+            "task_id": task_id,
+            "execution_mode": execution_mode,
+            "step_name": step_name if execution_mode == 'step' else None,
+            "message": "任务正在后台执行,请通过状态接口查询进度"
+        }
+        
+        return jsonify(success_response(
+            response_text="任务执行已启动",
+            data=response_data
+        )), 202
+        
+    except Exception as e:
+        logger.error(f"启动数据管道任务执行失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="启动任务执行失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/data_pipeline/tasks/<task_id>', methods=['GET'])
+def get_data_pipeline_task_status(task_id):
+    """
+    获取数据管道任务状态(从 citu_app.py 迁移)
+    
+    响应:
+    {
+        "success": true,
+        "code": 200,
+        "message": "获取任务状态成功",
+        "data": {
+            "task_id": "task_20250627_143052",
+            "status": "in_progress",
+            "step_status": {
+                "ddl_generation": "completed",
+                "qa_generation": "running",
+                "sql_validation": "pending",
+                "training_load": "pending"
+            },
+            "created_at": "2025-06-27T14:30:52",
+            "started_at": "2025-06-27T14:31:00",
+            "parameters": {...},
+            "current_execution": {...},
+            "total_executions": 2
+        }
+    }
+    """
+    try:
+        manager = get_data_pipeline_manager()
+        task_info = manager.get_task_status(task_id)
+        
+        if not task_info:
+            return jsonify(not_found_response(
+                response_text=f"任务不存在: {task_id}"
+            )), 404
+        
+        # 获取步骤状态
+        steps = manager.get_task_steps(task_id)
+        current_step = None
+        for step in steps:
+            if step['step_status'] == 'running':
+                current_step = step
+                break
+        
+        # 构建步骤状态摘要
+        step_status_summary = {}
+        for step in steps:
+            step_status_summary[step['step_name']] = step['step_status']
+        
+        response_data = {
+            "task_id": task_info['task_id'],
+            "task_name": task_info.get('task_name'),
+            "status": task_info['status'],
+            "step_status": step_status_summary,
+            "created_at": task_info['created_at'].isoformat() if task_info.get('created_at') else None,
+            "started_at": task_info['started_at'].isoformat() if task_info.get('started_at') else None,
+            "completed_at": task_info['completed_at'].isoformat() if task_info.get('completed_at') else None,
+            "parameters": task_info.get('parameters', {}),
+            "result": task_info.get('result'),
+            "error_message": task_info.get('error_message'),
+            "current_step": {
+                "execution_id": current_step['execution_id'],
+                "step": current_step['step_name'],
+                "status": current_step['step_status'],
+                "started_at": current_step['started_at'].isoformat() if current_step and current_step.get('started_at') else None
+            } if current_step else None,
+            "total_steps": len(steps),
+            "steps": [{
+                "step_name": step['step_name'],
+                "step_status": step['step_status'],
+                "started_at": step['started_at'].isoformat() if step.get('started_at') else None,
+                "completed_at": step['completed_at'].isoformat() if step.get('completed_at') else None,
+                "error_message": step.get('error_message')
+            } for step in steps]
+        }
+        
+        return jsonify(success_response(
+            response_text="获取任务状态成功",
+            data=response_data
+        ))
+        
+    except Exception as e:
+        logger.error(f"获取数据管道任务状态失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="获取任务状态失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/data_pipeline/tasks/<task_id>/logs', methods=['GET'])
+def get_data_pipeline_task_logs(task_id):
+    """
+    获取数据管道任务日志(从任务目录文件读取)(从 citu_app.py 迁移)
+    
+    查询参数:
+    - limit: 日志行数限制,默认100
+    - level: 日志级别过滤,可选
+    
+    响应:
+    {
+        "success": true,
+        "code": 200,
+        "message": "获取任务日志成功",
+        "data": {
+            "task_id": "task_20250627_143052",
+            "logs": [
+                {
+                    "timestamp": "2025-06-27 14:30:52",
+                    "level": "INFO",
+                    "message": "任务开始执行"
+                }
+            ],
+            "total": 15,
+            "source": "file"
+        }
+    }
+    """
+    try:
+        limit = request.args.get('limit', 100, type=int)
+        level = request.args.get('level')
+        
+        # 限制最大查询数量
+        limit = min(limit, 1000)
+        
+        manager = get_data_pipeline_manager()
+        
+        # 验证任务是否存在
+        task_info = manager.get_task_status(task_id)
+        if not task_info:
+            return jsonify(not_found_response(
+                response_text=f"任务不存在: {task_id}"
+            )), 404
+        
+        # 获取任务目录下的日志文件
+        import os
+        from pathlib import Path
+        
+        # 获取项目根目录的绝对路径
+        project_root = Path(__file__).parent.absolute()
+        task_dir = project_root / "data_pipeline" / "training_data" / task_id
+        log_file = task_dir / "data_pipeline.log"
+        
+        logs = []
+        if log_file.exists():
+            try:
+                # 读取日志文件的最后N行
+                with open(log_file, 'r', encoding='utf-8') as f:
+                    lines = f.readlines()
+                    
+                # 取最后limit行
+                recent_lines = lines[-limit:] if len(lines) > limit else lines
+                
+                # 解析日志行
+                import re
+                log_pattern = r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+?): (.+)$'
+                
+                for line in recent_lines:
+                    line = line.strip()
+                    if not line:
+                        continue
+                        
+                    match = re.match(log_pattern, line)
+                    if match:
+                        timestamp, log_level, logger_name, message = match.groups()
+                        
+                        # 级别过滤
+                        if level and log_level != level.upper():
+                            continue
+                            
+                        logs.append({
+                            "timestamp": timestamp,
+                            "level": log_level,
+                            "logger": logger_name,
+                            "message": message
+                        })
+                    else:
+                        # 处理多行日志(如异常堆栈)
+                        if logs:
+                            logs[-1]["message"] += f"\n{line}"
+                        
+            except Exception as e:
+                logger.error(f"读取日志文件失败: {e}")
+        
+        response_data = {
+            "task_id": task_id,
+            "logs": logs,
+            "total": len(logs),
+            "source": "file",
+            "log_file": str(log_file) if log_file.exists() else None
+        }
+        
+        return jsonify(success_response(
+            response_text="获取任务日志成功",
+            data=response_data
+        ))
+        
+    except Exception as e:
+        logger.error(f"获取数据管道任务日志失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="获取任务日志失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/data_pipeline/tasks', methods=['GET'])
+def list_data_pipeline_tasks():
+    """获取数据管道任务列表(从 citu_app.py 迁移)"""
+    try:
+        limit = request.args.get('limit', 50, type=int)
+        offset = request.args.get('offset', 0, type=int)
+        status_filter = request.args.get('status')
+        
+        # 限制查询数量
+        limit = min(limit, 100)
+        
+        manager = get_data_pipeline_manager()
+        tasks = manager.get_tasks_list(
+            limit=limit,
+            offset=offset,
+            status_filter=status_filter
+        )
+        
+        # 格式化任务列表
+        formatted_tasks = []
+        for task in tasks:
+            formatted_tasks.append({
+                "task_id": task.get('task_id'),
+                "task_name": task.get('task_name'),
+                "status": task.get('status'),
+                "step_status": task.get('step_status'),
+                "created_at": task['created_at'].isoformat() if task.get('created_at') else None,
+                "started_at": task['started_at'].isoformat() if task.get('started_at') else None,
+                "completed_at": task['completed_at'].isoformat() if task.get('completed_at') else None,
+                "created_by": task.get('by_user'),
+                "db_name": task.get('db_name'),
+                "business_context": task.get('parameters', {}).get('business_context') if task.get('parameters') else None,
+                # 新增字段
+                "directory_exists": task.get('directory_exists', True),  # 默认为True,兼容旧数据
+                "updated_at": task['updated_at'].isoformat() if task.get('updated_at') else None
+            })
+        
+        response_data = {
+            "tasks": formatted_tasks,
+            "total": len(formatted_tasks),
+            "limit": limit,
+            "offset": offset
+        }
+        
+        return jsonify(success_response(
+            response_text="获取任务列表成功",
+            data=response_data
+        ))
+        
+    except Exception as e:
+        logger.error(f"获取数据管道任务列表失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="获取任务列表失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/data_pipeline/tasks/query', methods=['POST'])
+def query_data_pipeline_tasks():
+    """
+    高级查询数据管道任务列表(从 citu_app.py 迁移)
+    
+    支持复杂筛选、排序、分页功能
+    
+    请求体:
+    {
+        "page": 1,                          // 页码,必须大于0,默认1
+        "page_size": 20,                    // 每页大小,1-100之间,默认20
+        "status": "completed",              // 可选,任务状态筛选:"pending"|"running"|"completed"|"failed"|"cancelled"
+        "task_name": "highway",             // 可选,任务名称模糊搜索,最大100字符
+        "created_by": "user123",            // 可选,创建者精确匹配
+        "db_name": "highway_db",            // 可选,数据库名称精确匹配
+        "created_time_start": "2025-01-01T00:00:00",  // 可选,创建时间范围开始
+        "created_time_end": "2025-12-31T23:59:59",    // 可选,创建时间范围结束
+        "started_time_start": "2025-01-01T00:00:00",  // 可选,开始时间范围开始
+        "started_time_end": "2025-12-31T23:59:59",    // 可选,开始时间范围结束
+        "completed_time_start": "2025-01-01T00:00:00", // 可选,完成时间范围开始
+        "completed_time_end": "2025-12-31T23:59:59",   // 可选,完成时间范围结束
+        "sort_by": "created_at",            // 可选,排序字段:"created_at"|"started_at"|"completed_at"|"task_name"|"status",默认"created_at"
+        "sort_order": "desc"                // 可选,排序方向:"asc"|"desc",默认"desc"
+    }
+    """
+    try:
+        # 获取请求数据
+        req = request.get_json(force=True) if request.is_json else {}
+        
+        # 解析参数,设置默认值
+        page = req.get('page', 1)
+        page_size = req.get('page_size', 20)
+        status = req.get('status')
+        task_name = req.get('task_name')
+        created_by = req.get('created_by')
+        db_name = req.get('db_name')
+        created_time_start = req.get('created_time_start')
+        created_time_end = req.get('created_time_end')
+        started_time_start = req.get('started_time_start')
+        started_time_end = req.get('started_time_end')
+        completed_time_start = req.get('completed_time_start')
+        completed_time_end = req.get('completed_time_end')
+        sort_by = req.get('sort_by', 'created_at')
+        sort_order = req.get('sort_order', 'desc')
+        
+        # 参数验证
+        # 验证分页参数
+        if page < 1:
+            return jsonify(bad_request_response(
+                response_text="页码必须大于0",
+                invalid_params=['page']
+            )), 400
+        
+        if page_size < 1 or page_size > 100:
+            return jsonify(bad_request_response(
+                response_text="每页大小必须在1-100之间",
+                invalid_params=['page_size']
+            )), 400
+        
+        # 验证任务名称长度
+        if task_name and len(task_name) > 100:
+            return jsonify(bad_request_response(
+                response_text="任务名称搜索关键词最大长度为100字符",
+                invalid_params=['task_name']
+            )), 400
+        
+        # 验证排序参数
+        allowed_sort_fields = ['created_at', 'started_at', 'completed_at', 'task_name', 'status']
+        if sort_by not in allowed_sort_fields:
+            return jsonify(bad_request_response(
+                response_text=f"不支持的排序字段: {sort_by},支持的字段: {', '.join(allowed_sort_fields)}",
+                invalid_params=['sort_by']
+            )), 400
+        
+        if sort_order.lower() not in ['asc', 'desc']:
+            return jsonify(bad_request_response(
+                response_text="排序方向必须是 'asc' 或 'desc'",
+                invalid_params=['sort_order']
+            )), 400
+        
+        # 验证状态筛选
+        if status:
+            allowed_statuses = ['pending', 'running', 'completed', 'failed', 'cancelled']
+            if status not in allowed_statuses:
+                return jsonify(bad_request_response(
+                    response_text=f"不支持的状态值: {status},支持的状态: {', '.join(allowed_statuses)}",
+                    invalid_params=['status']
+                )), 400
+        
+        # 调用管理器执行查询
+        manager = get_data_pipeline_manager()
+        result = manager.query_tasks_advanced(
+            page=page,
+            page_size=page_size,
+            status=status,
+            task_name=task_name,
+            created_by=created_by,
+            db_name=db_name,
+            created_time_start=created_time_start,
+            created_time_end=created_time_end,
+            started_time_start=started_time_start,
+            started_time_end=started_time_end,
+            completed_time_start=completed_time_start,
+            completed_time_end=completed_time_end,
+            sort_by=sort_by,
+            sort_order=sort_order
+        )
+        
+        # 格式化任务列表
+        formatted_tasks = []
+        for task in result['tasks']:
+            formatted_tasks.append({
+                "task_id": task.get('task_id'),
+                "task_name": task.get('task_name'),
+                "status": task.get('status'),
+                "step_status": task.get('step_status'),
+                "created_at": task['created_at'].isoformat() if task.get('created_at') else None,
+                "started_at": task['started_at'].isoformat() if task.get('started_at') else None,
+                "completed_at": task['completed_at'].isoformat() if task.get('completed_at') else None,
+                "created_by": task.get('by_user'),
+                "db_name": task.get('db_name'),
+                "business_context": task.get('parameters', {}).get('business_context') if task.get('parameters') else None,
+                "directory_exists": task.get('directory_exists', True),
+                "updated_at": task['updated_at'].isoformat() if task.get('updated_at') else None
+            })
+        
+        # 构建响应数据
+        response_data = {
+            "tasks": formatted_tasks,
+            "pagination": result['pagination'],
+            "filters_applied": {
+                k: v for k, v in {
+                    "status": status,
+                    "task_name": task_name,
+                    "created_by": created_by,
+                    "db_name": db_name,
+                    "created_time_start": created_time_start,
+                    "created_time_end": created_time_end,
+                    "started_time_start": started_time_start,
+                    "started_time_end": started_time_end,
+                    "completed_time_start": completed_time_start,
+                    "completed_time_end": completed_time_end
+                }.items() if v
+            },
+            "sort_applied": {
+                "sort_by": sort_by,
+                "sort_order": sort_order
+            },
+            "query_time": result.get('query_time', '0.000s')
+        }
+        
+        return jsonify(success_response(
+            response_text="查询任务列表成功",
+            data=response_data
+        ))
+        
+    except Exception as e:
+        logger.error(f"查询数据管道任务列表失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="查询任务列表失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/data_pipeline/tasks/<task_id>/files', methods=['GET'])
+def get_data_pipeline_task_files(task_id):
+    """获取任务文件列表(从 citu_app.py 迁移)"""
+    try:
+        file_manager = get_data_pipeline_file_manager()
+        
+        # 获取任务文件
+        files = file_manager.get_task_files(task_id)
+        directory_info = file_manager.get_directory_info(task_id)
+        
+        # 格式化文件信息
+        formatted_files = []
+        for file_info in files:
+            formatted_files.append({
+                "file_name": file_info['file_name'],
+                "file_type": file_info['file_type'],
+                "file_size": file_info['file_size'],
+                "file_size_formatted": file_info['file_size_formatted'],
+                "created_at": file_info['created_at'].isoformat() if file_info.get('created_at') else None,
+                "modified_at": file_info['modified_at'].isoformat() if file_info.get('modified_at') else None,
+                "is_readable": file_info['is_readable']
+            })
+        
+        response_data = {
+            "task_id": task_id,
+            "files": formatted_files,
+            "directory_info": directory_info
+        }
+        
+        return jsonify(success_response(
+            response_text="获取任务文件列表成功",
+            data=response_data
+        ))
+        
+    except Exception as e:
+        logger.error(f"获取任务文件列表失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="获取任务文件列表失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/data_pipeline/tasks/<task_id>/files/<file_name>', methods=['GET'])
+def download_data_pipeline_task_file(task_id, file_name):
+    """下载任务文件(从 citu_app.py 迁移)"""
+    try:
+        logger.info(f"开始下载文件: task_id={task_id}, file_name={file_name}")
+        
+        # 直接构建文件路径,避免依赖数据库
+        from pathlib import Path
+        import os
+        
+        # 获取项目根目录的绝对路径
+        project_root = Path(__file__).parent.absolute()
+        task_dir = project_root / "data_pipeline" / "training_data" / task_id
+        file_path = task_dir / file_name
+        
+        logger.info(f"文件路径: {file_path}")
+        
+        # 检查文件是否存在
+        if not file_path.exists():
+            logger.warning(f"文件不存在: {file_path}")
+            return jsonify(not_found_response(
+                response_text=f"文件不存在: {file_name}"
+            )), 404
+        
+        # 检查是否为文件(而不是目录)
+        if not file_path.is_file():
+            logger.warning(f"路径不是文件: {file_path}")
+            return jsonify(bad_request_response(
+                response_text=f"路径不是有效文件: {file_name}"
+            )), 400
+        
+        # 安全检查:确保文件在允许的目录内
+        try:
+            file_path.resolve().relative_to(task_dir.resolve())
+        except ValueError:
+            logger.warning(f"文件路径不安全: {file_path}")
+            return jsonify(bad_request_response(
+                response_text="非法的文件路径"
+            )), 400
+        
+        # 检查文件是否可读
+        if not os.access(file_path, os.R_OK):
+            logger.warning(f"文件不可读: {file_path}")
+            return jsonify(bad_request_response(
+                response_text="文件不可读"
+            )), 400
+        
+        logger.info(f"开始发送文件: {file_path}")
+        return send_file(
+            file_path,
+            as_attachment=True,
+            download_name=file_name
+        )
+        
+    except Exception as e:
+        logger.error(f"下载任务文件失败: task_id={task_id}, file_name={file_name}, 错误: {str(e)}", exc_info=True)
+        return jsonify(internal_error_response(
+            response_text="下载文件失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/data_pipeline/tasks/<task_id>/upload-table-list', methods=['POST'])
+def upload_table_list_file(task_id):
+    """
+    上传表清单文件(从 citu_app.py 迁移)
+    
+    表单参数:
+    - file: 要上传的表清单文件(multipart/form-data)
+    
+    响应:
+    {
+        "success": true,
+        "code": 200,
+        "message": "表清单文件上传成功",
+        "data": {
+            "task_id": "task_20250701_123456",
+            "filename": "table_list.txt",
+            "file_size": 1024,
+            "file_size_formatted": "1.0 KB"
+        }
+    }
+    """
+    try:
+        # 验证任务是否存在
+        manager = get_data_pipeline_manager()
+        task_info = manager.get_task_status(task_id)
+        if not task_info:
+            return jsonify(not_found_response(
+                response_text=f"任务不存在: {task_id}"
+            )), 404
+        
+        # 检查是否有文件上传
+        if 'file' not in request.files:
+            return jsonify(bad_request_response(
+                response_text="请选择要上传的表清单文件",
+                missing_params=['file']
+            )), 400
+        
+        file = request.files['file']
+        
+        # 验证文件名
+        if file.filename == '':
+            return jsonify(bad_request_response(
+                response_text="请选择有效的文件"
+            )), 400
+        
+        try:
+            # 使用文件管理器上传文件
+            file_manager = get_data_pipeline_file_manager()
+            result = file_manager.upload_table_list_file(task_id, file)
+            
+            response_data = {
+                "task_id": task_id,
+                "filename": result["filename"],
+                "file_size": result["file_size"],
+                "file_size_formatted": result["file_size_formatted"],
+                "upload_time": result["upload_time"].isoformat() if result.get("upload_time") else None
+            }
+            
+            return jsonify(success_response(
+                response_text="表清单文件上传成功",
+                data=response_data
+            )), 200
+            
+        except ValueError as e:
+            # 文件验证错误(如文件太大、空文件等)
+            return jsonify(bad_request_response(
+                response_text=str(e)
+            )), 400
+        except Exception as e:
+            logger.error(f"上传表清单文件失败: {str(e)}")
+            return jsonify(internal_error_response(
+                response_text="文件上传失败,请稍后重试"
+            )), 500
+        
+    except Exception as e:
+        logger.error(f"处理表清单文件上传请求失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="处理上传请求失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/data_pipeline/tasks/<task_id>/table-list-info', methods=['GET'])
+def get_table_list_info(task_id):
+    """
+    获取任务的表清单文件信息(从 citu_app.py 迁移)
+    
+    响应:
+    {
+        "success": true,
+        "code": 200,
+        "message": "获取表清单文件信息成功",
+        "data": {
+            "task_id": "task_20250701_123456",
+            "has_file": true,
+            "filename": "table_list.txt",
+            "file_path": "./data_pipeline/training_data/task_20250701_123456/table_list.txt",
+            "file_size": 1024,
+            "file_size_formatted": "1.0 KB",
+            "uploaded_at": "2025-07-01T12:34:56",
+            "table_count": 5,
+            "is_readable": true
+        }
+    }
+    """
+    try:
+        file_manager = get_data_pipeline_file_manager()
+        
+        # 获取表清单文件信息
+        table_list_info = file_manager.get_table_list_file_info(task_id)
+        
+        response_data = {
+            "task_id": task_id,
+            "has_file": table_list_info.get("exists", False),
+            **table_list_info
+        }
+        
+        return jsonify(success_response(
+            response_text="获取表清单文件信息成功",
+            data=response_data
+        ))
+        
+    except Exception as e:
+        logger.error(f"获取表清单文件信息失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="获取表清单文件信息失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/data_pipeline/tasks/<task_id>/table-list', methods=['POST'])
+def create_table_list_from_names(task_id):
+    """
+    通过POST方式提交表名列表并创建table_list.txt文件(从 citu_app.py 迁移)
+    
+    请求体:
+    {
+        "tables": ["table1", "schema.table2", "table3"]
+    }
+    或者:
+    {
+        "tables": "table1,schema.table2,table3"
+    }
+    
+    响应:
+    {
+        "success": true,
+        "code": 200,
+        "message": "表清单已成功创建",
+        "data": {
+            "task_id": "task_20250701_123456",
+            "filename": "table_list.txt",
+            "table_count": 3,
+            "file_size": 45,
+            "file_size_formatted": "45 B",
+            "created_time": "2025-07-01T12:34:56"
+        }
+    }
+    """
+    try:
+        # 验证任务是否存在
+        manager = get_data_pipeline_manager()
+        task_info = manager.get_task_status(task_id)
+        if not task_info:
+            return jsonify(not_found_response(
+                response_text=f"任务不存在: {task_id}"
+            )), 404
+        
+        # 获取请求数据
+        req = request.get_json(force=True)
+        tables_param = req.get('tables')
+        
+        if not tables_param:
+            return jsonify(bad_request_response(
+                response_text="缺少必需参数:tables",
+                missing_params=['tables']
+            )), 400
+        
+        # 处理不同格式的表名参数
+        try:
+            if isinstance(tables_param, str):
+                # 逗号分隔的字符串格式
+                table_names = [name.strip() for name in tables_param.split(',') if name.strip()]
+            elif isinstance(tables_param, list):
+                # 数组格式
+                table_names = [str(name).strip() for name in tables_param if str(name).strip()]
+            else:
+                return jsonify(bad_request_response(
+                    response_text="tables参数格式错误,应为字符串(逗号分隔)或数组"
+                )), 400
+            
+            if not table_names:
+                return jsonify(bad_request_response(
+                    response_text="表名列表不能为空"
+                )), 400
+                
+        except Exception as e:
+            return jsonify(bad_request_response(
+                response_text=f"解析tables参数失败: {str(e)}"
+            )), 400
+        
+        try:
+            # 使用文件管理器创建表清单文件
+            file_manager = get_data_pipeline_file_manager()
+            result = file_manager.create_table_list_from_names(task_id, table_names)
+            
+            response_data = {
+                "task_id": task_id,
+                "filename": result["filename"],
+                "table_count": result["table_count"],
+                "unique_table_count": result["unique_table_count"],
+                "file_size": result["file_size"],
+                "file_size_formatted": result["file_size_formatted"],
+                "created_time": result["created_time"].isoformat() if result.get("created_time") else None,
+                "original_count": len(table_names) if isinstance(table_names, list) else len(tables_param.split(','))
+            }
+            
+            return jsonify(success_response(
+                response_text=f"表清单已成功创建,包含 {result['table_count']} 个表",
+                data=response_data
+            )), 200
+            
+        except ValueError as e:
+            # 表名验证错误(如格式错误、数量限制等)
+            return jsonify(bad_request_response(
+                response_text=str(e)
+            )), 400
+        except Exception as e:
+            logger.error(f"创建表清单文件失败: {str(e)}")
+            return jsonify(internal_error_response(
+                response_text="创建表清单文件失败,请稍后重试"
+            )), 500
+        
+    except Exception as e:
+        logger.error(f"处理表清单创建请求失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="处理请求失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/data_pipeline/tasks/<task_id>/files', methods=['POST'])
+def upload_file_to_task(task_id):
+    """
+    上传文件到指定任务目录(从 citu_app.py 迁移)
+    
+    表单参数:
+    - file: 要上传的文件(multipart/form-data)
+    - overwrite_mode: 重名处理模式 (backup, replace, skip),默认为backup
+    
+    支持的文件类型:
+    - .ddl: DDL文件
+    - .md: Markdown文档
+    - .txt: 文本文件
+    - .json: JSON文件
+    - .sql: SQL文件
+    - .csv: CSV文件
+    
+    重名处理模式:
+    - backup: 备份原文件(默认)
+    - replace: 直接覆盖
+    - skip: 跳过上传
+    """
+    try:
+        # 验证任务是否存在
+        manager = get_data_pipeline_manager()
+        task_info = manager.get_task_status(task_id)
+        if not task_info:
+            return jsonify(not_found_response(
+                response_text=f"任务不存在: {task_id}"
+            )), 404
+        
+        # 检查是否有文件上传
+        if 'file' not in request.files:
+            return jsonify(bad_request_response(
+                response_text="请选择要上传的文件",
+                missing_params=['file']
+            )), 400
+        
+        file = request.files['file']
+        
+        # 验证文件名
+        if file.filename == '':
+            return jsonify(bad_request_response(
+                response_text="请选择有效的文件"
+            )), 400
+        
+        # 获取重名处理模式
+        overwrite_mode = request.form.get('overwrite_mode', 'backup')
+        
+        # 验证重名处理模式
+        valid_modes = ['backup', 'replace', 'skip']
+        if overwrite_mode not in valid_modes:
+            return jsonify(bad_request_response(
+                response_text=f"无效的overwrite_mode参数: {overwrite_mode},支持的值: {valid_modes}",
+                invalid_params=['overwrite_mode']
+            )), 400
+        
+        try:
+            # 使用文件管理器上传文件
+            file_manager = get_data_pipeline_file_manager()
+            result = file_manager.upload_file_to_task(task_id, file, file.filename, overwrite_mode)
+            
+            # 检查是否跳过上传
+            if result.get('skipped'):
+                return jsonify(success_response(
+                    response_text=result.get('message', '文件已存在,跳过上传'),
+                    data=result
+                )), 200
+            
+            return jsonify(success_response(
+                response_text="文件上传成功",
+                data=result
+            )), 200
+            
+        except ValueError as e:
+            # 文件验证错误(如文件太大、空文件、不支持的类型等)
+            return jsonify(bad_request_response(
+                response_text=str(e)
+            )), 400
+        except Exception as e:
+            logger.error(f"上传文件失败: {str(e)}")
+            return jsonify(internal_error_response(
+                response_text="文件上传失败,请稍后重试"
+            )), 500
+        
+    except Exception as e:
+        logger.error(f"处理文件上传请求失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="处理上传请求失败,请稍后重试"
+        )), 500
+
+# 任务目录删除功能(从 citu_app.py 迁移)
+import shutil
+from pathlib import Path
+import psycopg2
+from app_config import PGVECTOR_CONFIG
+
+def delete_task_directory_simple(task_id, delete_database_records=False):
+    """
+    简单的任务目录删除功能(从 citu_app.py 迁移)
+    - 删除 data_pipeline/training_data/{task_id} 目录
+    - 更新数据库中的 directory_exists 字段
+    - 可选:删除数据库记录
+    """
+    try:
+        # 1. 删除目录
+        project_root = Path(__file__).parent.absolute()
+        task_dir = project_root / "data_pipeline" / "training_data" / task_id
+        
+        deleted_files_count = 0
+        deleted_size = 0
+        
+        if task_dir.exists():
+            # 计算删除前的统计信息
+            for file_path in task_dir.rglob('*'):
+                if file_path.is_file():
+                    deleted_files_count += 1
+                    deleted_size += file_path.stat().st_size
+            
+            # 删除目录
+            shutil.rmtree(task_dir)
+            directory_deleted = True
+            operation_message = "目录删除成功"
+        else:
+            directory_deleted = False
+            operation_message = "目录不存在,无需删除"
+        
+        # 2. 更新数据库
+        database_records_deleted = False
+        
+        try:
+            conn = psycopg2.connect(**PGVECTOR_CONFIG)
+            cur = conn.cursor()
+            
+            if delete_database_records:
+                # 删除任务步骤记录
+                cur.execute("DELETE FROM data_pipeline_task_steps WHERE task_id = %s", (task_id,))
+                # 删除任务主记录
+                cur.execute("DELETE FROM data_pipeline_tasks WHERE task_id = %s", (task_id,))
+                database_records_deleted = True
+            else:
+                # 只更新目录状态
+                cur.execute("""
+                    UPDATE data_pipeline_tasks 
+                    SET directory_exists = FALSE, updated_at = CURRENT_TIMESTAMP 
+                    WHERE task_id = %s
+                """, (task_id,))
+            
+            conn.commit()
+            cur.close()
+            conn.close()
+            
+        except Exception as db_error:
+            logger.error(f"数据库操作失败: {db_error}")
+            # 数据库失败不影响文件删除的结果
+        
+        # 3. 格式化文件大小
+        def format_size(size_bytes):
+            if size_bytes < 1024:
+                return f"{size_bytes} B"
+            elif size_bytes < 1024**2:
+                return f"{size_bytes/1024:.1f} KB"
+            elif size_bytes < 1024**3:
+                return f"{size_bytes/(1024**2):.1f} MB"
+            else:
+                return f"{size_bytes/(1024**3):.1f} GB"
+        
+        return {
+            "success": True,
+            "task_id": task_id,
+            "directory_deleted": directory_deleted,
+            "database_records_deleted": database_records_deleted,
+            "deleted_files_count": deleted_files_count,
+            "deleted_size": format_size(deleted_size),
+            "deleted_at": datetime.now().isoformat(),
+            "operation_message": operation_message  # 新增:具体的操作消息
+        }
+        
+    except Exception as e:
+        logger.error(f"删除任务目录失败: {task_id}, 错误: {str(e)}")
+        return {
+            "success": False,
+            "task_id": task_id,
+            "error": str(e),
+            "error_code": "DELETE_FAILED",
+            "operation_message": f"删除操作失败: {str(e)}"  # 新增:失败消息
+        }
+
+@app.route('/api/v0/data_pipeline/tasks', methods=['DELETE'])
+def delete_tasks():
+    """删除任务目录(支持单个和批量)(从 citu_app.py 迁移)"""
+    try:
+        # 智能获取参数:支持JSON body和URL查询参数两种方式
+        def get_request_parameter(param_name, array_param_name=None):
+            """从JSON body或URL查询参数中获取参数值"""
+            # 1. 优先从JSON body获取
+            if request.is_json:
+                try:
+                    json_data = request.get_json()
+                    if json_data and param_name in json_data:
+                        return json_data[param_name]
+                except:
+                    pass
+            
+            # 2. 从URL查询参数获取
+            if param_name in request.args:
+                value = request.args.get(param_name)
+                # 处理布尔值
+                if value.lower() in ('true', '1', 'yes'):
+                    return True
+                elif value.lower() in ('false', '0', 'no'):
+                    return False
+                return value
+            
+            # 3. 处理数组参数(如 task_ids[])
+            if array_param_name and array_param_name in request.args:
+                return request.args.getlist(array_param_name)
+            
+            return None
+        
+        # 获取参数
+        task_ids = get_request_parameter('task_ids', 'task_ids[]')
+        confirm = get_request_parameter('confirm')
+        
+        if not task_ids:
+            return jsonify(bad_request_response(
+                response_text="缺少必需参数: task_ids",
+                missing_params=['task_ids']
+            )), 400
+        
+        if not confirm:
+            return jsonify(bad_request_response(
+                response_text="缺少必需参数: confirm",
+                missing_params=['confirm']
+            )), 400
+        
+        if confirm != True:
+            return jsonify(bad_request_response(
+                response_text="confirm参数必须为true以确认删除操作"
+            )), 400
+        
+        if not isinstance(task_ids, list) or len(task_ids) == 0:
+            return jsonify(bad_request_response(
+                response_text="task_ids必须是非空的任务ID列表"
+            )), 400
+        
+        # 获取可选参数
+        delete_database_records = get_request_parameter('delete_database_records') or False
+        continue_on_error = get_request_parameter('continue_on_error')
+        if continue_on_error is None:
+            continue_on_error = True
+        
+        # 执行批量删除操作
+        deleted_tasks = []
+        failed_tasks = []
+        total_size_freed = 0
+        
+        for task_id in task_ids:
+            result = delete_task_directory_simple(task_id, delete_database_records)
+            
+            if result["success"]:
+                deleted_tasks.append(result)
+                # 累计释放的空间大小(这里简化处理,实际应该解析size字符串)
+            else:
+                failed_tasks.append({
+                    "task_id": task_id,
+                    "error": result["error"],
+                    "error_code": result.get("error_code", "UNKNOWN")
+                })
+                
+                if not continue_on_error:
+                    break
+        
+        # 构建响应
+        summary = {
+            "total_requested": len(task_ids),
+            "successfully_deleted": len(deleted_tasks),
+            "failed": len(failed_tasks)
+        }
+        
+        batch_result = {
+            "deleted_tasks": deleted_tasks,
+            "failed_tasks": failed_tasks,
+            "summary": summary,
+            "deleted_at": datetime.now().isoformat()
+        }
+        
+        # 构建智能响应消息
+        if len(task_ids) == 1:
+            # 单个删除:使用具体的操作消息
+            if summary["failed"] == 0:
+                # 从deleted_tasks中获取具体的操作消息
+                operation_msg = deleted_tasks[0].get('operation_message', '任务处理完成')
+                message = operation_msg
+            else:
+                # 从failed_tasks中获取错误消息
+                error_msg = failed_tasks[0].get('error', '删除失败')
+                message = f"任务删除失败: {error_msg}"
+        else:
+            # 批量删除:统计各种操作结果
+            directory_deleted_count = sum(1 for task in deleted_tasks if task.get('directory_deleted', False))
+            directory_not_exist_count = sum(1 for task in deleted_tasks if not task.get('directory_deleted', False))
+            
+            if summary["failed"] == 0:
+                # 全部成功
+                if directory_deleted_count > 0 and directory_not_exist_count > 0:
+                    message = f"批量操作完成:{directory_deleted_count}个目录已删除,{directory_not_exist_count}个目录不存在"
+                elif directory_deleted_count > 0:
+                    message = f"批量删除完成:成功删除{directory_deleted_count}个目录"
+                elif directory_not_exist_count > 0:
+                    message = f"批量操作完成:{directory_not_exist_count}个目录不存在,无需删除"
+                else:
+                    message = "批量操作完成"
+            elif summary["successfully_deleted"] == 0:
+                message = f"批量删除失败:{summary['failed']}个任务处理失败"
+            else:
+                message = f"批量删除部分完成:成功{summary['successfully_deleted']}个,失败{summary['failed']}个"
+        
+        return jsonify(success_response(
+            response_text=message,
+            data=batch_result
+        )), 200
+        
+    except Exception as e:
+        logger.error(f"删除任务失败: 错误: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="删除任务失败,请稍后重试"
+        )), 500
+
+@app.route('/api/v0/data_pipeline/tasks/<task_id>/logs/query', methods=['POST'])
+def query_data_pipeline_task_logs(task_id):
+    """
+    高级查询数据管道任务日志(从 citu_app.py 迁移)
+    
+    支持复杂筛选、排序、分页功能
+    
+    请求体:
+    {
+        "page": 1,                          // 页码,必须大于0,默认1
+        "page_size": 50,                    // 每页大小,1-500之间,默认50
+        "level": "ERROR",                   // 可选,日志级别筛选:"DEBUG"|"INFO"|"WARNING"|"ERROR"|"CRITICAL"
+        "start_time": "2025-01-01 00:00:00", // 可选,开始时间范围 (YYYY-MM-DD HH:MM:SS)
+        "end_time": "2025-01-02 23:59:59",   // 可选,结束时间范围 (YYYY-MM-DD HH:MM:SS)
+        "keyword": "failed",                 // 可选,关键字搜索(消息内容模糊匹配)
+        "logger_name": "DDLGenerator",       // 可选,日志记录器名称精确匹配
+        "step_name": "ddl_generation",       // 可选,执行步骤名称精确匹配
+        "sort_by": "timestamp",              // 可选,排序字段:"timestamp"|"level"|"logger"|"step"|"line_number",默认"timestamp"
+        "sort_order": "desc"                 // 可选,排序方向:"asc"|"desc",默认"desc"
+    }
+    """
+    try:
+        # 验证任务是否存在
+        manager = get_data_pipeline_manager()
+        task_info = manager.get_task_status(task_id)
+        if not task_info:
+            return jsonify(not_found_response(
+                response_text=f"任务不存在: {task_id}"
+            )), 404
+        
+        # 解析请求数据
+        request_data = request.get_json() or {}
+        
+        # 参数验证
+        def _is_valid_time_format(time_str):
+            """验证时间格式是否有效"""
+            if not time_str:
+                return True
+            
+            # 支持的时间格式
+            time_formats = [
+                '%Y-%m-%d %H:%M:%S',     # 2025-01-01 00:00:00
+                '%Y-%m-%d',              # 2025-01-01
+                '%Y-%m-%dT%H:%M:%S',     # 2025-01-01T00:00:00
+                '%Y-%m-%dT%H:%M:%S.%f',  # 2025-01-01T00:00:00.123456
+            ]
+            
+            for fmt in time_formats:
+                try:
+                    from datetime import datetime
+                    datetime.strptime(time_str, fmt)
+                    return True
+                except ValueError:
+                    continue
+            return False
+        
+        # 提取和验证参数
+        page = request_data.get('page', 1)
+        page_size = request_data.get('page_size', 50)
+        level = request_data.get('level')
+        start_time = request_data.get('start_time')
+        end_time = request_data.get('end_time')
+        keyword = request_data.get('keyword')
+        logger_name = request_data.get('logger_name')
+        step_name = request_data.get('step_name')
+        sort_by = request_data.get('sort_by', 'timestamp')
+        sort_order = request_data.get('sort_order', 'desc')
+        
+        # 参数验证
+        if not isinstance(page, int) or page < 1:
+            return jsonify(bad_request_response(
+                response_text="页码必须是大于0的整数"
+            )), 400
+        
+        if not isinstance(page_size, int) or page_size < 1 or page_size > 500:
+            return jsonify(bad_request_response(
+                response_text="每页大小必须是1-500之间的整数"
+            )), 400
+        
+        # 验证日志级别
+        if level and level.upper() not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
+            return jsonify(bad_request_response(
+                response_text="日志级别必须是DEBUG、INFO、WARNING、ERROR、CRITICAL之一"
+            )), 400
+        
+        # 验证时间格式
+        if not _is_valid_time_format(start_time):
+            return jsonify(bad_request_response(
+                response_text="开始时间格式无效,支持格式:YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD"
+            )), 400
+        
+        if not _is_valid_time_format(end_time):
+            return jsonify(bad_request_response(
+                response_text="结束时间格式无效,支持格式:YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD"
+            )), 400
+        
+        # 验证关键字长度
+        if keyword and len(keyword) > 200:
+            return jsonify(bad_request_response(
+                response_text="关键字长度不能超过200个字符"
+            )), 400
+        
+        # 验证排序字段
+        allowed_sort_fields = ['timestamp', 'level', 'logger', 'step', 'line_number']
+        if sort_by not in allowed_sort_fields:
+            return jsonify(bad_request_response(
+                response_text=f"排序字段必须是以下之一: {', '.join(allowed_sort_fields)}"
+            )), 400
+        
+        # 验证排序方向
+        if sort_order.lower() not in ['asc', 'desc']:
+            return jsonify(bad_request_response(
+                response_text="排序方向必须是asc或desc"
+            )), 400
+        
+        # 创建工作流执行器并查询日志
+        from data_pipeline.api.simple_workflow import SimpleWorkflowExecutor
+        executor = SimpleWorkflowExecutor(task_id)
+        
+        try:
+            result = executor.query_logs_advanced(
+                page=page,
+                page_size=page_size,
+                level=level,
+                start_time=start_time,
+                end_time=end_time,
+                keyword=keyword,
+                logger_name=logger_name,
+                step_name=step_name,
+                sort_by=sort_by,
+                sort_order=sort_order
+            )
+            
+            return jsonify(success_response(
+                response_text="查询任务日志成功",
+                data=result
+            ))
+            
+        finally:
+            executor.cleanup()
+        
+    except Exception as e:
+        logger.error(f"查询数据管道任务日志失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="查询任务日志失败,请稍后重试"
+        )), 500