123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- import json
- import logging
- from flask import request, jsonify
- from app.api.production_line import bp
- from app.models.result import success, failed
- from app.api.graph.routes import MyEncoder, connect_graph
- from app.core.production_line import production_draw_graph
- from app.core.production_line.production_line import execute_production_line
- logger = logging.getLogger(__name__)
- # 生产线列表
- @bp.route('/production/line/list', methods=['POST'])
- def production_line_list():
- """
- 获取生产线列表,支持分页和名称过滤
-
- Args (通过JSON请求体):
- current (int): 当前页码,默认为1
- size (int): 每页大小,默认为10
- name (str, optional): 名称过滤条件
-
- Returns:
- JSON: 包含生产线列表和分页信息的响应
- """
- try:
- receiver = request.get_json()
- page = int(receiver.get('current', 1))
- page_size = int(receiver.get('size', 10))
- name_filter = receiver.get('name', None)
- # 计算跳过的记录的数量
- skip_count = (page - 1) * page_size
- if name_filter:
- where_clause = f"n.name CONTAINS'{name_filter}'"
- else:
- where_clause = "TRUE"
- cql = f"""
- MATCH (n)
- WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause}
- WITH id(n) AS id, n.name AS name, labels(n)[0] AS type,n
- RETURN {{
- id: id,
- name: name,
- type: type
- }} AS result,n.time as time
- ORDER BY time desc
- SKIP {skip_count}
- LIMIT {page_size}
- """
-
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return json.dumps(failed("无法连接到数据库"), ensure_ascii=False, cls=MyEncoder)
-
- with driver.session() as session:
- result = session.run(cql)
- data = result.data()
- records = []
- for item in data:
- records.append(item['result'])
- # 获取总量
- total_query = f"MATCH (n) WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause}" \
- f" RETURN COUNT(n) AS total"
- total_result = session.run(total_query).single()["total"]
-
- response_data = {'records': records, 'total': total_result, 'size': page_size, 'current': page}
- res = success(response_data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 根据生产线列表,传入id,绘制图谱
- @bp.route('/production/line/graph', methods=['POST'])
- def production_line_graph():
- """
- 根据生产线ID绘制关系图谱
-
- Args (通过JSON请求体):
- id (int): 节点ID
-
- Returns:
- JSON: 包含图谱数据的响应
- """
- try:
- # 获取请求参数
- receiver = request.get_json()
- if not receiver or 'id' not in receiver:
- return json.dumps(failed("缺少必要参数: id"), ensure_ascii=False, cls=MyEncoder)
-
- id = receiver['id']
- # 修改: 专门处理ID为0的情况,将ID类型视为整数
- if id is None:
- return json.dumps(failed("节点ID不能为空"), ensure_ascii=False, cls=MyEncoder)
-
- # 确保ID是整数类型
- try:
- id = int(id)
- except (ValueError, TypeError):
- return json.dumps(failed("节点ID必须是整数"), ensure_ascii=False, cls=MyEncoder)
-
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return json.dumps(failed("无法连接到数据库"), ensure_ascii=False, cls=MyEncoder)
-
- with driver.session() as session:
- # 检查节点是否存在
- check_query = """
- MATCH (n) WHERE id(n) = $nodeId
- RETURN labels(n)[0] as type, n.name as name
- """
- result = session.run(check_query, nodeId=id)
- record = result.single()
-
- if not record:
- return json.dumps(failed(f"节点不存在: ID={id}"), ensure_ascii=False, cls=MyEncoder)
-
- type = record["type"]
-
- # 生成图谱
- data = production_draw_graph(id, type)
-
- # 检查返回结果是否包含错误
- if "error" in data:
- error_msg = data.pop("error")
- return json.dumps(failed(error_msg, data), ensure_ascii=False, cls=MyEncoder)
-
- return json.dumps(success(data, "success"), ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"生成图谱失败: {str(e)}")
- return json.dumps(failed(str(e)), ensure_ascii=False, cls=MyEncoder)
- """
- Manual execution API endpoint
- Author: paul
- Date: 2024-03-20
- """
- @bp.route('/production/line/execute', methods=['POST'])
- def production_line_execute():
- """
- 手动执行数据资源加载
-
- Args (通过JSON请求体):
- id (int): 数据资源ID
-
- Returns:
- JSON: 执行结果
- """
- try:
- # 获取资源ID
- resource_id = request.json.get('id')
- if resource_id is None: # 修改检查逻辑,只有当ID为None时才报错
- return jsonify(failed("资源ID不能为空"))
-
- # 执行加载
- result = execute_production_line(resource_id)
-
- if result['status'] == 'success':
- return jsonify(success(result))
- else:
- return jsonify(failed(result['message']))
-
- except Exception as e:
- logger.error(f"执行数据资源加载失败: {str(e)}")
- return jsonify(failed(str(e)))
|