operate_graph.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. from configs.connections import connect_graph
  2. from functions.solve_graph import create_job_dataList, create_enterprise_dataList, create_seeker_dataList, \
  3. add_seeker_dataList
  4. import logging
  5. from flask import current_app
  6. logger = logging.getLogger(__name__)
  7. # 图谱节点类型:job(招聘职位) enterprise(企业) person-seeker(求职者) 给他们打标签建立关系 label
  8. # 针对招聘职位的操作
  9. def operate_job(operate, dataList):
  10. try:
  11. if operate == 'sync_all':
  12. sync_all_jobs(dataList)
  13. elif operate == 'add':
  14. add_jobs(dataList)
  15. elif operate == 'update':
  16. update_jobs(dataList)
  17. elif operate == 'delete':
  18. delete_jobs(dataList)
  19. else:
  20. current_app.logger.error(f'operate_job: Invalid operation')
  21. return True
  22. except Exception as e:
  23. current_app.logger.error(f'operate_job error: {e}')
  24. def sync_all_jobs(dataList):
  25. try:
  26. # 创建新的job数据
  27. create_job_dataList(dataList)
  28. except Exception as e:
  29. current_app.logger.error(f'sync_all_jobs error: {e}')
  30. def add_jobs(dataList):
  31. try:
  32. create_job_dataList(dataList)
  33. except Exception as e:
  34. current_app.logger.error(f'add_jobs error: {e}')
  35. def update_jobs(dataList):
  36. try:
  37. # 删除旧的job和jobLabel节点及关系
  38. delete_old_jobs(dataList)
  39. # 创建新的job数据
  40. create_job_dataList(dataList)
  41. except Exception as e:
  42. current_app.logger.error(f'update_jobs error: {e}')
  43. def delete_jobs(dataList):
  44. try:
  45. # 删除指定的job节点
  46. query = """
  47. UNWIND $Ids AS id
  48. MATCH (n:job {uniqueId: id})
  49. DETACH DELETE n
  50. """
  51. ids = [item['id'] for item in dataList]
  52. connect_graph.run(query, Ids=ids)
  53. except Exception as e:
  54. current_app.logger.error(f'delete_jobs error: {e}')
  55. def delete_old_jobs(dataList):
  56. try:
  57. # 删除与job节点相连的jobLabel节点和关系
  58. query = """
  59. WITH $Ids AS ids_list
  60. UNWIND ids_list AS Id
  61. MATCH (n:job {uniqueId: Id})
  62. WITH n
  63. MATCH (n)-[r:connection]-(m:jobLabel)
  64. WITH n, m, COUNT(r) AS connection_count, collect(r) AS relationships
  65. UNWIND relationships AS rel
  66. WITH n, m, connection_count, rel
  67. // 删除连接数为1的jobLabel节点
  68. WHERE connection_count = 1
  69. DETACH DELETE m
  70. WITH n, rel
  71. // 删除连接数大于1的connection关系
  72. WHERE connection_count > 1
  73. DELETE rel"""
  74. ids = [item['id'] for item in dataList]
  75. connect_graph.run(query, Ids=ids)
  76. cql ="""// 第二步:删除指定的job节点
  77. WITH $Ids AS ids_list
  78. UNWIND ids_list AS Id
  79. MATCH (n:job {uniqueId: Id})
  80. DETACH DELETE n
  81. """
  82. connect_graph.run(cql, Ids=ids)
  83. except Exception as e:
  84. current_app.logger.error(f'delete_old_jobs error: {e}')
  85. # 企业
  86. def operate_enterprise(operate, dataList):
  87. try:
  88. if operate == 'sync_all':
  89. sync_all_enterprise(dataList)
  90. elif operate == 'add':
  91. add_enterprise(dataList)
  92. elif operate == 'update':
  93. update_enterprise(dataList)
  94. elif operate == 'delete':
  95. delete_enterprise(dataList)
  96. else:
  97. current_app.logger.error(f'operate_enterprise: Invalid operation')
  98. return True
  99. except Exception as e:
  100. current_app.logger.error(f'operate_enterprise error: {e}')
  101. def sync_all_enterprise(dataList):
  102. try:
  103. create_enterprise_dataList(dataList)
  104. except Exception as e:
  105. current_app.logger.error(f'sync_all_enterprise error:{e}')
  106. def add_enterprise(dataList):
  107. try:
  108. create_enterprise_dataList(dataList)
  109. except Exception as e:
  110. current_app.logger.error(f'add_enterprise error:{e}')
  111. def update_enterprise(dataList):
  112. try:
  113. delete_old_enterprise(dataList)
  114. create_enterprise_dataList(dataList)
  115. except Exception as e:
  116. current_app.logger.error(f'add_enterprise error:{e}')
  117. def delete_enterprise(dataList):
  118. try:
  119. query = """
  120. UNWIND $Ids AS id
  121. MATCH (n:enterprise) WHERE n.uniqueId = id
  122. DETACH DELETE n"""
  123. # 提取所有 id
  124. ids = [item['id'] for item in dataList]
  125. connect_graph.run(query, Ids=ids)
  126. except Exception as e:
  127. current_app.logger.error(f'delete_enterprise error:{e}')
  128. def delete_old_enterprise(dataList):
  129. try:
  130. # 先删除后新建
  131. query = """
  132. // 第一步:删除与enterprise节点相连的enterpriseLabel节点和关系
  133. WITH $Ids AS ids_list
  134. UNWIND ids_list AS Id
  135. MATCH (n:enterprise {uniqueId: Id})
  136. WITH n
  137. MATCH (n)-[r:connection]-(m:enterpriseLabel)
  138. WITH n, m, COUNT(r) AS connection_count, collect(r) AS relationships
  139. UNWIND relationships AS rel
  140. WITH n, m, connection_count, rel
  141. // 删除连接数为1的enterpriseLabel节点
  142. WHERE connection_count = 1
  143. DETACH DELETE m
  144. WITH n, rel
  145. // 删除连接数大于1的connection关系
  146. WHERE connection_count > 1
  147. DELETE rel"""
  148. # 提取所有 id (批量删除多个)
  149. ids = [item['id'] for item in dataList]
  150. connect_graph.run(query, Ids=ids)
  151. cql ="""// 第二步:删除指定的enterprise节点
  152. WITH $Ids AS ids_list
  153. UNWIND ids_list AS Id
  154. MATCH (n:enterprise {uniqueId: Id})
  155. DETACH DELETE n"""
  156. connect_graph.run(cql, Ids=ids)
  157. except Exception as e:
  158. current_app.logger.error(f'delete_old_enterprise error:{e}')
  159. # 针对求职者的操作
  160. def operate_seeker(operate, dataList):
  161. try:
  162. if operate == 'sync_all':
  163. create_seeker_dataList(dataList)
  164. # 新增 修改基本信息 (单个新增)
  165. elif operate == 'add':
  166. add_seeker_dataList(dataList)
  167. # part_seeker_dataList(dataList)
  168. # create_seeker_dataList(dataList) # 第一次同步大量数据
  169. elif operate == 'update':
  170. # 先删除后新建
  171. query = """
  172. // 第一步:删除与seeker节点相连的seekerLabel节点和关系
  173. WITH $Ids AS ids_list
  174. UNWIND ids_list AS Id
  175. MATCH (n:seeker {uniqueId: Id})
  176. WITH n
  177. MATCH (n)-[r:connection]-(m:seekerLabel)
  178. WITH n, m, COUNT(r) AS connection_count, collect(r) AS relationships
  179. UNWIND relationships AS rel
  180. WITH n, m, connection_count, rel
  181. // 删除连接数为1的seekerLabel节点
  182. WHERE connection_count = 1
  183. DETACH DELETE m
  184. WITH n, rel
  185. // 删除连接数大于1的connection关系
  186. WHERE connection_count > 1
  187. DELETE rel"""
  188. # 提取所有 id (批量删除多个)
  189. ids = [item['id'] for item in dataList]
  190. connect_graph.run(query, Ids=ids)
  191. cql = """
  192. // 第二步:删除指定的seeker节点
  193. WITH $Ids AS ids_list
  194. UNWIND ids_list AS Id
  195. MATCH (n:seeker {uniqueId: Id})
  196. DETACH DELETE n"""
  197. connect_graph.run(cql, Ids=ids)
  198. add_seeker_dataList(dataList)
  199. elif operate == 'delete':
  200. # 删除
  201. query = """
  202. // 第一步:删除与seeker节点相连的seekerLabel节点和关系
  203. WITH $Ids AS ids_list
  204. UNWIND ids_list AS Id
  205. MATCH (n:seeker {uniqueId: Id})
  206. WITH n
  207. MATCH (n)-[r:connection]-(m:seekerLabel)
  208. WITH n, m, COUNT(r) AS connection_count, collect(r) AS relationships
  209. UNWIND relationships AS rel
  210. WITH n, m, connection_count, rel
  211. // 删除连接数为1的seekerLabel节点
  212. WHERE connection_count = 1
  213. DETACH DELETE m
  214. WITH n, rel
  215. // 删除连接数大于1的connection关系
  216. WHERE connection_count > 1
  217. DELETE rel"""
  218. connect_graph.run(query, Id=dataList[0]['id'])
  219. cql = """
  220. // 第二步:删除指定的seeker节点
  221. WITH $Ids AS ids_list
  222. UNWIND ids_list AS Id
  223. MATCH (n:seeker {uniqueId: Id})
  224. DETACH DELETE n"""
  225. connect_graph.run(cql, Id=dataList[0]['id'])
  226. else:
  227. current_app.logger.error(f'operate_seeker: Invalid operation')
  228. current_app.logger.error(f'operate_seeker:', dataList)
  229. return True
  230. except Exception as e:
  231. current_app.logger.error(f'operate_seeker error: {e}')