Преглед на файлове

修改重复记录的处理逻辑

maxiaolong преди 1 седмица
родител
ревизия
906a1eec5c

+ 41 - 0
app/__init__.py

@@ -50,6 +50,9 @@ def create_app():
     # Configure logging
     configure_logging(app)
     
+    # 添加全局异常处理器
+    configure_error_handlers(app)
+    
     # 输出启动信息
     app.logger.info(f"Starting server in {current_env} mode on port {app.config['PORT']}")
     
@@ -131,3 +134,41 @@ def configure_logging(app):
     
     app.logger.info(f"日志配置完成: 级别={log_level_name}, 文件={log_file}")
     return logger
+
+def configure_error_handlers(app):
+    """Configure global error handlers for the application"""
+    
+    @app.errorhandler(Exception)
+    def handle_exception(e):
+        """全局异常处理器,捕获所有未处理的异常"""
+        # 记录详细的错误信息
+        app.logger.error(f"未处理的异常: {str(e)}", exc_info=True)
+        
+        # 返回标准化的错误响应
+        error_response = {
+            'success': False,
+            'message': f'服务器内部错误: {str(e)}',
+            'data': None
+        }
+        
+        return jsonify(error_response), 500
+    
+    @app.errorhandler(404)
+    def handle_not_found(e):
+        """处理404错误"""
+        app.logger.warning(f"404错误: {str(e)}")
+        return jsonify({
+            'success': False,
+            'message': '请求的资源不存在',
+            'data': None
+        }), 404
+    
+    @app.errorhandler(500)
+    def handle_internal_error(e):
+        """处理500错误"""
+        app.logger.error(f"500错误: {str(e)}", exc_info=True)
+        return jsonify({
+            'success': False,
+            'message': '服务器内部错误',
+            'data': None
+        }), 500

+ 5 - 20
app/api/data_parse/routes.py

@@ -1550,20 +1550,10 @@ def add_parse_task_route():
             
             # 如果任务创建成功,继续执行批量处理
             if result['success']:
-                # 获取任务ID和任务源数据
-                task_data = result['data']
-                task_id = task_data.get('id')
-                
-                # 将task_data中的data赋值给task_source
-                task_source = []
-                if data:
-                    # data是JSON数组格式,直接解析
-                    task_source = json.loads(data)
-                
-                if task_id and task_source:
-                    logger.info(f"招聘任务创建成功,开始执行批量处理: task_id={task_id}")
-                    # 调用招聘数据批量处理函数
-                    batch_process_menduner_data(task_source, task_id, task_type)
+                # 招聘任务创建成功,不需要进一步处理
+                logger.info(f"招聘任务创建成功")
+            else:
+                logger.error(f"招聘任务创建失败: {result.get('message', '未知错误')}")
         else:
             # 其他类型需要文件上传
             if 'files' not in request.files:
@@ -1789,12 +1779,7 @@ def execute_parse_task():
                 result = batch_process_md(task_source, task_id=task_id, task_type=task_type)
                 
             elif task_type == '招聘':
-                # 招聘任务类型已在add-parse-task接口中处理,此处不再处理
-                return jsonify({
-                    'success': False,
-                    'message': '招聘任务类型已在创建时自动处理,无需再次执行',
-                    'data': None
-                }), 400
+                result = batch_process_menduner_data(task_source, task_id, task_type)
                 
             elif task_type == '杂项':
                 # 调用图片批量处理函数(表格类型)

+ 2 - 1
app/core/data_parse/parse_card.py

@@ -302,7 +302,8 @@ def add_business_card(card_data, image_file=None):
                     card_data, 
                     minio_path, 
                     duplicate_check['suspected_duplicates'],
-                    duplicate_check['reason']
+                    duplicate_check['reason'],
+                    task_type='名片'  # 传递task_type参数
                 )
                 
                 return {

+ 13 - 12
app/core/data_parse/parse_menduner.py

@@ -439,6 +439,11 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]], task_id=None, t
         # 逐一处理每条数据
         for i, data in enumerate(task_source):
             try:
