simple_db_manager.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. """
  2. Data Pipeline API 简化数据库管理器
  3. 复用现有的pgvector数据库连接机制,提供Data Pipeline任务的数据库操作功能
  4. """
  5. import json
  6. import re
  7. from datetime import datetime
  8. from pathlib import Path
  9. from typing import Dict, Any, List, Optional, Tuple
  10. import psycopg2
  11. from psycopg2.extras import RealDictCursor, Json
  12. from app_config import PGVECTOR_CONFIG
  13. import logging
  14. class SimpleTaskManager:
  15. """简化的任务管理器,复用现有pgvector连接"""
  16. def __init__(self):
  17. """初始化任务管理器"""
  18. # 使用简单的控制台日志,不使用文件日志
  19. self.logger = logging.getLogger("SimpleTaskManager")
  20. self.logger.setLevel(logging.INFO)
  21. self._connection = None
  22. def _get_connection(self):
  23. """获取pgvector数据库连接"""
  24. if self._connection is None or self._connection.closed:
  25. try:
  26. self._connection = psycopg2.connect(
  27. host=PGVECTOR_CONFIG.get('host'),
  28. port=PGVECTOR_CONFIG.get('port'),
  29. database=PGVECTOR_CONFIG.get('dbname'),
  30. user=PGVECTOR_CONFIG.get('user'),
  31. password=PGVECTOR_CONFIG.get('password')
  32. )
  33. self._connection.autocommit = True
  34. except Exception as e:
  35. self.logger.error(f"pgvector数据库连接失败: {e}")
  36. raise
  37. return self._connection
  38. def close_connection(self):
  39. """关闭数据库连接"""
  40. if self._connection and not self._connection.closed:
  41. self._connection.close()
  42. self._connection = None
  43. def generate_task_id(self) -> str:
  44. """生成任务ID,格式: task_YYYYMMDD_HHMMSS"""
  45. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  46. return f"task_{timestamp}"
  47. def create_task(self,
  48. table_list_file: str = None,
  49. business_context: str = None,
  50. db_name: str = None,
  51. db_connection: str = None,
  52. task_name: str = None,
  53. **kwargs) -> str:
  54. """创建新任务"""
  55. task_id = self.generate_task_id()
  56. # 处理数据库连接和名称
  57. if db_connection:
  58. # 使用传入的 db_connection 参数
  59. business_db_connection = db_connection
  60. # 如果没有提供 db_name,从连接字符串中提取
  61. if not db_name:
  62. db_name = self._extract_db_name(db_connection)
  63. else:
  64. # 从 app_config 获取业务数据库连接信息
  65. from app_config import APP_DB_CONFIG
  66. business_db_connection = self._build_db_connection_string(APP_DB_CONFIG)
  67. # 使用传入的db_name或从APP_DB_CONFIG提取
  68. if not db_name:
  69. db_name = APP_DB_CONFIG.get('dbname', 'business_db')
  70. # 处理table_list_file参数
  71. # 如果未提供,将在执行时检查任务目录中的table_list.txt文件
  72. task_table_list_file = table_list_file
  73. if not task_table_list_file:
  74. from data_pipeline.config import SCHEMA_TOOLS_CONFIG
  75. upload_config = SCHEMA_TOOLS_CONFIG.get("file_upload", {})
  76. target_filename = upload_config.get("target_filename", "table_list.txt")
  77. # 使用相对于任务目录的路径
  78. task_table_list_file = f"{{task_directory}}/{target_filename}"
  79. # 构建参数
  80. parameters = {
  81. "db_connection": business_db_connection, # 业务数据库连接(用于schema_workflow执行)
  82. "table_list_file": task_table_list_file,
  83. "business_context": business_context or "数据库管理系统",
  84. "file_upload_mode": table_list_file is None, # 标记是否使用文件上传模式
  85. **kwargs
  86. }
  87. try:
  88. conn = self._get_connection()
  89. with conn.cursor() as cursor:
  90. # 创建任务记录
  91. cursor.execute("""
  92. INSERT INTO data_pipeline_tasks (
  93. task_id, task_name, task_type, status, parameters, created_type,
  94. by_user, db_name, output_directory
  95. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  96. """, (
  97. task_id,
  98. task_name,
  99. 'data_workflow',
  100. 'pending',
  101. Json(parameters),
  102. 'api',
  103. 'guest',
  104. db_name,
  105. f"data_pipeline/training_data/{task_id}"
  106. ))
  107. # 预创建所有步骤记录(策略A)
  108. step_names = ['ddl_generation', 'qa_generation', 'sql_validation', 'training_load']
  109. for step_name in step_names:
  110. cursor.execute("""
  111. INSERT INTO data_pipeline_task_steps (
  112. task_id, step_name, step_status
  113. ) VALUES (%s, %s, %s)
  114. """, (task_id, step_name, 'pending'))
  115. # 创建任务目录
  116. try:
  117. from data_pipeline.api.simple_file_manager import SimpleFileManager
  118. file_manager = SimpleFileManager()
  119. success = file_manager.create_task_directory(task_id)
  120. if success:
  121. self.logger.info(f"任务目录创建成功: {task_id}")
  122. else:
  123. self.logger.warning(f"任务目录创建失败,但任务记录已保存: {task_id}")
  124. except Exception as dir_error:
  125. self.logger.warning(f"创建任务目录时出错: {dir_error},但任务记录已保存: {task_id}")
  126. self.logger.info(f"任务创建成功: {task_id}")
  127. return task_id
  128. except Exception as e:
  129. self.logger.error(f"任务创建失败: {e}")
  130. raise
  131. def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:
  132. """获取任务信息"""
  133. try:
  134. conn = self._get_connection()
  135. with conn.cursor(cursor_factory=RealDictCursor) as cursor:
  136. cursor.execute("SELECT * FROM data_pipeline_tasks WHERE task_id = %s", (task_id,))
  137. result = cursor.fetchone()
  138. return dict(result) if result else None
  139. except Exception as e:
  140. self.logger.error(f"获取任务信息失败: {e}")
  141. raise
  142. def update_task_status(self, task_id: str, status: str, error_message: Optional[str] = None):
  143. """更新任务状态"""
  144. try:
  145. conn = self._get_connection()
  146. with conn.cursor() as cursor:
  147. update_fields = ["status = %s"]
  148. values = [status]
  149. if status == 'in_progress' and not self._get_task_started_at(task_id):
  150. update_fields.append("started_at = CURRENT_TIMESTAMP")
  151. if status in ['completed', 'failed']:
  152. update_fields.append("completed_at = CURRENT_TIMESTAMP")
  153. if error_message:
  154. update_fields.append("error_message = %s")
  155. values.append(error_message)
  156. values.append(task_id)
  157. cursor.execute(f"""
  158. UPDATE data_pipeline_tasks
  159. SET {', '.join(update_fields)}
  160. WHERE task_id = %s
  161. """, values)
  162. self.logger.info(f"任务状态更新: {task_id} -> {status}")
  163. except Exception as e:
  164. self.logger.error(f"任务状态更新失败: {e}")
  165. raise
  166. def update_step_status(self, task_id: str, step_name: str, step_status: str, error_message: Optional[str] = None):
  167. """更新步骤状态"""
  168. try:
  169. conn = self._get_connection()
  170. with conn.cursor() as cursor:
  171. update_fields = ["step_status = %s"]
  172. values = [step_status]
  173. # 如果状态是running,记录开始时间
  174. if step_status == 'running':
  175. update_fields.append("started_at = CURRENT_TIMESTAMP")
  176. # 如果状态是completed或failed,记录完成时间
  177. if step_status in ['completed', 'failed']:
  178. update_fields.append("completed_at = CURRENT_TIMESTAMP")
  179. # 如果有错误信息,记录错误信息
  180. if error_message:
  181. update_fields.append("error_message = %s")
  182. values.append(error_message)
  183. values.extend([task_id, step_name])
  184. cursor.execute(f"""
  185. UPDATE data_pipeline_task_steps
  186. SET {', '.join(update_fields)}
  187. WHERE task_id = %s AND step_name = %s
  188. """, values)
  189. self.logger.debug(f"步骤状态更新: {task_id}.{step_name} -> {step_status}")
  190. except Exception as e:
  191. self.logger.error(f"步骤状态更新失败: {e}")
  192. raise
  193. def update_step_execution_id(self, task_id: str, step_name: str, execution_id: str):
  194. """更新步骤的execution_id"""
  195. try:
  196. conn = self._get_connection()
  197. with conn.cursor() as cursor:
  198. cursor.execute("""
  199. UPDATE data_pipeline_task_steps
  200. SET execution_id = %s
  201. WHERE task_id = %s AND step_name = %s
  202. """, (execution_id, task_id, step_name))
  203. self.logger.debug(f"步骤execution_id更新: {task_id}.{step_name} -> {execution_id}")
  204. except Exception as e:
  205. self.logger.error(f"步骤execution_id更新失败: {e}")
  206. raise
  207. def start_step(self, task_id: str, step_name: str) -> str:
  208. """开始执行步骤"""
  209. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  210. execution_id = f"{task_id}_step_{step_name}_exec_{timestamp}"
  211. try:
  212. # 更新步骤状态为running并设置execution_id
  213. self.update_step_status(task_id, step_name, 'running')
  214. self.update_step_execution_id(task_id, step_name, execution_id)
  215. self.logger.info(f"步骤开始执行: {task_id}.{step_name} -> {execution_id}")
  216. return execution_id
  217. except Exception as e:
  218. self.logger.error(f"步骤开始执行失败: {e}")
  219. raise
  220. def complete_step(self, task_id: str, step_name: str, status: str, error_message: Optional[str] = None):
  221. """完成步骤执行"""
  222. try:
  223. self.update_step_status(task_id, step_name, status, error_message)
  224. self.logger.info(f"步骤执行完成: {task_id}.{step_name} -> {status}")
  225. except Exception as e:
  226. self.logger.error(f"步骤执行完成失败: {e}")
  227. raise
  228. def get_task_steps(self, task_id: str) -> List[Dict[str, Any]]:
  229. """获取任务的所有步骤状态"""
  230. try:
  231. conn = self._get_connection()
  232. with conn.cursor(cursor_factory=RealDictCursor) as cursor:
  233. cursor.execute("""
  234. SELECT * FROM data_pipeline_task_steps
  235. WHERE task_id = %s
  236. ORDER BY
  237. CASE step_name
  238. WHEN 'ddl_generation' THEN 1
  239. WHEN 'qa_generation' THEN 2
  240. WHEN 'sql_validation' THEN 3
  241. WHEN 'training_load' THEN 4
  242. ELSE 5
  243. END
  244. """, (task_id,))
  245. return [dict(row) for row in cursor.fetchall()]
  246. except Exception as e:
  247. self.logger.error(f"获取任务步骤状态失败: {e}")
  248. raise
  249. def get_step_status(self, task_id: str, step_name: str) -> Optional[Dict[str, Any]]:
  250. """获取特定步骤的状态"""
  251. try:
  252. conn = self._get_connection()
  253. with conn.cursor(cursor_factory=RealDictCursor) as cursor:
  254. cursor.execute("""
  255. SELECT * FROM data_pipeline_task_steps
  256. WHERE task_id = %s AND step_name = %s
  257. """, (task_id, step_name))
  258. result = cursor.fetchone()
  259. return dict(result) if result else None
  260. except Exception as e:
  261. self.logger.error(f"获取步骤状态失败: {e}")
  262. raise
  263. def get_tasks_list(self, limit: int = 50, offset: int = 0, status_filter: Optional[str] = None) -> List[Dict[str, Any]]:
  264. """获取任务列表"""
  265. try:
  266. conn = self._get_connection()
  267. with conn.cursor(cursor_factory=RealDictCursor) as cursor:
  268. where_clause = ""
  269. params = []
  270. if status_filter:
  271. where_clause = "WHERE t.status = %s"
  272. params.append(status_filter)
  273. params.extend([limit, offset])
  274. # 联表查询获取步骤状态汇总(包含新增字段)
  275. cursor.execute(f"""
  276. SELECT
  277. t.task_id,
  278. t.task_name,
  279. t.task_type,
  280. t.status,
  281. t.parameters,
  282. t.error_message,
  283. t.created_at,
  284. t.started_at,
  285. t.completed_at,
  286. t.created_type,
  287. t.by_user,
  288. t.output_directory,
  289. t.db_name,
  290. COALESCE(t.directory_exists, TRUE) as directory_exists,
  291. t.updated_at,
  292. CASE
  293. WHEN COUNT(s.step_name) = 0 THEN NULL
  294. WHEN COUNT(s.step_name) FILTER (WHERE s.step_status = 'failed') > 0 THEN 'failed'
  295. WHEN COUNT(s.step_name) FILTER (WHERE s.step_status = 'running') > 0 THEN 'running'
  296. WHEN COUNT(s.step_name) FILTER (WHERE s.step_status = 'completed') = COUNT(s.step_name) THEN 'all_completed'
  297. WHEN COUNT(s.step_name) FILTER (WHERE s.step_status = 'completed') > 0 THEN 'partial_completed'
  298. ELSE 'pending'
  299. END as step_status
  300. FROM data_pipeline_tasks t
  301. LEFT JOIN data_pipeline_task_steps s ON t.task_id = s.task_id
  302. {where_clause}
  303. GROUP BY t.task_id, t.task_name, t.task_type, t.status, t.parameters, t.error_message,
  304. t.created_at, t.started_at, t.completed_at, t.created_type, t.by_user,
  305. t.output_directory, t.db_name, t.directory_exists, t.updated_at
  306. ORDER BY t.created_at DESC
  307. LIMIT %s OFFSET %s
  308. """, params)
  309. return [dict(row) for row in cursor.fetchall()]
  310. except Exception as e:
  311. self.logger.error(f"获取任务列表失败: {e}")
  312. raise
  313. def _get_task_started_at(self, task_id: str) -> Optional[datetime]:
  314. """获取任务开始时间"""
  315. try:
  316. conn = self._get_connection()
  317. with conn.cursor() as cursor:
  318. cursor.execute("SELECT started_at FROM data_pipeline_tasks WHERE task_id = %s", (task_id,))
  319. result = cursor.fetchone()
  320. return result[0] if result and result[0] else None
  321. except Exception:
  322. return None
  323. def _build_db_connection_string(self, db_config: dict) -> str:
  324. """构建数据库连接字符串"""
  325. try:
  326. host = db_config.get('host', 'localhost')
  327. port = db_config.get('port', 5432)
  328. dbname = db_config.get('dbname', 'database')
  329. user = db_config.get('user', 'postgres')
  330. password = db_config.get('password', '')
  331. return f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
  332. except Exception:
  333. return "postgresql://localhost:5432/database"
  334. def _extract_db_name(self, connection_string: str) -> str:
  335. """从连接字符串提取数据库名称"""
  336. try:
  337. if '/' in connection_string:
  338. db_name = connection_string.split('/')[-1]
  339. if '?' in db_name:
  340. db_name = db_name.split('?')[0]
  341. return db_name if db_name else "database"
  342. else:
  343. return "database"
  344. except Exception:
  345. return "database"
  346. def query_tasks_advanced(self,
  347. page: int = 1,
  348. page_size: int = 20,
  349. status: str = None,
  350. task_name: str = None,
  351. created_by: str = None,
  352. db_name: str = None,
  353. created_time_start: str = None,
  354. created_time_end: str = None,
  355. started_time_start: str = None,
  356. started_time_end: str = None,
  357. completed_time_start: str = None,
  358. completed_time_end: str = None,
  359. sort_by: str = "created_at",
  360. sort_order: str = "desc") -> dict:
  361. """
  362. 高级任务查询,支持复杂筛选、排序、分页
  363. Args:
  364. page: 页码,必须大于0,默认1
  365. page_size: 每页大小,1-100之间,默认20
  366. status: 可选,任务状态筛选
  367. task_name: 可选,任务名称模糊搜索
  368. created_by: 可选,创建者精确匹配
  369. db_name: 可选,数据库名称精确匹配
  370. created_time_start: 可选,创建时间范围开始
  371. created_time_end: 可选,创建时间范围结束
  372. started_time_start: 可选,开始时间范围开始
  373. started_time_end: 可选,开始时间范围结束
  374. completed_time_start: 可选,完成时间范围开始
  375. completed_time_end: 可选,完成时间范围结束
  376. sort_by: 可选,排序字段,默认"created_at"
  377. sort_order: 可选,排序方向,默认"desc"
  378. Returns:
  379. {
  380. "tasks": [...],
  381. "pagination": {
  382. "page": 1,
  383. "page_size": 20,
  384. "total": 150,
  385. "total_pages": 8,
  386. "has_next": True,
  387. "has_prev": False
  388. }
  389. }
  390. """
  391. try:
  392. import time
  393. start_time = time.time()
  394. # 参数验证和处理
  395. page = max(page, 1)
  396. page_size = min(max(page_size, 1), 100) # 限制在1-100之间
  397. offset = (page - 1) * page_size
  398. # 构建WHERE条件
  399. where_conditions = []
  400. params = []
  401. # 状态筛选
  402. if status:
  403. where_conditions.append("t.status = %s")
  404. params.append(status)
  405. # 任务名称模糊搜索
  406. if task_name:
  407. where_conditions.append("t.task_name ILIKE %s")
  408. params.append(f"%{task_name}%")
  409. # 创建者精确匹配
  410. if created_by:
  411. where_conditions.append("t.by_user = %s")
  412. params.append(created_by)
  413. # 数据库名称精确匹配
  414. if db_name:
  415. where_conditions.append("t.db_name = %s")
  416. params.append(db_name)
  417. # 时间范围筛选
  418. # 创建时间范围
  419. if created_time_start:
  420. where_conditions.append("t.created_at >= %s")
  421. params.append(created_time_start)
  422. if created_time_end:
  423. where_conditions.append("t.created_at <= %s")
  424. params.append(created_time_end)
  425. # 开始时间范围
  426. if started_time_start:
  427. where_conditions.append("t.started_at >= %s")
  428. params.append(started_time_start)
  429. if started_time_end:
  430. where_conditions.append("t.started_at <= %s")
  431. params.append(started_time_end)
  432. # 完成时间范围
  433. if completed_time_start:
  434. where_conditions.append("t.completed_at >= %s")
  435. params.append(completed_time_start)
  436. if completed_time_end:
  437. where_conditions.append("t.completed_at <= %s")
  438. params.append(completed_time_end)
  439. # 构建WHERE子句
  440. where_clause = ""
  441. if where_conditions:
  442. where_clause = "WHERE " + " AND ".join(where_conditions)
  443. # 构建ORDER BY子句
  444. # 验证排序字段白名单
  445. allowed_sort_fields = ['created_at', 'started_at', 'completed_at', 'task_name', 'status']
  446. if sort_by not in allowed_sort_fields:
  447. sort_by = 'created_at'
  448. # 验证排序方向
  449. sort_order_upper = sort_order.upper()
  450. if sort_order_upper not in ['ASC', 'DESC']:
  451. sort_order_upper = 'DESC'
  452. order_clause = f"ORDER BY t.{sort_by} {sort_order_upper}"
  453. conn = self._get_connection()
  454. with conn.cursor(cursor_factory=RealDictCursor) as cursor:
  455. # 首先获取总数
  456. count_query = f"""
  457. SELECT COUNT(*) as total
  458. FROM data_pipeline_tasks t
  459. {where_clause}
  460. """
  461. cursor.execute(count_query, params)
  462. total_count = cursor.fetchone()['total']
  463. # 然后获取分页数据
  464. data_params = params + [page_size, offset]
  465. data_query = f"""
  466. SELECT
  467. t.task_id,
  468. t.task_name,
  469. t.task_type,
  470. t.status,
  471. t.parameters,
  472. t.error_message,
  473. t.created_at,
  474. t.started_at,
  475. t.completed_at,
  476. t.created_type,
  477. t.by_user,
  478. t.output_directory,
  479. t.db_name,
  480. COALESCE(t.directory_exists, TRUE) as directory_exists,
  481. t.updated_at,
  482. CASE
  483. WHEN COUNT(s.step_name) = 0 THEN NULL
  484. WHEN COUNT(s.step_name) FILTER (WHERE s.step_status = 'failed') > 0 THEN 'failed'
  485. WHEN COUNT(s.step_name) FILTER (WHERE s.step_status = 'running') > 0 THEN 'running'
  486. WHEN COUNT(s.step_name) FILTER (WHERE s.step_status = 'completed') = COUNT(s.step_name) THEN 'all_completed'
  487. WHEN COUNT(s.step_name) FILTER (WHERE s.step_status = 'completed') > 0 THEN 'partial_completed'
  488. ELSE 'pending'
  489. END as step_status
  490. FROM data_pipeline_tasks t
  491. LEFT JOIN data_pipeline_task_steps s ON t.task_id = s.task_id
  492. {where_clause}
  493. GROUP BY t.task_id, t.task_name, t.task_type, t.status, t.parameters, t.error_message,
  494. t.created_at, t.started_at, t.completed_at, t.created_type, t.by_user,
  495. t.output_directory, t.db_name, t.directory_exists, t.updated_at
  496. {order_clause}
  497. LIMIT %s OFFSET %s
  498. """
  499. cursor.execute(data_query, data_params)
  500. tasks = [dict(row) for row in cursor.fetchall()]
  501. # 计算分页信息
  502. total_pages = (total_count + page_size - 1) // page_size if page_size > 0 else 1
  503. has_next = page < total_pages
  504. has_prev = page > 1
  505. query_time = time.time() - start_time
  506. return {
  507. "tasks": tasks,
  508. "pagination": {
  509. "page": page,
  510. "page_size": page_size,
  511. "total": total_count,
  512. "total_pages": total_pages,
  513. "has_next": has_next,
  514. "has_prev": has_prev
  515. },
  516. "query_time": f"{query_time:.3f}s"
  517. }
  518. except Exception as e:
  519. self.logger.error(f"高级任务查询失败: {e}")
  520. raise
  521. def query_logs_advanced(self,
  522. task_id: str,
  523. page: int = 1,
  524. page_size: int = 50,
  525. level: str = None,
  526. start_time: str = None,
  527. end_time: str = None,
  528. keyword: str = None,
  529. logger_name: str = None,
  530. step_name: str = None,
  531. sort_by: str = "timestamp",
  532. sort_order: str = "desc") -> dict:
  533. """
  534. 高级日志查询,支持复杂筛选、排序、分页
  535. Args:
  536. task_id: 任务ID
  537. page: 页码,必须大于0,默认1
  538. page_size: 每页大小,1-500之间,默认50
  539. level: 可选,日志级别筛选 (DEBUG, INFO, WARNING, ERROR, CRITICAL)
  540. start_time: 可选,开始时间范围 (YYYY-MM-DD HH:MM:SS)
  541. end_time: 可选,结束时间范围 (YYYY-MM-DD HH:MM:SS)
  542. keyword: 可选,关键字搜索(消息内容模糊匹配)
  543. logger_name: 可选,日志记录器名称精确匹配
  544. step_name: 可选,执行步骤名称精确匹配
  545. sort_by: 可选,排序字段,默认"timestamp"
  546. sort_order: 可选,排序方向,默认"desc"
  547. Returns:
  548. {
  549. "logs": [...],
  550. "pagination": {
  551. "page": 1,
  552. "page_size": 50,
  553. "total": 1000,
  554. "total_pages": 20,
  555. "has_next": True,
  556. "has_prev": False
  557. },
  558. "log_file_info": {...}
  559. }
  560. """
  561. try:
  562. import time
  563. start_query_time = time.time()
  564. # 参数验证和处理
  565. page = max(page, 1)
  566. page_size = min(max(page_size, 1), 500) # 限制在1-500之间
  567. # 获取日志文件路径
  568. project_root = Path(__file__).parent.parent.parent
  569. task_dir = project_root / "data_pipeline" / "training_data" / task_id
  570. log_file = task_dir / "data_pipeline.log"
  571. # 检查日志文件是否存在
  572. if not log_file.exists():
  573. return {
  574. "logs": [],
  575. "pagination": {
  576. "page": page,
  577. "page_size": page_size,
  578. "total": 0,
  579. "total_pages": 0,
  580. "has_next": False,
  581. "has_prev": False
  582. },
  583. "log_file_info": {
  584. "exists": False,
  585. "file_path": str(log_file),
  586. "error": "日志文件不存在"
  587. },
  588. "query_time": f"{time.time() - start_query_time:.3f}s"
  589. }
  590. # 读取并解析日志文件
  591. parsed_logs = self._parse_log_file(log_file)
  592. # 应用过滤器
  593. filtered_logs = self._filter_logs(
  594. parsed_logs,
  595. level=level,
  596. start_time=start_time,
  597. end_time=end_time,
  598. keyword=keyword,
  599. logger_name=logger_name,
  600. step_name=step_name
  601. )
  602. # 排序
  603. sorted_logs = self._sort_logs(filtered_logs, sort_by, sort_order)
  604. # 分页
  605. total_count = len(sorted_logs)
  606. start_index = (page - 1) * page_size
  607. end_index = start_index + page_size
  608. paginated_logs = sorted_logs[start_index:end_index]
  609. # 计算分页信息
  610. total_pages = (total_count + page_size - 1) // page_size if page_size > 0 else 1
  611. has_next = page < total_pages
  612. has_prev = page > 1
  613. # 获取文件信息
  614. file_stat = log_file.stat()
  615. log_file_info = {
  616. "exists": True,
  617. "file_path": str(log_file),
  618. "file_size": file_stat.st_size,
  619. "file_size_formatted": self._format_file_size(file_stat.st_size),
  620. "last_modified": datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
  621. "total_lines": len(parsed_logs)
  622. }
  623. query_time = time.time() - start_query_time
  624. return {
  625. "logs": paginated_logs,
  626. "pagination": {
  627. "page": page,
  628. "page_size": page_size,
  629. "total": total_count,
  630. "total_pages": total_pages,
  631. "has_next": has_next,
  632. "has_prev": has_prev
  633. },
  634. "log_file_info": log_file_info,
  635. "query_time": f"{query_time:.3f}s"
  636. }
  637. except Exception as e:
  638. self.logger.error(f"日志查询失败: {e}")
  639. return {
  640. "logs": [],
  641. "pagination": {
  642. "page": page,
  643. "page_size": page_size,
  644. "total": 0,
  645. "total_pages": 0,
  646. "has_next": False,
  647. "has_prev": False
  648. },
  649. "log_file_info": {
  650. "exists": False,
  651. "error": str(e)
  652. },
  653. "query_time": "0.000s"
  654. }
  655. def _parse_log_file(self, log_file_path: Path) -> List[Dict[str, Any]]:
  656. """
  657. 解析日志文件,提取结构化信息
  658. """
  659. try:
  660. logs = []
  661. with open(log_file_path, 'r', encoding='utf-8') as f:
  662. lines = f.readlines()
  663. # 日志行格式: 2025-07-01 14:30:52 [INFO] SimpleWorkflowExecutor: 任务开始执行
  664. log_pattern = r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+?): (.+)$'
  665. current_log = None
  666. line_number = 0
  667. for line in lines:
  668. line_number += 1
  669. line = line.rstrip('\n\r')
  670. if not line.strip():
  671. continue
  672. match = re.match(log_pattern, line)
  673. if match:
  674. # 如果有之前的日志,先保存
  675. if current_log:
  676. logs.append(current_log)
  677. # 解析新的日志条目
  678. timestamp, level, logger_name, message = match.groups()
  679. # 尝试从日志记录器名称中提取步骤信息
  680. step_name = self._extract_step_from_logger(logger_name)
  681. current_log = {
  682. "timestamp": timestamp,
  683. "level": level,
  684. "logger": logger_name,
  685. "step": step_name,
  686. "message": message,
  687. "line_number": line_number
  688. }
  689. else:
  690. # 多行日志(如异常堆栈),追加到当前日志的消息中
  691. if current_log:
  692. current_log["message"] += f"\n{line}"
  693. # 保存最后一个日志条目
  694. if current_log:
  695. logs.append(current_log)
  696. return logs
  697. except Exception as e:
  698. self.logger.error(f"解析日志文件失败: {e}")
  699. return []
  700. def _extract_step_from_logger(self, logger_name: str) -> Optional[str]:
  701. """
  702. 从日志记录器名称中提取步骤信息
  703. """
  704. # 映射日志记录器名称到步骤名称
  705. logger_to_step = {
  706. "DDLGenerator": "ddl_generation",
  707. "QAGenerator": "qa_generation",
  708. "QSGenerator": "qa_generation",
  709. "SQLValidator": "sql_validation",
  710. "TrainingDataLoader": "training_load",
  711. "VannaTrainer": "training_load",
  712. "SchemaWorkflowOrchestrator": None, # 总体协调器
  713. "SimpleWorkflowExecutor": None, # 工作流执行器
  714. }
  715. return logger_to_step.get(logger_name)
  716. def _filter_logs(self, logs: List[Dict[str, Any]], **filters) -> List[Dict[str, Any]]:
  717. """
  718. 根据条件过滤日志
  719. """
  720. filtered = logs
  721. # 日志级别过滤
  722. if filters.get('level'):
  723. level = filters['level'].upper()
  724. filtered = [log for log in filtered if log.get('level') == level]
  725. # 时间范围过滤
  726. if filters.get('start_time'):
  727. start_time = filters['start_time']
  728. filtered = [log for log in filtered if log.get('timestamp', '') >= start_time]
  729. if filters.get('end_time'):
  730. end_time = filters['end_time']
  731. filtered = [log for log in filtered if log.get('timestamp', '') <= end_time]
  732. # 关键字搜索(消息内容模糊匹配)
  733. if filters.get('keyword'):
  734. keyword = filters['keyword'].lower()
  735. filtered = [log for log in filtered
  736. if keyword in log.get('message', '').lower()]
  737. # 日志记录器名称精确匹配
  738. if filters.get('logger_name'):
  739. logger_name = filters['logger_name']
  740. filtered = [log for log in filtered if log.get('logger') == logger_name]
  741. # 步骤名称精确匹配
  742. if filters.get('step_name'):
  743. step_name = filters['step_name']
  744. filtered = [log for log in filtered if log.get('step') == step_name]
  745. return filtered
  746. def _sort_logs(self, logs: List[Dict[str, Any]], sort_by: str, sort_order: str) -> List[Dict[str, Any]]:
  747. """
  748. 对日志进行排序
  749. """
  750. # 验证排序字段
  751. allowed_sort_fields = ['timestamp', 'level', 'logger', 'step', 'line_number']
  752. if sort_by not in allowed_sort_fields:
  753. sort_by = 'timestamp'
  754. # 验证排序方向
  755. reverse = sort_order.lower() == 'desc'
  756. try:
  757. # 特殊处理时间戳排序
  758. if sort_by == 'timestamp':
  759. return sorted(logs, key=lambda x: x.get('timestamp', ''), reverse=reverse)
  760. else:
  761. return sorted(logs, key=lambda x: x.get(sort_by, ''), reverse=reverse)
  762. except Exception as e:
  763. self.logger.error(f"日志排序失败: {e}")
  764. return logs
  765. def _format_file_size(self, size_bytes: int) -> str:
  766. """格式化文件大小显示"""
  767. if size_bytes == 0:
  768. return "0 B"
  769. size_names = ["B", "KB", "MB", "GB"]
  770. i = 0
  771. size = float(size_bytes)
  772. while size >= 1024.0 and i < len(size_names) - 1:
  773. size /= 1024.0
  774. i += 1
  775. return f"{size:.1f} {size_names[i]}"