simple_file_manager.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. """
  2. Data Pipeline API 简化文件管理器
  3. 提供简单的文件列表和下载功能,无压缩等复杂功能
  4. """
  5. import os
  6. from pathlib import Path
  7. from typing import Dict, Any, List
  8. from datetime import datetime
  9. from core.logging import get_data_pipeline_logger
  10. class SimpleFileManager:
  11. """简化的文件管理器"""
  12. def __init__(self, base_output_dir: str = "./data_pipeline/training_data/"):
  13. """
  14. 初始化文件管理器
  15. Args:
  16. base_output_dir: 基础输出目录
  17. """
  18. self.base_output_dir = Path(base_output_dir)
  19. self.logger = get_data_pipeline_logger("SimpleFileManager")
  20. # 确保基础目录存在
  21. self.base_output_dir.mkdir(parents=True, exist_ok=True)
  22. def get_task_directory(self, task_id: str) -> Path:
  23. """获取任务目录路径"""
  24. return self.base_output_dir / task_id
  25. def create_task_directory(self, task_id: str) -> bool:
  26. """创建任务目录"""
  27. try:
  28. task_dir = self.get_task_directory(task_id)
  29. task_dir.mkdir(parents=True, exist_ok=True)
  30. self.logger.info(f"任务目录已创建: {task_dir}")
  31. return True
  32. except Exception as e:
  33. self.logger.error(f"创建任务目录失败: {e}")
  34. return False
  35. def get_task_files(self, task_id: str) -> List[Dict[str, Any]]:
  36. """获取任务目录下的所有文件信息"""
  37. try:
  38. task_dir = self.get_task_directory(task_id)
  39. if not task_dir.exists():
  40. return []
  41. files_info = []
  42. for file_path in task_dir.iterdir():
  43. if file_path.is_file():
  44. file_info = self._get_file_info(file_path)
  45. files_info.append(file_info)
  46. # 按修改时间排序(最新的在前)
  47. files_info.sort(key=lambda x: x['modified_at'], reverse=True)
  48. return files_info
  49. except Exception as e:
  50. self.logger.error(f"获取任务文件失败: {e}")
  51. return []
  52. def _get_file_info(self, file_path: Path) -> Dict[str, Any]:
  53. """获取单个文件的基本信息"""
  54. try:
  55. stat = file_path.stat()
  56. return {
  57. "file_name": file_path.name,
  58. "file_path": str(file_path),
  59. "file_type": self._determine_file_type(file_path),
  60. "file_size": stat.st_size,
  61. "file_size_formatted": self._format_file_size(stat.st_size),
  62. "created_at": datetime.fromtimestamp(stat.st_ctime),
  63. "modified_at": datetime.fromtimestamp(stat.st_mtime),
  64. "is_readable": os.access(file_path, os.R_OK)
  65. }
  66. except Exception as e:
  67. self.logger.error(f"获取文件信息失败: {e}")
  68. return {
  69. "file_name": file_path.name,
  70. "file_path": str(file_path),
  71. "file_type": "unknown",
  72. "file_size": 0,
  73. "file_size_formatted": "0 B",
  74. "created_at": datetime.now(),
  75. "modified_at": datetime.now(),
  76. "is_readable": False
  77. }
  78. def _determine_file_type(self, file_path: Path) -> str:
  79. """根据文件扩展名确定文件类型"""
  80. suffix = file_path.suffix.lower()
  81. type_mapping = {
  82. '.ddl': 'ddl',
  83. '.sql': 'sql',
  84. '.md': 'markdown',
  85. '.markdown': 'markdown',
  86. '.json': 'json',
  87. '.txt': 'text',
  88. '.log': 'log'
  89. }
  90. return type_mapping.get(suffix, 'other')
  91. def _format_file_size(self, size_bytes: int) -> str:
  92. """格式化文件大小显示"""
  93. if size_bytes == 0:
  94. return "0 B"
  95. size_names = ["B", "KB", "MB", "GB"]
  96. i = 0
  97. size = float(size_bytes)
  98. while size >= 1024.0 and i < len(size_names) - 1:
  99. size /= 1024.0
  100. i += 1
  101. return f"{size:.1f} {size_names[i]}"
  102. def get_file_path(self, task_id: str, file_name: str) -> Path:
  103. """获取文件的完整路径"""
  104. task_dir = self.get_task_directory(task_id)
  105. return task_dir / file_name
  106. def file_exists(self, task_id: str, file_name: str) -> bool:
  107. """检查文件是否存在"""
  108. file_path = self.get_file_path(task_id, file_name)
  109. return file_path.exists() and file_path.is_file()
  110. def is_file_safe(self, task_id: str, file_name: str) -> bool:
  111. """检查文件路径是否安全(防止路径遍历攻击)"""
  112. try:
  113. task_dir = self.get_task_directory(task_id)
  114. file_path = task_dir / file_name
  115. # 确保文件在任务目录内
  116. file_path.resolve().relative_to(task_dir.resolve())
  117. return True
  118. except ValueError:
  119. return False
  120. def get_directory_info(self, task_id: str) -> Dict[str, Any]:
  121. """获取任务目录信息"""
  122. try:
  123. task_dir = self.get_task_directory(task_id)
  124. if not task_dir.exists():
  125. return {
  126. "exists": False,
  127. "directory_path": str(task_dir),
  128. "total_files": 0,
  129. "total_size": 0,
  130. "total_size_formatted": "0 B"
  131. }
  132. files = self.get_task_files(task_id)
  133. total_size = sum(file_info['file_size'] for file_info in files)
  134. return {
  135. "exists": True,
  136. "directory_path": str(task_dir),
  137. "total_files": len(files),
  138. "total_size": total_size,
  139. "total_size_formatted": self._format_file_size(total_size)
  140. }
  141. except Exception as e:
  142. self.logger.error(f"获取目录信息失败: {e}")
  143. return {
  144. "exists": False,
  145. "directory_path": str(self.get_task_directory(task_id)),
  146. "total_files": 0,
  147. "total_size": 0,
  148. "total_size_formatted": "0 B"
  149. }