routes.py 9.4 KB


  1. import json
  2. import logging
  3. import requests
  4. from flask import request, jsonify, current_app
  5. from app.api.production_line import bp
  6. from app.models.result import success, failed
  7. from app.api.graph.routes import MyEncoder, connect_graph
  8. from app.core.production_line import production_draw_graph
  9. # from app.core.production_line.production_line import execute_production_line # 注释掉原来的导入
  10. logger = logging.getLogger(__name__)
  11. # 生产线列表
  12. @bp.route('/production/line/list', methods=['POST'])
  13. def production_line_list():
  14. """
  15. 获取生产线列表,支持分页和名称过滤
  16. Args (通过JSON请求体):
  17. current (int): 当前页码,默认为1
  18. size (int): 每页大小,默认为10
  19. name (str, optional): 名称过滤条件
  20. Returns:
  21. JSON: 包含生产线列表和分页信息的响应
  22. """
  23. try:
  24. receiver = request.get_json()
  25. page = int(receiver.get('current', 1))
  26. page_size = int(receiver.get('size', 10))
  27. name_filter = receiver.get('name', None)
  28. # 计算跳过的记录的数量
  29. skip_count = (page - 1) * page_size
  30. if name_filter:
  31. where_clause = f"n.name CONTAINS'{name_filter}'"
  32. else:
  33. where_clause = "TRUE"
  34. cql = f"""
  35. MATCH (n)
  36. WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause}
  37. WITH id(n) AS id, n.name AS name, labels(n)[0] AS type,n
  38. RETURN {{
  39. id: id,
  40. name: name,
  41. en_name: n.en_name,
  42. type: type
  43. }} AS result,n.time as time
  44. ORDER BY time desc
  45. SKIP {skip_count}
  46. LIMIT {page_size}
  47. """
  48. # 修复:使用正确的session方式执行查询
  49. driver = connect_graph()
  50. if not driver:
  51. return json.dumps(failed("无法连接到数据库"), ensure_ascii=False, cls=MyEncoder)
  52. with driver.session() as session:
  53. result = session.run(cql)
  54. data = result.data()
  55. records = []
  56. for item in data:
  57. records.append(item['result'])
  58. # 获取总量
  59. total_query = f"MATCH (n) WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause}" \
  60. f" RETURN COUNT(n) AS total"
  61. total_result = session.run(total_query).single()["total"]
  62. response_data = {'records': records, 'total': total_result, 'size': page_size, 'current': page}
  63. res = success(response_data, "success")
  64. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  65. except Exception as e:
  66. res = failed({}, {"error": f"{e}"})
  67. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  68. # 根据生产线列表,传入id,绘制图谱
  69. @bp.route('/production/line/graph', methods=['POST'])
  70. def production_line_graph():
  71. """
  72. 根据生产线ID绘制关系图谱
  73. Args (通过JSON请求体):
  74. id (int): 节点ID
  75. Returns:
  76. JSON: 包含图谱数据的响应
  77. """
  78. try:
  79. # 获取请求参数
  80. receiver = request.get_json()
  81. if not receiver or 'id' not in receiver:
  82. return json.dumps(failed("缺少必要参数: id"), ensure_ascii=False, cls=MyEncoder)
  83. id = receiver['id']
  84. # 修改: 专门处理ID为0的情况,将ID类型视为整数
  85. if id is None:
  86. return json.dumps(failed("节点ID不能为空"), ensure_ascii=False, cls=MyEncoder)
  87. # 确保ID是整数类型
  88. try:
  89. id = int(id)
  90. except (ValueError, TypeError):
  91. return json.dumps(failed("节点ID必须是整数"), ensure_ascii=False, cls=MyEncoder)
  92. # 修复:使用正确的session方式执行查询
  93. driver = connect_graph()
  94. if not driver:
  95. return json.dumps(failed("无法连接到数据库"), ensure_ascii=False, cls=MyEncoder)
  96. with driver.session() as session:
  97. # 检查节点是否存在
  98. check_query = """
  99. MATCH (n) WHERE id(n) = $nodeId
  100. RETURN labels(n)[0] as type, n.name as name
  101. """
  102. result = session.run(check_query, nodeId=id)
  103. record = result.single()
  104. if not record:
  105. return json.dumps(failed(f"节点不存在: ID={id}"), ensure_ascii=False, cls=MyEncoder)
  106. type = record["type"]
  107. # 生成图谱
  108. data = production_draw_graph(id, type)
  109. # 检查返回结果是否包含错误
  110. if "error" in data:
  111. error_msg = data.pop("error")
  112. return json.dumps(failed(error_msg, data), ensure_ascii=False, cls=MyEncoder)
  113. return json.dumps(success(data, "success"), ensure_ascii=False, cls=MyEncoder)
  114. except Exception as e:
  115. logger.error(f"生成图谱失败: {str(e)}")
  116. return json.dumps(failed(str(e)), ensure_ascii=False, cls=MyEncoder)
  117. """
  118. Manual execution API endpoint
  119. Author: paul
  120. Date: 2024-03-20
  121. """
  122. @bp.route('/production/line/execute', methods=['POST'])
  123. def production_line_execute():
  124. """
  125. 手动执行数据资源加载 - 通过调用 Airflow REST API
  126. Args (通过JSON请求体):
  127. id (int): 数据资源ID
  128. target_table (str): 目标表名
  129. dependency_level (str): 依赖级别,默认为 "resource"
  130. Returns:
  131. JSON: 执行结果
  132. """
  133. try:
  134. # 获取请求参数
  135. receiver = request.get_json()
  136. if not receiver:
  137. return jsonify(failed("请求体不能为空"))
  138. #resource_id = receiver.get('id')
  139. target_table = receiver.get('target_table')
  140. dependency_level = receiver.get('dependency_level', 'resource')
  141. if not target_table:
  142. return jsonify(failed("目标表名不能为空"))
  143. # Airflow API 配置 - 从配置文件获取
  144. AIRFLOW_API_ENDPOINT = '/api/v1/dags/dataops_productline_manual_trigger_dag/dagRuns'
  145. airflow_base_url = current_app.config['AIRFLOW_BASE_URL']
  146. airflow_url = f"{airflow_base_url}{AIRFLOW_API_ENDPOINT}"
  147. # 构建请求数据
  148. payload = {
  149. "conf": {
  150. "target_table": target_table,
  151. "dependency_level": dependency_level
  152. }
  153. }
  154. # 设置认证信息(Basic Auth) - 从配置文件获取
  155. auth = (current_app.config['AIRFLOW_AUTH_USER'], current_app.config['AIRFLOW_AUTH_PASSWORD'])
  156. # 设置请求头
  157. headers = {
  158. 'Content-Type': 'application/json'
  159. }
  160. # 记录请求信息到日志
  161. logger.info(f"准备调用 Airflow API: {airflow_url}")
  162. logger.info(f"请求参数: {json.dumps(payload, ensure_ascii=False, indent=2)}")
  163. # 调用 Airflow API
  164. response = requests.post(
  165. airflow_url,
  166. json=payload,
  167. auth=auth,
  168. headers=headers,
  169. timeout=30
  170. )
  171. # 检查响应状态
  172. if response.status_code == 200:
  173. airflow_result = response.json()
  174. # 打印Airflow API返回的结果到日志
  175. logger.info(f"Airflow API 返回结果: {json.dumps(airflow_result, ensure_ascii=False, indent=2)}")
  176. # 获取状态
  177. state = airflow_result.get("state")
  178. # 根据状态设置消息
  179. if state == "queued":
  180. message = "触发成功,正在执行"
  181. logger.info(f"DAG触发成功,状态: {state}, 运行ID: {airflow_result.get('dag_run_id')}")
  182. # 返回成功响应,包含完整的 Airflow 返回结果
  183. return jsonify(success(airflow_result, message))
  184. else:
  185. message = "无法触发执行,请联系管理员"
  186. logger.warning(f"DAG触发状态异常,状态: {state}, 运行ID: {airflow_result.get('dag_run_id')}")
  187. # 即使状态不是 queued,也返回完整的 Airflow 结果,但使用 failed 状态
  188. return jsonify(failed(message, airflow_result))
  189. else:
  190. # 处理错误响应
  191. error_msg = f"Airflow API 调用失败,状态码: {response.status_code}"
  192. try:
  193. error_detail = response.json()
  194. error_msg += f",错误详情: {error_detail}"
  195. except:
  196. error_msg += f",响应内容: {response.text}"
  197. logger.error(error_msg)
  198. return jsonify(failed(error_msg))
  199. except requests.exceptions.Timeout:
  200. error_msg = "调用 Airflow API 超时"
  201. logger.error(error_msg)
  202. return jsonify(failed(error_msg))
  203. except requests.exceptions.ConnectionError:
  204. error_msg = "无法连接到 Airflow 服务器"
  205. logger.error(error_msg)
  206. return jsonify(failed(error_msg))
  207. except requests.exceptions.RequestException as e:
  208. error_msg = f"请求 Airflow API 时发生错误: {str(e)}"
  209. logger.error(error_msg)
  210. return jsonify(failed(error_msg))
  211. except Exception as e:
  212. error_msg = f"执行数据资源加载失败: {str(e)}"
  213. logger.error(error_msg)
  214. return jsonify(failed(error_msg))