123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- from configs.connections import connect_graph
- from functions.solve_graph import create_job_dataList, create_enterprise_dataList, create_seeker_dataList, \
- add_seeker_dataList
- import logging
- from flask import current_app
- logger = logging.getLogger(__name__)
- # 图谱节点类型:job(招聘职位) enterprise(企业) person-seeker(求职者) 给他们打标签建立关系 label
- # 针对招聘职位的操作
- def operate_job(operate, dataList):
- try:
- if operate == 'sync_all':
- sync_all_jobs(dataList)
- elif operate == 'add':
- add_jobs(dataList)
- elif operate == 'update':
- update_jobs(dataList)
- elif operate == 'delete':
- delete_jobs(dataList)
- else:
- current_app.logger.error(f'operate_job: Invalid operation')
- return True
- except Exception as e:
- current_app.logger.error(f'operate_job error: {e}')
- def sync_all_jobs(dataList):
- try:
- # 创建新的job数据
- create_job_dataList(dataList)
- except Exception as e:
- current_app.logger.error(f'sync_all_jobs error: {e}')
- def add_jobs(dataList):
- try:
- create_job_dataList(dataList)
- except Exception as e:
- current_app.logger.error(f'add_jobs error: {e}')
- def update_jobs(dataList):
- try:
- # 删除旧的job和jobLabel节点及关系
- delete_old_jobs(dataList)
- # 创建新的job数据
- create_job_dataList(dataList)
- except Exception as e:
- current_app.logger.error(f'update_jobs error: {e}')
- def delete_jobs(dataList):
- try:
- # 删除指定的job节点
- query = """
- UNWIND $Ids AS id
- MATCH (n:job {uniqueId: id})
- DETACH DELETE n
- """
- ids = [item['id'] for item in dataList]
- connect_graph.run(query, Ids=ids)
- except Exception as e:
- current_app.logger.error(f'delete_jobs error: {e}')
- def delete_old_jobs(dataList):
- try:
- # 删除与job节点相连的jobLabel节点和关系
- query = """
- WITH $Ids AS ids_list
- UNWIND ids_list AS Id
- MATCH (n:job {uniqueId: Id})
- WITH n
- MATCH (n)-[r:connection]-(m:jobLabel)
- WITH n, m, COUNT(r) AS connection_count, collect(r) AS relationships
- UNWIND relationships AS rel
- WITH n, m, connection_count, rel
- // 删除连接数为1的jobLabel节点
- WHERE connection_count = 1
- DETACH DELETE m
- WITH n, rel
- // 删除连接数大于1的connection关系
- WHERE connection_count > 1
- DELETE rel
- // 第二步:删除指定的job节点
- WITH $Ids AS ids_list
- UNWIND ids_list AS Id
- MATCH (n:job {uniqueId: Id})
- DETACH DELETE n
- """
- ids = [item['id'] for item in dataList]
- connect_graph.run(query, Ids=ids)
- except Exception as e:
- current_app.logger.error(f'delete_old_jobs error: {e}')
- # 企业
- def operate_enterprise(operate,dataList):
- try:
- if operate == 'sync_all':
- sync_all_enterprise(dataList)
- elif operate == 'add':
- add_enterprise(dataList)
- elif operate == 'update':
- update_enterprise(dataList)
- elif operate == 'delete':
- delete_enterprise(dataList)
- else:
- current_app.logger.error(f'operate_enterprise: Invalid operation')
- return True
- except Exception as e:
- current_app.logger.error(f'operate_enterprise error: {e}')
- def sync_all_enterprise(dataList):
- try:
- create_enterprise_dataList(dataList)
- except Exception as e:
- current_app.logger.error(f'sync_all_enterprise error:{e}')
- def add_enterprise(dataList):
- try:
- create_enterprise_dataList(dataList)
- except Exception as e:
- current_app.logger.error(f'add_enterprise error:{e}')
- def update_enterprise(dataList):
- try:
- delete_old_enterprise(dataList)
- create_enterprise_dataList(dataList)
- except Exception as e:
- current_app.logger.error(f'add_enterprise error:{e}')
- def delete_enterprise(dataList):
- try:
- query = """
- UNWIND $Ids AS id
- MATCH (n:enterprise) WHERE n.uniqueId = id
- DETACH DELETE n"""
- # 提取所有 id
- ids = [item['id'] for item in dataList]
- connect_graph.run(query, Ids=ids)
- except Exception as e:
- current_app.logger.error(f'delete_enterprise error:{e}')
- def delete_old_enterprise(dataList):
- try:
- # 先删除后新建
- query = """
- // 第一步:删除与enterprise节点相连的enterpriseLabel节点和关系
- WITH $Ids AS ids_list
- UNWIND ids_list AS Id
- MATCH (n:enterprise {uniqueId: Id})
- WITH n
- MATCH (n)-[r:connection]-(m:enterpriseLabel)
- WITH n, m, COUNT(r) AS connection_count, collect(r) AS relationships
- UNWIND relationships AS rel
- WITH n, m, connection_count, rel
- // 删除连接数为1的enterpriseLabel节点
- WHERE connection_count = 1
- DETACH DELETE m
- WITH n, rel
- // 删除连接数大于1的connection关系
- WHERE connection_count > 1
- DELETE rel
- // 第二步:删除指定的enterprise节点
- WITH $Ids AS ids_list
- UNWIND ids_list AS Id
- MATCH (n:enterprise {uniqueId: Id})
- DETACH DELETE n"""
- # 提取所有 id (批量删除多个)
- ids = [item['id'] for item in dataList]
- connect_graph.run(query, Ids=ids)
- except Exception as e:
- current_app.logger.error(f'delete_old_enterprise error:{e}')
- # 针对求职者的操作
- def operate_seeker(operate, dataList):
- try:
- if operate == 'sync_all':
- create_seeker_dataList(dataList)
- # 新增 修改基本信息
- elif operate == 'add':
- add_seeker_dataList(dataList)
- # part_seeker_dataList(dataList)
- # create_seeker_dataList(dataList) # 第一次同步大量数据
- elif operate == 'update':
- # 先删除后新建
- query = """
- // 第一步:删除与seeker节点相连的seekerLabel节点和关系
- WITH $Ids AS ids_list
- UNWIND ids_list AS Id
- MATCH (n:seeker {uniqueId: Id})
- WITH n
- MATCH (n)-[r:connection]-(m:seekerLabel)
- WITH n, m, COUNT(r) AS connection_count, collect(r) AS relationships
- UNWIND relationships AS rel
- WITH n, m, connection_count, rel
- // 删除连接数为1的seekerLabel节点
- WHERE connection_count = 1
- DETACH DELETE m
- WITH n, rel
- // 删除连接数大于1的connection关系
- WHERE connection_count > 1
- DELETE rel"""
- # 提取所有 id (批量删除多个)
- ids = [item['id'] for item in dataList]
- connect_graph.run(query, Ids=ids)
- cql = """
- // 第二步:删除指定的seeker节点
- WITH $Ids AS ids_list
- UNWIND ids_list AS Id
- MATCH (n:seeker {uniqueId: Id})
- DETACH DELETE n"""
- connect_graph.run(cql, Ids=ids)
- add_seeker_dataList(dataList)
- elif operate == 'delete':
- # 删除
- query = """
- // 第一步:删除与seeker节点相连的seekerLabel节点和关系
- WITH $Ids AS ids_list
- UNWIND ids_list AS Id
- MATCH (n:seeker {uniqueId: Id})
- WITH n
- MATCH (n)-[r:connection]-(m:seekerLabel)
- WITH n, m, COUNT(r) AS connection_count, collect(r) AS relationships
- UNWIND relationships AS rel
- WITH n, m, connection_count, rel
- // 删除连接数为1的seekerLabel节点
- WHERE connection_count = 1
- DETACH DELETE m
- WITH n, rel
- // 删除连接数大于1的connection关系
- WHERE connection_count > 1
- DELETE rel"""
- connect_graph.run(query, Id=dataList[0]['id'])
- cql = """
- // 第二步:删除指定的seeker节点
- WITH $Ids AS ids_list
- UNWIND ids_list AS Id
- MATCH (n:seeker {uniqueId: Id})
- DETACH DELETE n"""
- connect_graph.run(cql, Id=dataList[0]['id'])
- else:
- current_app.logger.error(f'operate_seeker: Invalid operation')
- return True
- except Exception as e:
- current_app.logger.error(f'operate_seeker error: {e}')
|