Procházet zdrojové kódy

修复去重处理的逻辑,不删除记录,设置inactive状态

maxiaolong před 2 týdny
rodič
revize
f82b2e1cad

+ 38 - 50
app/api/data_parse/routes.py

@@ -1113,6 +1113,18 @@ def process_duplicate_record_route(duplicate_id):
     
     返回:
         - JSON: 包含处理结果和状态信息
+        
+    返回格式:
+        {
+            'success': true/false,
+            'message': '处理结果描述'
+        }
+        
+    功能说明:
+        - 接收包含人才数据的请求体
+        - 严格按照样例格式处理 results 数组中的人才数据
+        - 调用 add_single_talent 函数将人才信息写入 business_cards 表
+        - 提供详细的处理统计和结果追踪
     """
     try:
         # 获取请求数据
@@ -1174,7 +1186,10 @@ def process_duplicate_record_route(duplicate_id):
         else:
             status_code = 500  # Internal Server Error
         
-        return jsonify(result), status_code
+        return jsonify({
+            'success': result['success'],
+            'message': result['message']
+        }), status_code
         
     except Exception as e:
         # 处理未预期的异常
@@ -1463,7 +1478,13 @@ def add_parse_task_route():
         - publish_time: 发布时间 (form-data字段,新任命类型必填)
         
     返回:
-        - JSON: 包含任务创建结果和上传摘要
+        - JSON: 包含任务创建结果和状态信息
+        
+    返回格式:
+        {
+            'success': true/false,
+            'message': '处理结果描述'
+        }
         
     功能说明:
         - 根据任务类型处理不同格式的文件
@@ -1489,15 +1510,13 @@ def add_parse_task_route():
         if not task_type:
             return jsonify({
                 'success': False,
-                'message': '缺少task_type参数',
-                'data': None
+                'message': '缺少task_type参数'
             }), 400
         
         if task_type not in ['名片', '简历', '新任命', '招聘', '杂项']:
             return jsonify({
                 'success': False,
-                'message': 'task_type参数必须是以下值之一:名片、简历、新任命、招聘、杂项',
-                'data': None
+                'message': 'task_type参数必须是以下值之一:名片、简历、新任命、招聘、杂项'
             }), 400
         
         # 获取创建者信息(可选参数)
@@ -1513,16 +1532,14 @@ def add_parse_task_route():
             if 'files' in request.files and request.files.getlist('files'):
                 return jsonify({
                     'success': False,
-                    'message': '招聘类型任务不需要上传文件',
-                    'data': None
+                    'message': '招聘类型任务不需要上传文件'
                 }), 400
             
             # 检查data参数是否有内容
             if not data:
                 return jsonify({
                     'success': False,
-                    'message': '招聘类型任务需要提供data参数',
-                    'data': None
+                    'message': '招聘类型任务需要提供data参数'
                 }), 400
             
             # 记录请求日志
@@ -1552,8 +1569,7 @@ def add_parse_task_route():
             if 'files' not in request.files:
                 return jsonify({
                     'success': False,
-                    'message': f'{task_type}任务需要上传文件,请使用files字段上传文件',
-                    'data': None
+                    'message': f'{task_type}任务需要上传文件,请使用files字段上传文件'
                 }), 400
             
             # 获取上传的文件列表
@@ -1563,8 +1579,7 @@ def add_parse_task_route():
             if not uploaded_files or len(uploaded_files) == 0:
                 return jsonify({
                     'success': False,
-                    'message': '文件数组不能为空',
-                    'data': None
+                    'message': '文件数组不能为空'
                 }), 400
             
             # 验证所有文件
@@ -1574,8 +1589,7 @@ def add_parse_task_route():
                 if not file or file.filename == '':
                     return jsonify({
                         'success': False,
-                        'message': f'第{i+1}个文件为空或未选择',
-                        'data': None
+                        'message': f'第{i+1}个文件为空或未选择'
                     }), 400
                 
                 valid_files.append(file)
@@ -1585,8 +1599,7 @@ def add_parse_task_route():
                 if not publish_time:
                     return jsonify({
                         'success': False,
-                        'message': '新任命类型任务需要提供publish_time参数',
-                        'data': None
+                        'message': '新任命类型任务需要提供publish_time参数'
                     }), 400
             
             # 记录请求日志
@@ -1612,8 +1625,7 @@ def add_parse_task_route():
         # 返回结果
         return jsonify({
             'success': result['success'],
-            'message': result['message'],
-            'data': result['data']
+            'message': result['message']
         }), status_code
         
     except Exception as e:
@@ -1624,8 +1636,7 @@ def add_parse_task_route():
         # 返回错误响应
         return jsonify({
             'success': False,
-            'message': error_msg,
-            'data': None
+            'message': error_msg
         }), 500
 
 
@@ -2009,7 +2020,6 @@ def add_parsed_talents_route():
         - 接收包含人才数据的请求体
         - 严格按照样例格式处理 results 数组中的人才数据
         - 调用 add_single_talent 函数将人才信息写入 business_cards 表
-        - 成功处理后,更新对应任务记录状态为"已入库"
         - 提供详细的处理统计和结果追踪
         
     状态码:
@@ -2023,8 +2033,7 @@ def add_parsed_talents_route():
         if not request.is_json:
             return jsonify({
                 'success': False,
-                'message': '请求必须是 JSON 格式',
-                'data': None
+                'message': '请求必须是 JSON 格式'
             }), 400
         
         # 获取请求数据
@@ -2034,16 +2043,14 @@ def add_parsed_talents_route():
         if not api_response_data:
             return jsonify({
                 'success': False,
-                'message': '请求数据不能为空',
-                'data': None
+                'message': '请求数据不能为空'
             }), 400
         
         # 验证数据格式
         if not isinstance(api_response_data, dict):
             return jsonify({
                 'success': False,
-                'message': '请求数据必须是JSON对象格式',
-                'data': None
+                'message': '请求数据必须是JSON对象格式'
             }), 400
         
         # 记录请求日志
@@ -2076,31 +2083,13 @@ def add_parsed_talents_route():
             success_count = data_summary.get('success_count', 0)
             failed_count = data_summary.get('failed_count', 0)
             logger.info(f"处理人才数据完成: 成功 {success_count} 条,失败 {failed_count} 条")
-            
-            # 更新任务状态为"已入库"
-            task_id = api_response_data.get('task_id')
-            if task_id:
-                try:
-                    from app.core.data_parse.parse_system import db, ParseTaskRepository
-                    task_obj = ParseTaskRepository.query.filter_by(id=task_id).first()
-                    if task_obj:
-                        task_obj.task_status = '已入库'
-                        db.session.commit()
-                        logger.info(f"已更新解析任务记录: id={task_id}, 状态=已入库")
-                    else:
-                        logger.warning(f"未找到 ID 为 {task_id} 的任务记录")
-                except Exception as update_error:
-                    logger.error(f"更新任务状态失败: {str(update_error)}", exc_info=True)
-            else:
-                logger.info("请求中未包含 task_id,跳过任务状态更新")
         else:
             logger.error(f"处理人才数据失败: {result.get('message', '未知错误')}")
         
         # 返回结果
         return jsonify({
             'success': result.get('success', False),
-            'message': result.get('message', '处理完成'),
-            'data': result.get('data')
+            'message': result.get('message', '处理完成')
         }), status_code
         
     except Exception as e:
@@ -2111,8 +2100,7 @@ def add_parsed_talents_route():
         # 返回错误响应
         return jsonify({
             'success': False,
-            'message': error_msg,
-            'data': None
+            'message': error_msg
         }), 500
 
 

+ 48 - 33
app/core/data_parse/parse_system.py

@@ -1022,8 +1022,9 @@ def process_duplicate_record(duplicate_id, action, selected_duplicate_id=None, p
             }
             target_card.career_path = update_career_path(target_card, new_data)
             
-            db.session.delete(duplicate_record)
-            db.session.delete(main_card)
+            # 将主记录状态设置为inactive,而不是删除
+            main_card.status = 'inactive'
+            main_card.updated_by = processed_by or 'system'
             
             result_data = target_card.to_dict()
             
@@ -1033,11 +1034,11 @@ def process_duplicate_record(duplicate_id, action, selected_duplicate_id=None, p
         elif action == 'ignore':
             result_data = main_card.to_dict()
         
-        if action != 'merge_to_suspected':
-            duplicate_record.processing_status = 'processed'
-            duplicate_record.processed_at = datetime.now()
-            duplicate_record.processed_by = processed_by or 'system'
-            duplicate_record.processing_notes = notes or f'执行操作: {action}'
+        # 所有操作都更新duplicate_record的状态为processed
+        duplicate_record.processing_status = 'processed'
+        duplicate_record.processed_at = datetime.now()
+        duplicate_record.processed_by = processed_by or 'system'
+        duplicate_record.processing_notes = notes or f'执行操作: {action}'
         
         db.session.commit()
         
@@ -1046,7 +1047,7 @@ def process_duplicate_record(duplicate_id, action, selected_duplicate_id=None, p
             'success': True,
             'message': f'重复记录处理成功,操作: {action}',
             'data': {
-                'duplicate_record': duplicate_record.to_dict() if action != 'merge_to_suspected' else None,
+                'duplicate_record': duplicate_record.to_dict(),
                 'result': result_data
             }
         }
@@ -1662,7 +1663,7 @@ def query_neo4j_graph(query_requirement):
         1. 只输出有效的Cypher查询语句,不要包含任何解释或注释
         2. 确保return语句中包含talent节点属性
         3. 尽量利用图数据库的特性来优化查询效率
-        4. 使用WITH子句和COLLECT函数收集标签,确保查询到同时拥有所有标签的人才
+        4. 使用WITH子句和COLLECT函数收集标签,确保查询到至少拥有一个标签的人才
         
         注意:请直接返回Cypher查询语句,无需任何其他文本。
         
@@ -1673,7 +1674,7 @@ def query_neo4j_graph(query_requirement):
         MATCH (t:Talent)-[:BELONGS_TO]->(dl:DataLabel)  
         WHERE dl.name IN ['五星级酒店', '新开酒店经验', '总经理']  
         WITH t, COLLECT(DISTINCT dl.name) AS labels  
-        WHERE size(labels) = 3  
+        WHERE size(labels) >= 1  
         RETURN t.pg_id as pg_id, t.name_zh as name_zh, t.name_en as name_en, t.mobile as mobile, t.email as email, t.updated_at as updated_at
         """
         