+                # 只处理parse_flag为1的记录
+                if not isinstance(data, dict) or data.get('parse_flag') != 1:
+                    logging.debug(f"跳过第 {i+1} 条数据,parse_flag不为1或格式不正确")
+                    continue
+                
                 logging.debug(f"处理第 {i+1}/{len(task_source)} 条数据")
                 
                 # 标准化数据为名片格式
@@ -460,21 +465,18 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]], task_id=None, t
                             logging.info(f"成功记录人才信息到parsed_talents表: {normalized.get('name_zh', '')}")
                             
                             # 更新task_source中对应记录的parse_flag和status
-                            if isinstance(data, dict):
-                                data['parse_flag'] = 0
-                                data['status'] = '解析成功'
+                            data['parse_flag'] = 0
+                            data['status'] = '解析成功'
                         else:
                             logging.warning(f"记录人才信息失败: {record_result.get('message', '')}")
                             # 更新task_source中对应记录的parse_flag和status
-                            if isinstance(data, dict):
-                                data['parse_flag'] = 1
-                                data['status'] = '解析失败'
+                            data['parse_flag'] = 1
+                            data['status'] = '解析失败'
                     except Exception as record_error:
                         logging.error(f"调用record_parsed_talent函数失败: {str(record_error)}")
                         # 更新task_source中对应记录的parse_flag和status
-                        if isinstance(data, dict):
-                            data['parse_flag'] = 1
-                            data['status'] = '解析失败'
+                        data['parse_flag'] = 1
+                        data['status'] = '解析失败'
                     
                     success_count += 1
                     results.append({
@@ -493,9 +495,8 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]], task_id=None, t
                     error_messages = validation.get('errors', ['验证失败'])
                     
                     # 更新task_source中对应记录的parse_flag和status
