|
@@ -160,47 +160,44 @@ def _normalize_talent_to_card_format(raw_profile: Dict[str, Any]) -> Dict[str, A
|
|
|
Returns:
|
|
|
Dict[str, Any]: 标准化后的名片格式数据
|
|
|
"""
|
|
|
- # 提取基本信息
|
|
|
- name_zh = raw_profile.get('name', raw_profile.get('name_zh', ''))
|
|
|
- company = raw_profile.get('company', raw_profile.get('hotel_zh', ''))
|
|
|
- position = raw_profile.get('position', raw_profile.get('title_zh', ''))
|
|
|
- mobile = raw_profile.get('phone', raw_profile.get('mobile', ''))
|
|
|
+ import json
|
|
|
+
|
|
|
+ # 从raw_profile中提取基本信息
|
|
|
+ name_zh = raw_profile.get('name_zh', '')
|
|
|
email = raw_profile.get('email', '')
|
|
|
- location = raw_profile.get('location', raw_profile.get('address_zh', ''))
|
|
|
+ mobile = raw_profile.get('mobile', '')
|
|
|
+ birthday = raw_profile.get('birthday', '')
|
|
|
+ age = raw_profile.get('age', '')
|
|
|
+ career_path = raw_profile.get('career_path', [])
|
|
|
+
|
|
|
+ # 从career_path中找到最后一个数组元素,提取hotel_zh和title_zh
|
|
|
+ hotel_zh = ''
|
|
|
+ title_zh = ''
|
|
|
+ if career_path and isinstance(career_path, list) and len(career_path) > 0:
|
|
|
+ last_career = career_path[-1]
|
|
|
+ if isinstance(last_career, dict):
|
|
|
+ hotel_zh = last_career.get('hotel_zh', '')
|
|
|
+ title_zh = last_career.get('title_zh', '')
|
|
|
|
|
|
- # 构建隶属关系
|
|
|
- affiliation = []
|
|
|
- if company:
|
|
|
- affiliation.append({
|
|
|
- "company": company,
|
|
|
- "group": raw_profile.get('group', '')
|
|
|
- })
|
|
|
+ # 从id和userId组合成JSON字符串
|
|
|
+ id_value = raw_profile.get('id', '')
|
|
|
+ userId_value = raw_profile.get('userId', '')
|
|
|
+ id_json = json.dumps({"id": id_value, "userId": userId_value}, ensure_ascii=False)
|
|
|
|
|
|
- # 构建职业轨迹
|
|
|
- career_path = []
|
|
|
- if position and company:
|
|
|
- career_path.append({
|
|
|
- "date": datetime.now().strftime('%Y-%m-%d'),
|
|
|
- "hotel_en": raw_profile.get('hotel_en', ''),
|
|
|
- "hotel_zh": company,
|
|
|
- "image_path": raw_profile.get('image_path', ''),
|
|
|
- "source": "menduner_data_creation",
|
|
|
- "title_en": raw_profile.get('title_en', ''),
|
|
|
- "title_zh": position
|
|
|
- })
|
|
|
+ # 直接使用原始career_path
|
|
|
|
|
|
# 按照任务解析结果.txt的data字段格式组装数据
|
|
|
normalized = {
|
|
|
"address_en": raw_profile.get('address_en', ''),
|
|
|
- "address_zh": location,
|
|
|
- "affiliation": affiliation,
|
|
|
- "age": raw_profile.get('age', 0),
|
|
|
- "birthday": raw_profile.get('birthday', ''),
|
|
|
+ "address_zh": raw_profile.get('address_zh', ''),
|
|
|
+ "affiliation": raw_profile.get('affiliation', []),
|
|
|
+ "age": age,
|
|
|
+ "birthday": birthday,
|
|
|
"brand_group": raw_profile.get('brand_group', ''),
|
|
|
"career_path": career_path,
|
|
|
"email": _normalize_email(email),
|
|
|
"hotel_en": raw_profile.get('hotel_en', ''),
|
|
|
- "hotel_zh": company,
|
|
|
+ "hotel_zh": hotel_zh,
|
|
|
"mobile": _normalize_phone(mobile),
|
|
|
"name_en": raw_profile.get('name_en', ''),
|
|
|
"name_zh": name_zh,
|
|
@@ -210,11 +207,11 @@ def _normalize_talent_to_card_format(raw_profile: Dict[str, Any]) -> Dict[str, A
|
|
|
"postal_code_zh": raw_profile.get('postal_code_zh', ''),
|
|
|
"residence": raw_profile.get('residence', ''),
|
|
|
"title_en": raw_profile.get('title_en', ''),
|
|
|
- "title_zh": position,
|
|
|
- "image_path": raw_profile.get('id', ''),
|
|
|
+ "title_zh": title_zh,
|
|
|
+ "image_path": id_json,
|
|
|
"origin_source": [{
|
|
|
"task_type": "招聘",
|
|
|
- "minio_path": raw_profile.get('id', ''),
|
|
|
+ "minio_path": id_json,
|
|
|
"source_date": datetime.now().strftime('%Y-%m-%d')
|
|
|
}]
|
|
|
}
|
|
@@ -377,28 +374,34 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]], task_id=None, t
|
|
|
批量处理门墩儿人才数据
|
|
|
|
|
|
Args:
|
|
|
- data_list (List[Dict[str, Any]]): 待处理的人才数据列表
|
|
|
- task_id (str, optional): 任务ID
|
|
|
+ data_list (List[Dict[str, Any]]): 待处理的人才数据列表(已废弃,现在从数据库读取)
|
|
|
+ task_id (str, optional): 任务ID,用于从数据库读取task_source
|
|
|
task_type (str, optional): 任务类型
|
|
|
|
|
|
Returns:
|
|
|
Dict[str, Any]: 批量处理结果,格式与parse_result保持一致
|
|
|
"""
|
|
|
try:
|
|
|
- # 验证参数
|
|
|
- if not data_list or not isinstance(data_list, list):
|
|
|
+ # 根据task_id从parse_task_repository表读取记录
|
|
|
+ if not task_id:
|
|
|
return {
|
|
|
"processed_time": datetime.now().isoformat(),
|
|
|
"results": [],
|
|
|
"summary": {
|
|
|
- "failed_count": len(data_list) if data_list else 0,
|
|
|
+ "failed_count": 0,
|
|
|
"success_count": 0,
|
|
|
"success_rate": 0,
|
|
|
- "total_files": len(data_list) if data_list else 0
|
|
|
- }
|
|
|
+ "total_files": 0
|
|
|
+ },
|
|
|
+ "error": "缺少task_id参数"
|
|
|
}
|
|
|
|
|
|
- if len(data_list) == 0:
|
|
|
+ # 导入数据库模型
|
|
|
+ from app.core.data_parse.parse_system import ParseTaskRepository, db
|
|
|
+
|
|
|
+ # 查询对应的任务记录
|
|
|
+ task_record = ParseTaskRepository.query.get(task_id)
|
|
|
+ if not task_record:
|
|
|
return {
|
|
|
"processed_time": datetime.now().isoformat(),
|
|
|
"results": [],
|
|
@@ -407,7 +410,23 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]], task_id=None, t
|
|
|
"success_count": 0,
|
|
|
"success_rate": 0,
|
|
|
"total_files": 0
|
|
|
- }
|
|
|
+ },
|
|
|
+ "error": f"未找到task_id为{task_id}的任务记录"
|
|
|
+ }
|
|
|
+
|
|
|
+ # 获取task_source作为需要处理的数据列表
|
|
|
+ task_source = task_record.task_source
|
|
|
+ if not task_source or not isinstance(task_source, list):
|
|
|
+ return {
|
|
|
+ "processed_time": datetime.now().isoformat(),
|
|
|
+ "results": [],
|
|
|
+ "summary": {
|
|
|
+ "failed_count": 0,
|
|
|
+ "success_count": 0,
|
|
|
+ "success_rate": 0,
|
|
|
+ "total_files": 0
|
|
|
+ },
|
|
|
+ "error": "task_source为空或格式不正确"
|
|
|
}
|
|
|
|
|
|
results = []
|
|
@@ -415,12 +434,12 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]], task_id=None, t
|
|
|
failed_count = 0
|
|
|
parsed_record_ids = [] # 收集成功解析的记录ID
|
|
|
|
|
|
- logging.info(f"开始批量处理门墩儿人才数据,共 {len(data_list)} 条记录")
|
|
|
+ logging.info(f"开始批量处理门墩儿人才数据,共 {len(task_source)} 条记录")
|
|
|
|
|
|
# 逐一处理每条数据
|
|
|
- for i, data in enumerate(data_list):
|
|
|
+ for i, data in enumerate(task_source):
|
|
|
try:
|
|
|
- logging.debug(f"处理第 {i+1}/{len(data_list)} 条数据")
|
|
|
+ logging.debug(f"处理第 {i+1}/{len(task_source)} 条数据")
|
|
|
|
|
|
# 标准化数据为名片格式
|
|
|
normalized = _normalize_talent_to_card_format(data)
|
|
@@ -439,10 +458,23 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]], task_id=None, t
|
|
|
if parsed_record and 'id' in parsed_record:
|
|
|
parsed_record_ids.append(str(parsed_record['id']))
|
|
|
logging.info(f"成功记录人才信息到parsed_talents表: {normalized.get('name_zh', '')}")
|
|
|
+
|
|
|
+ # 更新task_source中对应记录的parse_flag和status
|
|
|
+ if isinstance(data, dict):
|
|
|
+ data['parse_flag'] = 0
|
|
|
+ data['status'] = '解析成功'
|
|
|
else:
|
|
|
logging.warning(f"记录人才信息失败: {record_result.get('message', '')}")
|
|
|
+ # 更新task_source中对应记录的parse_flag和status
|
|
|
+ if isinstance(data, dict):
|
|
|
+ data['parse_flag'] = 1
|
|
|
+ data['status'] = '解析失败'
|
|
|
except Exception as record_error:
|
|
|
logging.error(f"调用record_parsed_talent函数失败: {str(record_error)}")
|
|
|
+ # 更新task_source中对应记录的parse_flag和status
|
|
|
+ if isinstance(data, dict):
|
|
|
+ data['parse_flag'] = 1
|
|
|
+ data['status'] = '解析失败'
|
|
|
|
|
|
success_count += 1
|
|
|
results.append({
|
|
@@ -459,6 +491,12 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]], task_id=None, t
|
|
|
else:
|
|
|
failed_count += 1
|
|
|
error_messages = validation.get('errors', ['验证失败'])
|
|
|
+
|
|
|
+ # 更新task_source中对应记录的parse_flag和status
|
|
|
+ if isinstance(data, dict):
|
|
|
+ data['parse_flag'] = 1
|
|
|
+ data['status'] = '解析失败'
|
|
|
+
|
|
|
results.append({
|
|
|
"data": None,
|
|
|
"error": '; '.join(error_messages),
|
|
@@ -475,6 +513,12 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]], task_id=None, t
|
|
|
failed_count += 1
|
|
|
error_msg = f"处理门墩儿数据失败: {str(item_error)}"
|
|
|
logging.error(error_msg, exc_info=True)
|
|
|
+
|
|
|
+ # 更新task_source中对应记录的parse_flag和status
|
|
|
+ if isinstance(data, dict):
|
|
|
+ data['parse_flag'] = 1
|
|
|
+ data['status'] = '解析失败'
|
|
|
+
|
|
|
results.append({
|
|
|
"data": None,
|
|
|
"error": error_msg,
|
|
@@ -486,6 +530,31 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]], task_id=None, t
|
|
|
"success": False
|
|
|
})
|
|
|
|
|
|
+ # 根据处理结果更新task_status
|
|
|
+ if failed_count == 0:
|
|
|
+ task_status = '解析成功'
|
|
|
+ elif success_count == 0:
|
|
|
+ task_status = '解析失败'
|
|
|
+ else:
|
|
|
+ task_status = '部分解析成功'
|
|
|
+
|
|
|
+ # 所有task_source记录处理完成后,将更新后的task_source和task_status保存到数据库
|
|
|
+ try:
|
|
|
+ task_record.task_source = task_source
|
|
|
+ task_record.task_status = task_status
|
|
|
+ task_record.parse_count = success_count
|
|
|
+ task_record.parse_result = {
|
|
|
+ 'success_count': success_count,
|
|
|
+ 'failed_count': failed_count,
|
|
|
+ 'parsed_record_ids': parsed_record_ids,
|
|
|
+ 'processed_time': datetime.now().isoformat()
|
|
|
+ }
|
|
|
+ db.session.commit()
|
|
|
+ logging.info(f"成功更新task_id为{task_id}的任务记录,task_status={task_status},处理成功{success_count}条,失败{failed_count}条")
|
|
|
+ except Exception as update_error:
|
|
|
+ logging.error(f"更新任务记录失败: {str(update_error)}")
|
|
|
+ db.session.rollback()
|
|
|
+
|
|
|
# 组装最终结果
|
|
|
if failed_count == 0:
|
|
|
return {
|