operate_graph.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. from configs.connections import connect_graph
  2. from functions.solve_graph import create_job_dataList, create_enterprise_dataList, create_seeker_dataList, \
  3. add_seeker_dataList
  4. import logging
  5. from flask import current_app
  6. logger = logging.getLogger(__name__)
  7. # 图谱节点类型:job(招聘职位) enterprise(企业) person-seeker(求职者) 给他们打标签建立关系 label
  8. # 针对招聘职位的操作
  9. def operate_job(operate, dataList):
  10. try:
  11. if operate == 'sync_all':
  12. sync_all_jobs(dataList)
  13. elif operate == 'add':
  14. add_jobs(dataList)
  15. elif operate == 'update':
  16. update_jobs(dataList)
  17. elif operate == 'delete':
  18. delete_jobs(dataList)
  19. else:
  20. current_app.logger.error(f'operate_job: Invalid operation')
  21. return True
  22. except Exception as e:
  23. current_app.logger.error(f'operate_job error: {e}')
  24. def sync_all_jobs(dataList):
  25. try:
  26. # 创建新的job数据
  27. create_job_dataList(dataList)
  28. except Exception as e:
  29. current_app.logger.error(f'sync_all_jobs error: {e}')
  30. def add_jobs(dataList):
  31. try:
  32. create_job_dataList(dataList)
  33. except Exception as e:
  34. current_app.logger.error(f'add_jobs error: {e}')
  35. def update_jobs(dataList):
  36. try:
  37. # 删除旧的job和jobLabel节点及关系
  38. delete_old_jobs(dataList)
  39. # 创建新的job数据
  40. create_job_dataList(dataList)
  41. except Exception as e:
  42. current_app.logger.error(f'update_jobs error: {e}')
  43. def delete_jobs(dataList):
  44. try:
  45. # 删除指定的job节点
  46. query = """
  47. UNWIND $Ids AS id
  48. MATCH (n:job {uniqueId: id})
  49. DETACH DELETE n
  50. """
  51. ids = [item['id'] for item in dataList]
  52. connect_graph.run(query, Ids=ids)
  53. except Exception as e:
  54. current_app.logger.error(f'delete_jobs error: {e}')
  55. def delete_old_jobs(dataList):
  56. try:
  57. # 删除与job节点相连的jobLabel节点和关系
  58. query = """
  59. WITH $Ids AS ids_list
  60. UNWIND ids_list AS Id
  61. MATCH (n:job {uniqueId: Id})
  62. WITH n
  63. MATCH (n)-[r:connection]-(m:jobLabel)
  64. WITH n, m, COUNT(r) AS connection_count, collect(r) AS relationships
  65. UNWIND relationships AS rel
  66. WITH n, m, connection_count, rel
  67. // 删除连接数为1的jobLabel节点
  68. WHERE connection_count = 1
  69. DETACH DELETE m
  70. WITH n, rel
  71. // 删除连接数大于1的connection关系
  72. WHERE connection_count > 1
  73. DELETE rel
  74. // 第二步:删除指定的job节点
  75. WITH $Ids AS ids_list
  76. UNWIND ids_list AS Id
  77. MATCH (n:job {uniqueId: Id})
  78. DETACH DELETE n
  79. """
  80. ids = [item['id'] for item in dataList]
  81. connect_graph.run(query, Ids=ids)
  82. except Exception as e:
  83. current_app.logger.error(f'delete_old_jobs error: {e}')
  84. # 企业
  85. def operate_enterprise(operate,dataList):
  86. try:
  87. if operate == 'sync_all':
  88. sync_all_enterprise(dataList)
  89. elif operate == 'add':
  90. add_enterprise(dataList)
  91. elif operate == 'update':
  92. update_enterprise(dataList)
  93. elif operate == 'delete':
  94. delete_enterprise(dataList)
  95. else:
  96. current_app.logger.error(f'operate_enterprise: Invalid operation')
  97. return True
  98. except Exception as e:
  99. current_app.logger.error(f'operate_enterprise error: {e}')
  100. def sync_all_enterprise(dataList):
  101. try:
  102. create_enterprise_dataList(dataList)
  103. except Exception as e:
  104. current_app.logger.error(f'sync_all_enterprise error:{e}')
  105. def add_enterprise(dataList):
  106. try:
  107. create_enterprise_dataList(dataList)
  108. except Exception as e:
  109. current_app.logger.error(f'add_enterprise error:{e}')
  110. def update_enterprise(dataList):
  111. try:
  112. delete_old_enterprise(dataList)
  113. create_enterprise_dataList(dataList)
  114. except Exception as e:
  115. current_app.logger.error(f'add_enterprise error:{e}')
  116. def delete_enterprise(dataList):
  117. try:
  118. query = """
  119. UNWIND $Ids AS id
  120. MATCH (n:enterprise) WHERE n.uniqueId = id
  121. DETACH DELETE n"""
  122. # 提取所有 id
  123. ids = [item['id'] for item in dataList]
  124. connect_graph.run(query, Ids=ids)
  125. except Exception as e:
  126. current_app.logger.error(f'delete_enterprise error:{e}')
  127. def delete_old_enterprise(dataList):
  128. try:
  129. # 先删除后新建
  130. query = """
  131. // 第一步:删除与enterprise节点相连的enterpriseLabel节点和关系
  132. WITH $Ids AS ids_list
  133. UNWIND ids_list AS Id
  134. MATCH (n:enterprise {uniqueId: Id})
  135. WITH n
  136. MATCH (n)-[r:connection]-(m:enterpriseLabel)
  137. WITH n, m, COUNT(r) AS connection_count, collect(r) AS relationships
  138. UNWIND relationships AS rel
  139. WITH n, m, connection_count, rel
  140. // 删除连接数为1的enterpriseLabel节点
  141. WHERE connection_count = 1
  142. DETACH DELETE m
  143. WITH n, rel
  144. // 删除连接数大于1的connection关系
  145. WHERE connection_count > 1
  146. DELETE rel
  147. // 第二步:删除指定的enterprise节点
  148. WITH $Ids AS ids_list
  149. UNWIND ids_list AS Id
  150. MATCH (n:enterprise {uniqueId: Id})
  151. DETACH DELETE n"""
  152. # 提取所有 id (批量删除多个)
  153. ids = [item['id'] for item in dataList]
  154. connect_graph.run(query, Ids=ids)
  155. except Exception as e:
  156. current_app.logger.error(f'delete_old_enterprise error:{e}')
  157. # 针对求职者的操作
  158. def operate_seeker(operate, dataList):
  159. try:
  160. if operate == 'sync_all':
  161. create_seeker_dataList(dataList)
  162. # 新增 修改基本信息
  163. elif operate == 'add':
  164. add_seeker_dataList(dataList)
  165. # part_seeker_dataList(dataList)
  166. # create_seeker_dataList(dataList) # 第一次同步大量数据
  167. elif operate == 'update':
  168. # 先删除后新建
  169. query = """
  170. // 第一步:删除与seeker节点相连的seekerLabel节点和关系
  171. WITH $Ids AS ids_list
  172. UNWIND ids_list AS Id
  173. MATCH (n:seeker {uniqueId: Id})
  174. WITH n
  175. MATCH (n)-[r:connection]-(m:seekerLabel)
  176. WITH n, m, COUNT(r) AS connection_count, collect(r) AS relationships
  177. UNWIND relationships AS rel
  178. WITH n, m, connection_count, rel
  179. // 删除连接数为1的seekerLabel节点
  180. WHERE connection_count = 1
  181. DETACH DELETE m
  182. WITH n, rel
  183. // 删除连接数大于1的connection关系
  184. WHERE connection_count > 1
  185. DELETE rel"""
  186. # 提取所有 id (批量删除多个)
  187. ids = [item['id'] for item in dataList]
  188. connect_graph.run(query, Ids=ids)
  189. cql = """
  190. // 第二步:删除指定的seeker节点
  191. WITH $Ids AS ids_list
  192. UNWIND ids_list AS Id
  193. MATCH (n:seeker {uniqueId: Id})
  194. DETACH DELETE n"""
  195. connect_graph.run(cql, Ids=ids)
  196. add_seeker_dataList(dataList)
  197. elif operate == 'delete':
  198. # 删除
  199. query = """
  200. // 第一步:删除与seeker节点相连的seekerLabel节点和关系
  201. WITH $Ids AS ids_list
  202. UNWIND ids_list AS Id
  203. MATCH (n:seeker {uniqueId: Id})
  204. WITH n
  205. MATCH (n)-[r:connection]-(m:seekerLabel)
  206. WITH n, m, COUNT(r) AS connection_count, collect(r) AS relationships
  207. UNWIND relationships AS rel
  208. WITH n, m, connection_count, rel
  209. // 删除连接数为1的seekerLabel节点
  210. WHERE connection_count = 1
  211. DETACH DELETE m
  212. WITH n, rel
  213. // 删除连接数大于1的connection关系
  214. WHERE connection_count > 1
  215. DELETE rel"""
  216. connect_graph.run(query, Id=dataList[0]['id'])
  217. cql = """
  218. // 第二步:删除指定的seeker节点
  219. WITH $Ids AS ids_list
  220. UNWIND ids_list AS Id
  221. MATCH (n:seeker {uniqueId: Id})
  222. DETACH DELETE n"""
  223. connect_graph.run(cql, Id=dataList[0]['id'])
  224. else:
  225. current_app.logger.error(f'operate_seeker: Invalid operation')
  226. return True
  227. except Exception as e:
  228. current_app.logger.error(f'operate_seeker error: {e}')