|
@@ -1,5 +1,6 @@
|
|
|
package com.citu.module.menduner.system.mq.consumer;
|
|
|
|
|
|
+import cn.hutool.core.collection.CollUtil;
|
|
|
import com.citu.module.menduner.system.convert.JobAdvertisedConvert;
|
|
|
import com.citu.module.menduner.system.dal.dataobject.enterprise.EnterpriseDO;
|
|
|
import com.citu.module.menduner.system.dal.dataobject.job.JobAdvertisedDO;
|
|
@@ -9,19 +10,32 @@ import com.citu.module.menduner.system.dal.mysql.job.JobAdvertisedMapper;
|
|
|
import com.citu.module.menduner.system.dal.repository.ESJobAdvertisedMergeRepository;
|
|
|
import com.citu.module.menduner.system.mq.message.ESJobAdvertisedMergeSendMessage;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.springframework.context.event.EventListener;
|
|
|
-import org.springframework.scheduling.annotation.Async;
|
|
|
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
+import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
+import org.springframework.context.ApplicationContext;
|
|
|
+import org.springframework.data.elasticsearch.annotations.Document;
|
|
|
+import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
|
|
|
+import org.springframework.data.elasticsearch.core.IndexOperations;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.StopWatch;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
-import java.util.Optional;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
|
|
|
/**
|
|
|
* es 消费
|
|
|
+ * 不需要强一致性
|
|
|
+ * 通过 双写 + 数据定时同步补偿 来保证一致性
|
|
|
**/
|
|
|
@Slf4j
|
|
|
@Component
|
|
|
-public class ESConsumer {
|
|
|
+@RocketMQMessageListener(
|
|
|
+ topic = ESJobAdvertisedMergeSendMessage.TOPIC,
|
|
|
+ consumerGroup = ESJobAdvertisedMergeSendMessage.TOPIC + "_CONSUMER"
|
|
|
+)
|
|
|
+public class ESConsumer implements RocketMQListener<ESJobAdvertisedMergeSendMessage> {
|
|
|
|
|
|
@Resource
|
|
|
private ESJobAdvertisedMergeRepository repository;
|
|
@@ -32,23 +46,27 @@ public class ESConsumer {
|
|
|
@Resource
|
|
|
private JobAdvertisedMapper jobAdvertisedMapper;
|
|
|
|
|
|
+ @Resource
|
|
|
+ private ElasticsearchRestTemplate template;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private ApplicationContext applicationContext;
|
|
|
|
|
|
- @Async
|
|
|
- @EventListener
|
|
|
- public synchronized void onMessage(ESJobAdvertisedMergeSendMessage message) {
|
|
|
|
|
|
+ @Override
|
|
|
+ public void onMessage(ESJobAdvertisedMergeSendMessage message) {
|
|
|
switch (message.getOperate()) {
|
|
|
case SYNC_ALL:
|
|
|
+ sync();
|
|
|
break;
|
|
|
case ADD:
|
|
|
-// repository.save(message.getData());
|
|
|
+ jobAdd(message.getId());
|
|
|
break;
|
|
|
case UPDATE:
|
|
|
-// repository.save(message.getData());
|
|
|
jobUpdate(message.getId());
|
|
|
break;
|
|
|
case DELETE:
|
|
|
-// repository.deleteById(message.getId());
|
|
|
+ jobDel(message.getId());
|
|
|
break;
|
|
|
case ENTERPRISE_ADD:
|
|
|
// repository.save(message.getData());
|
|
@@ -62,29 +80,117 @@ public class ESConsumer {
|
|
|
default:
|
|
|
break;
|
|
|
}
|
|
|
- // 删除
|
|
|
- Optional<ESJobAdvertisedMergeDO> mergeDO = repository.findById(message.getId());
|
|
|
+
|
|
|
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 招聘职位的修改
|
|
|
+ *
|
|
|
+ * @param id 招聘职位id
|
|
|
+ **/
|
|
|
public void jobUpdate(Long id) {
|
|
|
- Optional<ESJobAdvertisedMergeDO> mergeOptional = repository.findById(id);
|
|
|
- if (!mergeOptional.isPresent()) {
|
|
|
- log.error("ES找不到招聘职位信息 {} ", id);
|
|
|
+ // Optional<ESJobAdvertisedMergeDO> mergeOptional = repository.findById(id);
|
|
|
+ // if (!mergeOptional.isPresent()) {
|
|
|
+ // log.error("ES找不到招聘职位信息 {} ", id);
|
|
|
+ // return;
|
|
|
+ // }
|
|
|
+ // ESJobAdvertisedMergeDO merge = mergeOptional.get();
|
|
|
+ JobAdvertisedDO job = jobAdvertisedMapper.selectById(id);
|
|
|
+ if (null == job) {
|
|
|
+ log.error("找不到招聘职位信息 {} ", id);
|
|
|
return;
|
|
|
}
|
|
|
- ESJobAdvertisedMergeDO merge = mergeOptional.get();
|
|
|
+ EnterpriseDO enterpriseDO = enterpriseMapper.selectById(job.getEnterpriseId());
|
|
|
+
|
|
|
+ repository.save(build(job, enterpriseDO));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 招聘职位的新增
|
|
|
+ *
|
|
|
+ * @param id 招聘职位id
|
|
|
+ **/
|
|
|
+ public void jobAdd(Long id) {
|
|
|
JobAdvertisedDO job = jobAdvertisedMapper.selectById(id);
|
|
|
if (null == job) {
|
|
|
log.error("找不到招聘职位信息 {} ", id);
|
|
|
return;
|
|
|
}
|
|
|
EnterpriseDO enterpriseDO = enterpriseMapper.selectById(job.getEnterpriseId());
|
|
|
-
|
|
|
repository.save(build(job, enterpriseDO));
|
|
|
+ }
|
|
|
|
|
|
+ /**
|
|
|
+ * 招聘职位的删除
|
|
|
+ *
|
|
|
+ * @param id 招聘职位的id
|
|
|
+ **/
|
|
|
+ public void jobDel(Long id) {
|
|
|
+ repository.deleteById(id);
|
|
|
}
|
|
|
|
|
|
+ public void sync() {
|
|
|
+ StopWatch stopWatch = new StopWatch();
|
|
|
+ stopWatch.start("ES 数据同步");
|
|
|
+ // 清空数据
|
|
|
+ repository.deleteAll();
|
|
|
+ List<Class<?>> documentClasses = getDocumentAnnotatedClasses();
|
|
|
+ if (CollUtil.isEmpty(documentClasses)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 删除索引
|
|
|
+ deleteAllIndexes(documentClasses);
|
|
|
+ // 新增索引
|
|
|
+ recreateIndexes(documentClasses);
|
|
|
+
|
|
|
+ List<JobAdvertisedDO> jobAdvertisedList = jobAdvertisedMapper.selectList();
|
|
|
+ for (JobAdvertisedDO job : jobAdvertisedList) {
|
|
|
+ EnterpriseDO enterpriseDO = enterpriseMapper.selectById(job.getEnterpriseId());
|
|
|
+ repository.save(build(job, enterpriseDO));
|
|
|
+ }
|
|
|
+ stopWatch.stop();
|
|
|
+ stopWatch.prettyPrint();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取带有 @Document 注解的实体类
|
|
|
+ */
|
|
|
+ public List<Class<?>> getDocumentAnnotatedClasses() {
|
|
|
+ Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(Document.class);
|
|
|
+ List<Class<?>> documentClasses = new ArrayList<>();
|
|
|
+ for (Object bean : beansWithAnnotation.values()) {
|
|
|
+ documentClasses.add(bean.getClass());
|
|
|
+ }
|
|
|
+ return documentClasses;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 清空索引
|
|
|
+ **/
|
|
|
+ public void deleteAllIndexes(List<Class<?>> documentClasses) {
|
|
|
+
|
|
|
+ for (Class<?> indexClass : documentClasses) {
|
|
|
+ boolean exists = template.indexOps(indexClass).exists();
|
|
|
+ if (exists) {
|
|
|
+ template.indexOps(indexClass).delete();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 重新增加索引
|
|
|
+ **/
|
|
|
+ public void recreateIndexes(List<Class<?>> documentClasses) {
|
|
|
+
|
|
|
+ for (Class<?> indexClass : documentClasses) {
|
|
|
+ IndexOperations indexOps = template.indexOps(indexClass);
|
|
|
+ indexOps.create();
|
|
|
+ indexOps.putMapping(indexOps.createMapping());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
public ESJobAdvertisedMergeDO build(JobAdvertisedDO job, EnterpriseDO enterpriseDO) {
|
|
|
|
|
|
ESJobAdvertisedMergeDO mergeDO = JobAdvertisedConvert.INSTANCE.convert2(job);
|