@@ -2168,8 +2169,7 @@ def record_parsed_talents(result):
             return {
                 'code': 400,
                 'success': False,
-                'message': '解析任务未成功,无法记录人才数据',
-                'data': None
+                'message': '解析任务未成功,无法记录人才数据'
             }
         
         # 获取解析数据
@@ -2178,8 +2178,7 @@ def record_parsed_talents(result):
             return {
                 'code': 400,
                 'success': False,
-                'message': '解析结果中没有数据',
-                'data': None
+                'message': '解析结果中没有数据'
             }
         
         # 提取任务信息
@@ -2208,8 +2207,7 @@ def record_parsed_talents(result):
             return {
                 'code': 400,
                 'success': False,
-                'message': '未找到有效的人才数据',
-                'data': None
+                'message': '未找到有效的人才数据'
             }
         
         # 批量创建ParsedTalent记录
@@ -2281,15 +2279,13 @@ def record_parsed_talents(result):
             return {
                 'code': 206,  # 部分成功
                 'success': True,
-                'message': f'成功创建 {len(created_records)} 条记录,失败 {len(failed_records)} 条',
-                'data': result_data
+                'message': f'成功创建 {len(created_records)} 条记录,失败 {len(failed_records)} 条'
             }
         else:
             return {
                 'code': 200,
                 'success': True,
-                'message': f'成功创建 {len(created_records)} 条人才记录',
-                'data': result_data
+                'message': f'成功创建 {len(created_records)} 条人才记录'
             }
             
     except Exception as e:
