simple_file_manager.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. """
  2. Data Pipeline API 简化文件管理器
  3. 提供简单的文件列表和下载功能,无压缩等复杂功能
  4. """
  5. import os
  6. from pathlib import Path
  7. from typing import Dict, Any, List
  8. from datetime import datetime
  9. import logging
  10. class SimpleFileManager:
  11. """简化的文件管理器"""
  12. def __init__(self, base_output_dir: str = "./data_pipeline/training_data/"):
  13. """
  14. 初始化文件管理器
  15. Args:
  16. base_output_dir: 基础输出目录
  17. """
  18. self.base_output_dir = Path(base_output_dir)
  19. # 使用简单的控制台日志,不使用文件日志
  20. self.logger = logging.getLogger("SimpleFileManager")
  21. self.logger.setLevel(logging.INFO)
  22. # 确保基础目录存在
  23. self.base_output_dir.mkdir(parents=True, exist_ok=True)
  24. def get_task_directory(self, task_id: str) -> Path:
  25. """获取任务目录路径"""
  26. return self.base_output_dir / task_id
  27. def create_task_directory(self, task_id: str) -> bool:
  28. """创建任务目录"""
  29. try:
  30. task_dir = self.get_task_directory(task_id)
  31. task_dir.mkdir(parents=True, exist_ok=True)
  32. self.logger.info(f"任务目录已创建: {task_dir}")
  33. return True
  34. except Exception as e:
  35. self.logger.error(f"创建任务目录失败: {e}")
  36. return False
  37. def get_task_files(self, task_id: str) -> List[Dict[str, Any]]:
  38. """获取任务目录下的所有文件信息"""
  39. try:
  40. task_dir = self.get_task_directory(task_id)
  41. if not task_dir.exists():
  42. return []
  43. files_info = []
  44. for file_path in task_dir.iterdir():
  45. if file_path.is_file():
  46. file_info = self._get_file_info(file_path)
  47. files_info.append(file_info)
  48. # 按修改时间排序(最新的在前)
  49. files_info.sort(key=lambda x: x['modified_at'], reverse=True)
  50. return files_info
  51. except Exception as e:
  52. self.logger.error(f"获取任务文件失败: {e}")
  53. return []
  54. def _get_file_info(self, file_path: Path) -> Dict[str, Any]:
  55. """获取单个文件的基本信息"""
  56. try:
  57. stat = file_path.stat()
  58. return {
  59. "file_name": file_path.name,
  60. "file_path": str(file_path),
  61. "file_type": self._determine_file_type(file_path),
  62. "file_size": stat.st_size,
  63. "file_size_formatted": self._format_file_size(stat.st_size),
  64. "created_at": datetime.fromtimestamp(stat.st_ctime),
  65. "modified_at": datetime.fromtimestamp(stat.st_mtime),
  66. "is_readable": os.access(file_path, os.R_OK)
  67. }
  68. except Exception as e:
  69. self.logger.error(f"获取文件信息失败: {e}")
  70. return {
  71. "file_name": file_path.name,
  72. "file_path": str(file_path),
  73. "file_type": "unknown",
  74. "file_size": 0,
  75. "file_size_formatted": "0 B",
  76. "created_at": datetime.now(),
  77. "modified_at": datetime.now(),
  78. "is_readable": False
  79. }
  80. def _determine_file_type(self, file_path: Path) -> str:
  81. """根据文件扩展名确定文件类型"""
  82. suffix = file_path.suffix.lower()
  83. type_mapping = {
  84. '.ddl': 'ddl',
  85. '.sql': 'sql',
  86. '.md': 'markdown',
  87. '.markdown': 'markdown',
  88. '.json': 'json',
  89. '.txt': 'text',
  90. '.log': 'log'
  91. }
  92. return type_mapping.get(suffix, 'other')
  93. def _format_file_size(self, size_bytes: int) -> str:
  94. """格式化文件大小显示"""
  95. if size_bytes == 0:
  96. return "0 B"
  97. size_names = ["B", "KB", "MB", "GB"]
  98. i = 0
  99. size = float(size_bytes)
  100. while size >= 1024.0 and i < len(size_names) - 1:
  101. size /= 1024.0
  102. i += 1
  103. return f"{size:.1f} {size_names[i]}"
  104. def get_file_path(self, task_id: str, file_name: str) -> Path:
  105. """获取文件的完整路径"""
  106. task_dir = self.get_task_directory(task_id)
  107. return task_dir / file_name
  108. def file_exists(self, task_id: str, file_name: str) -> bool:
  109. """检查文件是否存在"""
  110. file_path = self.get_file_path(task_id, file_name)
  111. return file_path.exists() and file_path.is_file()
  112. def is_file_safe(self, task_id: str, file_name: str) -> bool:
  113. """检查文件路径是否安全(防止路径遍历攻击)"""
  114. try:
  115. task_dir = self.get_task_directory(task_id)
  116. file_path = task_dir / file_name
  117. # 确保文件在任务目录内
  118. file_path.resolve().relative_to(task_dir.resolve())
  119. return True
  120. except ValueError:
  121. return False
  122. def get_directory_info(self, task_id: str) -> Dict[str, Any]:
  123. """获取任务目录信息"""
  124. try:
  125. task_dir = self.get_task_directory(task_id)
  126. if not task_dir.exists():
  127. return {
  128. "exists": False,
  129. "directory_path": str(task_dir),
  130. "total_files": 0,
  131. "total_size": 0,
  132. "total_size_formatted": "0 B"
  133. }
  134. files = self.get_task_files(task_id)
  135. total_size = sum(file_info['file_size'] for file_info in files)
  136. return {
  137. "exists": True,
  138. "directory_path": str(task_dir),
  139. "total_files": len(files),
  140. "total_size": total_size,
  141. "total_size_formatted": self._format_file_size(total_size)
  142. }
  143. except Exception as e:
  144. self.logger.error(f"获取目录信息失败: {e}")
  145. return {
  146. "exists": False,
  147. "directory_path": str(self.get_task_directory(task_id)),
  148. "total_files": 0,
  149. "total_size": 0,
  150. "total_size_formatted": "0 B"
  151. }