# 封装mysql执行函数、创建节点函数 import logging from flask_sqlalchemy import SQLAlchemy from app.core.graph.graph_operations import connect_graph logger = logging.getLogger(__name__) db = SQLAlchemy() def execute_sql(cur, sql, params): result = db.session.execute(sql, params) return result.fetchall() def sql_commit(sql): try: db.session.execute(sql) db.session.commit() except Exception as e: db.session.rollback() raise e def sql_execute_result(sql): try: result = db.session.execute(sql) return result.fetchall() except Exception as e: raise e # 创建或获取节点 """ def create_or_get_node(label, **properties): node = connect_graph().nodes.match(label, **properties).first() if node is None: node = Node(label, **properties) connect_graph().create(node) return node """ # 查询是否存在节点 """ def get_node(label, **properties): node = connect_graph.nodes.match(label, **properties).first() # 如果没有找到匹配的节点,node 将会是 None return node """ # 关系权重生成 def relation_weights(relation): relation_list = ["父亲", "母亲", "儿子", "女儿"] if relation in relation_list: return 3 else: return 1 def workplace_weights(workplace_list, workplace): if workplace in workplace_list: return 3 else: return 1 def soure_organization_name(workplace): query = ( f"match (n:workplace)<-[r:workin]-(subordinate_person:worker)" f"WHERE n.organization_no = '{workplace}' " f"return subordinate_person.code as code" ) driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(query, workplace=workplace) # type: ignore[arg-type] data = result.data() return data except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return [] finally: if driver: driver.close() # 输入人员编码列表,得到员工与工作单位的关系,并且在此函数内完成员工,亲属,以及人-工作单位关系的创建 def create_person_workplace(code_list, flag, relatives_type): nodes = [] links = [] condition = tuple(map(int, relatives_type.split(","))) relation_dict = { (0, 0, 0, 0): lambda: [], (0, 0, 0, 1): lambda: [], (0, 0, 1, 0): lambda: [], (0, 1, 0, 0): lambda: person_relative(links, code_list, 0), (0, 1, 0, 1): lambda: person_relative(links, code_list, 0), (0, 1, 1, 0): lambda: person_relative(links, code_list, 0), (0, 1, 1, 1): lambda: person_relative(links, code_list, 0), (1, 0, 0, 0): lambda: person_relative(links, code_list, 1), (1, 0, 0, 1): lambda: person_relative(links, code_list, 1), (1, 0, 1, 0): lambda: person_relative(links, code_list, 1), (1, 0, 1, 1): lambda: person_relative(links, code_list, 1), (1, 1, 0, 0): lambda: person_relative(links, code_list, (0, 1)), (1, 1, 0, 1): lambda: person_relative(links, code_list, (0, 1)), (1, 1, 1, 0): lambda: person_relative(links, code_list, (0, 1)), (1, 1, 1, 1): lambda: person_relative(links, code_list, (0, 1)), } query = """ MATCH (n:worker)-[r:relatives]-(m:worker), (n)-[:workin]-(wrk_n:workplace), (m)-[:workin]-(wrk_m:workplace) WHERE n.code IN $codes RETURN n.name as employee, id(n) as id_n, wrk_n.name as employee_workplace, id(wrk_n) as id_wrk_n, m.name as relatives, id(m) as id_m, wrk_m.name as relatives_workplace, id(wrk_m) as id_wrk_m, CASE WHEN exists(wrk_m.organization_no) THEN 1 ELSE 0 END as relatives_status """ result = [] driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(query, codes=code_list).data() except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return nodes, links finally: if driver: driver.close() handle_function = relation_dict.get(condition, []) # type: ignore[arg-type] for row in result: employee = row["employee"] id_employee = row["id_n"] employee_workplace = row["employee_workplace"] id_employee_workplace = row["id_wrk_n"] relatives = row["relatives"] id_relatives = row["id_m"] relatives_workplace = row["relatives_workplace"] id_relatives_workplace = row["id_wrk_m"] relatives_status = row["relatives_status"] nodes.extend(create_node(employee, id_employee, "selected")) nodes.extend( create_node( employee_workplace, id_employee_workplace, "work_place_selected" if flag else "internel_work_place", ) ) links.extend(create_relation(id_employee, id_employee_workplace, "work_in")) temp_node, temp_link = handle_condition( condition, relatives, id_relatives, relatives_workplace, id_relatives_workplace, relatives_status, ) nodes.extend(temp_node) links.extend(temp_link) if condition[0] != 0 or condition[1] != 0: links.extend(handle_function()) return nodes, links # 处理不同筛选条件的节点/关系 def handle_condition( condition, relatives, id_relatives, relatives_workplace, id_relatives_workplace, relatives_status, ): nodes = [] links = [] if condition == (0, 0, 0, 1): nodes.extend( create_node( relatives_workplace, id_relatives_workplace, "" if relatives_status else "externel_work_place", ) ) elif condition == (0, 0, 1, 0): nodes.extend( create_node( relatives_workplace, id_relatives_workplace, "internel_work_place" if relatives_status else "", ) ) elif condition == (0, 0, 1, 1): nodes.extend( create_node( relatives_workplace, id_relatives_workplace, "internel_work_place" if relatives_status else "externel_work_place", ) ) elif condition == (0, 1, 0, 0): nodes.extend( create_node( relatives, id_relatives, "external_relatives" if relatives_status == 0 else "", ) ) elif condition == (0, 1, 0, 1): nodes.extend( create_node( relatives, id_relatives, "" if relatives_status else "external_relatives", ) ) nodes.extend( create_node( relatives_workplace, id_relatives_workplace, "" if relatives_status else "externel_work_place", ) ) links.extend( create_relation( id_relatives, id_relatives_workplace if relatives_status == 0 else "", "work_in", ) ) elif condition == (0, 1, 1, 0): nodes.extend( create_node( relatives, id_relatives, "" if relatives_status else "external_relatives", ) ) nodes.extend( create_node( relatives_workplace, id_relatives_workplace, "internel_work_place" if relatives_status else "", ) ) elif condition == (0, 1, 1, 1): nodes.extend( create_node( relatives, id_relatives, "" if relatives_status else "external_relatives", ) ) nodes.extend( create_node( relatives_workplace, id_relatives_workplace, "internel_work_place" if relatives_status else "externel_work_place", ) ) links.extend( create_relation( id_relatives, id_relatives_workplace if relatives_status == 0 else "", "work_in", ) ) elif condition == (1, 0, 0, 0): nodes.extend( create_node( relatives, id_relatives, "internal_relatives" if relatives_status else "", ) ) elif condition == (1, 0, 0, 1): nodes.extend( create_node( relatives, id_relatives, "internal_relatives" if relatives_status else "", ) ) nodes.extend( create_node( relatives_workplace, id_relatives_workplace, "" if relatives_status else "externel_work_place", ) ) elif condition == (1, 0, 1, 0): nodes.extend( create_node( relatives, id_relatives, "internal_relatives" if relatives_status else "", ) ) nodes.extend( create_node( relatives_workplace, id_relatives_workplace, "internel_work_place" if relatives_status else "", ) ) links.extend( create_relation( id_relatives, id_relatives_workplace if relatives_status else "", "work_in", ) ) elif condition == (1, 0, 1, 1): nodes.extend( create_node( relatives, id_relatives, "internal_relatives" if relatives_status else "", ) ) nodes.extend( create_node( relatives_workplace, id_relatives_workplace, "internel_work_place" if relatives_status else "externel_work_place", ) ) links.extend( create_relation( id_relatives, id_relatives_workplace if relatives_status else "", "work_in", ) ) elif condition == (1, 1, 0, 0): nodes.extend( create_node( relatives, id_relatives, "internal_relatives" if relatives_status else "external_relatives", ) ) elif condition == (1, 1, 0, 1): nodes.extend( create_node( relatives, id_relatives, "internal_relatives" if relatives_status else "external_relatives", ) ) nodes.extend( create_node( relatives_workplace, id_relatives_workplace, "externel_work_place" if relatives_status == 0 else "", ) ) links.extend( create_relation( id_relatives, id_relatives_workplace if relatives_status == 0 else "", "work_in", ) ) elif condition == (1, 1, 1, 0): nodes.extend( create_node( relatives, id_relatives, "internal_relatives" if relatives_status else "external_relatives", ) ) nodes.extend( create_node( relatives_workplace, id_relatives_workplace, "internel_work_place" if relatives_status else "", ) ) links.extend( create_relation( id_relatives, id_relatives_workplace if relatives_status else "", "work_in", ) ) elif condition == (1, 1, 1, 1): nodes.extend( create_node( relatives, id_relatives, "internal_relatives" if relatives_status else "external_relatives", ) ) nodes.extend( create_node( relatives_workplace, id_relatives_workplace, "internel_work_place" if relatives_status else "externel_work_place", ) ) links.extend(create_relation(id_relatives, id_relatives_workplace, "work_in")) return nodes, links # 创建节点 def create_node(name, nodeid, node_type): if name in (None, "无") or node_type == "": return [] return [{"name": name, "id": nodeid, "type": node_type}] # 创建关系 def create_relation(start, end, relation_type): if end in (None, "无", ""): return [] return [{"source": start, "target": end, "type": relation_type}] # 创建员工和亲属的关系 def person_relative(links, code_list, status): query = """ MATCH (n:worker)-[r:relatives]-(m:worker) WHERE n.code IN $codes {} RETURN id(STARTNODE(r)) AS startnode, r.content AS content, id(ENDNODE(r)) AS endnode """.format( "WITH CASE WHEN exists(m.code) THEN 1 ELSE 0 END AS status,r " "WHERE status = $relatives_status" if isinstance(status, int) else "" ) driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(query, codes=code_list, relatives_status=status).data() except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return links finally: if driver: driver.close() for row in result: startnode = row["startnode"] endnode = row["endnode"] content = row["content"] links.extend(create_relation(startnode, endnode, content)) return links