123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- import json
- import logging
- import requests
- from flask import request, jsonify, current_app
- from app.api.production_line import bp
- from app.models.result import success, failed
- from app.api.graph.routes import MyEncoder, connect_graph
- from app.core.production_line import production_draw_graph
- # from app.core.production_line.production_line import execute_production_line # 注释掉原来的导入
- logger = logging.getLogger(__name__)
- # 生产线列表
- @bp.route('/production/line/list', methods=['POST'])
- def production_line_list():
- """
- 获取生产线列表,支持分页和名称过滤
-
- Args (通过JSON请求体):
- current (int): 当前页码,默认为1
- size (int): 每页大小,默认为10
- name (str, optional): 名称过滤条件
-
- Returns:
- JSON: 包含生产线列表和分页信息的响应
- """
- try:
- receiver = request.get_json()
- page = int(receiver.get('current', 1))
- page_size = int(receiver.get('size', 10))
- name_filter = receiver.get('name', None)
- # 计算跳过的记录的数量
- skip_count = (page - 1) * page_size
- if name_filter:
- where_clause = f"n.name CONTAINS'{name_filter}'"
- else:
- where_clause = "TRUE"
- cql = f"""
- MATCH (n)
- WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause}
- WITH id(n) AS id, n.name AS name, labels(n)[0] AS type,n
- RETURN {{
- id: id,
- name: name,
- en_name: n.en_name,
- type: type
- }} AS result,n.time as time
- ORDER BY time desc
- SKIP {skip_count}
- LIMIT {page_size}
- """
-
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return json.dumps(failed("无法连接到数据库"), ensure_ascii=False, cls=MyEncoder)
-
- with driver.session() as session:
- result = session.run(cql)
- data = result.data()
- records = []
- for item in data:
- records.append(item['result'])
- # 获取总量
- total_query = f"MATCH (n) WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause}" \
- f" RETURN COUNT(n) AS total"
- total_result = session.run(total_query).single()["total"]
-
- response_data = {'records': records, 'total': total_result, 'size': page_size, 'current': page}
- res = success(response_data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 根据生产线列表,传入id,绘制图谱
- @bp.route('/production/line/graph', methods=['POST'])
- def production_line_graph():
- """
- 根据生产线ID绘制关系图谱
-
- Args (通过JSON请求体):
- id (int): 节点ID
-
- Returns:
- JSON: 包含图谱数据的响应
- """
- try:
- # 获取请求参数
- receiver = request.get_json()
- if not receiver or 'id' not in receiver:
- return json.dumps(failed("缺少必要参数: id"), ensure_ascii=False, cls=MyEncoder)
-
- id = receiver['id']
- # 修改: 专门处理ID为0的情况,将ID类型视为整数
- if id is None:
- return json.dumps(failed("节点ID不能为空"), ensure_ascii=False, cls=MyEncoder)
-
- # 确保ID是整数类型
- try:
- id = int(id)
- except (ValueError, TypeError):
- return json.dumps(failed("节点ID必须是整数"), ensure_ascii=False, cls=MyEncoder)
-
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return json.dumps(failed("无法连接到数据库"), ensure_ascii=False, cls=MyEncoder)
-
- with driver.session() as session:
- # 检查节点是否存在
- check_query = """
- MATCH (n) WHERE id(n) = $nodeId
- RETURN labels(n)[0] as type, n.name as name
- """
- result = session.run(check_query, nodeId=id)
- record = result.single()
-
- if not record:
- return json.dumps(failed(f"节点不存在: ID={id}"), ensure_ascii=False, cls=MyEncoder)
-
- type = record["type"]
-
- # 生成图谱
- data = production_draw_graph(id, type)
-
- # 检查返回结果是否包含错误
- if "error" in data:
- error_msg = data.pop("error")
- return json.dumps(failed(error_msg, data), ensure_ascii=False, cls=MyEncoder)
-
- return json.dumps(success(data, "success"), ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"生成图谱失败: {str(e)}")
- return json.dumps(failed(str(e)), ensure_ascii=False, cls=MyEncoder)
- """
- Manual execution API endpoint
- Author: paul
- Date: 2024-03-20
- """
- @bp.route('/production/line/execute', methods=['POST'])
- def production_line_execute():
- """
- 手动执行数据资源加载 - 通过调用 Airflow REST API
-
- Args (通过JSON请求体):
- id (int): 数据资源ID
- target_table (str): 目标表名
- dependency_level (str): 依赖级别,默认为 "resource"
-
- Returns:
- JSON: 执行结果
- """
- try:
- # 获取请求参数
- receiver = request.get_json()
- if not receiver:
- return jsonify(failed("请求体不能为空"))
-
- #resource_id = receiver.get('id')
- target_table = receiver.get('target_table')
- dependency_level = receiver.get('dependency_level', 'resource')
-
- if not target_table:
- return jsonify(failed("目标表名不能为空"))
-
- # Airflow API 配置 - 从配置文件获取
- AIRFLOW_API_ENDPOINT = '/api/v1/dags/dataops_productline_manual_trigger_dag/dagRuns'
- airflow_base_url = current_app.config['AIRFLOW_BASE_URL']
- airflow_url = f"{airflow_base_url}{AIRFLOW_API_ENDPOINT}"
-
- # 构建请求数据
- payload = {
- "conf": {
- "target_table": target_table,
- "dependency_level": dependency_level
- }
- }
-
- # 设置认证信息(Basic Auth) - 从配置文件获取
- auth = (current_app.config['AIRFLOW_AUTH_USER'], current_app.config['AIRFLOW_AUTH_PASSWORD'])
-
- # 设置请求头
- headers = {
- 'Content-Type': 'application/json'
- }
-
- # 记录请求信息到日志
- logger.info(f"准备调用 Airflow API: {airflow_url}")
- logger.info(f"请求参数: {json.dumps(payload, ensure_ascii=False, indent=2)}")
-
- # 调用 Airflow API
- response = requests.post(
- airflow_url,
- json=payload,
- auth=auth,
- headers=headers,
- timeout=30
- )
-
- # 检查响应状态
- if response.status_code == 200:
- airflow_result = response.json()
-
- # 打印Airflow API返回的结果到日志
- logger.info(f"Airflow API 返回结果: {json.dumps(airflow_result, ensure_ascii=False, indent=2)}")
-
- # 获取状态
- state = airflow_result.get("state")
-
- # 根据状态设置消息
- if state == "queued":
- message = "触发成功,正在执行"
- logger.info(f"DAG触发成功,状态: {state}, 运行ID: {airflow_result.get('dag_run_id')}")
- # 返回成功响应,包含完整的 Airflow 返回结果
- return jsonify(success(airflow_result, message))
- else:
- message = "无法触发执行,请联系管理员"
- logger.warning(f"DAG触发状态异常,状态: {state}, 运行ID: {airflow_result.get('dag_run_id')}")
- # 即使状态不是 queued,也返回完整的 Airflow 结果,但使用 failed 状态
- return jsonify(failed(message, airflow_result))
-
- else:
- # 处理错误响应
- error_msg = f"Airflow API 调用失败,状态码: {response.status_code}"
- try:
- error_detail = response.json()
- error_msg += f",错误详情: {error_detail}"
- except:
- error_msg += f",响应内容: {response.text}"
-
- logger.error(error_msg)
- return jsonify(failed(error_msg))
-
- except requests.exceptions.Timeout:
- error_msg = "调用 Airflow API 超时"
- logger.error(error_msg)
- return jsonify(failed(error_msg))
-
- except requests.exceptions.ConnectionError:
- error_msg = "无法连接到 Airflow 服务器"
- logger.error(error_msg)
- return jsonify(failed(error_msg))
-
- except requests.exceptions.RequestException as e:
- error_msg = f"请求 Airflow API 时发生错误: {str(e)}"
- logger.error(error_msg)
- return jsonify(failed(error_msg))
-
- except Exception as e:
- error_msg = f"执行数据资源加载失败: {str(e)}"
- logger.error(error_msg)
- return jsonify(failed(error_msg))
|