瀏覽代碼

es数据新增迭代数据替换法

DESKTOP-VAEGFGM\zqc 2 周之前
父節點
當前提交
3f89e157ab

+ 4 - 0
menduner/menduner-system-api/src/main/java/com/citu/module/menduner/system/enums/sync/SyncConstants.java

@@ -8,6 +8,10 @@ public class SyncConstants {
 
     // ========== 操作类型 ==========
     public final static String SYNC_ALL = "sync_all";
+
+    public final static String VERSION_250627_SYNC_ALL = "sync_all_250627";
+
+
     public final static String ADD = "add";
     public final static String UPDATE = "update";
     public final static String DELETE = "delete";

+ 0 - 1
menduner/menduner-system-biz/src/main/java/com/citu/module/menduner/system/dal/mysql/job/JobAdvertisedMapper.java

@@ -49,7 +49,6 @@ import java.util.Map;
 
 /**
  * 招聘职位 Mapper
- *
  * @author Rayson
  */
 @Mapper

+ 45 - 0
menduner/menduner-system-biz/src/main/java/com/citu/module/menduner/system/mq/consumer/ESConsumer.java

@@ -34,6 +34,7 @@ import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static com.citu.module.menduner.system.enums.sync.SyncConstants.*;
 
@@ -76,6 +77,12 @@ public class ESConsumer implements RocketMQListener<ESJobAdvertisedMergeSendMess
         log.info("接收到队列消息[{}]", message);
         try {
             switch (message.getOperate()) {
+
+                case VERSION_250627_SYNC_ALL:
+                    if(JOB.equals(message.getType())){
+                        version250627Sync();
+                    }
+                    break;
                 case SYNC_ALL:
                     if (JOB.equals(message.getType())) {
                         sync();
@@ -221,6 +228,44 @@ public class ESConsumer implements RocketMQListener<ESJobAdvertisedMergeSendMess
         // 企业删除
     }
 
+
+    public void version250627Sync() {
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start("ES 数据同步");
+        List<ESJobAdvertisedMergeDO> saveAll = new ArrayList<>(11);
+        List<Long> idList = jobAdvertisedMapper.selectIdList();
+        for (Long id : idList) {
+            JobAdvertisedDO job = jobAdvertisedMapper.selectById(id);
+            // 判断职位是否过期
+            if (checkJobExpireTimeAndEnable(job)) {
+                continue;
+            }
+            EnterpriseDO enterpriseDO = enterpriseMapper.selectById(job.getEnterpriseId());
+            if (null == enterpriseDO) {
+                log.error("找不到企业信息 {} ", job.getEnterpriseId());
+                continue;
+            }
+            ESJobAdvertisedMergeDO merge = build(job, enterpriseDO);
+            saveAll.add(merge);
+            if(saveAll.size()==10) {
+                repository.deleteAllById(saveAll.stream().map(ESJobAdvertisedMergeDO::getId).collect(Collectors.toList()));
+                repository.saveAll(saveAll);
+
+                saveAll.clear();
+            }
+
+            log.info("ES 保存数据成功 {}",merge);
+        }
+
+        if(!saveAll.isEmpty()){
+            repository.deleteAllById(saveAll.stream().map(ESJobAdvertisedMergeDO::getId).collect(Collectors.toList()));
+            repository.saveAll(saveAll);
+        }
+
+        stopWatch.stop();
+        stopWatch.prettyPrint();
+
+    }
     public void sync() {
         StopWatch stopWatch = new StopWatch();
         stopWatch.start("ES 数据同步");