Преглед изворни кода

调整resume解析,添加resume解析结果的minio路径
调整名片解析,添加名片解析结果的minio路径
调整图片解析,添加图片解析结果的minio路径

maxiaolong пре 3 недеља
родитељ
комит
a7d1b2ff1d

+ 201 - 80
app/api/data_parse/routes.py

@@ -1,4 +1,5 @@
 from flask import jsonify, request, make_response, Blueprint, current_app, send_file
+from datetime import datetime
 from app.api.data_parse import bp
 from app.core.data_parse.parse_system import (
     update_business_card, 
@@ -1623,70 +1624,132 @@ def execute_parse_task():
     - 杂项: batch_process_images
     
     请求参数:
-    - task_type (str): 任务类型,可选值:'名片', '简历', '新任命', '招聘', '杂项'
-    - data (list): 数据列表,根据task_type不同,数据格式不同
-    - publish_time (str, optional): 发布时间,仅新任命任务需要
+    - data (dict): 包含完整任务信息的对象,格式如下:
+        {
+            "id": 123,
+            "task_name": "parse_task_20241201_a1b2c3d4",
+            "task_status": "待解析",
+            "task_type": "名片",
+            "task_source": [
+                {
+                    "original_filename": "张三名片.jpg",
+                    "minio_path": "https://192.168.3.143:9000/dataops-platform/talent_photos/20241201_001234_张三名片.jpg",
+                    "status": "正常"
+                }
+            ],
+            "collection_count": 2,
+            "parse_count": 0,
+            "parse_result": null,
+            "created_at": "2024-12-01 10:30:45",
+            "created_by": "api_user",
+            "updated_at": "2024-12-01 10:30:45",
+            "updated_by": "api_user"
+        }
+        
+    对于新任命类型,task_source中的每个对象还需要包含publish_time字段:
+        {
+            "publish_time": "20250731",
+            "original_filename": "张三任命.md",
+            "minio_path": "https://192.168.3.143:9000/dataops-platform/appointment_files/20241201_001234_张三任命.md",
+            "status": "正常"
+        }
     """
     try:
         # 获取请求数据
-        data = request.get_json()
+        request_data = request.get_json()
         
-        if not data:
+        if not request_data:
             return jsonify({
                 'success': False,
                 'message': '请求数据不能为空',
                 'data': None
             }), 400
         
+        # 验证请求数据格式
+        if not isinstance(request_data, dict) or 'data' not in request_data:
+            return jsonify({
+                'success': False,
+                'message': '请求数据格式错误,必须包含data字段',
+                'data': None
+            }), 400
+        
+        # 获取任务数据
+        task_data = request_data.get('data')
+        if not task_data:
+            return jsonify({
+                'success': False,
+                'message': '任务数据不能为空',
+                'data': None
+            }), 400
+        
+        # 验证任务数据格式
+        if not isinstance(task_data, dict):
+            return jsonify({
+                'success': False,
+                'message': '任务数据必须是对象格式',
+                'data': None
+            }), 400
+        
         # 获取任务类型
-        task_type = data.get('task_type', '').strip()
+        task_type = task_data.get('task_type', '').strip()
         if not task_type:
             return jsonify({
                 'success': False,
-                'message': 'task_type参数不能为空',
+                'message': '任务类型不能为空',
                 'data': None
             }), 400
         
-        # 获取数据列表
-        task_data = data.get('data')
-        if not task_data:
+        # 获取任务源数据
+        task_source = task_data.get('task_source', [])
+        if not task_source:
             return jsonify({
                 'success': False,
-                'message': 'data参数不能为空',
+                'message': '任务源数据不能为空',
                 'data': None
             }), 400
         
+        # 验证任务源数据格式
+        if not isinstance(task_source, list):
+            return jsonify({
+                'success': False,
+                'message': '任务源数据必须是数组格式',
+                'data': None
+            }), 400
+        
+        # 获取任务ID
+        task_id = task_data.get('id')
+        
         # 根据任务类型执行相应的处理函数
         try:
             if task_type == '名片':
                 # 调用名片批量处理函数
-                result = batch_process_business_card_images(task_data)
+                result = batch_process_business_card_images(task_source, task_id, task_type)
                 
             elif task_type == '简历':
                 # 调用简历批量处理函数
-                result = batch_parse_resumes(task_data)
+                result = batch_parse_resumes(task_source, task_id, task_type)
                 
             elif task_type == '新任命':
-                # 获取发布时间参数
-                publish_time = data.get('publish_time', '')
-                if not publish_time:
-                    return jsonify({
-                        'success': False,
-                        'message': '新任命任务需要提供publish_time参数',
-                        'data': None
-                    }), 400
+                # 验证新任命任务的publish_time字段
+                for source_item in task_source:
+                    if not isinstance(source_item, dict) or 'publish_time' not in source_item:
+                        return jsonify({
+                            'success': False,
+                            'message': '新任命任务的每个源数据必须包含publish_time字段',
+                            'data': None
+                        }), 400
                 
                 # 调用新任命批量处理函数
-                result = batch_process_md(task_data, publish_time)
+                result = batch_process_md(task_source, task_id=task_id, task_type=task_type)
                 
             elif task_type == '招聘':
                 # 调用招聘数据批量处理函数
-                result = batch_process_menduner_data(task_data)
+                result = batch_process_menduner_data(task_source, task_id, task_type)
                 
             elif task_type == '杂项':
                 # 调用图片批量处理函数(表格类型)
-                process_type = data.get('process_type', 'table')
-                result = batch_process_images(task_data, process_type)
+                process_type = request_data.get('process_type', 'table')
+                result = batch_process_images(task_source, process_type, task_id, task_type)
                 
             else:
                 return jsonify({
@@ -1696,8 +1759,7 @@ def execute_parse_task():
                 }), 400
             
             # 记录处理结果日志并更新任务状态
-            from app.core.data_parse.parse_system import db, ParseTaskRepository, record_parsed_talents
-            task_id = data.get('id')
+            from app.core.data_parse.parse_system import db, ParseTaskRepository
             task_obj = None
             
             if task_id:
@@ -1707,59 +1769,75 @@ def execute_parse_task():
             if result.get('success'):
                 logging.info(f"执行{task_type}解析任务成功: {result.get('message', '')}")
                 
-                # 检查是否有部分成功的情况
-                has_partial_success = False
-                if 'code' in result and result['code'] == 206:
-                    has_partial_success = True
-                elif 'summary' in result.get('data', {}):
-                    summary = result['data']['summary']
-                    if summary.get('failed_count', 0) > 0 and summary.get('success_count', 0) > 0:
-                        has_partial_success = True
+                # 获取解析结果数据
+                result_data = result.get('data', {})
+                success_count = result_data.get('success_count', 0)
+                failed_count = result_data.get('failed_count', 0)
+                # 对于新任命类型,parsed_record_ids在process_single_markdown_file中已经处理
+                parsed_record_ids = result_data.get('parsed_record_ids', [])
                 
-                # 设置任务状态
+                # 确定任务状态
+                if failed_count == 0:
+                    task_status = '解析成功'
+                elif success_count > 0:
+                    task_status = '部分解析成功'
+                else:
+                    task_status = '不成功'
+                
+                # 更新任务记录
                 if task_obj:
-                    if has_partial_success:
-                        task_obj.task_status = '部分成功'
+                    task_obj.task_status = task_status
+                    task_obj.parse_count = success_count
+                    # 对于新任命类型,需要从数据库中查询实际的记录ID
+                    if task_type == '新任命':
+                        try:
+                            from app.core.data_parse.parse_system import ParsedTalent
+                            # 查询该任务相关的所有记录
+                            parsed_records = ParsedTalent.query.filter_by(task_id=task_id, task_type=task_type).all()
+                            record_ids = [str(record.id) for record in parsed_records]
+                            task_obj.parse_result = ','.join(record_ids) if record_ids else ''
+                        except Exception as e:
+                            logging.error(f"查询新任命记录ID失败: {str(e)}")
+                            task_obj.parse_result = ''
                     else:
-                        task_obj.task_status = '解析成功'
-                    task_obj.parse_result = result.get('data')
+                        task_obj.parse_result = ','.join(parsed_record_ids) if parsed_record_ids else ''
+                    task_obj.updated_at = datetime.now()
+                    task_obj.updated_by = 'admin'
                     db.session.commit()
                     logging.info(f"已更新解析任务记录: id={getattr(task_obj, 'id', None)}, 状态={task_obj.task_status}")
                 
-                # 调用record_parsed_talents函数将解析结果写入parsed_talents表
-                try:
-                    # 为result添加任务信息
-                    result_with_task_info = result.copy()
-                    if 'data' in result_with_task_info:
-                        result_with_task_info['data'] = result_with_task_info['data'].copy() if isinstance(result_with_task_info['data'], dict) else {}
-                        result_with_task_info['data']['task_id'] = str(task_id) if task_id else ''
-                        result_with_task_info['data']['task_type'] = task_type
-                    
-                    record_result = record_parsed_talents(result_with_task_info)
-                    if record_result.get('success'):
-                        logging.info(f"成功将解析结果写入parsed_talents表: {record_result.get('message', '')}")
-                    else:
-                        logging.warning(f"写入parsed_talents表失败: {record_result.get('message', '')}")
-                except Exception as record_error:
-                    logging.error(f"调用record_parsed_talents函数失败: {str(record_error)}")
+                # 构建返回数据,按照请求参数格式返回
+                return_data = task_data.copy() if task_data else {}
                 
-                # 构建返回数据,格式与add-parse-task保持一致
-                if task_obj:
-                    return_data = task_obj.to_dict()
+                # 对于新任命类型,需要从数据库中查询实际的记录ID
+                if task_type == '新任命':
+                    try:
+                        from app.core.data_parse.parse_system import ParsedTalent
+                        # 查询该任务相关的所有记录
+                        parsed_records = ParsedTalent.query.filter_by(task_id=task_id, task_type=task_type).all()
+                        record_ids = [str(record.id) for record in parsed_records]
+                        parse_result = ','.join(record_ids) if record_ids else ''
+                    except Exception as e:
+                        logging.error(f"查询新任命记录ID失败: {str(e)}")
+                        parse_result = ''
                 else:
-                    # 如果没有找到任务记录,返回简化的成功信息
-                    return_data = {
-                        'success': True,
-                        'message': result.get('message', '解析完成'),
-                        'task_type': task_type,
-                        'parse_result': result.get('data')
-                    }
+                    parse_result = ','.join(parsed_record_ids) if parsed_record_ids else ''
+                
+                return_data.update({
+                    'task_status': task_status,
+                    'parse_count': success_count,
+                    'parse_result': parse_result,
+                    'updated_at': datetime.now().isoformat(),
+                    'updated_by': 'admin'
+                })
                 
                 # 确定HTTP状态码
-                if has_partial_success:
+                if failed_count == 0:
+                    status_code = 200  # 完全成功
+                elif success_count > 0:
                     status_code = 206  # 部分成功
                 else:
-                    status_code = 200  # 完全成功
+                    status_code = 500  # 完全失败
                 
                 return jsonify({
                     'success': True,
@@ -1773,14 +1851,27 @@ def execute_parse_task():
                 # 设置任务状态为不成功
                 if task_obj:
                     task_obj.task_status = '不成功'
-                    task_obj.parse_result = result.get('data')
+                    task_obj.parse_count = 0
+                    task_obj.parse_result = ''
+                    task_obj.updated_at = datetime.now()
+                    task_obj.updated_by = 'admin'
                     db.session.commit()
                     logging.info(f"已更新解析任务记录: id={getattr(task_obj, 'id', None)}, 状态=不成功")
                 
+                # 构建返回数据,按照请求参数格式返回
+                return_data = task_data.copy() if task_data else {}
+                return_data.update({
+                    'task_status': '不成功',
+                    'parse_count': 0,
+                    'parse_result': '',
+                    'updated_at': datetime.now().isoformat(),
+                    'updated_by': 'admin'
+                })
+                
                 return jsonify({
                     'success': False,
                     'message': result.get('message', '解析失败'),
-                    'data': None
+                    'data': return_data
                 }), 500
             
         except Exception as process_error:
@@ -1814,21 +1905,51 @@ def add_parsed_talents_route():
     请求参数:
         - 请求体: 包含任务ID和人才数据的JSON对象 (JSON格式)
           - task_id: 任务ID,用于更新任务状态(可选)
+          - task_type: 任务类型(可选)
           - data: 包含人才解析结果的数据对象
         
-    请求体示例:
+    请求体格式(严格按照样例格式):
         {
-           "task_id": 123,
+           "task_id": "119",
+           "task_type": "名片",
            "data": {
                "results": [
                     {
-                        "index": 0,
-                        "success": true,
-                        "data": {
-                            "name_zh": "张三",
-                            "title_zh": "经理",
-                            "hotel_zh": "某酒店"
-                        }
+                        "name_zh": "王仁",
+                        "name_en": "Owen Wang",
+                        "title_zh": "总经理",
+                        "title_en": "General Manager",
+                        "mobile": "+86 138 1685 0647",
+                        "phone": null,
+                        "email": "rwang5@urcove-hotels.com",
+                        "hotel_zh": "上海静安逸扉酒店",
+                        "hotel_en": "UrCove by HYATT Shanghai Jing'an",
+                        "brand_zh": null,
+                        "brand_en": null,
+                        "affiliation_zh": null,
+                        "affiliation_en": null,
+                        "brand_group": "UrCove, HYATT",
+                        "address_zh": "中国上海市静安区武定西路1185号",
+                        "address_en": "No.1185 West Wuding Road, Jing'an District",
+                        "postal_code_zh": "200042",
+                        "postal_code_en": "200042",
+                        "birthday": null,
+                        "residence": null,
+                        "age": 0,
+                        "native_place": null,
+                        "talent_profile": "测试用名片",
+                        "career_path": [
+                            {
+                                "date": "2025-08-01",
+                                "hotel_en": "UrCove by HYATT Shanghai Jing'an",
+                                "hotel_zh": "上海静安逸扉酒店",
+                                "image_path": "",
+                                "source": "business_card_creation",
+                                "title_en": "General Manager",
+                                "title_zh": "总经理"
+                            }
+                        ],
+                        "minio_path": "http://example.com/path/to/image.jpg"  // 可选字段
                     }
                 ]
             }
@@ -1839,7 +1960,7 @@ def add_parsed_talents_route():
         
     功能说明:
         - 接收包含人才数据的请求体
-        - 处理 results 数组中的人才数据
+        - 严格按照样例格式处理 results 数组中的人才数据
         - 调用 add_single_talent 函数将人才信息写入 business_cards 表
         - 成功处理后,更新对应任务记录状态为"已入库"
         - 提供详细的处理统计和结果追踪

+ 5 - 0
app/config/config.py

@@ -56,6 +56,11 @@ class BaseConfig:
     QWEN_API_KEY = os.environ.get('QWEN_API_KEY', "sk-db68e37f00974031935395315bfe07f0")
     QWEN_API_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation"
     
+    # Qwen API配置 - 用于文本生成(替代Deepseek)
+    QWEN_TEXT_API_KEY = os.environ.get('QWEN_TEXT_API_KEY', "sk-db68e37f00974031935395315bfe07f0")
+    QWEN_TEXT_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
+    QWEN_TEXT_MODEL = "qwen-turbo"
+    
     # 日志基础配置
     LOG_FORMAT = '%(asctime)s - %(levelname)s - %(filename)s - %(funcName)s - %(lineno)s - %(message)s'
     LOG_ENCODING = 'UTF-8'

+ 70 - 19
app/core/data_parse/parse_card.py

@@ -540,12 +540,14 @@ def delete_business_card(card_id):
         }
 
 
-def batch_process_business_card_images(minio_paths_json):
+def batch_process_business_card_images(minio_paths_json, task_id=None, task_type=None):
     """
     批量处理名片图片,从MinIO下载图片并进行解析
     
     Args:
-        minio_paths_json (list): 包含MinIO对象访问地址的JSON数组
+        minio_paths_json (list): 包含MinIO对象访问地址的JSON数组,可以是字符串数组或字典数组
+        task_id (str, optional): 任务ID
+        task_type (str, optional): 任务类型
         
     Returns:
         dict: 批量处理结果,包含所有解析结果的数组
@@ -583,10 +585,41 @@ def batch_process_business_card_images(minio_paths_json):
         results = []
         success_count = 0
         failed_count = 0
+        parsed_record_ids = []  # 收集成功解析的记录ID
         
         # 逐一处理每个MinIO路径
-        for i, minio_path in enumerate(minio_paths_json):
+        for i, item in enumerate(minio_paths_json):
             try:
+                # 处理输入格式:支持字符串或字典格式
+                if isinstance(item, dict):
+                    minio_path = item.get('minio_path')
+                    original_filename = item.get('original_filename', '')
+                    status = item.get('status', '')
+                    if not minio_path:
+                        failed_count += 1
+                        results.append({
+                            'index': i,
+                            'minio_path': str(item),
+                            'success': False,
+                            'error': f'字典中缺少minio_path字段: {item}',
+                            'data': None
+                        })
+                        continue
+                elif isinstance(item, str):
+                    minio_path = item
+                    original_filename = ''
+                    status = ''
+                else:
+                    failed_count += 1
+                    results.append({
+                        'index': i,
+                        'minio_path': str(item),
+                        'success': False,
+                        'error': f'不支持的数据格式: {type(item)}',
+                        'data': None
+                    })
+                    continue
+                
                 logging.info(f"处理第 {i+1}/{len(minio_paths_json)} 个文件: {minio_path}")
                 
                 # 解析MinIO URL获取对象路径
@@ -620,7 +653,7 @@ def batch_process_business_card_images(minio_paths_json):
                         continue
                     
                     # 获取文件名和内容类型
-                    filename = object_key.split('/')[-1]
+                    filename = original_filename if original_filename else object_key.split('/')[-1]
                     content_type = _get_content_type_by_filename(filename)
                     
                     # 创建FileStorage对象模拟上传的文件
@@ -635,6 +668,23 @@ def batch_process_business_card_images(minio_paths_json):
                     process_result = process_business_card_image(file_storage)
                     
                     if process_result.get('success', False):
+                        # 记录成功解析的人才信息到parsed_talents表
+                        try:
+                            from app.core.data_parse.parse_task import record_parsed_talent
+                            talent_data = process_result.get('data')
+                            if talent_data and isinstance(talent_data, dict):
+                                record_result = record_parsed_talent(talent_data, task_id, task_type)
+                                if record_result.get('success'):
+                                    # 收集成功解析的记录ID
+                                    parsed_record = record_result.get('data', {})
+                                    if parsed_record and 'id' in parsed_record:
+                                        parsed_record_ids.append(str(parsed_record['id']))
+                                    logging.info(f"成功记录人才信息到parsed_talents表: {talent_data.get('name_zh', '')}")
+                                else:
+                                    logging.warning(f"记录人才信息失败: {record_result.get('message', '')}")
+                        except Exception as record_error:
+                            logging.error(f"调用record_parsed_talent函数失败: {str(record_error)}")
+                        
                         success_count += 1
                         results.append({
                             'index': i,
@@ -679,44 +729,45 @@ def batch_process_business_card_images(minio_paths_json):
                 logging.error(error_msg, exc_info=True)
                 results.append({
                     'index': i,
-                    'minio_path': minio_path,
+                    'minio_path': str(item) if isinstance(item, (str, dict)) else 'unknown',
                     'success': False,
                     'error': error_msg,
                     'data': None
                 })
         
         # 组装最终结果
-        batch_result = {
-            'summary': {
-                'total_files': len(minio_paths_json),
-                'success_count': success_count,
-                'failed_count': failed_count,
-                'success_rate': round((success_count / len(minio_paths_json)) * 100, 2) if len(minio_paths_json) > 0 else 0
-            },
-            'results': results,
-            'processed_time': datetime.now().isoformat()
-        }
-        
         if failed_count == 0:
             return {
                 'code': 200,
                 'success': True,
                 'message': f'批量处理完成,全部 {success_count} 个文件处理成功',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
         elif success_count == 0:
             return {
                 'code': 500,
                 'success': False,
                 'message': f'批量处理失败,全部 {failed_count} 个文件处理失败',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
         else:
             return {
                 'code': 206,  # Partial Content
                 'success': True,
                 'message': f'批量处理部分成功,成功 {success_count} 个,失败 {failed_count} 个',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
             
     except Exception as e:

+ 34 - 15
app/core/data_parse/parse_menduner.py

@@ -366,12 +366,14 @@ def validate_menduner_data(data: Dict[str, Any]) -> Dict[str, Any]:
         }
 
 
-def batch_process_menduner_data(data_list: List[Dict[str, Any]]) -> Dict[str, Any]:
+def batch_process_menduner_data(data_list: List[Dict[str, Any]], task_id=None, task_type=None) -> Dict[str, Any]:
     """
     批量处理门墩儿人才数据
     
     Args:
         data_list (List[Dict[str, Any]]): 待处理的人才数据列表
+        task_id (str, optional): 任务ID
+        task_type (str, optional): 任务类型
         
     Returns:
         Dict[str, Any]: 批量处理结果,格式与parse_result保持一致
@@ -405,6 +407,7 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]]) -> Dict[str, An
         results = []
         success_count = 0
         failed_count = 0
+        parsed_record_ids = []  # 收集成功解析的记录ID
         
         logging.info(f"开始批量处理门墩儿人才数据,共 {len(data_list)} 条记录")
         
@@ -420,6 +423,21 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]]) -> Dict[str, An
                 validation = validate_menduner_data(normalized)
                 
                 if validation.get('is_valid', False):
+                    # 记录成功解析的人才信息到parsed_talents表
+                    try:
+                        from app.core.data_parse.parse_task import record_parsed_talent
+                        record_result = record_parsed_talent(normalized, task_id, task_type)
+                        if record_result.get('success'):
+                            # 收集成功解析的记录ID
+                            parsed_record = record_result.get('data', {})
+                            if parsed_record and 'id' in parsed_record:
+                                parsed_record_ids.append(str(parsed_record['id']))
+                            logging.info(f"成功记录人才信息到parsed_talents表: {normalized.get('name_zh', '')}")
+                        else:
+                            logging.warning(f"记录人才信息失败: {record_result.get('message', '')}")
+                    except Exception as record_error:
+                        logging.error(f"调用record_parsed_talent函数失败: {str(record_error)}")
+                    
                     success_count += 1
                     results.append({
                         "data": normalized,
@@ -463,37 +481,38 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]]) -> Dict[str, An
                 })
         
         # 组装最终结果
-        batch_result = {
-            'summary': {
-                'total_files': len(data_list),
-                'success_count': success_count,
-                'failed_count': failed_count,
-                'success_rate': round((success_count / len(data_list)) * 100, 2) if len(data_list) > 0 else 0
-            },
-            'results': results,
-            'processed_time': datetime.now().isoformat()
-        }
-        
         if failed_count == 0:
             return {
                 'code': 200,
                 'success': True,
                 'message': f'批量处理完成,全部 {success_count} 个文件处理成功',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
         elif success_count == 0:
             return {
                 'code': 500,
                 'success': False,
                 'message': f'批量处理失败,全部 {failed_count} 个文件处理失败',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
         else:
             return {
                 'code': 206,  # Partial Content
                 'success': True,
                 'message': f'批量处理部分成功,成功 {success_count} 个,失败 {failed_count} 个',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
             
     except Exception as e:

+ 122 - 25
app/core/data_parse/parse_pic.py

@@ -858,13 +858,15 @@ def parse_table_with_qwen(base64_image: str) -> List[Dict[str, Any]]:
         raise Exception(error_msg)
 
 
-def batch_process_images(image_paths: List[str], process_type: str = 'table') -> Dict[str, Any]:
+def batch_process_images(image_paths: List[Any], process_type: str = 'table', task_id=None, task_type=None) -> Dict[str, Any]:
     """
     批量处理图片
     
     Args:
-        image_paths (List[str]): 图片路径列表
+        image_paths (List[Any]): 图片路径列表,可以是字符串数组或字典数组
         process_type (str): 处理类型,只支持 'table'
+        task_id (str, optional): 任务ID
+        task_type (str, optional): 任务类型
         
     Returns:
         Dict[str, Any]: 批量处理结果,格式与parse_result保持一致
@@ -911,12 +913,67 @@ def batch_process_images(image_paths: List[str], process_type: str = 'table') ->
         results = []
         success_count = 0
         failed_count = 0
+        parsed_record_ids = []  # 收集成功解析的记录ID
         
         logging.info(f"开始批量处理图片,共 {len(image_paths)} 个文件")
         
         # 逐一处理每个图片路径
-        for i, image_path in enumerate(image_paths):
+        for i, item in enumerate(image_paths):
             try:
+                # 处理输入格式:支持字符串或字典格式
+                if isinstance(item, dict):
+                    image_path = item.get('minio_path')
+                    original_filename = item.get('original_filename', '')
+                    status = item.get('status', '')
+                    
+                    # 确保image_path是字符串类型
+                    if not image_path:
+                        failed_count += 1
+                        results.append({
+                            "data": None,
+                            "error": f"字典中缺少minio_path字段: {item}",
+                            "filename": str(item),
+                            "index": i,
+                            "message": "图片路径格式无效",
+                            "minio_path": "",
+                            "object_key": "",
+                            "success": False
+                        })
+                        logging.warning(f"第 {i+1} 个文件缺少minio_path字段")
+                        continue
+                    elif not isinstance(image_path, str):
+                        failed_count += 1
+                        results.append({
+                            "data": None,
+                            "error": f"minio_path字段不是字符串类型: {type(image_path)}",
+                            "filename": str(item),
+                            "index": i,
+                            "message": "图片路径格式无效",
+                            "minio_path": "",
+                            "object_key": "",
+                            "success": False
+                        })
+                        logging.warning(f"第 {i+1} 个文件minio_path字段类型错误")
+                        continue
+                elif isinstance(item, str):
+                    image_path = item
+                    original_filename = ''
+                    status = ''
+                else:
+                    failed_count += 1
+                    results.append({
+                        "data": None,
+                        "error": f"不支持的数据格式: {type(item)}",
+                        "filename": str(item),
+                        "index": i,
+                        "message": "图片路径格式无效",
+                        "minio_path": "",
+                        "object_key": "",
+                        "success": False
+                    })
+                    logging.warning(f"第 {i+1} 个文件格式无效")
+                    continue
+                
                 logging.info(f"处理第 {i+1}/{len(image_paths)} 个文件: {image_path}")
                 
                 # 调用表格处理函数
@@ -929,15 +986,36 @@ def batch_process_images(image_paths: List[str], process_type: str = 'table') ->
                     if extracted_data and isinstance(extracted_data, list):
                         # 为每个人员创建一个结果记录
                         for person_idx, person_data in enumerate(extracted_data):
+                            # 记录成功解析的人才信息到parsed_talents表
+                            try:
+                                from app.core.data_parse.parse_task import record_parsed_talent
+                                record_result = record_parsed_talent(person_data, task_id, task_type)
+                                if record_result.get('success'):
+                                    # 收集成功解析的记录ID
+                                    parsed_record = record_result.get('data', {})
+                                    if parsed_record and 'id' in parsed_record:
+                                        parsed_record_ids.append(str(parsed_record['id']))
+                                    logging.info(f"成功记录人才信息到parsed_talents表: {person_data.get('name_zh', '')}")
+                                else:
+                                    logging.warning(f"记录人才信息失败: {record_result.get('message', '')}")
+                            except Exception as record_error:
+                                logging.error(f"调用record_parsed_talent函数失败: {str(record_error)}")
+                            
                             success_count += 1
                             # 构建完整的MinIO URL路径
-                            relative_path = f"misc_files/{os.path.basename(image_path)}" if image_path else f'misc_files/file_{i}.jpg'
+                            if original_filename:
+                                filename = original_filename
+                            elif isinstance(image_path, str) and image_path:
+                                filename = os.path.basename(image_path)
+                            else:
+                                filename = f'table_file_{i}.jpg'
+                            relative_path = f"misc_files/{filename}"
                             complete_minio_path = f"{minio_url}/{minio_bucket}/{relative_path}"
                             
                             results.append({
                                 "data": person_data,
                                 "error": None,
-                                "filename": os.path.basename(image_path) if image_path else f'table_file_{i}.jpg',
+                                "filename": filename,
                                 "index": len(results),  # 使用连续的索引
                                 "message": "表格图片解析成功",
                                 "minio_path": complete_minio_path,
@@ -949,13 +1027,19 @@ def batch_process_images(image_paths: List[str], process_type: str = 'table') ->
                         # 没有提取到有效数据
                         failed_count += 1
                         # 构建完整的MinIO URL路径
-                        relative_path = f"misc_files/{os.path.basename(image_path)}" if image_path else f'misc_files/file_{i}.jpg'
+                        if original_filename:
+                            filename = original_filename
+                        elif isinstance(image_path, str) and image_path:
+                            filename = os.path.basename(image_path)
+                        else:
+                            filename = f'table_file_{i}.jpg'
+                        relative_path = f"misc_files/{filename}"
                         complete_minio_path = f"{minio_url}/{minio_bucket}/{relative_path}"
                         
                         results.append({
                             "data": None,
                             "error": "未从表格图片中提取到人员信息",
-                            "filename": os.path.basename(image_path) if image_path else f'table_file_{i}.jpg',
+                            "filename": filename,
                             "index": i,
                             "message": "表格图片解析失败",
                             "minio_path": complete_minio_path,
@@ -966,13 +1050,19 @@ def batch_process_images(image_paths: List[str], process_type: str = 'table') ->
                 else:
                     failed_count += 1
                     # 构建完整的MinIO URL路径
-                    relative_path = f"misc_files/{os.path.basename(image_path)}" if image_path else f'misc_files/file_{i}.jpg'
+                    if original_filename:
+                        filename = original_filename
+                    elif isinstance(image_path, str) and image_path:
+                        filename = os.path.basename(image_path)
+                    else:
+                        filename = f'table_file_{i}.jpg'
+                    relative_path = f"misc_files/{filename}"
                     complete_minio_path = f"{minio_url}/{minio_bucket}/{relative_path}"
                     
                     results.append({
                         "data": None,
                         "error": result.get('error', '处理失败'),
-                        "filename": os.path.basename(image_path) if image_path else f'table_file_{i}.jpg',
+                        "filename": filename,
                         "index": i,
                         "message": "表格图片解析失败",
                         "minio_path": complete_minio_path,
@@ -986,13 +1076,19 @@ def batch_process_images(image_paths: List[str], process_type: str = 'table') ->
                 error_msg = f"处理图片失败: {str(item_error)}"
                 logging.error(error_msg, exc_info=True)
                 # 构建完整的MinIO URL路径
-                relative_path = f"misc_files/{os.path.basename(image_path)}" if image_path else f'misc_files/file_{i}.jpg'
+                if original_filename:
+                    filename = original_filename
+                elif isinstance(image_path, str) and image_path:
+                    filename = os.path.basename(image_path)
+                else:
+                    filename = f'table_file_{i}.jpg'
+                relative_path = f"misc_files/{filename}"
                 complete_minio_path = f"{minio_url}/{minio_bucket}/{relative_path}"
                 
                 results.append({
                     "data": None,
                     "error": error_msg,
-                    "filename": os.path.basename(image_path) if image_path else f'table_file_{i}.jpg',
+                    "filename": filename,
                     "index": i,
                     "message": "表格图片解析失败",
                     "minio_path": complete_minio_path,
@@ -1001,37 +1097,38 @@ def batch_process_images(image_paths: List[str], process_type: str = 'table') ->
                 })
         
         # 组装最终结果
-        batch_result = {
-            'summary': {
-                'total_files': len(image_paths),
-                'success_count': success_count,
-                'failed_count': failed_count,
-                'success_rate': round((success_count / len(image_paths)) * 100, 2) if len(image_paths) > 0 else 0
-            },
-            'results': results,
-            'processed_time': datetime.now().isoformat()
-        }
-        
         if failed_count == 0:
             return {
                 'code': 200,
                 'success': True,
                 'message': f'批量处理完成,全部 {success_count} 个文件处理成功',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
         elif success_count == 0:
             return {
                 'code': 500,
                 'success': False,
                 'message': f'批量处理失败,全部 {failed_count} 个文件处理失败',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
         else:
             return {
                 'code': 206,  # Partial Content
                 'success': True,
                 'message': f'批量处理部分成功,成功 {success_count} 个,失败 {failed_count} 个',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
             
     except Exception as e:

+ 58 - 29
app/core/data_parse/parse_resume.py

@@ -121,7 +121,7 @@ def parse_resume_with_qwen(resume_text: str) -> Dict[str, Any]:
 5. 中文酒店/公司名称 (hotel_zh)
 6. 英文酒店/公司名称 (hotel_en)
 7. 手机号码 (mobile) - 如有多个手机号码,使用逗号分隔,最多提取3个
-8. 固定电话 (phone) - 如有多个,使用逗号分隔
+8. 固定电话 (phone) - 如有多个,只提取一个
 9. 电子邮箱 (email)
 10. 中文地址 (address_zh)
 11. 英文地址 (address_en)
@@ -623,12 +623,14 @@ def validate_resume_format(file_path: str) -> bool:
         return False
 
 
-def batch_parse_resumes(file_paths: List[str]) -> Dict[str, Any]:
+def batch_parse_resumes(file_paths: List[str], task_id=None, task_type=None) -> Dict[str, Any]:
     """
     批量解析简历文件
     
     Args:
         file_paths (List[str]): 简历文件路径列表
+        task_id (str, optional): 任务ID
+        task_type (str, optional): 任务类型
         
     Returns:
         Dict[str, Any]: 批量解析结果,格式与parse_result保持一致
@@ -662,15 +664,26 @@ def batch_parse_resumes(file_paths: List[str]) -> Dict[str, Any]:
         results = []
         success_count = 0
         failed_count = 0
+        parsed_record_ids = []  # 收集成功解析的记录ID
         
         logging.info(f"开始批量解析简历文件,共 {len(file_paths)} 个文件")
         
         # 逐一处理每个简历文件
-        for i, file_path in enumerate(file_paths):
+        for i, file_item in enumerate(file_paths):
             try:
-                logging.info(f"处理第 {i+1}/{len(file_paths)} 个文件: {file_path}")
+                # 从文件项中获取minio_path和original_filename
+                if isinstance(file_item, dict):
+                    minio_path = file_item.get('minio_path', '')
+                    original_filename = file_item.get('original_filename', f'resume_{i}.pdf')
+                    file_status = file_item.get('status', '正常')
+                else:
+                    minio_path = file_item
+                    original_filename = _get_filename_from_path(file_item) if file_item else f'resume_{i}.pdf'
+                    file_status = '正常'
+                
+                logging.info(f"处理第 {i+1}/{len(file_paths)} 个文件: {file_item}")
                 
-                result = parse_resume_file(file_path)
+                result = parse_resume_file(minio_path)
                 
                 if result.get('success', False):
                     # 提取并转换为标准名片格式
@@ -700,32 +713,47 @@ def batch_parse_resumes(file_paths: List[str]) -> Dict[str, Any]:
                         "title_zh": resume_data.get('title_zh', '')
                     }
                     
+                    # 记录成功解析的人才信息到parsed_talents表
+                    try:
+                        from app.core.data_parse.parse_task import record_parsed_talent
+                        record_result = record_parsed_talent(standardized_data, task_id, task_type)
+                        if record_result.get('success'):
+                            # 收集成功解析的记录ID
+                            parsed_record = record_result.get('data', {})
+                            if parsed_record and 'id' in parsed_record:
+                                parsed_record_ids.append(str(parsed_record['id']))
+                            logging.info(f"成功记录人才信息到parsed_talents表: {standardized_data.get('name_zh', '')}")
+                        else:
+                            logging.warning(f"记录人才信息失败: {record_result.get('message', '')}")
+                    except Exception as record_error:
+                        logging.error(f"调用record_parsed_talent函数失败: {str(record_error)}")
+                    
                     success_count += 1
                     # 构建完整的MinIO URL路径
-                    relative_path = f"resume_files/{_get_filename_from_path(file_path)}" if file_path else f'resume_files/file_{i}.pdf'
-                    complete_minio_path = f"{minio_url}/{minio_bucket}/{relative_path}"
+                    relative_path = f"resume_files/{original_filename}"
+                    complete_minio_path = minio_path if minio_path.startswith('http') else f"{minio_url}/{minio_bucket}/{relative_path}"
                     
                     results.append({
                         "data": standardized_data,
                         "error": None,
-                        "filename": _get_filename_from_path(file_path) if file_path else f'resume_{i}.pdf',
+                        "filename": original_filename,
                         "index": i,
                         "message": "简历文件解析成功",
                         "minio_path": complete_minio_path,
                         "object_key": relative_path,
                         "success": True
                     })
-                    logging.info(f"成功处理第 {i+1} 个文件: {_get_filename_from_path(file_path)}")
+                    logging.info(f"成功处理第 {i+1} 个文件: {original_filename}")
                 else:
                     failed_count += 1
                     # 构建完整的MinIO URL路径
-                    relative_path = f"resume_files/{_get_filename_from_path(file_path)}" if file_path else f'resume_files/file_{i}.pdf'
-                    complete_minio_path = f"{minio_url}/{minio_bucket}/{relative_path}"
+                    relative_path = f"resume_files/{original_filename}"
+                    complete_minio_path = minio_path if minio_path.startswith('http') else f"{minio_url}/{minio_bucket}/{relative_path}"
                     
                     results.append({
                         "data": None,
                         "error": result.get('error', '处理失败'),
-                        "filename": _get_filename_from_path(file_path) if file_path else f'resume_{i}.pdf',
+                        "filename": original_filename,
                         "index": i,
                         "message": "简历文件解析失败",
                         "minio_path": complete_minio_path,
@@ -739,13 +767,13 @@ def batch_parse_resumes(file_paths: List[str]) -> Dict[str, Any]:
                 error_msg = f"处理简历文件失败: {str(item_error)}"
                 logging.error(error_msg, exc_info=True)
                 # 构建完整的MinIO URL路径
-                relative_path = f"resume_files/{_get_filename_from_path(file_path)}" if file_path else f'resume_files/file_{i}.pdf'
-                complete_minio_path = f"{minio_url}/{minio_bucket}/{relative_path}"
+                relative_path = f"resume_files/{original_filename}"
+                complete_minio_path = minio_path if minio_path.startswith('http') else f"{minio_url}/{minio_bucket}/{relative_path}"
                 
                 results.append({
                     "data": None,
                     "error": error_msg,
-                    "filename": _get_filename_from_path(file_path) if file_path else f'resume_{i}.pdf',
+                    "filename": original_filename,
                     "index": i,
                     "message": "简历文件解析失败",
                     "minio_path": complete_minio_path,
@@ -754,37 +782,38 @@ def batch_parse_resumes(file_paths: List[str]) -> Dict[str, Any]:
                 })
         
         # 组装最终结果
-        batch_result = {
-            'summary': {
-                'total_files': len(file_paths),
-                'success_count': success_count,
-                'failed_count': failed_count,
-                'success_rate': round((success_count / len(file_paths)) * 100, 2) if len(file_paths) > 0 else 0
-            },
-            'results': results,
-            'processed_time': datetime.now().isoformat()
-        }
-        
         if failed_count == 0:
             return {
                 'code': 200,
                 'success': True,
                 'message': f'批量处理完成,全部 {success_count} 个文件处理成功',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
         elif success_count == 0:
             return {
                 'code': 500,
                 'success': False,
                 'message': f'批量处理失败,全部 {failed_count} 个文件处理失败',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
         else:
             return {
                 'code': 206,  # Partial Content
                 'success': True,
                 'message': f'批量处理部分成功,成功 {success_count} 个,失败 {failed_count} 个',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
             
     except Exception as e:

+ 40 - 54
app/core/data_parse/parse_system.py

@@ -241,11 +241,14 @@ minio_bucket = getattr(config, 'MINIO_BUCKET', 'dataops')
 use_ssl = getattr(config, 'MINIO_SECURE', False)
 
 # API密钥配置
-DEEPSEEK_API_KEY = getattr(config, 'DEEPSEEK_API_KEY', '')
-DEEPSEEK_API_URL = getattr(config, 'DEEPSEEK_API_URL', 'https://api.deepseek.com/v1/chat/completions')
 QWEN_API_KEY = getattr(config, 'QWEN_API_KEY', '')
 QWEN_API_URL = getattr(config, 'QWEN_API_URL', 'https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation')
 
+# Qwen文本API配置(替代Deepseek)
+QWEN_TEXT_API_KEY = getattr(config, 'QWEN_TEXT_API_KEY', '')
+QWEN_TEXT_BASE_URL = getattr(config, 'QWEN_TEXT_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1')
+QWEN_TEXT_MODEL = getattr(config, 'QWEN_TEXT_MODEL', 'qwen-turbo')
+
 # OCR配置
 OCR_LANG = getattr(config, 'OCR_LANG', 'chi_sim+eng')
 
@@ -1474,7 +1477,7 @@ def delete_talent_tag(tag_id):
 
 def query_neo4j_graph(query_requirement):
     """
-    查询Neo4j图数据库,通过Deepseek API生成Cypher脚本
+    查询Neo4j图数据库,通过阿里千问API生成Cypher脚本
     
     Args:
         query_requirement (str): 查询需求描述
@@ -1485,12 +1488,19 @@ def query_neo4j_graph(query_requirement):
     try:
         # 导入必要的模块
         from app.services.neo4j_driver import neo4j_driver
-        import requests
+        from openai import OpenAI
         import json
         
-        # Deepseek API配置
-        api_key = DEEPSEEK_API_KEY
-        api_url = DEEPSEEK_API_URL
+        # 阿里千问API配置
+        api_key = QWEN_TEXT_API_KEY
+        base_url = QWEN_TEXT_BASE_URL
+        model_name = QWEN_TEXT_MODEL
+        
+        # 初始化OpenAI客户端(配置为阿里云API)
+        client = OpenAI(
+            api_key=api_key,
+            base_url=base_url,
+        )
         
         # 步骤1: 从Neo4j获取所有标签列表
         logging.info("第一步:从Neo4j获取人才类别的标签列表")
@@ -1508,8 +1518,8 @@ def query_neo4j_graph(query_requirement):
         
         logging.info(f"获取到{len(all_labels)}个人才标签: {all_labels}")
         
-        # 步骤2: 使用Deepseek判断查询需求中的关键信息与标签的对应关系
-        logging.info("第二步:调用Deepseek API匹配查询需求与标签")
+        # 步骤2: 使用阿里千问判断查询需求中的关键信息与标签的对应关系
+        logging.info("第二步:调用阿里千问API匹配查询需求与标签")
         
         # 构建所有标签的JSON字符串
         labels_json = json.dumps(all_labels, ensure_ascii=False)
@@ -1530,34 +1540,26 @@ def query_neo4j_graph(query_requirement):
         3. 如果没有找到匹配的标签,请返回空数组 []
         """
         
-        # 调用Deepseek API匹配标签
-        headers = {
-            "Authorization": f"Bearer {api_key}",
-            "Content-Type": "application/json"
-        }
+        # 调用阿里千问API匹配标签
+        logging.info("发送请求到阿里千问API匹配标签:"+matching_prompt)
         
-        payload = {
-            "model": "deepseek-chat",
-            "messages": [
+        completion = client.chat.completions.create(
+            model=model_name,
+            messages=[
                 {"role": "system", "content": "你是一个专业的文本分析和匹配专家。"},
                 {"role": "user", "content": matching_prompt}
             ],
-            "temperature": 0.1,
-            "response_format": {"type": "json_object"}
-        }
-        
-        logging.info("发送请求到Deepseek API匹配标签:"+matching_prompt)
-        response = requests.post(api_url, headers=headers, json=payload, timeout=30)
-        response.raise_for_status()
+            temperature=0.1,
+            response_format={"type": "json_object"}
+        )
         
         # 解析API响应
-        result = response.json()
-        matching_content = result.get("choices", [{}])[0].get("message", {}).get("content", "[]")
+        matching_content = completion.choices[0].message.content
         
         # 提取JSON数组
         try:
             # 尝试直接解析返回结果,预期格式为 ["新开酒店经验", "五星级酒店", "总经理"]
-            logging.info(f"Deepseek返回的匹配内容: {matching_content}")
+            logging.info(f"阿里千问返回的匹配内容: {matching_content}")
             
             # 如果返回的是JSON字符串,先去除可能的前后缀文本
             if isinstance(matching_content, str):
@@ -1611,7 +1613,7 @@ def query_neo4j_graph(query_requirement):
         ## 图数据库结构
         
         ### 节点
-        1. talent - 人才节点
+        1. Talent - 人才节点
            属性: pg_id(PostgreSQL数据库ID), name_zh(中文姓名), name_en(英文姓名), 
                 mobile(手机号码), email(电子邮箱), updated_at(更新时间)
         
@@ -1619,7 +1621,7 @@ def query_neo4j_graph(query_requirement):
                       
         ### 关系
         BELONGS_TO - 从属关系
-           (talent)-[BELONGS_TO]->(DataLabel) - 人才属于某标签
+           (Talent)-[BELONGS_TO]->(DataLabel) - 人才属于某标签
         
         ## 匹配的标签列表
         [{matched_labels_str}]
@@ -1646,23 +1648,20 @@ def query_neo4j_graph(query_requirement):
         RETURN t.pg_id as pg_id, t.name_zh as name_zh, t.name_en as name_en, t.mobile as mobile, t.email as email, t.updated_at as updated_at
         """
         
-        # 调用Deepseek API生成Cypher脚本
-        payload = {
-            "model": "deepseek-chat",
-            "messages": [
+        # 调用阿里千问API生成Cypher脚本
+        logging.info("发送请求到阿里千问API生成Cypher脚本")
+        
+        completion = client.chat.completions.create(
+            model=model_name,
+            messages=[
                 {"role": "system", "content": "你是一个专业的Neo4j Cypher查询专家。"},
                 {"role": "user", "content": cypher_prompt}
             ],
-            "temperature": 0.1
-        }
-        
-        logging.info("发送请求到Deepseek API生成Cypher脚本")
-        response = requests.post(api_url, headers=headers, json=payload, timeout=30)
-        response.raise_for_status()
+            temperature=0.1
+        )
         
         # 解析API响应
-        result = response.json()
-        cypher_script = result.get("choices", [{}])[0].get("message", {}).get("content", "")
+        cypher_script = completion.choices[0].message.content
         
         # 清理Cypher脚本,移除不必要的markdown格式或注释
         cypher_script = cypher_script.strip()
@@ -1694,19 +1693,6 @@ def query_neo4j_graph(query_requirement):
         
         return response_data
         
-    except requests.exceptions.HTTPError as e:
-        error_msg = f"调用Deepseek API失败: {str(e)}"
-        logging.error(error_msg)
-        if hasattr(e, 'response') and e.response:
-            logging.error(f"错误状态码: {e.response.status_code}")
-            logging.error(f"错误内容: {e.response.text}")
-        
-        return {
-            'code': 500,
-            'success': False,
-            'message': error_msg,
-            'data': []
-        }
     except Exception as e:
         error_msg = f"查询Neo4j图数据库失败: {str(e)}"
         logging.error(error_msg, exc_info=True)

+ 175 - 56
app/core/data_parse/parse_task.py

@@ -1013,7 +1013,8 @@ def add_parsed_talents(api_response_data):
     处理解析任务响应数据,提取人才信息并写入business_cards表
     
     Args:
-        api_response_data (dict): 请求数据,格式为 {"data": {"results": [...]}}
+        api_response_data (dict): 请求数据,严格按照样例格式:
+            {"task_id": "119", "task_type": "名片", "data": {"results": [{"name_zh": "...", ...}]}}
         
     Returns:
         dict: 批量处理结果,格式与其他batch函数保持一致
@@ -1066,91 +1067,75 @@ def add_parsed_talents(api_response_data):
         failed_count = 0
         
         # 逐一处理每个结果项
-        for i, result_item in enumerate(results):
+        for i, talent_data in enumerate(results):
             try:
                 logging.debug(f"处理第 {i+1}/{len(results)} 条记录")
                 
-                # 检查结果项是否成功
-                if not result_item.get('success', False):
+                # 验证人才数据格式
+                if not talent_data or not isinstance(talent_data, dict):
                     failed_count += 1
                     processed_results.append({
                         'index': i,
-                        'original_index': result_item.get('index', i),
                         'success': False,
-                        'error': f"原始解析失败: {result_item.get('error', '未知错误')}",
+                        'error': '人才数据格式无效,必须是字典格式',
                         'data': None
                     })
-                    logging.warning(f"第 {i+1} 条记录原始解析失败,跳过处理")
+                    logging.warning(f"第 {i+1} 条记录人才数据格式无效")
                     continue
                 
-                # 获取人才数据
-                item_data = result_item.get('data')
-                if not item_data:
+                # 检查必要字段
+                if not talent_data.get('name_zh'):
                     failed_count += 1
                     processed_results.append({
                         'index': i,
-                        'original_index': result_item.get('index', i),
                         'success': False,
-                        'error': '结果项中缺少data字段',
+                        'error': '人才数据缺少必要字段name_zh',
                         'data': None
                     })
-                    logging.warning(f"第 {i+1} 条记录缺少data字段")
+                    logging.warning(f"第 {i+1} 条记录缺少name_zh字段")
                     continue
                 
-                # 从结果项中提取人才数据和图片保存地址
-                talent_data = item_data
-                
-                # 根据task_type决定如何提取minio_path
-                if task_type == "招聘":
-                    # 如果task_type为"招聘",从result_item中提取id字段的值
-                    minio_path = str(result_item.get('id', ''))
-                else:
-                    # 其他情况,从result_item中提取minio_path字段
-                    minio_path = result_item.get('minio_path', '')
+                # 获取minio_path(如果存在)
+                minio_path = talent_data.get('minio_path', '')
                 
                 # 处理单个人才数据
-                if talent_data and isinstance(talent_data, dict):
-                    try:
-                        talent_result = add_single_talent(talent_data, minio_path, task_type)
-                        if talent_result.get('success', False):
-                            success_count += 1
-                            processed_results.append({
-                                'index': i,
-                                'original_index': result_item.get('index', i),
-                                'success': True,
-                                'error': None,
-                                'data': talent_result.get('data'),
-                                'message': f'成功处理人员: {talent_data.get("name_zh", "未知")}'
-                            })
-                            logging.debug(f"成功处理第 {i+1} 条记录")
-                        else:
-                            failed_count += 1
-                            processed_results.append({
-                                'index': i,
-                                'original_index': result_item.get('index', i),
-                                'success': False,
-                                'error': talent_result.get('message', '处理失败'),
-                                'data': None
-                            })
-                            logging.error(f"处理第 {i+1} 条记录失败: {talent_result.get('message', '未知错误')}")
-                    except Exception as talent_error:
+                try:
+                    talent_result = add_single_talent(talent_data, minio_path, task_type)
+                    if talent_result.get('success', False):
+                        success_count += 1
+                        processed_results.append({
+                            'index': i,
+                            'success': True,
+                            'error': None,
+                            'data': talent_result.get('data'),
+                            'message': f'成功处理人员: {talent_data.get("name_zh", "未知")}'
+                        })
+                        logging.debug(f"成功处理第 {i+1} 条记录")
+                    else:
                         failed_count += 1
-                        error_msg = f"处理人才数据异常: {str(talent_error)}"
                         processed_results.append({
                             'index': i,
-                            'original_index': result_item.get('index', i),
                             'success': False,
-                            'error': error_msg,
+                            'error': talent_result.get('message', '处理失败'),
                             'data': None
                         })
-                        logging.error(error_msg, exc_info=True)
+                        logging.error(f"处理第 {i+1} 条记录失败: {talent_result.get('message', '未知错误')}")
+                except Exception as talent_error:
+                    failed_count += 1
+                    error_msg = f"处理人才数据异常: {str(talent_error)}"
+                    processed_results.append({
+                        'index': i,
+                        'success': False,
+                        'error': error_msg,
+                        'data': None
+                    })
+                    logging.error(error_msg, exc_info=True)
                     
             except Exception as item_error:
                 failed_count += 1
                 error_msg = f"处理结果项异常: {str(item_error)}"
                 processed_results.append({
                     'index': i,
-                    'original_index': result_item.get('index', i),
                     'success': False,
                     'error': error_msg,
                     'data': None
@@ -1169,18 +1154,19 @@ def add_parsed_talents(api_response_data):
             'processed_time': datetime.now().isoformat()
         }
         
+        # 根据处理结果返回相应的状态
         if failed_count == 0:
             return {
                 'code': 200,
                 'success': True,
-                'message': f'批量处理完成,全部 {success_count} 条人才数据写入成功',
+                'message': f'批量处理完成,全部 {success_count} 条记录处理成功',
                 'data': batch_result
             }
         elif success_count == 0:
             return {
                 'code': 500,
                 'success': False,
-                'message': f'批量处理失败,全部 {failed_count} 条人才数据写入失败',
+                'message': f'批量处理失败,全部 {failed_count} 条记录处理失败',
                 'data': batch_result
             }
         else:
@@ -1192,7 +1178,7 @@ def add_parsed_talents(api_response_data):
             }
             
     except Exception as e:
-        error_msg = f"处理人才数据失败: {str(e)}"
+        error_msg = f"批量处理人才数据失败: {str(e)}"
         logging.error(error_msg, exc_info=True)
         
         return {
@@ -1200,4 +1186,137 @@ def add_parsed_talents(api_response_data):
             'success': False,
             'message': error_msg,
             'data': None
+        } 
+
+
+def _clean_field_value(value, field_type='string'):
+    """
+    清理字段值,将空字符串转换为None(适用于数据库字段)
+    
+    Args:
+        value: 原始值
+        field_type: 字段类型 ('string', 'date', 'int')
+        
+    Returns:
+        清理后的值
+    """
+    if value is None:
+        return None
+    
+    if field_type == 'string':
+        return value if value != '' else None
+    elif field_type == 'date':
+        if value == '' or value is None:
+            return None
+        # 如果已经是date对象,直接返回
+        if hasattr(value, 'date'):
+            return value
+        # 如果是字符串,尝试转换为date对象
+        if isinstance(value, str):
+            try:
+                from datetime import datetime
+                return datetime.strptime(value, '%Y-%m-%d').date()
+            except ValueError:
+                # 如果日期格式不正确,返回None
+                return None
+        return value
+    elif field_type == 'int':
+        if value == '' or value is None:
+            return None
+        try:
+            return int(value)
+        except (ValueError, TypeError):
+            return None
+    
+    return value
+
+
+def record_parsed_talent(talent_data, task_id=None, task_type=None):
+    """
+    记录单条解析成功的人才信息到parsed_talents表
+    
+    Args:
+        talent_data (dict): 人才数据字典,包含解析出的人才信息
+        task_id (str, optional): 任务ID
+        task_type (str, optional): 任务类型
+        
+    Returns:
+        dict: 包含操作结果的字典
+    """
+    try:
+        from app.core.data_parse.parse_system import ParsedTalent, db
+        from datetime import datetime
+        
+        # 验证人才数据
+        if not talent_data or not isinstance(talent_data, dict):
+            return {
+                'success': False,
+                'message': '人才数据不能为空且必须是字典格式',
+                'data': None
+            }
+        
+        # 检查必要字段
+        if not talent_data.get('name_zh'):
+            return {
+                'success': False,
+                'message': '人才数据必须包含name_zh字段',
+                'data': None
+            }
+        
+        # 创建ParsedTalent记录
+        parsed_talent = ParsedTalent(
+            name_zh=_clean_field_value(talent_data.get('name_zh', ''), 'string'),
+            name_en=_clean_field_value(talent_data.get('name_en', ''), 'string'),
+            title_zh=_clean_field_value(talent_data.get('title_zh', ''), 'string'),
+            title_en=_clean_field_value(talent_data.get('title_en', ''), 'string'),
+            mobile=_clean_field_value(talent_data.get('mobile', ''), 'string'),
+            phone=_clean_field_value(talent_data.get('phone', ''), 'string'),
+            email=_clean_field_value(talent_data.get('email', ''), 'string'),
+            hotel_zh=_clean_field_value(talent_data.get('hotel_zh', ''), 'string'),
+            hotel_en=_clean_field_value(talent_data.get('hotel_en', ''), 'string'),
+            address_zh=_clean_field_value(talent_data.get('address_zh', ''), 'string'),
+            address_en=_clean_field_value(talent_data.get('address_en', ''), 'string'),
+            postal_code_zh=_clean_field_value(talent_data.get('postal_code_zh', ''), 'string'),
+            postal_code_en=_clean_field_value(talent_data.get('postal_code_en', ''), 'string'),
+            brand_zh=_clean_field_value(talent_data.get('brand_zh', ''), 'string'),
+            brand_en=_clean_field_value(talent_data.get('brand_en', ''), 'string'),
+            affiliation_zh=_clean_field_value(talent_data.get('affiliation_zh', ''), 'string'),
+            affiliation_en=_clean_field_value(talent_data.get('affiliation_en', ''), 'string'),
+            image_path=_clean_field_value(talent_data.get('image_path', ''), 'string'),
+            career_path=talent_data.get('career_path', []),
+            brand_group=_clean_field_value(talent_data.get('brand_group', ''), 'string'),
+            birthday=_clean_field_value(talent_data.get('birthday'), 'date'),
+            residence=_clean_field_value(talent_data.get('residence', ''), 'string'),
+            age=_clean_field_value(talent_data.get('age'), 'int'),
+            native_place=_clean_field_value(talent_data.get('native_place', ''), 'string'),
+            origin_source=talent_data.get('origin_source', []),
+            talent_profile=_clean_field_value(talent_data.get('talent_profile', ''), 'string'),
+            task_id=str(task_id) if task_id else '',
+            task_type=task_type or '',
+            status='待审核',  # 统一设置为待审核状态
+            created_at=datetime.now(),
+            updated_by='system'
+        )
+        
+        # 添加到数据库会话并提交
+        db.session.add(parsed_talent)
+        db.session.commit()
+        
+        logging.info(f"成功记录人才信息到parsed_talents表: {talent_data.get('name_zh', '')}")
+        
+        return {
+            'success': True,
+            'message': '成功记录人才信息',
+            'data': parsed_talent.to_dict()
+        }
+        
+    except Exception as e:
+        db.session.rollback()
+        error_msg = f"记录人才信息失败: {str(e)}"
+        logging.error(error_msg, exc_info=True)
+        
+        return {
+            'success': False,
+            'message': error_msg,
+            'data': None
         } 

+ 76 - 30
app/core/data_parse/parse_web.py

@@ -795,13 +795,15 @@ def _convert_webpage_to_card_format(webpage_data: Dict[str, Any], publish_time:
     return standardized
 
 
-def batch_process_md(markdown_file_list, publish_time):
+def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_type=None):
     """
     批量处理包含多个人员信息的markdown文件
     
     Args:
-        markdown_file_list (list): MinIO对象保存地址组成的数组
-        publish_time (str): 发布时间,用于career_path中的date字段
+        markdown_file_list (list): MinIO对象保存地址组成的数组,每个元素包含publish_time字段
+        publish_time (str, optional): 发布时间,用于career_path中的date字段(已废弃,从task_source中获取)
+        task_id (str, optional): 任务ID
+        task_type (str, optional): 任务类型
         
     Returns:
         dict: 批量处理结果,格式与parse_result保持一致
@@ -820,17 +822,25 @@ def batch_process_md(markdown_file_list, publish_time):
                 }
             }
         
-        if not publish_time or not isinstance(publish_time, str):
-            return {
-                "processed_time": datetime.now().isoformat(),
-                "results": [],
-                "summary": {
-                    "failed_count": len(markdown_file_list),
-                    "success_count": 0,
-                    "success_rate": 0,
-                    "total_files": len(markdown_file_list)
+        # 从task_source中获取publish_time(如果publish_time参数未提供)
+        if not publish_time:
+            # 尝试从第一个元素中获取publish_time
+            if markdown_file_list and len(markdown_file_list) > 0:
+                first_item = markdown_file_list[0]
+                if isinstance(first_item, dict) and 'publish_time' in first_item:
+                    publish_time = first_item['publish_time']
+            
+            if not publish_time:
+                return {
+                    "processed_time": datetime.now().isoformat(),
+                    "results": [],
+                    "summary": {
+                        "failed_count": len(markdown_file_list),
+                        "success_count": 0,
+                        "success_rate": 0,
+                        "total_files": len(markdown_file_list)
+                    }
                 }
-            }
         
         logging.info(f"开始批量处理 {len(markdown_file_list)} 个markdown文件")
         
@@ -840,12 +850,20 @@ def batch_process_md(markdown_file_list, publish_time):
         total_records = 0  # 总记录数(人员数)
         
         # 逐个处理每个markdown文件
-        for i, minio_path in enumerate(markdown_file_list):
+        for i, file_item in enumerate(markdown_file_list):
             try:
+                # 从文件项中获取minio_path和publish_time
+                if isinstance(file_item, dict):
+                    minio_path = file_item.get('minio_path', '')
+                    file_publish_time = file_item.get('publish_time', publish_time)
+                else:
+                    minio_path = file_item
+                    file_publish_time = publish_time
+                
                 logging.info(f"处理第 {i+1}/{len(markdown_file_list)} 个文件: {minio_path}")
                 
                 # 处理单个文件
-                file_result = process_single_markdown_file(minio_path, publish_time)
+                file_result = process_single_markdown_file(minio_path, file_publish_time, task_id, task_type)
                 
                 if file_result.get('success', False):
                     # 提取处理结果中的人员信息
@@ -943,37 +961,35 @@ def batch_process_md(markdown_file_list, publish_time):
                 })
         
         # 组装最终结果
-        batch_result = {
-            'summary': {
-                'total_files': len(markdown_file_list),
-                'success_count': success_count,
-                'failed_count': failed_count,
-                'success_rate': round((success_count / len(markdown_file_list)) * 100, 2) if len(markdown_file_list) > 0 else 0
-            },
-            'results': results,
-            'processed_time': datetime.now().isoformat()
-        }
-        
         if failed_count == 0:
             return {
                 'code': 200,
                 'success': True,
                 'message': f'批量处理完成,全部 {success_count} 个文件处理成功',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count
+                }
             }
         elif success_count == 0:
             return {
                 'code': 500,
                 'success': False,
                 'message': f'批量处理失败,全部 {failed_count} 个文件处理失败',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count
+                }
             }
         else:
             return {
                 'code': 206,  # Partial Content
                 'success': True,
                 'message': f'批量处理部分成功,成功 {success_count} 个,失败 {failed_count} 个',
-                'data': batch_result
+                'data': {
+                    'success_count': success_count,
+                    'failed_count': failed_count
+                }
             }
             
     except Exception as e:
@@ -1150,7 +1166,7 @@ def save_section_to_minio(minio_client, section_content, original_minio_path, se
         return None
 
 
-def process_single_markdown_file(minio_path, publish_time):
+def process_single_markdown_file(minio_path, publish_time, task_id=None, task_type=None):
     """
     处理单个markdown文件,从MinIO获取内容并判断是否需要分割
     
@@ -1194,9 +1210,24 @@ def process_single_markdown_file(minio_path, publish_time):
                 if result:
                     for person in result:
                         person['pic_url'] = minio_path  # 设置原始文件路径
+                        person['image_path'] = minio_path  # 设置image_path
+                        person['origin_source'] = minio_path  # 设置origin_source
                         if 'career_path' in person and person['career_path']:
                             for career_entry in person['career_path']:
                                 career_entry['image_path'] = minio_path  # 设置原始文件路径
+                        
+                        # 记录成功解析的人才信息到parsed_talents表
+                        if task_id and task_type:
+                            try:
+                                from app.core.data_parse.parse_task import record_parsed_talent
+                                standardized_data = _convert_webpage_to_card_format(person, publish_time)
+                                record_result = record_parsed_talent(standardized_data, task_id, task_type)
+                                if record_result.get('success'):
+                                    logging.info(f"成功记录人才信息到parsed_talents表: {person.get('name_zh', '')}")
+                                else:
+                                    logging.warning(f"记录人才信息失败: {record_result.get('message', '')}")
+                            except Exception as record_error:
+                                logging.error(f"调用record_parsed_talent函数失败: {str(record_error)}")
                 
                 return {
                     'success': True,
@@ -1282,9 +1313,24 @@ def process_single_markdown_file(minio_path, publish_time):
                 if section_result:
                     for person in section_result:
                         person['pic_url'] = section_minio_path  # 设置分割后的文件路径
+                        person['image_path'] = section_minio_path  # 设置image_path
+                        person['origin_source'] = section_minio_path  # 设置origin_source
                         if 'career_path' in person and person['career_path']:
                             for career_entry in person['career_path']:
                                 career_entry['image_path'] = section_minio_path  # 设置分割后的文件路径
+                        
+                        # 记录成功解析的人才信息到parsed_talents表
+                        if task_id and task_type:
+                            try:
+                                from app.core.data_parse.parse_task import record_parsed_talent
+                                standardized_data = _convert_webpage_to_card_format(person, publish_time)
+                                record_result = record_parsed_talent(standardized_data, task_id, task_type)
+                                if record_result.get('success'):
+                                    logging.info(f"成功记录人才信息到parsed_talents表: {person.get('name_zh', '')}")
+                                else:
+                                    logging.warning(f"记录人才信息失败: {record_result.get('message', '')}")
+                            except Exception as record_error:
+                                logging.error(f"调用record_parsed_talent函数失败: {str(record_error)}")
                 
                 if section_result:
                     results['processed_sections'] += 1

+ 207 - 0
test_batch_process_images_fix.py

@@ -0,0 +1,207 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+"""
+测试batch_process_images函数的修复
+"""
+
+import sys
+import os
+sys.path.append(os.path.dirname(os.path.abspath(__file__)))
+
+def test_dict_format_input():
+    """测试字典格式输入的处理"""
+    print("测试字典格式输入处理...")
+    
+    # 模拟字典格式的输入(实际传入的格式)
+    test_input = [
+        {
+            'minio_path': 'http://192.168.3.143:9000/dataops-bucket/misc_files/misc_file_20250801_212256_b61ddd94.png',
+            'original_filename': '杂项表格样例.png',
+            'status': '正常'
+        }
+    ]
+    
+    print(f"测试输入格式: {test_input}")
+    
+    # 模拟处理逻辑
+    def simulate_processing(items):
+        processed_count = 0
+        for i, item in enumerate(items):
+            # 处理输入格式:支持字符串或字典格式
+            if isinstance(item, dict):
+                image_path = item.get('minio_path')
+                original_filename = item.get('original_filename', '')
+                status = item.get('status', '')
+                if not image_path:
+                    print(f"第 {i+1} 个文件:缺少minio_path字段")
+                    continue
+            elif isinstance(item, str):
+                image_path = item
+                original_filename = ''
+                status = ''
+            else:
+                print(f"第 {i+1} 个文件:格式无效")
+                continue
+            
+            # 处理成功
+            processed_count += 1
+            print(f"第 {i+1} 个文件:成功处理 {image_path}")
+            print(f"  原始文件名: {original_filename}")
+            print(f"  状态: {status}")
+        
+        return processed_count
+    
+    processed_count = simulate_processing(test_input)
+    assert processed_count == 1, "应该成功处理1个文件"
+    
+    print("✓ 字典格式输入处理验证通过")
+
+def test_string_format_input():
+    """测试字符串格式输入的处理"""
+    print("\n测试字符串格式输入处理...")
+    
+    # 模拟字符串格式的输入(向后兼容)
+    test_input = [
+        'http://192.168.3.143:9000/dataops-bucket/misc_files/test.png'
+    ]
+    
+    print(f"测试输入格式: {test_input}")
+    
+    # 模拟处理逻辑
+    def simulate_processing(items):
+        processed_count = 0
+        for i, item in enumerate(items):
+            # 处理输入格式:支持字符串或字典格式
+            if isinstance(item, dict):
+                image_path = item.get('minio_path')
+                original_filename = item.get('original_filename', '')
+                status = item.get('status', '')
+                if not image_path:
+                    print(f"第 {i+1} 个文件:缺少minio_path字段")
+                    continue
+            elif isinstance(item, str):
+                image_path = item
+                original_filename = ''
+                status = ''
+            else:
+                print(f"第 {i+1} 个文件:格式无效")
+                continue
+            
+            # 处理成功
+            processed_count += 1
+            print(f"第 {i+1} 个文件:成功处理 {image_path}")
+            print(f"  原始文件名: {original_filename}")
+            print(f"  状态: {status}")
+        
+        return processed_count
+    
+    processed_count = simulate_processing(test_input)
+    assert processed_count == 1, "应该成功处理1个文件"
+    
+    print("✓ 字符串格式输入处理验证通过")
+
+def test_invalid_format_input():
+    """测试无效格式输入的处理"""
+    print("\n测试无效格式输入处理...")
+    
+    # 模拟无效格式的输入
+    test_input = [
+        None,
+        {'wrong_field': 'value'},
+        {'minio_path': ''},  # 空路径
+        123  # 数字
+    ]
+    
+    print(f"测试输入格式: {test_input}")
+    
+    # 模拟处理逻辑
+    def simulate_processing(items):
+        processed_count = 0
+        for i, item in enumerate(items):
+            # 处理输入格式:支持字符串或字典格式
+            if isinstance(item, dict):
+                image_path = item.get('minio_path')
+                original_filename = item.get('original_filename', '')
+                status = item.get('status', '')
+                if not image_path:
+                    print(f"第 {i+1} 个文件:缺少minio_path字段或路径为空")
+                    continue
+            elif isinstance(item, str):
+                image_path = item
+                original_filename = ''
+                status = ''
+            else:
+                print(f"第 {i+1} 个文件:格式无效 ({type(item)})")
+                continue
+            
+            # 处理成功
+            processed_count += 1
+            print(f"第 {i+1} 个文件:成功处理 {image_path}")
+        
+        return processed_count
+    
+    processed_count = simulate_processing(test_input)
+    assert processed_count == 0, "应该拒绝所有无效格式"
+    
+    print("✓ 无效格式输入处理验证通过")
+
+def test_filename_extraction():
+    """测试文件名提取逻辑"""
+    print("\n测试文件名提取逻辑...")
+    
+    # 测试用例
+    test_cases = [
+        {
+            'input': {'minio_path': 'http://example.com/file.png', 'original_filename': '原始文件.png'},
+            'expected': '原始文件.png'
+        },
+        {
+            'input': {'minio_path': 'http://example.com/file.png'},
+            'expected': 'file.png'
+        },
+        {
+            'input': 'http://example.com/file.png',
+            'expected': 'file.png'
+        }
+    ]
+    
+    for i, case in enumerate(test_cases):
+        item = case['input']
+        expected = case['expected']
+        
+        # 模拟文件名提取逻辑
+        if isinstance(item, dict):
+            original_filename = item.get('original_filename', '')
+            image_path = item.get('minio_path', '')
+            if original_filename:
+                filename = original_filename
+            elif isinstance(image_path, str) and image_path:
+                filename = os.path.basename(image_path)
+            else:
+                filename = f'table_file_{i}.jpg'
+        elif isinstance(item, str):
+            if item:
+                filename = os.path.basename(item)
+            else:
+                filename = f'table_file_{i}.jpg'
+        else:
+            filename = f'table_file_{i}.jpg'
+        
+        print(f"测试用例 {i+1}: 输入={item}, 期望={expected}, 实际={filename}")
+        assert filename == expected, f"文件名提取错误: 期望 {expected}, 实际 {filename}"
+    
+    print("✓ 文件名提取逻辑验证通过")
+
+if __name__ == "__main__":
+    print("开始测试batch_process_images函数的修复...")
+    
+    try:
+        test_dict_format_input()
+        test_string_format_input()
+        test_invalid_format_input()
+        test_filename_extraction()
+        print("\n所有测试通过!batch_process_images函数修复成功。")
+    except Exception as e:
+        print(f"\n测试失败: {str(e)}")
+        sys.exit(1) 

+ 114 - 0
test_parse_pic_fix.py

@@ -0,0 +1,114 @@
+#!/usr/bin/env python3
+"""
+测试parse_pic.py的修复
+验证batch_process_images函数能正确处理字典格式的输入
+"""
+
+import sys
+import os
+import logging
+
+# 添加项目路径
+sys.path.append(os.path.dirname(os.path.abspath(__file__)))
+
+# 设置日志
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(funcName)s - %(lineno)d - %(message)s')
+
+def test_batch_process_images_fix():
+    """测试batch_process_images函数的修复"""
+    print("测试batch_process_images函数的修复...")
+    
+    try:
+        from app.core.data_parse.parse_pic import batch_process_images
+        
+        # 测试数据 - 模拟错误日志中的输入格式
+        test_input = [
+            {
+                'minio_path': 'http://192.168.3.143:9000/dataops-bucket/misc_files/misc_file_20250801_212256_b61ddd94.png',
+                'original_filename': '杂项表格样例.png',
+                'status': '正常'
+            }
+        ]
+        
+        print(f"测试输入: {test_input}")
+        
+        # 调用函数(这里只是测试参数处理,不会真正处理图片)
+        # 由于没有真实的图片文件,我们期望函数能正确处理参数格式而不抛出类型错误
+        result = batch_process_images(test_input, process_type='table')
+        
+        print(f"函数执行结果: {result}")
+        print("✓ batch_process_images函数修复验证通过")
+        
+    except TypeError as e:
+        if "expected str, bytes or os.PathLike object, not dict" in str(e):
+            print(f"❌ 仍然存在类型错误: {e}")
+            return False
+        else:
+            print(f"其他类型错误: {e}")
+            return False
+    except Exception as e:
+        print(f"其他错误: {e}")
+        return False
+    
+    return True
+
+def test_invalid_input_handling():
+    """测试无效输入的处理"""
+    print("\n测试无效输入处理...")
+    
+    try:
+        from app.core.data_parse.parse_pic import batch_process_images
+        
+        # 测试各种无效输入
+        invalid_inputs = [
+            [{'minio_path': None}],  # minio_path为None
+            [{'minio_path': {}}],    # minio_path为字典
+            [{'minio_path': 123}],   # minio_path为数字
+            [{'wrong_field': 'value'}],  # 缺少minio_path字段
+            [{'minio_path': ''}],    # minio_path为空字符串
+        ]
+        
+        for i, invalid_input in enumerate(invalid_inputs):
+            print(f"测试无效输入 {i+1}: {invalid_input}")
+            try:
+                result = batch_process_images(invalid_input, process_type='table')
+                print(f"  处理结果: {result.get('success', False)}")
+                # 期望处理失败但不抛出类型错误
+                if not result.get('success', False):
+                    print(f"  ✓ 正确处理了无效输入")
+                else:
+                    print(f"  ⚠ 意外成功处理了无效输入")
+            except TypeError as e:
+                if "expected str, bytes or os.PathLike object, not dict" in str(e):
+                    print(f"  ❌ 仍然存在类型错误: {e}")
+                    return False
+                else:
+                    print(f"  ✓ 正确处理了类型错误: {e}")
+            except Exception as e:
+                print(f"  ✓ 正确处理了其他错误: {e}")
+        
+        print("✓ 无效输入处理验证通过")
+        return True
+        
+    except Exception as e:
+        print(f"❌ 测试失败: {e}")
+        return False
+
+if __name__ == "__main__":
+    print("开始测试parse_pic.py的修复...")
+    
+    success = True
+    
+    # 测试基本功能
+    if not test_batch_process_images_fix():
+        success = False
+    
+    # 测试无效输入处理
+    if not test_invalid_input_handling():
+        success = False
+    
+    if success:
+        print("\n🎉 所有测试通过!parse_pic.py修复成功。")
+    else:
+        print("\n❌ 部分测试失败,需要进一步修复。")
+        sys.exit(1) 

+ 44 - 0
人才入库上传数据样例.txt

@@ -0,0 +1,44 @@
+{
+  "task_id": "119",
+  "task_type": "名片",
+  "data": {
+    "results": [
+      {
+        "name_zh": "王仁",
+        "name_en": "Owen Wang",
+        "title_zh": "总经理",
+        "title_en": "General Manager",
+        "mobile": "+86 138 1685 0647",
+        "phone": null,
+        "email": "rwang5@urcove-hotels.com",
+        "hotel_zh": "上海静安逸扉酒店",
+        "hotel_en": "UrCove by HYATT Shanghai Jing'an",
+        "brand_zh": null,
+        "brand_en": null,
+        "affiliation_zh": null,
+        "affiliation_en": null,
+        "brand_group": "UrCove, HYATT",
+        "address_zh": "中国上海市静安区武定西路1185号",
+        "address_en": "No.1185 West Wuding Road, Jing'an District",
+        "postal_code_zh": "200042",
+        "postal_code_en": "200042",
+        "birthday": null,
+        "residence": null,
+        "age": 0,
+        "native_place": null,
+        "talent_profile": "测试用名片",
+        "career_path": [
+          {
+            "date": "2025-08-01",
+            "hotel_en": "UrCove by HYATT Shanghai Jing'an",
+            "hotel_zh": "上海静安逸扉酒店",
+            "image_path": "",
+            "source": "business_card_creation",
+            "title_en": "General Manager",
+            "title_zh": "总经理"
+          }
+        ]
+      }
+    ]
+  }
+}

+ 43 - 0
任务解析请求参数.txt

@@ -0,0 +1,43 @@
+'名片', '简历', '杂项'的请求参数格式
+"data": {
+      "id": 123,
+      "task_name": "parse_task_20241201_a1b2c3d4",
+      "task_status": "待解析",
+      "task_type": "名片",
+      "task_source": [
+         {"original_filename": "张三名片.jpg",
+          "minio_path":"https://192.168.3.143:9000/dataops-platform/talent_photos/20241201_001234_张三名片.jpg",
+         "status":"正常"},
+        {"original_filename": "李四名片.png",
+         "minio_path":"https://192.168.3.143:9000/dataops-platform/talent_photos/20241201_001235_李四名片.png",
+         "status":"出错"}
+        ],
+      "collection_count": 2,
+      "parse_count": 0,
+      "parse_result": null,
+      "created_at": "2024-12-01 10:30:45",
+      "created_by": "api_user",
+      "updated_at": "2024-12-01 10:30:45",
+      "updated_by": "api_user"
+  }
+
+'新任命'的请求参数格式
+"data": {
+      "id": 123,
+      "task_name": "parse_task_20241201_a1b2c3d4",
+      "task_status": "待解析",
+      "task_type": "名片",
+      "task_source": [
+         {"publish_time":"20250731",
+          "original_filename": "张三名片.md",
+          "minio_path":"https://192.168.3.143:9000/dataops-platform/talent_photos/20241201_001234_张三名片.md",
+         "status":"正常"},
+        ],
+      "collection_count": 2,
+      "parse_count": 0,
+      "parse_result": null,
+      "created_at": "2024-12-01 10:30:45",
+      "created_by": "api_user",
+      "updated_at": "2024-12-01 10:30:45",
+      "updated_by": "api_user"
+  }