-                    if isinstance(data, dict):
-                        data['parse_flag'] = 1
-                        data['status'] = '解析失败'
+                    data['parse_flag'] = 1
+                    data['status'] = '解析失败'
                     
                     results.append({
                         "data": None,

+ 19 - 5
app/core/data_parse/parse_system.py

@@ -501,7 +501,7 @@ def update_career_path(existing_card, new_data):
         return existing_card.career_path if existing_card.career_path else []
 
 
-def create_main_card_with_duplicates(extracted_data, minio_path, suspected_duplicates, reason):
+def create_main_card_with_duplicates(extracted_data, minio_path, suspected_duplicates, reason, task_type=None):
     """
     创建主名片记录并标记疑似重复记录
     
@@ -510,6 +510,7 @@ def create_main_card_with_duplicates(extracted_data, minio_path, suspected_dupli
         minio_path (str): MinIO中的图片路径
         suspected_duplicates (list): 疑似重复的名片记录列表
         reason (str): 重复原因描述
+        task_type (str, optional): 任务类型,用于origin_source字段
         
     Returns:
         tuple: (BusinessCard, DuplicateBusinessCard) 创建的主名片记录和重复记录标记
@@ -521,6 +522,10 @@ def create_main_card_with_duplicates(extracted_data, minio_path, suspected_dupli
         # 直接使用extracted_data中的career_path记录
         career_path = extracted_data.get('career_path', [])
         
+        # 确定task_type,如果未提供则从extracted_data中获取,否则使用默认值
+        if not task_type:
+            task_type = extracted_data.get('task_type', '名片')
+        
         # 创建新的主名片记录
         main_card = BusinessCard(
             name_zh=extracted_data.get('name_zh', ''),
@@ -543,10 +548,10 @@ def create_main_card_with_duplicates(extracted_data, minio_path, suspected_dupli
             brand_group=extracted_data.get('brand_group', ''),
             image_path=minio_path,
             career_path=career_path,
-            origin_source=[create_origin_source_entry('manual_upload', minio_path)],
+            origin_source=[create_origin_source_entry(task_type, minio_path)],
             created_at=datetime.now(),
             updated_by='system',
-            status='active'
+            status='duplicate'
         )
         
         # 保存主记录到数据库
@@ -626,13 +631,16 @@ def get_minio_client():
 def get_business_cards():
     """
     获取所有名片记录,并为每个记录添加tag_count字段
+    只返回status为active和inactive的记录
     
     Returns:
         dict: 包含名片记录列表的字典
     """
     try:
-        # 查询所有名片记录,按创建时间倒序排列
-        cards = BusinessCard.query.order_by(BusinessCard.created_at.desc()).all()
+        # 查询所有名片记录,只返回status为active和inactive的记录,按创建时间倒序排列
+        cards = BusinessCard.query.filter(
+            BusinessCard.status.in_(['active', 'inactive'])
+        ).order_by(BusinessCard.created_at.desc()).all()
         
         # 转换为字典格式
         cards_data = [card.to_dict() for card in cards]
@@ -1029,9 +1037,15 @@ def process_duplicate_record(duplicate_id, action, selected_duplicate_id=None, p
             result_data = target_card.to_dict()
             
         elif action == 'keep_main':
+            # 保留主记录,将status设置为active
+            main_card.status = 'active'
+            main_card.updated_by = processed_by or 'system'
             result_data = main_card.to_dict()
             
         elif action == 'ignore':
+            # 忽略,将主记录status设置为active
+            main_card.status = 'active'
+            main_card.updated_by = processed_by or 'system'
             result_data = main_card.to_dict()
         
         # 所有操作都更新duplicate_record的状态为processed

+ 34 - 10
app/core/data_parse/parse_task.py

@@ -260,7 +260,7 @@ def _handle_recruitment_task(created_by, data=None):
         # 创建解析任务记录
         parse_task = ParseTaskRepository(
             task_name=task_name,
-            task_status='成功',  # 招聘任务不需要实际解析操作,直接设置为成功
+            task_status='待解析',  # 招聘任务不需要实际解析操作,直接设置为成功
             task_type='招聘',
             task_source=task_source,
             collection_count=len(task_source),  # 招聘任务的数据项数量
@@ -858,7 +858,8 @@ def add_single_talent(talent_data, minio_path=None, task_type=None):
                     talent_data, 
                     talent_data.get('image_path', ''),  # 从talent_data获取图片路径
                     duplicate_check['suspected_duplicates'],
-                    duplicate_check['reason']
+                    duplicate_check['reason'],
+                    task_type=task_type  # 传递task_type参数
                 )
                 
                 # 更新origin_source字段,将talent_data提供的origin_source与现有的origin_source进行合并
@@ -1080,6 +1081,8 @@ def add_parsed_talents(api_response_data):
         processed_results = []
         success_count = 0
         failed_count = 0
+        success_messages = []  # 收集成功处理的message信息
+        failed_messages = []   # 收集失败处理的message信息
         
         # 逐一处理每个结果项
         for i, talent_data in enumerate(results):
@@ -1089,22 +1092,26 @@ def add_parsed_talents(api_response_data):
                 # 验证人才数据格式
                 if not talent_data or not isinstance(talent_data, dict):
                     failed_count += 1
+                    error_msg = '人才数据格式无效,必须是字典格式'
                     processed_results.append({
                         'index': i,
                         'success': False,
-                        'message': '人才数据格式无效,必须是字典格式'
+                        'message': error_msg
                     })
+                    failed_messages.append(f"第{i+1}条记录: {error_msg}")
                     logging.warning(f"第 {i+1} 条记录人才数据格式无效")
                     continue
                 
                 # 检查必要字段
                 if not talent_data.get('name_zh'):
                     failed_count += 1
+                    error_msg = '人才数据缺少必要字段name_zh'
                     processed_results.append({
                         'index': i,
                         'success': False,
-                        'message': '人才数据缺少必要字段name_zh'
+                        'message': error_msg
                     })
+                    failed_messages.append(f"第{i+1}条记录: {error_msg}")
                     logging.warning(f"第 {i+1} 条记录缺少name_zh字段")
                     continue
                 
@@ -1134,21 +1141,25 @@ def add_parsed_talents(api_response_data):
                                 logging.error(f"更新parsed_talents记录状态失败: {str(update_error)}")
                         
                         success_count += 1
+                        talent_message = talent_result.get('message', f'成功处理人员: {talent_data.get("name_zh", "未知")}')
+                        success_messages.append(f"第{i+1}条记录: {talent_message}")
                         processed_results.append({
                             'index': i,
                             'success': True,
                             'error': None,
-                            'message': f'成功处理人员: {talent_data.get("name_zh", "未知")}'
+                            'message': talent_message
                         })
                         logging.debug(f"成功处理第 {i+1} 条记录")
                     else:
                         failed_count += 1
+                        error_msg = talent_result.get('message', '处理失败')
                         processed_results.append({
                             'index': i,
                             'success': False,
-                            'message': talent_result.get('message', '处理失败')
+                            'message': error_msg
                         })
-                        logging.error(f"处理第 {i+1} 条记录失败: {talent_result.get('message', '未知错误')}")
+                        failed_messages.append(f"第{i+1}条记录: {error_msg}")
+                        logging.error(f"处理第 {i+1} 条记录失败: {error_msg}")
                 except Exception as talent_error:
                     failed_count += 1
                     error_msg = f"处理人才数据异常: {str(talent_error)}"
@@ -1157,6 +1168,7 @@ def add_parsed_talents(api_response_data):
                         'success': False,
                         'message': error_msg
                     })
+                    failed_messages.append(f"第{i+1}条记录: {error_msg}")
                     logging.error(error_msg, exc_info=True)
                     
             except Exception as item_error:
@@ -1167,6 +1179,7 @@ def add_parsed_talents(api_response_data):
                     'success': False,
                     'message': error_msg
                 })
+                failed_messages.append(f"第{i+1}条记录: {error_msg}")
                 logging.error(error_msg, exc_info=True)
         
         # 组装最终结果
@@ -1181,24 +1194,35 @@ def add_parsed_talents(api_response_data):
             'processed_time': datetime.now().isoformat()
         }
         
+        # 构建详细的message信息
+        detailed_message = f"批量处理完成,成功 {success_count} 条,失败 {failed_count} 条"
+        
+        # 添加成功处理的详细信息
+        if success_messages:
+            detailed_message += f"。成功处理详情: {'; '.join(success_messages)}"
+        
+        # 添加失败处理的详细信息
+        if failed_messages:
+            detailed_message += f"。失败处理详情: {'; '.join(failed_messages)}"
+        
         # 根据处理结果返回相应的状态
         if failed_count == 0:
             return {
                 'code': 200,
                 'success': True,
-                'message': f'批量处理完成,全部 {success_count} 条记录处理成功'
+                'message': detailed_message
             }
         elif success_count == 0:
             return {
                 'code': 500,
                 'success': False,
-                'message': f'批量处理失败,全部 {failed_count} 条记录处理失败'
+                'message': detailed_message
             }
         else:
             return {
                 'code': 206,  # Partial Content
                 'success': True,
-                'message': f'批量处理部分成功,成功 {success_count} 条,失败 {failed_count} 条'
+                'message': detailed_message
             }
             
     except Exception as e:

+ 61 - 20
app/core/data_parse/parse_web.py

@@ -478,7 +478,8 @@ def process_single_talent_card(talent_data, minio_md_path):
                 talent_data, 
                 image_path,  # 传递图片路径
                 duplicate_check['suspected_duplicates'],
-                duplicate_check['reason']
+                duplicate_check['reason'],
+                task_type='新任命'  # 传递task_type参数
             )
             
             main_card.updated_by = 'webpage_talent_system'
@@ -593,10 +594,13 @@ def process_webpage_with_QWen(markdown_text, publish_time):
     QWEN_API_KEY = os.environ.get('QWEN_API_KEY', 'sk-8f2320dafc9e4076968accdd8eebd8e9')
     
     try:
+        logging.info(f"开始处理网页文本,文本长度: {len(markdown_text) if markdown_text else 0} 字符")
+        
         # 初始化 OpenAI 客户端,配置为阿里云 API
         client = OpenAI(
             api_key=QWEN_API_KEY,
             base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
+            timeout=60.0,  # 设置60秒超时
         )
         
         # 构建针对单个人员网页文本的优化提示语
@@ -656,32 +660,65 @@ def process_webpage_with_QWen(markdown_text, publish_time):
 
 """ + markdown_text
         
-        # 调用 Qwen VL Max API
-        logging.info("发送网页文本请求到 Qwen VL Max 模型")
-        completion = client.chat.completions.create(
-            model="qwen-vl-max-latest",
-            messages=[
-                {
-                    "role": "user",
-                    "content": [
-                        {"type": "text", "text": prompt}
-                    ]
-                }
-            ],
-            temperature=0.1,  # 降低温度增加精确性
-            response_format={"type": "json_object"}  # 要求输出JSON格式
-        )
+        # 调用 Qwen VL Max API,添加重试机制
+        max_retries = 3
+        retry_count = 0
+        response_content = None
         
-        # 解析响应
-        response_content = completion.choices[0].message.content
-        logging.info(f"成功从 Qwen 模型获取单个人员文本响应: {response_content}")
+        while retry_count < max_retries:
+            try:
+                logging.info(f"发送网页文本请求到 Qwen VL Max 模型 (尝试 {retry_count + 1}/{max_retries})")
+                
+                # 设置更详细的超时和重试配置
+                completion = client.chat.completions.create(
+                    model="qwen-vl-max-latest",
+                    messages=[
+                        {
+                            "role": "user",
+                            "content": [
+                                {"type": "text", "text": prompt}
+                            ]
+                        }
+                    ],
+                    temperature=0.1,  # 降低温度增加精确性
+                    response_format={"type": "json_object"},  # 要求输出JSON格式
+                    timeout=60  # 设置60秒超时
+                )
+                
+                # 解析响应
+                response_content = completion.choices[0].message.content
+                logging.info(f"成功从 Qwen 模型获取单个人员文本响应: {response_content}")
+                break  # 成功获取响应,跳出重试循环
+                
+            except Exception as api_error:
+                retry_count += 1
+                error_msg = f"Qwen API 调用失败 (尝试 {retry_count}/{max_retries}): {str(api_error)}"
+                logging.warning(error_msg)
+                
+                if retry_count >= max_retries:
+                    # 所有重试都失败了
+                    logging.error(f"Qwen API 调用失败,已重试 {max_retries} 次,最终错误: {str(api_error)}")
+                    raise Exception(f"Qwen API 调用失败,已重试 {max_retries} 次: {str(api_error)}")
+                else:
+                    # 等待一段时间后重试
+                    import time
+                    wait_time = 2 * retry_count
+                    logging.info(f"等待 {wait_time} 秒后重试...")
+                    time.sleep(wait_time)  # 递增等待时间
+                    continue
+        
+        # 检查是否成功获取响应
+        if not response_content:
+            error_msg = "未能从 Qwen API 获取有效响应"
+            logging.error(error_msg)
+            raise Exception(error_msg)
         
         # 直接解析 QWen 返回的 JSON 响应
         try:
             extracted_data = json.loads(response_content)
             logging.info("成功解析 Qwen 单个人员文本响应中的 JSON")
         except json.JSONDecodeError as e:
-            error_msg = f"JSON 解析失败: {str(e)}"
+            error_msg = f"JSON 解析失败: {str(e)}, 响应内容: {response_content[:200]}..."
             logging.error(error_msg)
             raise Exception(error_msg)
 
@@ -726,6 +763,7 @@ def process_webpage_with_QWen(markdown_text, publish_time):
         logging.info(f"为人员 {person_data.get('name_zh', 'Unknown')} 添加了career_path记录: {career_entry}")
         
         # 返回列表格式以保持与其他函数的一致性
+        logging.info(f"process_webpage_with_QWen 函数执行完成,返回 {len([person_data])} 条记录")
         return [person_data]
         
     except Exception as e:
@@ -1233,7 +1271,9 @@ def process_single_markdown_file(minio_path, publish_time, task_id=None, task_ty
         # 直接处理整个文件
         logging.info("直接处理整个markdown文件")
         try:
+            logging.info(f"开始调用 process_webpage_with_QWen 函数处理文件: {minio_path}")
             result = process_webpage_with_QWen(markdown_content, publish_time)
+            logging.info(f"process_webpage_with_QWen 函数执行完成,返回结果: {len(result) if result else 0} 条记录")
             
             parsed_record_ids = []  # 收集成功解析的记录ID
             
@@ -1286,6 +1326,7 @@ def process_single_markdown_file(minio_path, publish_time, task_id=None, task_ty
                         except Exception as record_error:
                             logging.error(f"调用record_parsed_talent函数失败: {str(record_error)}")
             
+            logging.info(f"单个markdown文件处理完成,成功解析 {len(result) if result else 0} 条记录")
             return {
                 'success': True,
                 'message': '单个markdown文件处理成功',