@@ -2300,9 +2296,8 @@ def record_parsed_talents(result):
         return {
             'code': 500,
             'success': False,
-            'message': error_msg,
-            'data': None
-        } 
+            'message': error_msg
+        }
 
 
 def get_parsed_talents(status=None):
@@ -2364,7 +2359,7 @@ def create_origin_source_entry(task_type, minio_path):
     return {
         'task_type': task_type,
         'minio_path': minio_path,
-        'source_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+        'source_date': datetime.now().strftime('%Y-%m-%d')
     }
 
 
@@ -2378,36 +2373,56 @@ def update_origin_source(existing_origin_source, task_type, minio_path):
         minio_path (str): MinIO路径
         
     Returns:
-        list: 更新后的origin_source JSON数组
+        list: 更新后的origin_source JSON数组,格式为:
+        [
+            {"task_type": "名片", "minio_path": "path1", "source_date": "2025-01-01"},
+            {"task_type": "简历", "minio_path": "path2", "source_date": "2025-01-02"}
+        ]
     """
     try:
         # 解析现有的origin_source
+        origin_list = []
         if existing_origin_source:
             if isinstance(existing_origin_source, str):
-                origin_list = json.loads(existing_origin_source)
+                try:
+                    parsed = json.loads(existing_origin_source)
+                    if isinstance(parsed, list):
+                        origin_list = parsed
+                    elif isinstance(parsed, dict):
+                        origin_list = [parsed]
+                    else:
+                        origin_list = []
+                except (json.JSONDecodeError, TypeError):
+                    origin_list = []
             elif isinstance(existing_origin_source, list):
                 origin_list = existing_origin_source
             elif isinstance(existing_origin_source, dict):
-                # 如果是单个对象,转换为数组
                 origin_list = [existing_origin_source]
             else:
                 origin_list = []
-        else:
-            origin_list = []
         
         # 确保origin_list是列表
         if not isinstance(origin_list, list):
-            origin_list = [origin_list] if origin_list else []
+            origin_list = []
+        
+        # 验证现有记录,确保每个元素都符合格式要求
+        validated_list = []
+        for item in origin_list:
+            if isinstance(item, dict) and 'task_type' in item and 'minio_path' in item:
+                # 确保有source_date字段,如果没有则添加当前日期
+                if 'source_date' not in item:
+                    item['source_date'] = datetime.now().strftime('%Y-%m-%d')
+                validated_list.append(item)
         
         # 创建新的记录
         new_entry = create_origin_source_entry(task_type, minio_path)
         
         # 检查是否已存在相同的minio_path记录
-        existing_paths = [entry.get('minio_path') for entry in origin_list if isinstance(entry, dict)]
+        existing_paths = [entry.get('minio_path') for entry in validated_list if isinstance(entry, dict)]
         if minio_path not in existing_paths:
-            origin_list.append(new_entry)
+            validated_list.append(new_entry)
         
-        return origin_list
+        return validated_list
         
     except Exception as e:
         logging.error(f"更新origin_source失败: {str(e)}")

+ 97 - 103
app/core/data_parse/parse_task.py

@@ -189,8 +189,7 @@ def _validate_files_by_task_type(files, task_type):
             return {
                 'code': 400,
                 'success': False,
-                'message': f'第{i+1}个文件缺少文件名',
-                'data': None
+                'message': f'第{i+1}个文件缺少文件名'
             }
         
         # 杂项类型不验证文件格式
@@ -207,8 +206,7 @@ def _validate_files_by_task_type(files, task_type):
             return {
                 'code': 400,
                 'success': False,
-                'message': f'第{i+1}个文件格式不支持,{task_type}任务只支持{format_desc[task_type]}',
-                'data': None
+                'message': f'第{i+1}个文件格式不支持,{task_type}任务只支持{format_desc[task_type]}'
             }
     
     return {'success': True}
@@ -280,8 +278,7 @@ def _handle_recruitment_task(created_by, data=None):
         return {
             'code': 200,
             'success': True,
-            'message': '招聘任务创建成功',
-            'data': parse_task.to_dict()
+            'message': '招聘任务创建成功'
         }
         
     except Exception as e:
@@ -292,8 +289,7 @@ def _handle_recruitment_task(created_by, data=None):
         return {
             'code': 500,
             'success': False,
-            'message': error_msg,
-            'data': None
+            'message': error_msg
         }
 
 
@@ -386,8 +382,7 @@ def add_parse_task(files, task_type, created_by='system', data=None, publish_tim
             return {
                 'code': 400,
                 'success': False,
-                'message': 'task_type参数必须是以下值之一:名片、简历、新任命、招聘、杂项',
-                'data': None
+                'message': 'task_type参数必须是以下值之一:名片、简历、新任命、招聘、杂项'
             }
         
         # 对于招聘类型,不需要文件,直接处理数据库记录
@@ -396,8 +391,7 @@ def add_parse_task(files, task_type, created_by='system', data=None, publish_tim
                 return {
                     'code': 400,
                     'success': False,
-                    'message': '招聘类型任务不需要上传文件',
-                    'data': None
+                    'message': '招聘类型任务不需要上传文件'
                 }
             # 招聘任务处理逻辑
             return _handle_recruitment_task(created_by, data)
@@ -407,16 +401,14 @@ def add_parse_task(files, task_type, created_by='system', data=None, publish_tim
             return {
                 'code': 400,
                 'success': False,
-                'message': 'files参数必须是非空数组',
-                'data': None
+                'message': 'files参数必须是非空数组'
             }
         
         if len(files) == 0:
             return {
                 'code': 400,
                 'success': False,
-                'message': '文件数组不能为空',
-                'data': None
+                'message': '文件数组不能为空'
             }
         
         # 根据任务类型验证文件格式
@@ -430,8 +422,7 @@ def add_parse_task(files, task_type, created_by='system', data=None, publish_tim
             return {
                 'code': 500,
                 'success': False,
-                'message': '无法连接到MinIO服务器',
-                'data': None
+                'message': '无法连接到MinIO服务器'
             }
         
         # 存储上传结果
@@ -504,8 +495,7 @@ def add_parse_task(files, task_type, created_by='system', data=None, publish_tim
             return {
                 'code': 500,
                 'success': False,
-                'message': '所有文件上传失败',
-                'data': None
+                'message': '所有文件上传失败'
             }
         
         # 生成任务名称
@@ -580,15 +570,13 @@ def add_parse_task(files, task_type, created_by='system', data=None, publish_tim
                 return {
                     'code': 206,  # Partial Content
                     'success': True,
-                    'message': f'解析任务创建成功,但有{len(failed_uploads)}个文件上传失败',
-                    'data': result_data
+                    'message': f'解析任务创建成功,但有{len(failed_uploads)}个文件上传失败'
                 }
             else:
                 return {
                     'code': 200,
                     'success': True,
-                    'message': '解析任务创建成功,所有文件上传完成',
-                    'data': result_data
+                    'message': '解析任务创建成功,所有文件上传完成'
                 }
                 
         except Exception as db_error:
@@ -599,8 +587,7 @@ def add_parse_task(files, task_type, created_by='system', data=None, publish_tim
             return {
                 'code': 500,
                 'success': False,
-                'message': error_msg,
-                'data': None
+                'message': error_msg
             }
             
     except Exception as e:
@@ -610,8 +597,7 @@ def add_parse_task(files, task_type, created_by='system', data=None, publish_tim
         return {
             'code': 500,
             'success': False,
-            'message': error_msg,
-            'data': None
+            'message': error_msg
         }
 
 
@@ -624,52 +610,91 @@ def _update_origin_source_with_minio_path(existing_origin_source, talent_data=No
         talent_data: 人才数据,包含origin_source字段
         
     Returns:
-        str: 更新后的origin_source JSON字符串
+        list: 更新后的origin_source JSON数组,格式为:
+        [
+            {"task_type": "名片", "minio_path": "path1", "source_date": "2025-01-01"},
+            {"task_type": "简历", "minio_path": "path2", "source_date": "2025-01-02"}
+        ]
     """
     import json
     
     try:
         # 解析现有的origin_source
