""" Custom React Agent API 服务 提供RESTful接口用于智能问答 """ import asyncio import logging import atexit import os import sys from datetime import datetime from typing import Optional, Dict, Any # 🔧 修复模块路径问题:添加项目根目录到 sys.path CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, '..', '..')) sys.path.insert(0, CURRENT_DIR) # 当前目录优先 sys.path.insert(1, PROJECT_ROOT) # 项目根目录 from flask import Flask, request, jsonify import redis.asyncio as redis try: # 尝试相对导入(当作为模块导入时) from .agent import CustomReactAgent except ImportError: # 如果相对导入失败,尝试绝对导入(直接运行时) from agent import CustomReactAgent # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 全局Agent实例 _agent_instance: Optional[CustomReactAgent] = None _redis_client: Optional[redis.Redis] = None def validate_request_data(data: Dict[str, Any]) -> Dict[str, Any]: """验证请求数据,并支持从thread_id中推断user_id""" errors = [] # 验证 question(必填) question = data.get('question', '') if not question or not question.strip(): errors.append('问题不能为空') elif len(question) > 2000: errors.append('问题长度不能超过2000字符') # 优先获取 thread_id thread_id = data.get('thread_id') or data.get('conversation_id') # 获取 user_id,但暂不设置默认值 user_id = data.get('user_id') # 如果没有传递 user_id,则尝试从 thread_id 中推断 if not user_id: if thread_id and ':' in thread_id: inferred_user_id = thread_id.split(':', 1)[0] if inferred_user_id: user_id = inferred_user_id logger.info(f"👤 未提供user_id,从 thread_id '{thread_id}' 中推断出: '{user_id}'") else: # 如果拆分结果为空,则使用默认值 user_id = 'guest' else: # 如果 thread_id 不存在或格式不正确,则使用默认值 user_id = 'guest' # 验证 user_id 长度 if user_id and len(user_id) > 50: errors.append('用户ID长度不能超过50字符') # 用户ID与会话ID一致性校验 if thread_id: if ':' not in thread_id: errors.append('会话ID格式无效,期望格式为 user_id:timestamp') else: thread_user_id = thread_id.split(':', 1)[0] if thread_user_id != user_id: errors.append(f'会话归属验证失败:会话ID [{thread_id}] 不属于当前用户 [{user_id}]') if errors: raise ValueError('; '.join(errors)) return { 'question': question.strip(), 'user_id': user_id, 'thread_id': thread_id # 可选,不传则自动生成新会话 } async def initialize_agent(): """异步初始化Agent""" global _agent_instance, _redis_client if _agent_instance is None: logger.info("🚀 正在异步初始化 Custom React Agent...") try: # 设置环境变量(checkpointer内部需要) os.environ['REDIS_URL'] = 'redis://localhost:6379' # 初始化共享的Redis客户端 _redis_client = redis.from_url('redis://localhost:6379', decode_responses=True) await _redis_client.ping() logger.info("✅ Redis客户端连接成功") _agent_instance = await CustomReactAgent.create() logger.info("✅ Agent 异步初始化完成") except Exception as e: logger.error(f"❌ Agent 异步初始化失败: {e}") raise async def ensure_agent_ready(): """异步确保Agent实例可用""" global _agent_instance if _agent_instance is None: await initialize_agent() # 测试Agent是否还可用 try: # 简单测试 - 尝试获取一个不存在用户的对话(应该返回空列表) test_result = await _agent_instance.get_user_recent_conversations("__test__", 1) return True except Exception as e: logger.warning(f"⚠️ Agent实例不可用: {e}") # 重新创建Agent实例 _agent_instance = None await initialize_agent() return True # 删除复杂的事件循环管理函数 - 不再需要 async def cleanup_agent(): """异步清理Agent资源""" global _agent_instance, _redis_client if _agent_instance: await _agent_instance.close() logger.info("✅ Agent 资源已异步清理") _agent_instance = None if _redis_client: await _redis_client.aclose() logger.info("✅ Redis客户端已异步关闭") _redis_client = None # 创建Flask应用 app = Flask(__name__) # 简化的退出处理 def cleanup_on_exit(): """程序退出时的清理函数""" logger.info("程序退出,资源清理将在异步上下文中进行") atexit.register(cleanup_on_exit) @app.route("/") def root(): """健康检查端点""" return jsonify({"message": "Custom React Agent API 服务正在运行"}) @app.route('/health', methods=['GET']) def health_check(): """健康检查端点""" try: health_status = { "status": "healthy", "agent_initialized": _agent_instance is not None, "timestamp": datetime.now().isoformat() } return jsonify(health_status), 200 except Exception as e: logger.error(f"健康检查失败: {e}") return jsonify({"status": "unhealthy", "error": str(e)}), 500 @app.route("/api/chat", methods=["POST"]) async def chat_endpoint(): """异步智能问答接口""" global _agent_instance # 确保Agent已初始化 if not await ensure_agent_ready(): return jsonify({ "code": 503, "message": "服务未就绪", "success": False, "error": "Agent 初始化失败" }), 503 try: # 获取请求数据,处理JSON解析错误 try: data = request.get_json(force=True) except Exception as json_error: logger.warning(f"⚠️ JSON解析失败: {json_error}") return jsonify({ "code": 400, "message": "请求格式错误", "success": False, "error": "无效的JSON格式,请检查请求体中是否存在语法错误(如多余的逗号、引号不匹配等)", "details": str(json_error) }), 400 if not data: return jsonify({ "code": 400, "message": "请求参数错误", "success": False, "error": "请求体不能为空" }), 400 # 验证请求数据 validated_data = validate_request_data(data) logger.info(f"📨 收到请求 - User: {validated_data['user_id']}, Question: {validated_data['question'][:50]}...") # 直接调用异步方法,不需要事件循环包装 agent_result = await _agent_instance.chat( message=validated_data['question'], user_id=validated_data['user_id'], thread_id=validated_data['thread_id'] ) if not agent_result.get("success", False): # Agent处理失败 error_msg = agent_result.get("error", "Agent处理失败") logger.error(f"❌ Agent处理失败: {error_msg}") return jsonify({ "code": 500, "message": "处理失败", "success": False, "error": error_msg, "data": { "conversation_id": agent_result.get("thread_id"), # 新增:conversation_id等于thread_id "user_id": validated_data['user_id'], # 新增:返回用户ID "react_agent_meta": { "thread_id": agent_result.get("thread_id"), "agent_version": "custom_react_v1_async", "execution_path": ["error"] }, "timestamp": datetime.now().isoformat() } }), 500 # Agent处理成功,按照设计文档格式化响应 api_data = agent_result.get("api_data", {}) # 构建符合设计文档的响应数据 response_data = { "response": api_data.get("response", ""), "conversation_id": agent_result.get("thread_id"), # 新增:conversation_id等于thread_id "user_id": validated_data['user_id'], # 新增:返回用户ID "react_agent_meta": api_data.get("react_agent_meta", { "thread_id": agent_result.get("thread_id"), "agent_version": "custom_react_v1" }), "timestamp": datetime.now().isoformat() } # 可选字段:SQL(仅当执行SQL时存在) if "sql" in api_data: response_data["sql"] = api_data["sql"] # 可选字段:records(仅当有查询结果时存在) if "records" in api_data: response_data["records"] = api_data["records"] logger.info(f"✅ 请求处理成功 - Thread: {response_data['react_agent_meta'].get('thread_id')}") return jsonify({ "code": 200, "message": "操作成功", "success": True, "data": response_data }) except ValueError as e: # 参数验证错误 error_msg = str(e) logger.warning(f"⚠️ 参数验证失败: {error_msg}") # 根据错误类型提供更友好的消息 if "会话归属验证失败" in error_msg: message = "会话归属验证失败" elif "会话ID格式无效" in error_msg: message = "会话ID格式无效" elif "JSON格式" in error_msg: message = "请求格式错误" else: message = "请求参数错误" return jsonify({ "code": 400, "message": message, "success": False, "error": error_msg, "error_type": "validation_error" }), 400 except Exception as e: # 其他未预期的错误 logger.error(f"❌ 未预期的错误: {e}", exc_info=True) return jsonify({ "code": 500, "message": "服务器内部错误", "success": False, "error": "系统异常,请稍后重试" }), 500 @app.route('/api/v0/react/users//conversations', methods=['GET']) async def get_user_conversations(user_id: str): """异步获取用户的聊天记录列表""" global _agent_instance try: # 获取查询参数 limit = request.args.get('limit', 10, type=int) # 限制limit的范围 limit = max(1, min(limit, 50)) # 限制在1-50之间 logger.info(f"📋 异步获取用户 {user_id} 的对话列表,限制 {limit} 条") # 确保Agent可用 if not await ensure_agent_ready(): return jsonify({ "success": False, "error": "Agent 未就绪", "timestamp": datetime.now().isoformat() }), 503 # 直接调用异步方法 conversations = await _agent_instance.get_user_recent_conversations(user_id, limit) return jsonify({ "success": True, "data": { "user_id": user_id, "conversations": conversations, "total_count": len(conversations), "limit": limit }, "timestamp": datetime.now().isoformat() }), 200 except Exception as e: logger.error(f"❌ 异步获取用户 {user_id} 对话列表失败: {e}") return jsonify({ "success": False, "error": str(e), "timestamp": datetime.now().isoformat() }), 500 @app.route('/api/v0/react/users//conversations/', methods=['GET']) async def get_user_conversation_detail(user_id: str, thread_id: str): """异步获取特定对话的详细历史""" global _agent_instance try: # 验证thread_id格式是否匹配user_id if not thread_id.startswith(f"{user_id}:"): return jsonify({ "success": False, "error": f"Thread ID {thread_id} 不属于用户 {user_id}", "timestamp": datetime.now().isoformat() }), 400 logger.info(f"📖 异步获取用户 {user_id} 的对话 {thread_id} 详情") # 确保Agent可用 if not await ensure_agent_ready(): return jsonify({ "success": False, "error": "Agent 未就绪", "timestamp": datetime.now().isoformat() }), 503 # 直接调用异步方法 history = await _agent_instance.get_conversation_history(thread_id) logger.info(f"✅ 异步成功获取对话历史,消息数量: {len(history)}") if not history: return jsonify({ "success": False, "error": f"未找到对话 {thread_id}", "timestamp": datetime.now().isoformat() }), 404 return jsonify({ "success": True, "data": { "user_id": user_id, "thread_id": thread_id, "message_count": len(history), "messages": history }, "timestamp": datetime.now().isoformat() }), 200 except Exception as e: import traceback logger.error(f"❌ 异步获取对话 {thread_id} 详情失败: {e}") logger.error(f"❌ 详细错误信息: {traceback.format_exc()}") return jsonify({ "success": False, "error": str(e), "timestamp": datetime.now().isoformat() }), 500 # 简单Redis查询函数(测试用) def get_user_conversations_simple_sync(user_id: str, limit: int = 10): """直接从Redis获取用户对话,测试版本""" import redis import json try: # 创建Redis连接 redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True) redis_client.ping() # 扫描用户的checkpoint keys pattern = f"checkpoint:{user_id}:*" logger.info(f"🔍 扫描模式: {pattern}") keys = [] cursor = 0 while True: cursor, batch = redis_client.scan(cursor=cursor, match=pattern, count=1000) keys.extend(batch) if cursor == 0: break logger.info(f"📋 找到 {len(keys)} 个keys") # 解析thread信息 thread_data = {} for key in keys: try: parts = key.split(':') if len(parts) >= 4: thread_id = f"{parts[1]}:{parts[2]}" # user_id:timestamp timestamp = parts[2] if thread_id not in thread_data: thread_data[thread_id] = { "thread_id": thread_id, "timestamp": timestamp, "keys": [] } thread_data[thread_id]["keys"].append(key) except Exception as e: logger.warning(f"解析key失败 {key}: {e}") continue logger.info(f"📊 找到 {len(thread_data)} 个thread") # 按时间戳排序 sorted_threads = sorted( thread_data.values(), key=lambda x: x["timestamp"], reverse=True )[:limit] # 获取每个thread的详细信息 conversations = [] for thread_info in sorted_threads: try: thread_id = thread_info["thread_id"] # 获取最新的checkpoint数据 latest_key = max(thread_info["keys"]) # 先检查key的数据类型 key_type = redis_client.type(latest_key) logger.info(f"🔍 Key {latest_key} 的类型: {key_type}") data = None if key_type == 'string': data = redis_client.get(latest_key) elif key_type == 'hash': # 如果是hash类型,获取所有字段 hash_data = redis_client.hgetall(latest_key) logger.info(f"🔍 Hash字段: {list(hash_data.keys())}") # 尝试获取可能的数据字段 for field in ['data', 'state', 'value', 'checkpoint']: if field in hash_data: data = hash_data[field] break if not data and hash_data: # 如果没找到预期字段,取第一个值试试 data = list(hash_data.values())[0] elif key_type == 'list': # 如果是list类型,获取最后一个元素 data = redis_client.lindex(latest_key, -1) elif key_type == 'ReJSON-RL': # 这是RedisJSON类型,使用JSON.GET命令 logger.info(f"🔍 使用JSON.GET获取RedisJSON数据") try: # 使用JSON.GET命令获取整个JSON对象 json_data = redis_client.execute_command('JSON.GET', latest_key) if json_data: data = json_data # JSON.GET返回的就是JSON字符串 logger.info(f"🔍 JSON数据长度: {len(data)} 字符") else: logger.warning(f"⚠️ JSON.GET 返回空数据") continue except Exception as json_error: logger.error(f"❌ JSON.GET 失败: {json_error}") continue else: logger.warning(f"⚠️ 未知的key类型: {key_type}") continue if data: try: checkpoint_data = json.loads(data) # 调试:查看JSON数据结构 logger.info(f"🔍 JSON顶级keys: {list(checkpoint_data.keys())}") # 根据您提供的JSON结构,消息在 checkpoint.channel_values.messages messages = [] # 首先检查是否有checkpoint字段 if 'checkpoint' in checkpoint_data: checkpoint = checkpoint_data['checkpoint'] if isinstance(checkpoint, dict) and 'channel_values' in checkpoint: channel_values = checkpoint['channel_values'] if isinstance(channel_values, dict) and 'messages' in channel_values: messages = channel_values['messages'] logger.info(f"🔍 找到messages: {len(messages)} 条消息") # 如果没有checkpoint字段,尝试直接在channel_values if not messages and 'channel_values' in checkpoint_data: channel_values = checkpoint_data['channel_values'] if isinstance(channel_values, dict) and 'messages' in channel_values: messages = channel_values['messages'] logger.info(f"🔍 找到messages(直接路径): {len(messages)} 条消息") # 生成对话预览 preview = "空对话" if messages: for msg in messages: # 处理LangChain消息格式:{"lc": 1, "type": "constructor", "id": ["langchain", "schema", "messages", "HumanMessage"], "kwargs": {"content": "...", "type": "human"}} if isinstance(msg, dict): # 检查是否是LangChain格式的HumanMessage if (msg.get('lc') == 1 and msg.get('type') == 'constructor' and 'id' in msg and isinstance(msg['id'], list) and len(msg['id']) >= 4 and msg['id'][3] == 'HumanMessage' and 'kwargs' in msg): kwargs = msg['kwargs'] if kwargs.get('type') == 'human' and 'content' in kwargs: content = str(kwargs['content']) preview = content[:50] + "..." if len(content) > 50 else content break # 兼容其他格式 elif msg.get('type') == 'human' and 'content' in msg: content = str(msg['content']) preview = content[:50] + "..." if len(content) > 50 else content break conversations.append({ "thread_id": thread_id, "user_id": user_id, "timestamp": thread_info["timestamp"], "message_count": len(messages), "conversation_preview": preview }) except json.JSONDecodeError: logger.error(f"❌ JSON解析失败,数据类型: {type(data)}, 长度: {len(str(data))}") logger.error(f"❌ 数据开头: {str(data)[:200]}...") continue except Exception as e: logger.error(f"处理thread {thread_info['thread_id']} 失败: {e}") continue redis_client.close() logger.info(f"✅ 返回 {len(conversations)} 个对话") return conversations except Exception as e: logger.error(f"❌ Redis查询失败: {e}") return [] @app.route('/api/test/redis', methods=['GET']) def test_redis_connection(): """测试Redis连接和基本查询""" try: import redis # 创建Redis连接 r = redis.Redis(host='localhost', port=6379, decode_responses=True) r.ping() # 扫描checkpoint keys pattern = "checkpoint:*" keys = [] cursor = 0 count = 0 while True: cursor, batch = r.scan(cursor=cursor, match=pattern, count=100) keys.extend(batch) count += len(batch) if cursor == 0 or count > 500: # 限制扫描数量 break # 统计用户 users = {} for key in keys: try: parts = key.split(':') if len(parts) >= 2: user_id = parts[1] users[user_id] = users.get(user_id, 0) + 1 except: continue r.close() return jsonify({ "success": True, "data": { "redis_connected": True, "total_checkpoint_keys": len(keys), "users_found": list(users.keys()), "user_key_counts": users, "sample_keys": keys[:5] if keys else [] }, "timestamp": datetime.now().isoformat() }), 200 except Exception as e: logger.error(f"❌ Redis测试失败: {e}") return jsonify({ "success": False, "error": str(e), "timestamp": datetime.now().isoformat() }), 500 @app.route('/api/v0/react/direct/users//conversations', methods=['GET']) def test_get_user_conversations_simple(user_id: str): """测试简单Redis查询获取用户对话列表""" try: limit = request.args.get('limit', 10, type=int) limit = max(1, min(limit, 50)) logger.info(f"🧪 测试获取用户 {user_id} 的对话列表(简单Redis方式)") # 使用简单Redis查询 conversations = get_user_conversations_simple_sync(user_id, limit) return jsonify({ "success": True, "method": "simple_redis_query", "data": { "user_id": user_id, "conversations": conversations, "total_count": len(conversations), "limit": limit }, "timestamp": datetime.now().isoformat() }), 200 except Exception as e: logger.error(f"❌ 测试简单Redis查询失败: {e}") return jsonify({ "success": False, "error": str(e), "timestamp": datetime.now().isoformat() }), 500 # 在 api.py 文件顶部的导入部分添加: from enhanced_redis_api import get_conversation_detail_from_redis # 在 api.py 文件中添加以下新路由: @app.route('/api/v0/react/direct/conversations/', methods=['GET']) def get_conversation_detail_api(thread_id: str): """ 获取特定对话的详细信息 - 支持include_tools开关参数 Query Parameters: - include_tools: bool, 是否包含工具调用信息,默认false true: 返回完整对话(human/ai/tool/system) false: 只返回human/ai消息,清理工具调用信息 - user_id: str, 可选的用户ID验证 Examples: GET /api/conversations/wang:20250709195048728?include_tools=true # 完整模式 GET /api/conversations/wang:20250709195048728?include_tools=false # 简化模式(默认) GET /api/conversations/wang:20250709195048728 # 简化模式(默认) """ try: # 获取查询参数 include_tools = request.args.get('include_tools', 'false').lower() == 'true' user_id = request.args.get('user_id') # 验证thread_id格式 if ':' not in thread_id: return jsonify({ "success": False, "error": "Invalid thread_id format. Expected format: user_id:timestamp", "timestamp": datetime.now().isoformat() }), 400 # 如果提供了user_id,验证thread_id是否属于该用户 thread_user_id = thread_id.split(':')[0] if user_id and thread_user_id != user_id: return jsonify({ "success": False, "error": f"Thread ID {thread_id} does not belong to user {user_id}", "timestamp": datetime.now().isoformat() }), 400 logger.info(f"📖 获取对话详情 - Thread: {thread_id}, Include Tools: {include_tools}") # 从Redis获取对话详情(使用我们的新函数) result = get_conversation_detail_from_redis(thread_id, include_tools) if not result['success']: logger.warning(f"⚠️ 获取对话详情失败: {result['error']}") return jsonify({ "success": False, "error": result['error'], "timestamp": datetime.now().isoformat() }), 404 # 添加API元数据 result['data']['api_metadata'] = { "timestamp": datetime.now().isoformat(), "api_version": "v1", "endpoint": "get_conversation_detail", "query_params": { "include_tools": include_tools, "user_id": user_id } } mode_desc = "完整模式" if include_tools else "简化模式" logger.info(f"✅ 成功获取对话详情 - Messages: {result['data']['message_count']}, Mode: {mode_desc}") return jsonify({ "success": True, "data": result['data'], "timestamp": datetime.now().isoformat() }), 200 except Exception as e: import traceback logger.error(f"❌ 获取对话详情异常: {e}") logger.error(f"❌ 详细错误信息: {traceback.format_exc()}") return jsonify({ "success": False, "error": str(e), "timestamp": datetime.now().isoformat() }), 500 @app.route('/api/v0/react/direct/conversations//compare', methods=['GET']) def compare_conversation_modes_api(thread_id: str): """ 比较完整模式和简化模式的对话内容 用于调试和理解两种模式的差异 Examples: GET /api/conversations/wang:20250709195048728/compare """ try: logger.info(f"🔍 比较对话模式 - Thread: {thread_id}") # 获取完整模式 full_result = get_conversation_detail_from_redis(thread_id, include_tools=True) # 获取简化模式 simple_result = get_conversation_detail_from_redis(thread_id, include_tools=False) if not (full_result['success'] and simple_result['success']): return jsonify({ "success": False, "error": "无法获取对话数据进行比较", "timestamp": datetime.now().isoformat() }), 404 # 构建比较结果 comparison = { "thread_id": thread_id, "full_mode": { "message_count": full_result['data']['message_count'], "stats": full_result['data']['stats'], "sample_messages": full_result['data']['messages'][:3] # 只显示前3条作为示例 }, "simple_mode": { "message_count": simple_result['data']['message_count'], "stats": simple_result['data']['stats'], "sample_messages": simple_result['data']['messages'][:3] # 只显示前3条作为示例 }, "comparison_summary": { "message_count_difference": full_result['data']['message_count'] - simple_result['data']['message_count'], "tools_filtered_out": full_result['data']['stats'].get('tool_messages', 0), "ai_messages_with_tools": full_result['data']['stats'].get('messages_with_tools', 0), "filtering_effectiveness": "有效" if (full_result['data']['message_count'] - simple_result['data']['message_count']) > 0 else "无差异" }, "metadata": { "timestamp": datetime.now().isoformat(), "note": "sample_messages 只显示前3条消息作为示例,完整消息请使用相应的详情API" } } logger.info(f"✅ 模式比较完成 - 完整: {comparison['full_mode']['message_count']}, 简化: {comparison['simple_mode']['message_count']}") return jsonify({ "success": True, "data": comparison, "timestamp": datetime.now().isoformat() }), 200 except Exception as e: logger.error(f"❌ 对话模式比较失败: {e}") return jsonify({ "success": False, "error": str(e), "timestamp": datetime.now().isoformat() }), 500 @app.route('/api/v0/react/direct/conversations//summary', methods=['GET']) def get_conversation_summary_api(thread_id: str): """ 获取对话摘要信息(只包含基本统计,不返回具体消息) Query Parameters: - include_tools: bool, 影响统计信息的计算方式 Examples: GET /api/conversations/wang:20250709195048728/summary?include_tools=true """ try: include_tools = request.args.get('include_tools', 'false').lower() == 'true' # 验证thread_id格式 if ':' not in thread_id: return jsonify({ "success": False, "error": "Invalid thread_id format. Expected format: user_id:timestamp", "timestamp": datetime.now().isoformat() }), 400 logger.info(f"📊 获取对话摘要 - Thread: {thread_id}, Include Tools: {include_tools}") # 获取完整对话信息 result = get_conversation_detail_from_redis(thread_id, include_tools) if not result['success']: return jsonify({ "success": False, "error": result['error'], "timestamp": datetime.now().isoformat() }), 404 # 只返回摘要信息,不包含具体消息 data = result['data'] summary = { "thread_id": data['thread_id'], "user_id": data['user_id'], "include_tools": data['include_tools'], "message_count": data['message_count'], "stats": data['stats'], "metadata": data['metadata'], "first_message_preview": None, "last_message_preview": None, "conversation_preview": None } # 添加消息预览 messages = data.get('messages', []) if messages: # 第一条human消息预览 for msg in messages: if msg['type'] == 'human': content = str(msg['content']) summary['first_message_preview'] = content[:100] + "..." if len(content) > 100 else content break # 最后一条ai消息预览 for msg in reversed(messages): if msg['type'] == 'ai' and msg.get('content', '').strip(): content = str(msg['content']) summary['last_message_preview'] = content[:100] + "..." if len(content) > 100 else content break # 生成对话预览(第一条human消息) summary['conversation_preview'] = summary['first_message_preview'] # 添加API元数据 summary['api_metadata'] = { "timestamp": datetime.now().isoformat(), "api_version": "v1", "endpoint": "get_conversation_summary" } logger.info(f"✅ 成功获取对话摘要") return jsonify({ "success": True, "data": summary, "timestamp": datetime.now().isoformat() }), 200 except Exception as e: logger.error(f"❌ 获取对话摘要失败: {e}") return jsonify({ "success": False, "error": str(e), "timestamp": datetime.now().isoformat() }), 500 # 为了支持独立运行 if __name__ == "__main__": try: # 尝试使用ASGI模式启动(推荐) import uvicorn from asgiref.wsgi import WsgiToAsgi logger.info("🚀 使用ASGI模式启动异步Flask应用...") logger.info(" 这将解决事件循环冲突问题,支持LangGraph异步checkpoint保存") # 将Flask WSGI应用转换为ASGI应用 asgi_app = WsgiToAsgi(app) # 信号处理 import signal def signal_handler(signum, frame): logger.info("🛑 收到关闭信号,开始清理...") print("正在关闭服务...") exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # 使用uvicorn启动ASGI应用 uvicorn.run( asgi_app, host="0.0.0.0", port=8000, log_level="info", access_log=True ) except ImportError as e: # 如果缺少ASGI依赖,fallback到传统Flask模式 logger.warning("⚠️ ASGI依赖缺失,使用传统Flask模式启动") logger.warning(" 建议安装: pip install uvicorn asgiref") logger.warning(" 传统模式可能存在异步事件循环冲突问题") # 信号处理 import signal def signal_handler(signum, frame): logger.info("🛑 收到关闭信号,开始清理...") print("正在关闭服务...") exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # 启动传统Flask应用 app.run(host="0.0.0.0", port=8000, debug=False, threaded=True)