|
@@ -20,91 +20,142 @@ def production_draw_graph(id, type):
|
|
|
Returns:
|
|
|
dict: 包含节点、连线和根节点ID的图谱数据
|
|
|
"""
|
|
|
- # 数据模型
|
|
|
- if type == "data_model":
|
|
|
- cql = """
|
|
|
- MATCH (n)
|
|
|
- WHERE id(n) = $nodeId AND labels(n)[0] = "data_model"
|
|
|
- MATCH (n)-[r:connection]-(m:meta_node)
|
|
|
- OPTIONAL MATCH (n)-[r2:clean_model]-(d:data_standard)
|
|
|
- OPTIONAL MATCH (d)-[r3:clean_model]-(m)
|
|
|
- WITH
|
|
|
- collect({from: toString(id(n)), to: toString(id(m)), text: "包含"}) AS line1,
|
|
|
- collect({from: toString(id(n)), to: toString(id(d)), text: "清洗"}) AS line2,
|
|
|
- collect({from: toString(id(d)), to: toString(id(m)), text: "清洗"}) AS line3,
|
|
|
- collect({id: toString(id(n)), text: n.name, type: "model"}) AS node1,
|
|
|
- collect({id: toString(id(m)), text: m.name}) AS node2,
|
|
|
- collect({id: toString(id(d)), text: d.name, type: "standard"}) AS node3,n
|
|
|
- WITH apoc.coll.toSet(line1 + line2 + line3) AS lines,
|
|
|
- apoc.coll.toSet(node1 + node2 + node3) AS nodes,
|
|
|
- toString(id(n)) as res
|
|
|
- RETURN lines,nodes,res
|
|
|
- """
|
|
|
- data = connect_graph.run(cql, nodeId=id).data()
|
|
|
- res = {}
|
|
|
- for item in data:
|
|
|
- res = {
|
|
|
- "nodes": [record for record in item['nodes'] if record['id']],
|
|
|
- "lines": [record for record in item['lines'] if record['from'] and record['to']],
|
|
|
- "rootId": item['res'],
|
|
|
- }
|
|
|
-
|
|
|
- return res
|
|
|
- # 数据资源
|
|
|
- elif type == "data_resource":
|
|
|
- cql = """
|
|
|
- MATCH (n)
|
|
|
- WHERE id(n) = $nodeId AND labels(n)[0] = "data_resource"
|
|
|
- MATCH (n)-[r:connection]-(m:meta_node)
|
|
|
- OPTIONAL MATCH (n)-[r2:clean_resource]-(d:data_standard)
|
|
|
- OPTIONAL MATCH (d)-[r3:clean_resource]-(m)
|
|
|
- WITH
|
|
|
- collect({from: toString(id(n)), to: toString(id(m)), text: "包含"}) AS lines1,
|
|
|
- collect({from: toString(id(n)), to: toString(id(d)), text: "清洗"}) AS lines2,
|
|
|
- collect({from: toString(id(d)), to: toString(id(m)), text: "清洗"}) AS lines3,
|
|
|
- collect({id: toString(id(n)), text: n.name, type: "resource"}) AS nodes1,
|
|
|
- collect({id: toString(id(m)), text: m.name}) AS nodes2,
|
|
|
- collect({id: toString(id(d)), text: d.name, type: "standard"}) AS nodes3,n
|
|
|
- WITH
|
|
|
- apoc.coll.toSet(lines1 + lines2 + lines3) AS lines,
|
|
|
- apoc.coll.toSet(nodes1 + nodes2 + nodes3) AS nodes,
|
|
|
- toString(id(n)) AS res
|
|
|
- RETURN lines, nodes, res
|
|
|
- """
|
|
|
- data = connect_graph.run(cql, nodeId=id).data()
|
|
|
- res = {}
|
|
|
- for item in data:
|
|
|
- res = {
|
|
|
- "nodes": [record for record in item['nodes'] if record['id']],
|
|
|
- "lines": [record for record in item['lines'] if record['from'] and record['to'] ],
|
|
|
- "rootId": item['res'],
|
|
|
- }
|
|
|
-
|
|
|
- return res
|
|
|
- # 数据指标
|
|
|
- elif type == "data_metric":
|
|
|
- cql = """
|
|
|
- MATCH (n)
|
|
|
- WHERE id(n) = $nodeId AND labels(n)[0] = "data_metric"
|
|
|
- MATCH (n)-[r:connection]-(m:meta_node)
|
|
|
- WITH collect({from: toString(id(n)), to: toString(id(m)), text: "处理"}) AS line1,
|
|
|
- collect({id: toString(id(n)), text: n.name, type: "etric"}) AS node1,
|
|
|
- collect({id: toString(id(m)), text: m.name}) AS node2,n
|
|
|
- WITH apoc.coll.toSet(line1) AS lines,
|
|
|
- apoc.coll.toSet(node1 + node2) AS nodes,
|
|
|
- toString(id(n)) as res
|
|
|
- RETURN lines,nodes,res
|
|
|
- """
|
|
|
- data = connect_graph.run(cql, nodeId=id).data()
|
|
|
- res = {}
|
|
|
- for item in data:
|
|
|
- res = {
|
|
|
- "nodes": [record for record in item['nodes'] if record['id']],
|
|
|
- "lines": [record for record in item['lines'] if record['from'] and record['to']],
|
|
|
- "rootId": item['res'],
|
|
|
- }
|
|
|
-
|
|
|
- return res
|
|
|
+ # 获取Neo4j连接
|
|
|
+ driver = connect_graph()
|
|
|
+ if not driver:
|
|
|
+ logger.error("无法连接到数据库")
|
|
|
+ return {"nodes": [], "lines": [], "rootId": "", "error": "无法连接到数据库"}
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 首先验证节点是否存在
|
|
|
+ with driver.session() as session:
|
|
|
+ check_node_query = """
|
|
|
+ MATCH (n)
|
|
|
+ WHERE id(n) = $nodeId
|
|
|
+ RETURN n, labels(n) as labels, n.name as name
|
|
|
+ """
|
|
|
+ check_result = session.run(check_node_query, nodeId=id).single()
|
|
|
+
|
|
|
+ if not check_result:
|
|
|
+ logger.error(f"节点不存在: ID={id}")
|
|
|
+ return {"nodes": [], "lines": [], "rootId": "", "error": "节点不存在"}
|
|
|
+
|
|
|
+ actual_type = check_result["labels"][0] # 获取实际的节点类型
|
|
|
+ node_name = check_result["name"]
|
|
|
+
|
|
|
+ # 如果提供的类型与实际类型不匹配,使用实际类型
|
|
|
+ if type.lower() != actual_type.lower():
|
|
|
+ logger.warning(f"提供的类型({type})与实际类型({actual_type})不匹配,使用实际类型")
|
|
|
+ type = actual_type
|
|
|
+
|
|
|
+ # 数据模型
|
|
|
+ if type.lower() == "data_model":
|
|
|
+ cql = """
|
|
|
+ MATCH (n:data_model)
|
|
|
+ WHERE id(n) = $nodeId
|
|
|
+ OPTIONAL MATCH (n)-[r:connection]-(m:meta_node)
|
|
|
+ OPTIONAL MATCH (n)-[r2:clean_model]-(d:data_standard)
|
|
|
+ OPTIONAL MATCH (d)-[r3:clean_model]-(m)
|
|
|
+ WITH
|
|
|
+ collect({from: toString(id(n)), to: toString(id(m)), text: "包含"}) AS line1,
|
|
|
+ collect({from: toString(id(n)), to: toString(id(d)), text: "清洗"}) AS line2,
|
|
|
+ collect({from: toString(id(d)), to: toString(id(m)), text: "清洗"}) AS line3,
|
|
|
+ collect({id: toString(id(n)), text: n.name, type: "model"}) AS node1,
|
|
|
+ collect({id: toString(id(m)), text: m.name}) AS node2,
|
|
|
+ collect({id: toString(id(d)), text: d.name, type: "standard"}) AS node3,n
|
|
|
+ WITH apoc.coll.toSet(line1 + line2 + line3) AS lines,
|
|
|
+ apoc.coll.toSet(node1 + node2 + node3) AS nodes,
|
|
|
+ toString(id(n)) as res
|
|
|
+ RETURN lines,nodes,res
|
|
|
+ """
|
|
|
+ # 数据资源
|
|
|
+ elif type.lower() == "data_resource":
|
|
|
+ cql = """
|
|
|
+ MATCH (n:data_resource)
|
|
|
+ WHERE id(n) = $nodeId
|
|
|
+ OPTIONAL MATCH (n)-[r:connection]-(m:meta_node)
|
|
|
+ OPTIONAL MATCH (n)-[r2:clean_resource]-(d:data_standard)
|
|
|
+ OPTIONAL MATCH (d)-[r3:clean_resource]-(m)
|
|
|
+ WITH
|
|
|
+ collect({from: toString(id(n)), to: toString(id(m)), text: "包含"}) AS lines1,
|
|
|
+ collect({from: toString(id(n)), to: toString(id(d)), text: "清洗"}) AS lines2,
|
|
|
+ collect({from: toString(id(d)), to: toString(id(m)), text: "清洗"}) AS lines3,
|
|
|
+ collect({id: toString(id(n)), text: n.name, type: "resource"}) AS nodes1,
|
|
|
+ collect({id: toString(id(m)), text: m.name}) AS nodes2,
|
|
|
+ collect({id: toString(id(d)), text: d.name, type: "standard"}) AS nodes3,n
|
|
|
+ WITH
|
|
|
+ apoc.coll.toSet(lines1 + lines2 + lines3) AS lines,
|
|
|
+ apoc.coll.toSet(nodes1 + nodes2 + nodes3) AS nodes,
|
|
|
+ toString(id(n)) AS res
|
|
|
+ RETURN lines, nodes, res
|
|
|
+ """
|
|
|
+ # 数据指标
|
|
|
+ elif type.lower() == "data_metric":
|
|
|
+ cql = """
|
|
|
+ MATCH (n:data_metric)
|
|
|
+ WHERE id(n) = $nodeId
|
|
|
+ OPTIONAL MATCH (n)-[r:connection]-(m:meta_node)
|
|
|
+ WITH collect({from: toString(id(n)), to: toString(id(m)), text: "处理"}) AS line1,
|
|
|
+ collect({id: toString(id(n)), text: n.name, type: "metric"}) AS node1,
|
|
|
+ collect({id: toString(id(m)), text: m.name}) AS node2,n
|
|
|
+ WITH apoc.coll.toSet(line1) AS lines,
|
|
|
+ apoc.coll.toSet(node1 + node2) AS nodes,
|
|
|
+ toString(id(n)) as res
|
|
|
+ RETURN lines,nodes,res
|
|
|
+ """
|
|
|
+ else:
|
|
|
+ # 处理未知节点类型
|
|
|
+ cql = """
|
|
|
+ MATCH (n)
|
|
|
+ WHERE id(n) = $nodeId
|
|
|
+ OPTIONAL MATCH (n)-[r]-(m)
|
|
|
+ WITH collect({from: toString(id(n)), to: toString(id(m)), text: type(r)}) AS lines,
|
|
|
+ collect({id: toString(id(n)), text: n.name, type: labels(n)[0]}) AS nodes1,
|
|
|
+ collect({id: toString(id(m)), text: m.name, type: labels(m)[0]}) AS nodes2,
|
|
|
+ toString(id(n)) as res
|
|
|
+ RETURN apoc.coll.toSet(lines) AS lines,
|
|
|
+ apoc.coll.toSet(nodes1 + nodes2) AS nodes,
|
|
|
+ res
|
|
|
+ """
|
|
|
+
|
|
|
+ with driver.session() as session:
|
|
|
+ try:
|
|
|
+ result = session.run(cql, nodeId=id)
|
|
|
+ data = result.data()
|
|
|
+
|
|
|
+ # 如果没有数据,返回节点自身
|
|
|
+ if not data:
|
|
|
+ return {
|
|
|
+ "nodes": [{"id": str(id), "text": node_name, "type": type}],
|
|
|
+ "lines": [],
|
|
|
+ "rootId": str(id)
|
|
|
+ }
|
|
|
+
|
|
|
+ res = {}
|
|
|
+ for item in data:
|
|
|
+ res = {
|
|
|
+ "nodes": [record for record in item['nodes'] if record.get('id')],
|
|
|
+ "lines": [record for record in item['lines'] if record.get('from') and record.get('to')],
|
|
|
+ "rootId": item['res'],
|
|
|
+ }
|
|
|
+
|
|
|
+ # 确保节点列表不为空
|
|
|
+ if not res.get("nodes"):
|
|
|
+ res["nodes"] = [{"id": str(id), "text": node_name, "type": type}]
|
|
|
+
|
|
|
+ return res
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"执行图谱查询失败: {str(e)}")
|
|
|
+ return {
|
|
|
+ "nodes": [{"id": str(id), "text": node_name, "type": type}],
|
|
|
+ "lines": [],
|
|
|
+ "rootId": str(id),
|
|
|
+ "error": f"查询执行失败: {str(e)}"
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"生成图谱失败: {str(e)}")
|
|
|
+ return {"nodes": [], "lines": [], "rootId": "", "error": str(e)}
|
|
|
|
|
|
"""
|
|
|
Manual execution functions for production line
|