+        origin_list = []
         if existing_origin_source:
-            try:
-                origin_list = json.loads(existing_origin_source)
-                if not isinstance(origin_list, list):
-                    origin_list = [origin_list]
-            except (json.JSONDecodeError, TypeError):
-                # 如果解析失败,将现有内容作为单个元素
-                origin_list = [existing_origin_source] if existing_origin_source else []
-        else:
+            if isinstance(existing_origin_source, str):
+                try:
+                    parsed = json.loads(existing_origin_source)
+                    if isinstance(parsed, list):
+                        origin_list = parsed
+                    elif isinstance(parsed, dict):
+                        origin_list = [parsed]
+                    else:
+                        origin_list = []
+                except (json.JSONDecodeError, TypeError):
+                    origin_list = []
+            elif isinstance(existing_origin_source, list):
+                origin_list = existing_origin_source
+            elif isinstance(existing_origin_source, dict):
+                origin_list = [existing_origin_source]
+            else:
+                origin_list = []
+        
+        # 确保origin_list是列表
+        if not isinstance(origin_list, list):
             origin_list = []
         
         # 处理talent_data提供的origin_source
         if talent_data and talent_data.get('origin_source'):
             talent_origin_source = talent_data.get('origin_source')
+            
             if isinstance(talent_origin_source, list):
-                # 如果是列表,直接合并
+                # 如果是列表,验证每个元素是否符合格式要求
                 for entry in talent_origin_source:
-                    if isinstance(entry, dict) and entry not in origin_list:
-                        origin_list.append(entry)
+                    if isinstance(entry, dict) and 'task_type' in entry and 'minio_path' in entry:
+                        # 检查是否已存在相同的minio_path记录
+                        existing_paths = [item.get('minio_path') for item in origin_list if isinstance(item, dict)]
+                        if entry.get('minio_path') not in existing_paths:
+                            origin_list.append(entry)
             elif isinstance(talent_origin_source, str):
                 # 如果是字符串,尝试解析为JSON
                 try:
                     parsed_talent_origin = json.loads(talent_origin_source)
                     if isinstance(parsed_talent_origin, list):
                         for entry in parsed_talent_origin:
