routes.py 9.7 KB


  1. import json
  2. import logging
  3. import requests
  4. from flask import request, jsonify, current_app
  5. from app.api.production_line import bp
  6. from app.models.result import success, failed
  7. from app.api.graph.routes import MyEncoder, connect_graph
  8. from app.core.production_line import production_draw_graph
  9. # from app.core.production_line.production_line import execute_production_line # 注释掉原来的导入
  10. logger = logging.getLogger(__name__)
  11. # 生产线列表
  12. @bp.route('/production/line/list', methods=['POST'])
  13. def production_line_list():
  14. """
  15. 获取生产线列表,支持分页和名称过滤
  16. Args (通过JSON请求体):
  17. current (int): 当前页码,默认为1
  18. size (int): 每页大小,默认为10
  19. name_zh (str, optional): 名称过滤条件
  20. Returns:
  21. JSON: 包含生产线列表和分页信息的响应
  22. """
  23. try:
  24. receiver = request.get_json()
  25. page = int(receiver.get('current', 1))
  26. page_size = int(receiver.get('size', 10))
  27. name_zh_filter = receiver.get('name_zh', None)
  28. # 计算跳过的记录的数量
  29. skip_count = (page - 1) * page_size
  30. if name_zh_filter:
  31. where_clause = f"n.name_zh CONTAINS'{name_zh_filter}'"
  32. else:
  33. where_clause = "TRUE"
  34. cql = f"""
  35. MATCH (n)
  36. WHERE (n:DataModel OR n:DataResource OR n:DataMetric) AND {where_clause}
  37. WITH id(n) AS id, n.name_zh AS name_zh, labels(n)[0] AS type,n
  38. RETURN {{
  39. id: id,
  40. name_zh: name_zh,
  41. name_en: n.name_en,
  42. type: type
  43. }} AS result,n.create_time as create_time
  44. ORDER BY create_time desc
  45. SKIP {skip_count}
  46. LIMIT {page_size}
  47. """
  48. driver = None
  49. try:
  50. driver = connect_graph()
  51. with driver.session() as session:
  52. result = session.run(cql)
  53. data = result.data()
  54. records = []
  55. for item in data:
  56. records.append(item['result'])
  57. # 获取总量
  58. total_query = f"MATCH (n) WHERE (n:DataModel OR n:DataResource OR n:DataMetric) AND {where_clause}" \
  59. f" RETURN COUNT(n) AS total"
  60. total_result = session.run(total_query).single()["total"]
  61. response_data = {'records': records, 'total': total_result, 'size': page_size, 'current': page}
  62. res = success(response_data, "success")
  63. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  64. except (ConnectionError, ValueError) as e:
  65. return json.dumps(failed(f"无法连接到数据库: {str(e)}"), ensure_ascii=False, cls=MyEncoder)
  66. finally:
  67. if driver:
  68. driver.close()
  69. except Exception as e:
  70. res = failed({}, {"error": f"{e}"})
  71. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  72. # 根据生产线列表,传入id,绘制图谱
  73. @bp.route('/production/line/graph', methods=['POST'])
  74. def production_line_graph():
  75. """
  76. 根据生产线ID绘制关系图谱
  77. Args (通过JSON请求体):
  78. id (int): 节点ID
  79. Returns:
  80. JSON: 包含图谱数据的响应
  81. """
  82. try:
  83. # 获取请求参数
  84. receiver = request.get_json()
  85. if not receiver or 'id' not in receiver:
  86. return json.dumps(failed("缺少必要参数: id"), ensure_ascii=False, cls=MyEncoder)
  87. id = receiver['id']
  88. # 修改: 专门处理ID为0的情况,将ID类型视为整数
  89. if id is None:
  90. return json.dumps(failed("节点ID不能为空"), ensure_ascii=False, cls=MyEncoder)
  91. # 确保ID是整数类型
  92. try:
  93. id = int(id)
  94. except (ValueError, TypeError):
  95. return json.dumps(failed("节点ID必须是整数"), ensure_ascii=False, cls=MyEncoder)
  96. driver = None
  97. try:
  98. driver = connect_graph()
  99. with driver.session() as session:
  100. # 检查节点是否存在
  101. check_query = """
  102. MATCH (n) WHERE id(n) = $nodeId
  103. RETURN labels(n)[0] as type, n.name_zh as name_zh
  104. """
  105. result = session.run(check_query, nodeId=id)
  106. record = result.single()
  107. if not record:
  108. return json.dumps(failed(f"节点不存在: ID={id}"), ensure_ascii=False, cls=MyEncoder)
  109. type = record["type"]
  110. except (ConnectionError, ValueError) as e:
  111. return json.dumps(failed(f"无法连接到数据库: {str(e)}"), ensure_ascii=False, cls=MyEncoder)
  112. finally:
  113. if driver:
  114. driver.close()
  115. # 生成图谱
  116. data = production_draw_graph(id, type)
  117. # 检查返回结果是否包含错误
  118. if "error" in data:
  119. error_msg = data.pop("error")
  120. return json.dumps(failed(error_msg, data), ensure_ascii=False, cls=MyEncoder)
  121. return json.dumps(success(data, "success"), ensure_ascii=False, cls=MyEncoder)
  122. except Exception as e:
  123. logger.error(f"生成图谱失败: {str(e)}")
  124. return json.dumps(failed(str(e)), ensure_ascii=False, cls=MyEncoder)
  125. """
  126. Manual execution API endpoint
  127. Author: paul
  128. Date: 2024-03-20
  129. """
  130. @bp.route('/production/line/execute', methods=['POST'])
  131. def production_line_execute():
  132. """
  133. 手动执行数据资源加载 - 通过调用 Airflow REST API
  134. Args (通过JSON请求体):
  135. id (int): 数据资源ID
  136. target_table (str): 目标表名
  137. dependency_level (str): 依赖级别,默认为 "resource"
  138. Returns:
  139. JSON: 执行结果
  140. """
  141. try:
  142. # 获取请求参数
  143. receiver = request.get_json()
  144. if not receiver:
  145. return jsonify(failed("请求体不能为空"))
  146. #resource_id = receiver.get('id')
  147. target_table = receiver.get('target_table')
  148. dependency_level = receiver.get('dependency_level', 'resource')
  149. if not target_table:
  150. return jsonify(failed("目标表名不能为空"))
  151. # Airflow API 配置 - 从配置文件获取
  152. AIRFLOW_API_ENDPOINT = '/api/v1/dags/dataops_productline_manual_trigger_dag/dagRuns'
  153. airflow_base_url = current_app.config['AIRFLOW_BASE_URL']
  154. airflow_url = f"{airflow_base_url}{AIRFLOW_API_ENDPOINT}"
  155. # 构建请求数据
  156. payload = {
  157. "conf": {
  158. "target_table": target_table,
  159. "dependency_level": dependency_level
  160. }
  161. }
  162. # 设置认证信息(Basic Auth) - 从配置文件获取
  163. auth = (current_app.config['AIRFLOW_AUTH_USER'], current_app.config['AIRFLOW_AUTH_PASSWORD'])
  164. # 设置请求头
  165. headers = {
  166. 'Content-Type': 'application/json'
  167. }
  168. # 记录请求信息到日志
  169. logger.info(f"准备调用 Airflow API: {airflow_url}")
  170. logger.info(f"请求参数: {json.dumps(payload, ensure_ascii=False, indent=2)}")
  171. # 调用 Airflow API
  172. response = requests.post(
  173. airflow_url,
  174. json=payload,
  175. auth=auth,
  176. headers=headers,
  177. timeout=30
  178. )
  179. # 检查响应状态
  180. if response.status_code == 200:
  181. airflow_result = response.json()
  182. # 打印Airflow API返回的结果到日志
  183. logger.info(f"Airflow API 返回结果: {json.dumps(airflow_result, ensure_ascii=False, indent=2)}")
  184. # 获取状态
  185. state = airflow_result.get("state")
  186. # 根据状态设置消息
  187. if state == "queued":
  188. message = "触发成功,正在执行"
  189. logger.info(f"DAG触发成功,状态: {state}, 运行ID: {airflow_result.get('dag_run_id')}")
  190. # 返回成功响应,包含完整的 Airflow 返回结果
  191. return jsonify(success(airflow_result, message))
  192. else:
  193. message = "无法触发执行,请联系管理员"
  194. logger.warning(f"DAG触发状态异常,状态: {state}, 运行ID: {airflow_result.get('dag_run_id')}")
  195. # 即使状态不是 queued,也返回完整的 Airflow 结果,但使用 failed 状态
  196. return jsonify(failed(message, airflow_result))
  197. else:
  198. # 处理错误响应
  199. error_msg = f"Airflow API 调用失败,状态码: {response.status_code}"
  200. try:
  201. error_detail = response.json()
  202. error_msg += f",错误详情: {error_detail}"
  203. except:
  204. error_msg += f",响应内容: {response.text}"
  205. logger.error(error_msg)
  206. return jsonify(failed(error_msg))
  207. except requests.exceptions.Timeout:
  208. error_msg = "调用 Airflow API 超时"
  209. logger.error(error_msg)
  210. return jsonify(failed(error_msg))
  211. except requests.exceptions.ConnectionError:
  212. error_msg = "无法连接到 Airflow 服务器"
  213. logger.error(error_msg)
  214. return jsonify(failed(error_msg))
  215. except requests.exceptions.RequestException as e:
  216. error_msg = f"请求 Airflow API 时发生错误: {str(e)}"
  217. logger.error(error_msg)
  218. return jsonify(failed(error_msg))
  219. except Exception as e:
  220. error_msg = f"执行数据资源加载失败: {str(e)}"
  221. logger.error(error_msg)
  222. return jsonify(failed(error_msg))