dataflows.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. import logging
  2. from typing import Dict, List, Optional, Any
  3. from datetime import datetime
  4. import json
  5. from app.core.llm.llm_service import llm_client
  6. logger = logging.getLogger(__name__)
  7. class DataFlowService:
  8. """数据流服务类,处理数据流相关的业务逻辑"""
  9. @staticmethod
  10. def get_dataflows(page: int = 1, page_size: int = 10, search: str = '') -> Dict[str, Any]:
  11. """
  12. 获取数据流列表
  13. Args:
  14. page: 页码
  15. page_size: 每页大小
  16. search: 搜索关键词
  17. Returns:
  18. 包含数据流列表和分页信息的字典
  19. """
  20. try:
  21. # TODO: 这里应该从数据库查询数据流列表
  22. # 目前返回模拟数据
  23. mock_dataflows = [
  24. {
  25. 'id': i,
  26. 'name': f'数据流_{i}',
  27. 'description': f'这是第{i}个数据流的描述',
  28. 'status': 'active' if i % 2 == 0 else 'inactive',
  29. 'created_at': datetime.now().isoformat(),
  30. 'updated_at': datetime.now().isoformat(),
  31. 'created_by': f'user_{i % 3 + 1}'
  32. }
  33. for i in range(1, 21)
  34. ]
  35. # 简单的搜索过滤
  36. if search:
  37. mock_dataflows = [
  38. df for df in mock_dataflows
  39. if search.lower() in df['name'].lower() or search.lower() in df['description'].lower()
  40. ]
  41. # 分页处理
  42. total = len(mock_dataflows)
  43. start = (page - 1) * page_size
  44. end = start + page_size
  45. dataflows = mock_dataflows[start:end]
  46. return {
  47. 'list': dataflows,
  48. 'pagination': {
  49. 'page': page,
  50. 'page_size': page_size,
  51. 'total': total,
  52. 'total_pages': (total + page_size - 1) // page_size
  53. }
  54. }
  55. except Exception as e:
  56. logger.error(f"获取数据流列表失败: {str(e)}")
  57. raise e
  58. @staticmethod
  59. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  60. """
  61. 根据ID获取数据流详情
  62. Args:
  63. dataflow_id: 数据流ID
  64. Returns:
  65. 数据流详情字典,如果不存在则返回None
  66. """
  67. try:
  68. # TODO: 这里应该从数据库查询指定的数据流
  69. # 目前返回模拟数据
  70. if dataflow_id <= 0 or dataflow_id > 20:
  71. return None
  72. return {
  73. 'id': dataflow_id,
  74. 'name': f'数据流_{dataflow_id}',
  75. 'description': f'这是第{dataflow_id}个数据流的详细描述',
  76. 'status': 'active' if dataflow_id % 2 == 0 else 'inactive',
  77. 'created_at': datetime.now().isoformat(),
  78. 'updated_at': datetime.now().isoformat(),
  79. 'created_by': f'user_{dataflow_id % 3 + 1}',
  80. 'config': {
  81. 'source': {
  82. 'type': 'database',
  83. 'connection': 'mysql://localhost:3306/test'
  84. },
  85. 'target': {
  86. 'type': 'file',
  87. 'path': '/data/output/'
  88. },
  89. 'transformations': [
  90. {'type': 'filter', 'condition': 'age > 18'},
  91. {'type': 'aggregate', 'group_by': 'department', 'function': 'count'}
  92. ]
  93. }
  94. }
  95. except Exception as e:
  96. logger.error(f"获取数据流详情失败: {str(e)}")
  97. raise e
  98. @staticmethod
  99. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  100. """
  101. 创建新的数据流
  102. Args:
  103. data: 数据流配置数据
  104. Returns:
  105. 创建的数据流信息
  106. """
  107. try:
  108. # TODO: 这里应该验证数据并保存到数据库
  109. # 目前返回模拟数据
  110. required_fields = ['name', 'description']
  111. for field in required_fields:
  112. if field not in data:
  113. raise ValueError(f"缺少必填字段: {field}")
  114. new_dataflow = {
  115. 'id': 21, # 模拟新生成的ID
  116. 'name': data['name'],
  117. 'description': data['description'],
  118. 'status': 'inactive',
  119. 'created_at': datetime.now().isoformat(),
  120. 'updated_at': datetime.now().isoformat(),
  121. 'created_by': 'current_user',
  122. 'config': data.get('config', {})
  123. }
  124. logger.info(f"创建数据流成功: {new_dataflow['name']}")
  125. return new_dataflow
  126. except Exception as e:
  127. logger.error(f"创建数据流失败: {str(e)}")
  128. raise e
  129. @staticmethod
  130. def update_dataflow(dataflow_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
  131. """
  132. 更新数据流
  133. Args:
  134. dataflow_id: 数据流ID
  135. data: 更新的数据
  136. Returns:
  137. 更新后的数据流信息,如果不存在则返回None
  138. """
  139. try:
  140. # TODO: 这里应该更新数据库中的数据流
  141. # 目前返回模拟数据
  142. if dataflow_id <= 0 or dataflow_id > 20:
  143. return None
  144. updated_dataflow = {
  145. 'id': dataflow_id,
  146. 'name': data.get('name', f'数据流_{dataflow_id}'),
  147. 'description': data.get('description', f'这是第{dataflow_id}个数据流的描述'),
  148. 'status': data.get('status', 'active' if dataflow_id % 2 == 0 else 'inactive'),
  149. 'created_at': datetime.now().isoformat(),
  150. 'updated_at': datetime.now().isoformat(),
  151. 'created_by': f'user_{dataflow_id % 3 + 1}',
  152. 'config': data.get('config', {})
  153. }
  154. logger.info(f"更新数据流成功: ID={dataflow_id}")
  155. return updated_dataflow
  156. except Exception as e:
  157. logger.error(f"更新数据流失败: {str(e)}")
  158. raise e
  159. @staticmethod
  160. def delete_dataflow(dataflow_id: int) -> bool:
  161. """
  162. 删除数据流
  163. Args:
  164. dataflow_id: 数据流ID
  165. Returns:
  166. 删除是否成功
  167. """
  168. try:
  169. # TODO: 这里应该从数据库删除数据流
  170. # 目前返回模拟结果
  171. if dataflow_id <= 0 or dataflow_id > 20:
  172. return False
  173. logger.info(f"删除数据流成功: ID={dataflow_id}")
  174. return True
  175. except Exception as e:
  176. logger.error(f"删除数据流失败: {str(e)}")
  177. raise e
  178. @staticmethod
  179. def execute_dataflow(dataflow_id: int, params: Dict[str, Any] = None) -> Dict[str, Any]:
  180. """
  181. 执行数据流
  182. Args:
  183. dataflow_id: 数据流ID
  184. params: 执行参数
  185. Returns:
  186. 执行结果信息
  187. """
  188. try:
  189. # TODO: 这里应该实际执行数据流
  190. # 目前返回模拟结果
  191. if dataflow_id <= 0 or dataflow_id > 20:
  192. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  193. execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  194. result = {
  195. 'execution_id': execution_id,
  196. 'dataflow_id': dataflow_id,
  197. 'status': 'running',
  198. 'started_at': datetime.now().isoformat(),
  199. 'params': params or {},
  200. 'progress': 0
  201. }
  202. logger.info(f"开始执行数据流: ID={dataflow_id}, execution_id={execution_id}")
  203. return result
  204. except Exception as e:
  205. logger.error(f"执行数据流失败: {str(e)}")
  206. raise e
  207. @staticmethod
  208. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  209. """
  210. 获取数据流执行状态
  211. Args:
  212. dataflow_id: 数据流ID
  213. Returns:
  214. 执行状态信息
  215. """
  216. try:
  217. # TODO: 这里应该查询实际的执行状态
  218. # 目前返回模拟状态
  219. if dataflow_id <= 0 or dataflow_id > 20:
  220. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  221. status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4]
  222. return {
  223. 'dataflow_id': dataflow_id,
  224. 'status': status,
  225. 'progress': 100 if status == 'completed' else (dataflow_id * 10) % 100,
  226. 'started_at': datetime.now().isoformat(),
  227. 'completed_at': datetime.now().isoformat() if status == 'completed' else None,
  228. 'error_message': '执行过程中发生错误' if status == 'failed' else None
  229. }
  230. except Exception as e:
  231. logger.error(f"获取数据流状态失败: {str(e)}")
  232. raise e
  233. @staticmethod
  234. def get_dataflow_logs(dataflow_id: int, page: int = 1, page_size: int = 50) -> Dict[str, Any]:
  235. """
  236. 获取数据流执行日志
  237. Args:
  238. dataflow_id: 数据流ID
  239. page: 页码
  240. page_size: 每页大小
  241. Returns:
  242. 执行日志列表和分页信息
  243. """
  244. try:
  245. # TODO: 这里应该查询实际的执行日志
  246. # 目前返回模拟日志
  247. if dataflow_id <= 0 or dataflow_id > 20:
  248. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  249. mock_logs = [
  250. {
  251. 'id': i,
  252. 'timestamp': datetime.now().isoformat(),
  253. 'level': ['INFO', 'WARNING', 'ERROR'][i % 3],
  254. 'message': f'数据流执行日志消息 {i}',
  255. 'component': ['source', 'transform', 'target'][i % 3]
  256. }
  257. for i in range(1, 101)
  258. ]
  259. # 分页处理
  260. total = len(mock_logs)
  261. start = (page - 1) * page_size
  262. end = start + page_size
  263. logs = mock_logs[start:end]
  264. return {
  265. 'logs': logs,
  266. 'pagination': {
  267. 'page': page,
  268. 'page_size': page_size,
  269. 'total': total,
  270. 'total_pages': (total + page_size - 1) // page_size
  271. }
  272. }
  273. except Exception as e:
  274. logger.error(f"获取数据流日志失败: {str(e)}")
  275. raise e
  276. @staticmethod
  277. def create_script(request_data: str) -> str:
  278. """
  279. 使用Deepseek模型生成脚本
  280. Args:
  281. request_data: 请求数据,用户需求的文本描述
  282. Returns:
  283. 生成的脚本内容(TXT格式)
  284. """
  285. try:
  286. # 构建prompt
  287. prompt_parts = []
  288. # 添加系统提示
  289. prompt_parts.append("请根据以下需求生成相应的数据处理脚本:")
  290. # 直接将request_data作为文本描述添加到prompt中
  291. prompt_parts.append(request_data)
  292. # 添加格式要求
  293. prompt_parts.append("\n请生成完整可执行的脚本代码,包含必要的注释和错误处理。")
  294. # 组合prompt
  295. full_prompt = "\n\n".join(prompt_parts)
  296. logger.info(f"开始调用Deepseek模型生成脚本,prompt长度: {len(full_prompt)}")
  297. # 调用LLM服务
  298. script_content = llm_client(full_prompt)
  299. if not script_content:
  300. raise ValueError("Deepseek模型返回空内容")
  301. # 确保返回的是文本格式
  302. if not isinstance(script_content, str):
  303. script_content = str(script_content)
  304. logger.info(f"脚本生成成功,内容长度: {len(script_content)}")
  305. return script_content
  306. except Exception as e:
  307. logger.error(f"生成脚本失败: {str(e)}")
  308. raise e