-                            if isinstance(entry, dict) and entry not in origin_list:
-                                origin_list.append(entry)
-                    elif isinstance(parsed_talent_origin, dict) and parsed_talent_origin not in origin_list:
-                        origin_list.append(parsed_talent_origin)
+                            if isinstance(entry, dict) and 'task_type' in entry and 'minio_path' in entry:
+                                existing_paths = [item.get('minio_path') for item in origin_list if isinstance(item, dict)]
+                                if entry.get('minio_path') not in existing_paths:
+                                    origin_list.append(entry)
+                    elif isinstance(parsed_talent_origin, dict) and 'task_type' in parsed_talent_origin and 'minio_path' in parsed_talent_origin:
+                        existing_paths = [item.get('minio_path') for item in origin_list if isinstance(item, dict)]
+                        if parsed_talent_origin.get('minio_path') not in existing_paths:
+                            origin_list.append(parsed_talent_origin)
                 except (json.JSONDecodeError, TypeError):
                     # 如果解析失败,忽略talent_data的origin_source
                     pass
-        
-        # 返回JSON字符串
-        return json.dumps(origin_list, ensure_ascii=False)
+            elif isinstance(talent_origin_source, dict) and 'task_type' in talent_origin_source and 'minio_path' in talent_origin_source:
+                # 如果是字典,检查是否符合格式要求
+                existing_paths = [item.get('minio_path') for item in origin_list if isinstance(item, dict)]
+                if talent_origin_source.get('minio_path') not in existing_paths:
+                    origin_list.append(talent_origin_source)
+        
+        # 验证最终结果,确保每个元素都符合格式要求
+        validated_list = []
+        for item in origin_list:
+            if isinstance(item, dict) and 'task_type' in item and 'minio_path' in item:
+                # 确保有source_date字段,如果没有则添加当前日期
+                if 'source_date' not in item:
+                    from datetime import datetime
+                    item['source_date'] = datetime.now().strftime('%Y-%m-%d')
+                validated_list.append(item)
+        
+        return validated_list
         
     except Exception as e:
         logging.error(f"更新origin_source失败: {str(e)}")
-        # 如果处理失败,返回原始的origin_source
-        return existing_origin_source
+        # 如果处理失败,返回空数组
+        return []
 
 
 def add_single_talent(talent_data, minio_path=None, task_type=None):
@@ -690,8 +715,7 @@ def add_single_talent(talent_data, minio_path=None, task_type=None):
             return {
                 'code': 400,
                 'success': False,
-                'message': '人才数据不能为空',
-                'data': None
+                'message': '人才数据不能为空'
             }
         
         # 检查重复记录
@@ -824,8 +848,7 @@ def add_single_talent(talent_data, minio_path=None, task_type=None):
                 return {
                     'code': 200,
                     'success': True,
-                    'message': f'人才信息已更新。{duplicate_check["reason"]}',
-                    'data': existing_card.to_dict()
+                    'message': f'人才信息已更新。{duplicate_check["reason"]}'
                 }
                 
             elif duplicate_check['action'] == 'create_with_duplicates':
@@ -880,15 +903,7 @@ def add_single_talent(talent_data, minio_path=None, task_type=None):
                 return {
                     'code': 202,  # Accepted,表示已接受但需要进一步处理
                     'success': True,
-                    'message': f'创建新记录成功,发现疑似重复记录待处理。{duplicate_check["reason"]}',
-                    'data': {
-                        'main_card': main_card.to_dict(),
-                        'duplicate_record_id': duplicate_record.id,
-                        'suspected_duplicates_count': len(duplicate_check['suspected_duplicates']),
-                        'processing_status': 'pending',
-                        'duplicate_reason': duplicate_record.duplicate_reason,
-                        'created_at': duplicate_record.created_at.strftime('%Y-%m-%d %H:%M:%S')
-                    }
+                    'message': f'创建新记录成功,发现疑似重复记录待处理。{duplicate_check["reason"]}'
                 }
                 
             else:
@@ -986,8 +1001,7 @@ def add_single_talent(talent_data, minio_path=None, task_type=None):
                 return {
                     'code': 200,
                     'success': True,
-                    'message': f'人才信息保存成功。{duplicate_check["reason"]}',
-                    'data': business_card.to_dict()
+                    'message': f'人才信息保存成功。{duplicate_check["reason"]}'
                 }
                 
         except Exception as e:
@@ -998,8 +1012,7 @@ def add_single_talent(talent_data, minio_path=None, task_type=None):
             return {
                 'code': 500,
                 'success': False,
-                'message': error_msg,
-                'data': None
+                'message': error_msg
             }
             
     except Exception as e:
@@ -1010,8 +1023,7 @@ def add_single_talent(talent_data, minio_path=None, task_type=None):
         return {
             'code': 500,
             'success': False,
-            'message': error_msg,
-            'data': None
+            'message': error_msg
         }
 
 
@@ -1032,8 +1044,7 @@ def add_parsed_talents(api_response_data):
             return {
                 'code': 400,
                 'success': False,
-                'message': 'api_response_data参数必须是非空字典',
-                'data': None
+                'message': 'api_response_data参数必须是非空字典'
             }
         
         # 获取data字段
@@ -1042,8 +1053,7 @@ def add_parsed_talents(api_response_data):
             return {
                 'code': 400,
                 'success': False,
-                'message': '请求中缺少有效的data字段',
-                'data': None
+                'message': '请求中缺少有效的data字段'
             }
         
         # 获取results数组
@@ -1052,16 +1062,14 @@ def add_parsed_talents(api_response_data):
             return {
                 'code': 400,
                 'success': False,
-                'message': '请求中的results字段必须是数组',
-                'data': None
+                'message': '请求中的results字段必须是数组'
             }
         
         if len(results) == 0:
             return {
                 'code': 400,
                 'success': False,
-                'message': '请求中的results数组为空,没有人才数据需要处理',
-                'data': None
+                'message': '请求中的results数组为空,没有人才数据需要处理'
             }
         
         # 从api_response_data中提取task_type
@@ -1084,8 +1092,7 @@ def add_parsed_talents(api_response_data):
                     processed_results.append({
                         'index': i,
                         'success': False,
-                        'error': '人才数据格式无效,必须是字典格式',
-                        'data': None
+                        'message': '人才数据格式无效,必须是字典格式'
                     })
                     logging.warning(f"第 {i+1} 条记录人才数据格式无效")
                     continue
@@ -1096,8 +1103,7 @@ def add_parsed_talents(api_response_data):
                     processed_results.append({
                         'index': i,
                         'success': False,
-                        'error': '人才数据缺少必要字段name_zh',
-                        'data': None
+                        'message': '人才数据缺少必要字段name_zh'
                     })
                     logging.warning(f"第 {i+1} 条记录缺少name_zh字段")
                     continue
@@ -1132,7 +1138,6 @@ def add_parsed_talents(api_response_data):
                             'index': i,
                             'success': True,
                             'error': None,
-                            'data': talent_result.get('data'),
                             'message': f'成功处理人员: {talent_data.get("name_zh", "未知")}'
                         })
                         logging.debug(f"成功处理第 {i+1} 条记录")
@@ -1141,8 +1146,7 @@ def add_parsed_talents(api_response_data):
                         processed_results.append({
                             'index': i,
                             'success': False,
-                            'error': talent_result.get('message', '处理失败'),
-                            'data': None
+                            'message': talent_result.get('message', '处理失败')
                         })
                         logging.error(f"处理第 {i+1} 条记录失败: {talent_result.get('message', '未知错误')}")
                 except Exception as talent_error:
@@ -1151,8 +1155,7 @@ def add_parsed_talents(api_response_data):
                     processed_results.append({
                         'index': i,
                         'success': False,
-                        'error': error_msg,
-                        'data': None
+                        'message': error_msg
                     })
                     logging.error(error_msg, exc_info=True)
                     
@@ -1162,8 +1165,7 @@ def add_parsed_talents(api_response_data):
                 processed_results.append({
                     'index': i,
                     'success': False,
-                    'error': error_msg,
-                    'data': None
+                    'message': error_msg
                 })
                 logging.error(error_msg, exc_info=True)
         
@@ -1184,22 +1186,19 @@ def add_parsed_talents(api_response_data):
             return {
                 'code': 200,
                 'success': True,
-                'message': f'批量处理完成,全部 {success_count} 条记录处理成功',
-                'data': batch_result
+                'message': f'批量处理完成,全部 {success_count} 条记录处理成功'
             }
         elif success_count == 0:
             return {
                 'code': 500,
                 'success': False,
-                'message': f'批量处理失败,全部 {failed_count} 条记录处理失败',
-                'data': batch_result
+                'message': f'批量处理失败,全部 {failed_count} 条记录处理失败'
             }
         else:
             return {
                 'code': 206,  # Partial Content
                 'success': True,
-                'message': f'批量处理部分成功,成功 {success_count} 条,失败 {failed_count} 条',
-                'data': batch_result
+                'message': f'批量处理部分成功,成功 {success_count} 条,失败 {failed_count} 条'
             }
             
     except Exception as e:
@@ -1209,8 +1208,7 @@ def add_parsed_talents(api_response_data):
         return {
             'code': 500,
             'success': False,
-            'message': error_msg,
-            'data': None
+            'message': error_msg
         } 
 
 
@@ -1276,16 +1274,14 @@ def record_parsed_talent(talent_data, task_id=None, task_type=None):
         if not talent_data or not isinstance(talent_data, dict):
             return {
                 'success': False,
-                'message': '人才数据不能为空且必须是字典格式',
-                'data': None
+                'message': '人才数据不能为空且必须是字典格式'
             }
         
         # 检查必要字段
         if not talent_data.get('name_zh'):
             return {
                 'success': False,
-                'message': '人才数据必须包含name_zh字段',
-                'data': None
+                'message': '人才数据必须包含name_zh字段'
             }
         
         # 创建ParsedTalent记录
@@ -1331,8 +1327,7 @@ def record_parsed_talent(talent_data, task_id=None, task_type=None):
         
         return {
             'success': True,
-            'message': '成功记录人才信息',
-            'data': parsed_talent.to_dict()
+            'message': '成功记录人才信息'
         }
         
     except Exception as e:
@@ -1342,8 +1337,7 @@ def record_parsed_talent(talent_data, task_id=None, task_type=None):
         
         return {
             'success': False,
-            'message': error_msg,
-            'data': None
+            'message': error_msg
         } 
 
 

+ 116 - 128
app/core/data_parse/parse_web.py

@@ -808,6 +808,13 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
     Returns:
         dict: 批量处理结果,格式与parse_result保持一致
     """
+    # 初始化变量
+    task_record = None
+    task_source = []
+    success_count = 0
+    failed_count = 0
+    parsed_record_ids = []
+    
     try:
         # 根据task_id从parse_task_repository表读取记录
         if not task_id:
@@ -841,11 +848,6 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
                 'data': None
             }
         
-        results = []
-        success_count = 0
-        failed_count = 0
-        parsed_record_ids = []  # 收集成功解析的记录ID
-        
         logging.info(f"开始批量处理markdown文件,共 {len(task_source)} 条记录")
         
         # 逐一处理每条数据
@@ -868,6 +870,15 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
                     # 更新task_source中对应记录的parse_flag和status
                     data['parse_flag'] = 1
                     data['status'] = '解析失败'
+                    # 立即更新数据库记录
+                    try:
+                        task_record.task_source = task_source
+                        task_record.task_status = '解析中'
+                        db.session.commit()
+                        logging.info(f"第 {i+1} 条数据处理失败,已更新数据库记录")
+                    except Exception as update_error:
+                        logging.error(f"更新数据库记录失败: {str(update_error)}")
+                        db.session.rollback()
                     continue
                 
                 # 处理单个markdown文件
@@ -885,28 +896,7 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
                     if persons_data and isinstance(persons_data, list):
                         # 为每个人员创建一个结果记录
                         for person_idx, person_data in enumerate(persons_data):
-                            # 转换为标准名片格式
-                            standardized_data = _convert_webpage_to_card_format(person_data, file_publish_time)
-                            
                             success_count += 1
-                            # 构建完整的MinIO URL路径
-                            if minio_path.startswith('http'):
-                                complete_minio_path = minio_path
-                                object_key = _extract_object_key_from_url(minio_path)
-                            else:
-                                complete_minio_path = f"{minio_url}/{minio_bucket}/{minio_path}"
-                                object_key = minio_path
-                            
-                            results.append({
-                                "data": standardized_data,
-                                "error": None,
-                                "filename": minio_path.split('/')[-1] if '/' in minio_path else minio_path,
-                                "index": len(results),  # 使用连续的索引
-                                "message": "网页人才信息解析成功",
-                                "minio_path": complete_minio_path,
-                                "object_key": object_key,
-                                "success": True
-                            })
                             logging.info(f"成功提取人员 {person_idx+1}: {person_data.get('name_zh', 'Unknown')}")
                         
                         # 更新task_source中对应记录的parse_flag和status
@@ -915,24 +905,6 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
                     else:
                         # 没有提取到有效数据,这算作一个失败记录
                         failed_count += 1
-                        # 构建完整的MinIO URL路径
-                        if minio_path.startswith('http'):
-                            complete_minio_path = minio_path
-                            object_key = _extract_object_key_from_url(minio_path)
-                        else:
-                            complete_minio_path = f"{minio_url}/{minio_bucket}/{minio_path}"
-                            object_key = minio_path
-                            
-                        results.append({
-                            "data": None,
-                            "error": "未从markdown文件中提取到人员信息",
-                            "filename": minio_path.split('/')[-1] if '/' in minio_path else minio_path,
-                            "index": len(results),
-                            "message": "网页人才信息解析失败",
-                            "minio_path": complete_minio_path,
-                            "object_key": object_key,
-                            "success": False
-                        })
                         logging.warning(f"第 {i+1} 个文件未提取到人员信息")
                         
                         # 更新task_source中对应记录的parse_flag和status
@@ -942,29 +914,21 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
                     # 文件处理失败,算作一个失败记录
                     failed_count += 1
                     error_msg = file_result.get('message', '处理失败')
-                    # 构建完整的MinIO URL路径
-                    if minio_path.startswith('http'):
-                        complete_minio_path = minio_path
-                        object_key = _extract_object_key_from_url(minio_path)
-                    else:
-                        complete_minio_path = f"{minio_url}/{minio_bucket}/{minio_path}"
-                        object_key = minio_path
-                        
-                    results.append({
-                        "data": None,
-                        "error": error_msg,
-                        "filename": minio_path.split('/')[-1] if '/' in minio_path else minio_path,
-                        "index": len(results),
-                        "message": "网页人才信息解析失败",
-                        "minio_path": complete_minio_path,
-                        "object_key": object_key,
-                        "success": False
-                    })
                     logging.error(f"处理第 {i+1} 个文件失败: {error_msg}")
                     
                     # 更新task_source中对应记录的parse_flag和status
                     data['parse_flag'] = 1
                     data['status'] = '解析失败'
+                
+                # 立即更新数据库记录
+                try:
+                    task_record.task_source = task_source
+                    task_record.task_status = '解析中'
+                    db.session.commit()
+                    logging.info(f"第 {i+1} 条数据处理完成,已更新数据库记录")
+                except Exception as update_error:
+                    logging.error(f"更新数据库记录失败: {str(update_error)}")
+                    db.session.rollback()
                     
             except Exception as item_error:
                 failed_count += 1
@@ -976,16 +940,15 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
                     data['parse_flag'] = 1
                     data['status'] = '解析失败'
                 
-                results.append({
-                    "data": None,
-                    "error": error_msg,
-                    "filename": data.get('filename', f'markdown_record_{i}.md') if isinstance(data, dict) else f'markdown_record_{i}.md',
-                    "index": len(results),
-                    "message": "网页人才信息解析失败",
-                    "minio_path": data.get('minio_path', '') if isinstance(data, dict) else '',
-                    "object_key": data.get('object_key', f'markdown_data/record_{i}.md') if isinstance(data, dict) else f'markdown_data/record_{i}.md',
-                    "success": False
-                })
+                # 立即更新数据库记录
+                try:
+                    task_record.task_source = task_source
+                    task_record.task_status = '解析中'
+                    db.session.commit()
+                    logging.info(f"第 {i+1} 条数据处理异常,已更新数据库记录")
+                except Exception as update_error:
+                    logging.error(f"更新数据库记录失败: {str(update_error)}")
+                    db.session.rollback()
         
         # 根据处理结果更新task_status
         if failed_count == 0:
@@ -995,8 +958,43 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
         else:
             task_status = '部分解析成功'
         
-        # 所有task_source记录处理完成后,将更新后的task_source和task_status保存到数据库
-        try:
+    except Exception as e:
+        error_msg = f"batch_process_md函数执行失败: {str(e)}"
+        logging.error(error_msg, exc_info=True)
+        
+        # 即使出现异常,也要确保更新数据库
+        if task_record and task_source:
+            try:
+                task_record.task_source = task_source
+                task_record.task_status = '解析失败'
+                task_record.parse_count = success_count
+                task_record.parse_result = {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids,
+                    'processed_time': datetime.now().isoformat(),
+                    'error': error_msg
+                }
+                db.session.commit()
+                logging.info(f"异常情况下成功更新task_id为{task_id}的任务记录")
+            except Exception as update_error:
+                logging.error(f"异常情况下更新任务记录失败: {str(update_error)}")
+                db.session.rollback()
+        
+        return {
+            'code': 500,
+            'success': False,
+            'message': error_msg,
+            'data': {
+                'success_count': success_count,
+                'failed_count': failed_count,
+                'parsed_record_ids': parsed_record_ids
+            }
+        }
+    
+    # 无论处理过程如何,都要保证更新数据库
+    try:
+        if task_record and task_source:
             task_record.task_source = task_source
             task_record.task_status = task_status
             task_record.parse_count = success_count
@@ -1008,65 +1006,46 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
             }
             db.session.commit()
             logging.info(f"成功更新task_id为{task_id}的任务记录,task_status={task_status},处理成功{success_count}条,失败{failed_count}条")
-        except Exception as update_error:
-            logging.error(f"更新任务记录失败: {str(update_error)}")
-            db.session.rollback()
-        
-        # 组装最终结果
-        if failed_count == 0:
-            return {
-                'code': 200,
-                'success': True,
-                'message': f'批量处理完成,全部 {success_count} 个文件处理成功',
-                'data': {
-                    'success_count': success_count,
-                    'failed_count': failed_count,
-                    'parsed_record_ids': parsed_record_ids
-                }
-            }
-        elif success_count == 0:
-            return {
-                'code': 500,
-                'success': False,
-                'message': f'批量处理失败,全部 {failed_count} 个文件处理失败',
-                'data': {
-                    'success_count': success_count,
-                    'failed_count': failed_count,
-                    'parsed_record_ids': parsed_record_ids
-                }
-            }
         else:
-            return {
-                'code': 206,  # Partial Content
-                'success': True,
-                'message': f'批量处理部分成功,成功 {success_count} 个,失败 {failed_count} 个',
-                'data': {
-                    'success_count': success_count,
-                    'failed_count': failed_count,
-                    'parsed_record_ids': parsed_record_ids
-                }
+            logging.error("无法更新数据库:task_record或task_source为空")
+    except Exception as update_error:
+        logging.error(f"更新任务记录失败: {str(update_error)}")
+        db.session.rollback()
+        # 即使数据库更新失败,也要返回处理结果
+    
+    # 组装最终结果
+    if failed_count == 0:
+        return {
+            'code': 200,
+            'success': True,
+            'message': f'批量处理完成,全部 {success_count} 个文件处理成功',
+            'data': {
+                'success_count': success_count,
+                'failed_count': failed_count,
+                'parsed_record_ids': parsed_record_ids
             }
-            
-    except Exception as e:
-        error_msg = f"batch_process_md函数执行失败: {str(e)}"
-        logging.error(error_msg, exc_info=True)
-        
-        batch_result = {
-            'summary': {
-                'total_files': len(markdown_file_list) if markdown_file_list else 1,
-                'success_count': 0,
-                'failed_count': len(markdown_file_list) if markdown_file_list else 1,
-                'success_rate': 0
-            },
-            'results': [],
-            'processed_time': datetime.now().isoformat()
         }
-        
+    elif success_count == 0:
         return {
             'code': 500,
             'success': False,
-            'message': error_msg,
-            'data': batch_result
+            'message': f'批量处理失败,全部 {failed_count} 个文件处理失败',
+            'data': {
+                'success_count': success_count,
+                'failed_count': failed_count,
+                'parsed_record_ids': parsed_record_ids
+            }
+        }
+    else:
+        return {
+            'code': 206,  # Partial Content
+            'success': True,
+            'message': f'批量处理部分成功,成功 {success_count} 个,失败 {failed_count} 个',
+            'data': {
+                'success_count': success_count,
+                'failed_count': failed_count,
+                'parsed_record_ids': parsed_record_ids
+            }
         }
 
 
@@ -1263,7 +1242,16 @@ def process_single_markdown_file(minio_path, publish_time, task_id=None, task_ty
                 for person in result:
                     person['pic_url'] = minio_path  # 设置原始文件路径
                     person['image_path'] = minio_path  # 设置image_path
-                    person['origin_source'] = minio_path  # 设置origin_source
+                    
+                    # 设置origin_source为JSON数组格式
+                    current_date = datetime.now().strftime('%Y-%m-%d')
+                    origin_source_entry = {
+                        "task_type": "新任命",
+                        "minio_path": minio_path,
+                        "source_date": current_date
+                    }
+                    person['origin_source'] = [origin_source_entry]
+                    
                     if 'career_path' in person and person['career_path']:
                         for career_entry in person['career_path']:
                             career_entry['image_path'] = minio_path  # 设置原始文件路径