Jelajahi Sumber

修复markdown文件切分后,task_source更新失败的问题

maxiaolong 2 minggu lalu
induk
melakukan
2fe9905742
2 mengubah file dengan 240 tambahan dan 276 penghapusan
  1. 68 13
      app/core/data_parse/parse_task.py
  2. 172 263
      app/core/data_parse/parse_web.py

+ 68 - 13
app/core/data_parse/parse_task.py

@@ -1419,8 +1419,11 @@ def split_markdown_files(task_id):
                     
                     if not matches:
                         # 没有找到**数字**格式,跳过处理
+                        logging.debug(f"文件 {minio_path} 不包含**数字**格式,跳过处理")
                         continue
                     
+                    logging.info(f"文件 {minio_path} 包含**数字**格式,开始进行拆分处理")
+                    
                     # 获取原始文件名
                     original_filename = item.get('original_filename', '')
                     if not original_filename:
@@ -1432,13 +1435,16 @@ def split_markdown_files(task_id):
                     if base_name.endswith('.md'):
                         base_name = base_name[:-3]
                     
-                    # 切分文件
+                    # 调用_split_markdown_content函数进行切分
                     split_parts = _split_markdown_content(file_content)
                     
                     if len(split_parts) <= 1:
                         # 没有成功切分,跳过
+                        logging.debug(f"文件 {minio_path} 拆分后只有1个或0个部分,跳过处理")
                         continue
                     
+                    logging.info(f"文件 {minio_path} 成功拆分为 {len(split_parts)} 个部分")
+                    
                     # 为每个切分部分创建新的MD文件
                     new_items = []
                     for j, part_content in enumerate(split_parts, 1):
@@ -1452,18 +1458,20 @@ def split_markdown_files(task_id):
                             )
                             
                             if new_minio_path:
-                                # 创建新的task_source元素
+                                # 创建新的task_source元素,status为待解析,parse_flag值为1
                                 new_item = {
                                     'status': '待解析',
-                                    'publish_time':item.get('publish_time', ''),
+                                    'publish_time': item.get('publish_time', ''),
                                     'parse_flag': 1,
                                     'minio_path': new_minio_path,
                                     'original_filename': new_filename
                                 }
                                 new_items.append(new_item)
                                 success_count += 1
+                                logging.info(f"成功创建拆分文件: {new_filename}")
                             else:
                                 failed_count += 1
+                                logging.error(f"上传拆分文件失败: {new_filename}")
                                 
                         except Exception as split_error:
                             logging.error(f"处理切分文件失败: {str(split_error)}")
@@ -1474,6 +1482,7 @@ def split_markdown_files(task_id):
                     
                     # 将原始记录的parse_flag设置为0
                     item['parse_flag'] = 0
+                    logging.info(f"原始文件 {minio_path} 的parse_flag已设置为0")
                     
                 except Exception as download_error:
                     logging.error(f"下载文件失败: {str(download_error)}")
@@ -1483,10 +1492,53 @@ def split_markdown_files(task_id):
                 logging.error(f"处理task_source元素失败: {str(item_error)}")
                 failed_count += 1
         
-        # 更新数据库记录
+        # task_source原有元素都遍历完成后,把task_source修改后的值保存到parse_task_repository数据表里的对应记录中去
         try:
+            # 添加调试信息
+            logging.info(f"准备更新task_id为{task_id}的任务记录")
+            logging.info(f"更新前的task_source长度: {len(task_record.task_source) if task_record.task_source else 0}")
+            logging.info(f"更新后的task_source长度: {len(task_source)}")
+            
+            # 方法1: 使用SQLAlchemy ORM更新
             task_record.task_source = task_source
+            db.session.add(task_record)  # 确保对象在会话中
+            db.session.flush()  # 强制刷新到数据库
+            
+            # 提交事务
             db.session.commit()
+            
+            # 验证更新是否成功
+            db.session.refresh(task_record)
+            logging.info(f"更新后的task_source长度验证: {len(task_record.task_source) if task_record.task_source else 0}")
+            
+            # 如果ORM更新失败,尝试直接SQL更新
+            if len(task_record.task_source) != len(task_source):
+                logging.warning("ORM更新可能失败,尝试直接SQL更新")
+                import json
+                from sqlalchemy import text
+                task_source_json = json.dumps(task_source, ensure_ascii=False)
+                update_sql = text(f"UPDATE parse_task_repository SET task_source = '{task_source_json}' WHERE id = {task_id}")
+                db.session.execute(update_sql)
+                db.session.commit()
+                logging.info("直接SQL更新完成")
+            
+            # 验证更新是否成功 - 直接查询数据库
+            try:
+                from sqlalchemy import text
+                verify_sql = text(f"SELECT task_source FROM parse_task_repository WHERE id = {task_id}")
+                result = db.session.execute(verify_sql).fetchone()
+                if result:
+                    actual_task_source = result[0]
+                    logging.info(f"数据库中的实际task_source长度: {len(actual_task_source) if actual_task_source else 0}")
+                    if len(actual_task_source) != len(task_source):
+                        logging.error(f"数据库更新验证失败!期望长度: {len(task_source)}, 实际长度: {len(actual_task_source)}")
+                    else:
+                        logging.info("数据库更新验证成功")
+                else:
+                    logging.error("无法从数据库查询到更新后的记录")
+            except Exception as verify_error:
+                logging.error(f"验证数据库更新失败: {str(verify_error)}")
+            
             logging.info(f"成功更新task_id为{task_id}的任务记录,切分成功{success_count}个文件,失败{failed_count}个")
         except Exception as update_error:
             logging.error(f"更新任务记录失败: {str(update_error)}")
@@ -1545,18 +1597,21 @@ def _split_markdown_content(content):
     parts = []
     for i in range(len(positions)):
         if i == 0:
-            # 第一个部分:从开始到第一个标记
-            start_pos = 0
-        else:
-            # 其他部分:从上一个标记到当前标记
-            start_pos = positions[i-1]
-        
-        if i == len(positions) - 1:
+            # 第一个部分:从第一个标记到第二个标记
+            start_pos = positions[i]
+            if i + 1 < len(positions):
+                end_pos = positions[i + 1]
+            else:
+                # 如果只有一个标记,从第一个标记到文件末尾
+                end_pos = len(content)
+        elif i == len(positions) - 1:
             # 最后一个部分:从最后一个标记到文件末尾
+            start_pos = positions[i]
             end_pos = len(content)
         else:
-            # 其他部分:到下一个标记
-            end_pos = positions[i]
+            # 其他部分:从当前标记到下一个标记
+            start_pos = positions[i]
+            end_pos = positions[i + 1]
         
         part_content = content[start_pos:end_pos].strip()
         if part_content:

+ 172 - 263
app/core/data_parse/parse_web.py

@@ -800,81 +800,93 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
     批量处理包含多个人员信息的markdown文件
     
     Args:
-        markdown_file_list (list): MinIO对象保存地址组成的数组,每个元素包含publish_time字段
+        markdown_file_list (list): MinIO对象保存地址组成的数组,每个元素包含publish_time字段(已废弃,现在从数据库读取)
         publish_time (str, optional): 发布时间,用于career_path中的date字段(已废弃,从task_source中获取)
-        task_id (str, optional): 任务ID
+        task_id (str, optional): 任务ID,用于从数据库读取task_source
         task_type (str, optional): 任务类型
         
     Returns:
         dict: 批量处理结果,格式与parse_result保持一致
     """
     try:
-        # 参数验证
-        if not markdown_file_list or not isinstance(markdown_file_list, list):
+        # 根据task_id从parse_task_repository表读取记录
+        if not task_id:
             return {
-                "processed_time": datetime.now().isoformat(),
-                "results": [],
-                "summary": {
-                    "failed_count": len(markdown_file_list) if markdown_file_list else 0,
-                    "success_count": 0,
-                    "success_rate": 0,
-                    "total_files": len(markdown_file_list) if markdown_file_list else 0
-                }
+                'code': 400,
+                'success': False,
+                'message': '缺少task_id参数',
+                'data': None
             }
         
-        # 从task_source中获取publish_time(如果publish_time参数未提供)
-        if not publish_time:
-            # 尝试从第一个元素中获取publish_time
-            if markdown_file_list and len(markdown_file_list) > 0:
-                first_item = markdown_file_list[0]
-                if isinstance(first_item, dict) and 'publish_time' in first_item:
-                    publish_time = first_item['publish_time']
-            
-            if not publish_time:
-                return {
-                    "processed_time": datetime.now().isoformat(),
-                    "results": [],
-                    "summary": {
-                        "failed_count": len(markdown_file_list),
-                        "success_count": 0,
-                        "success_rate": 0,
-                        "total_files": len(markdown_file_list)
-                    }
-                }
+        # 导入数据库模型
+        from app.core.data_parse.parse_system import ParseTaskRepository, db
+        
+        # 查询对应的任务记录
+        task_record = ParseTaskRepository.query.get(task_id)
+        if not task_record:
+            return {
+                'code': 404,
+                'success': False,
+                'message': f'未找到task_id为{task_id}的任务记录',
+                'data': None
+            }
         
-        logging.info(f"开始批量处理 {len(markdown_file_list)} 个markdown文件")
+        # 获取task_source作为需要处理的数据列表
+        task_source = task_record.task_source
+        if not task_source or not isinstance(task_source, list):
+            return {
+                'code': 400,
+                'success': False,
+                'message': 'task_source为空或格式不正确',
+                'data': None
+            }
         
         results = []
         success_count = 0
         failed_count = 0
-        total_records = 0  # 总记录数(人员数)
+        parsed_record_ids = []  # 收集成功解析的记录ID
+        
+        logging.info(f"开始批量处理markdown文件,共 {len(task_source)} 条记录")
         
-        # 逐个处理每个markdown文件
-        for i, file_item in enumerate(markdown_file_list):
+        # 逐一处理每条数据
+        for i, data in enumerate(task_source):
             try:
-                # 从文件项中获取minio_path和publish_time
-                if isinstance(file_item, dict):
-                    minio_path = file_item.get('minio_path', '')
-                    file_publish_time = file_item.get('publish_time', publish_time)
-                else:
-                    minio_path = file_item
-                    file_publish_time = publish_time
+                # 只处理parse_flag为1的记录
+                if not isinstance(data, dict) or data.get('parse_flag') != 1:
+                    logging.debug(f"跳过第 {i+1} 条数据,parse_flag不为1或格式不正确")
+                    continue
+                
+                logging.info(f"处理第 {i+1}/{len(task_source)} 条数据")
                 
-                logging.info(f"处理第 {i+1}/{len(markdown_file_list)} 个文件: {minio_path}")
+                # 从数据中获取必要信息
+                minio_path = data.get('minio_path', '')
+                file_publish_time = data.get('publish_time', publish_time)
+                
+                if not minio_path:
+                    logging.warning(f"第 {i+1} 条数据缺少minio_path")
+                    failed_count += 1
+                    # 更新task_source中对应记录的parse_flag和status
+                    data['parse_flag'] = 1
+                    data['status'] = '解析失败'
+                    continue
                 
-                # 处理单个文件
+                # 处理单个markdown文件
                 file_result = process_single_markdown_file(minio_path, file_publish_time, task_id, task_type)
                 
                 if file_result.get('success', False):
                     # 提取处理结果中的人员信息
                     persons_data = file_result.get('data', {}).get('all_results', [])
                     
+                    # 收集parsed_record_ids
+                    file_parsed_record_ids = file_result.get('data', {}).get('parsed_record_ids', [])
+                    if file_parsed_record_ids:
+                        parsed_record_ids.extend(file_parsed_record_ids)
+                    
                     if persons_data and isinstance(persons_data, list):
                         # 为每个人员创建一个结果记录
                         for person_idx, person_data in enumerate(persons_data):
-                            total_records += 1
                             # 转换为标准名片格式
-                            standardized_data = _convert_webpage_to_card_format(person_data, publish_time)
+                            standardized_data = _convert_webpage_to_card_format(person_data, file_publish_time)
                             
                             success_count += 1
                             # 构建完整的MinIO URL路径
@@ -896,9 +908,12 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
                                 "success": True
                             })
                             logging.info(f"成功提取人员 {person_idx+1}: {person_data.get('name_zh', 'Unknown')}")
+                        
+                        # 更新task_source中对应记录的parse_flag和status
+                        data['parse_flag'] = 0
+                        data['status'] = '解析成功'
                     else:
                         # 没有提取到有效数据,这算作一个失败记录
-                        total_records += 1
                         failed_count += 1
                         # 构建完整的MinIO URL路径
                         if minio_path.startswith('http'):
@@ -919,9 +934,12 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
                             "success": False
                         })
                         logging.warning(f"第 {i+1} 个文件未提取到人员信息")
+                        
+                        # 更新task_source中对应记录的parse_flag和status
+                        data['parse_flag'] = 1
+                        data['status'] = '解析失败'
                 else:
                     # 文件处理失败,算作一个失败记录
-                    total_records += 1
                     failed_count += 1
                     error_msg = file_result.get('message', '处理失败')
                     # 构建完整的MinIO URL路径
@@ -944,22 +962,56 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
                     })
                     logging.error(f"处理第 {i+1} 个文件失败: {error_msg}")
                     
+                    # 更新task_source中对应记录的parse_flag和status
+                    data['parse_flag'] = 1
+                    data['status'] = '解析失败'
+                    
             except Exception as item_error:
-                total_records += 1
                 failed_count += 1
                 error_msg = f"处理markdown文件失败: {str(item_error)}"
                 logging.error(error_msg, exc_info=True)
+                
+                # 更新task_source中对应记录的parse_flag和status
+                if isinstance(data, dict):
+                    data['parse_flag'] = 1
+                    data['status'] = '解析失败'
+                
                 results.append({
                     "data": None,
                     "error": error_msg,
-                    "filename": minio_path.split('/')[-1] if '/' in minio_path else minio_path,
+                    "filename": data.get('filename', f'markdown_record_{i}.md') if isinstance(data, dict) else f'markdown_record_{i}.md',
                     "index": len(results),
                     "message": "网页人才信息解析失败",
-                    "minio_path": minio_path,
-                    "object_key": minio_path,
+                    "minio_path": data.get('minio_path', '') if isinstance(data, dict) else '',
+                    "object_key": data.get('object_key', f'markdown_data/record_{i}.md') if isinstance(data, dict) else f'markdown_data/record_{i}.md',
                     "success": False
                 })
         
+        # 根据处理结果更新task_status
+        if failed_count == 0:
+            task_status = '解析成功'
+        elif success_count == 0:
+            task_status = '解析失败'
+        else:
+            task_status = '部分解析成功'
+        
+        # 所有task_source记录处理完成后,将更新后的task_source和task_status保存到数据库
+        try:
+            task_record.task_source = task_source
+            task_record.task_status = task_status
+            task_record.parse_count = success_count
+            task_record.parse_result = {
+                'success_count': success_count,
+                'failed_count': failed_count,
+                'parsed_record_ids': parsed_record_ids,
+                'processed_time': datetime.now().isoformat()
+            }
+            db.session.commit()
+            logging.info(f"成功更新task_id为{task_id}的任务记录,task_status={task_status},处理成功{success_count}条,失败{failed_count}条")
+        except Exception as update_error:
+            logging.error(f"更新任务记录失败: {str(update_error)}")
+            db.session.rollback()
+        
         # 组装最终结果
         if failed_count == 0:
             return {
@@ -968,7 +1020,8 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
                 'message': f'批量处理完成,全部 {success_count} 个文件处理成功',
                 'data': {
                     'success_count': success_count,
-                    'failed_count': failed_count
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
                 }
             }
         elif success_count == 0:
@@ -978,7 +1031,8 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
                 'message': f'批量处理失败,全部 {failed_count} 个文件处理失败',
                 'data': {
                     'success_count': success_count,
-                    'failed_count': failed_count
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
                 }
             }
         else:
@@ -988,7 +1042,8 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
                 'message': f'批量处理部分成功,成功 {success_count} 个,失败 {failed_count} 个',
                 'data': {
                     'success_count': success_count,
-                    'failed_count': failed_count
+                    'failed_count': failed_count,
+                    'parsed_record_ids': parsed_record_ids
                 }
             }
             
@@ -1168,7 +1223,7 @@ def save_section_to_minio(minio_client, section_content, original_minio_path, se
 
 def process_single_markdown_file(minio_path, publish_time, task_id=None, task_type=None):
     """
-    处理单个markdown文件,从MinIO获取内容并判断是否需要分割
+    处理单个markdown文件,从MinIO获取内容并直接处理
     
     Args:
         minio_path (str): MinIO中的文件路径
@@ -1196,219 +1251,73 @@ def process_single_markdown_file(minio_path, publish_time, task_id=None, task_ty
                 'data': None
             }
         
-        # 检查是否存在 **1**, **2** 等分隔结构
-        pattern = r'\*\*(\d+)\*\*'
-        matches = re.findall(pattern, markdown_content)
-        
-        if not matches:
-            # 如果没有找到分隔符,直接处理整个文件
-            logging.info("未发现数字分隔符,直接处理整个markdown文件")
-            try:
-                result = process_webpage_with_QWen(markdown_content, publish_time)
-                
-                # 更新解析结果中的路径信息
-                if result:
-                    for person in result:
-                        person['pic_url'] = minio_path  # 设置原始文件路径
-                        person['image_path'] = minio_path  # 设置image_path
-                        person['origin_source'] = minio_path  # 设置origin_source
-                        if 'career_path' in person and person['career_path']:
-                            for career_entry in person['career_path']:
-                                career_entry['image_path'] = minio_path  # 设置原始文件路径
-                        
-                        # 记录成功解析的人才信息到parsed_talents表
-                        if task_id and task_type:
-                            try:
-                                from app.core.data_parse.parse_task import record_parsed_talent
-                                standardized_data = _convert_webpage_to_card_format(person, publish_time)
-                                
-                                # 在记录到parsed_talents表之前,设置image_path和origin_source
-                                standardized_data['image_path'] = minio_path
-                                
-                                # 设置origin_source为JSON数组格式
-                                current_date = datetime.now().strftime('%Y-%m-%d')
-                                origin_source_entry = {
-                                    "task_type": "新任命",
-                                    "minio_path": minio_path,
-                                    "source_date": current_date
-                                }
-                                standardized_data['origin_source'] = [origin_source_entry]
-                                
-                                record_result = record_parsed_talent(standardized_data, task_id, task_type)
-                                if record_result.get('success'):
-                                    logging.info(f"成功记录人才信息到parsed_talents表: {person.get('name_zh', '')}")
-                                else:
-                                    logging.warning(f"记录人才信息失败: {record_result.get('message', '')}")
-                            except Exception as record_error:
-                                logging.error(f"调用record_parsed_talent函数失败: {str(record_error)}")
-                
-                return {
-                    'success': True,
-                    'message': '单个markdown文件处理成功',
-                    'data': {
-                        'total_sections': 1,
-                        'processed_sections': 1,
-                        'total_persons': len(result) if result else 0,
-                        'all_results': result,
-                        'section_results': [],
-                        'failed_sections_info': []
-                    }
-                }
-            except Exception as e:
-                error_msg = f"处理单个markdown文件失败: {str(e)}"
-                logging.error(error_msg, exc_info=True)
-                return {
-                    'success': False,
-                    'message': error_msg,
-                    'data': None
-                }
-        
-        # 发现分隔符,按分隔符拆分内容
-        logging.info(f"发现 {len(matches)} 个数字分隔符: {matches}")
-        
-        # 使用正则表达式分割内容
-        sections = re.split(r'\*\*\d+\*\*', markdown_content)
-        
-        # 移除第一个空白部分(分隔符前的内容)
-        if sections and not sections[0].strip():
-            sections = sections[1:]
-        
-        # 确保分割后的部分数量与分隔符数量匹配
-        if len(sections) != len(matches):
-            logging.warning(f"分割部分数量 ({len(sections)}) 与分隔符数量 ({len(matches)}) 不匹配")
-            # 取较小的数量以确保安全
-            min_count = min(len(sections), len(matches))
-            sections = sections[:min_count]
-            matches = matches[:min_count]
-        
-        # 处理结果统计
-        results = {
-            'total_sections': len(sections),
-            'processed_sections': 0,
-            'failed_sections': 0,
-            'total_persons': 0,
-            'all_results': [],
-            'section_results': [],
-            'failed_sections_info': []
-        }
-        
-        # 逐个处理每个markdown片段
-        for i, (section_content, section_number) in enumerate(zip(sections, matches)):
-            try:
-                logging.info(f"开始处理第 {section_number} 部分 (索引: {i})")
-                
-                # 清理内容,移除前后空白
-                section_content = section_content.strip()
-                
-                if not section_content:
-                    logging.warning(f"第 {section_number} 部分内容为空,跳过处理")
-                    results['failed_sections'] += 1
-                    results['failed_sections_info'].append({
-                        'section_number': section_number,
-                        'index': i,
-                        'error': '部分内容为空'
-                    })
-                    continue
-                
-                # 重新构建完整的markdown片段,包含分隔符标题
-                full_section_content = f"**{section_number}**\n\n{section_content}"
-                
-                # 将分割后的内容保存到MinIO
-                section_minio_path = save_section_to_minio(minio_client, full_section_content, minio_path, section_number)
-                if not section_minio_path:
-                    logging.warning(f"保存第 {section_number} 部分到MinIO失败,使用原始路径")
-                    section_minio_path = minio_path
-                
-                # 调用process_webpage_with_QWen处理单个片段
-                section_result = process_webpage_with_QWen(full_section_content, publish_time)
-                
-                # 更新解析结果中的路径信息
-                if section_result:
-                    for person in section_result:
-                        person['pic_url'] = section_minio_path  # 设置分割后的文件路径
-                        person['image_path'] = section_minio_path  # 设置image_path
-                        person['origin_source'] = section_minio_path  # 设置origin_source
-                        if 'career_path' in person and person['career_path']:
-                            for career_entry in person['career_path']:
-                                career_entry['image_path'] = section_minio_path  # 设置分割后的文件路径
-                        
-                        # 记录成功解析的人才信息到parsed_talents表
-                        if task_id and task_type:
-                            try:
-                                from app.core.data_parse.parse_task import record_parsed_talent
-                                standardized_data = _convert_webpage_to_card_format(person, publish_time)
-                                
-                                # 在记录到parsed_talents表之前,设置image_path和origin_source
-                                standardized_data['image_path'] = section_minio_path
-                                
-                                # 设置origin_source为JSON数组格式
-                                current_date = datetime.now().strftime('%Y-%m-%d')
-                                origin_source_entry = {
-                                    "task_type": "新任命",
-                                    "minio_path": section_minio_path,
-                                    "source_date": current_date
-                                }
-                                standardized_data['origin_source'] = [origin_source_entry]
-                                
-                                record_result = record_parsed_talent(standardized_data, task_id, task_type)
-                                if record_result.get('success'):
-                                    logging.info(f"成功记录人才信息到parsed_talents表: {person.get('name_zh', '')}")
-                                else:
-                                    logging.warning(f"记录人才信息失败: {record_result.get('message', '')}")
-                            except Exception as record_error:
-                                logging.error(f"调用record_parsed_talent函数失败: {str(record_error)}")
-                
-                if section_result:
-                    results['processed_sections'] += 1
-                    results['total_persons'] += len(section_result)
-                    results['all_results'].extend(section_result)
-                    results['section_results'].append({
-                        'section_number': section_number,
-                        'index': i,
-                        'persons_count': len(section_result),
-                        'persons': section_result
-                    })
-                    logging.info(f"第 {section_number} 部分处理成功,提取 {len(section_result)} 个人员信息")
-                else:
-                    results['failed_sections'] += 1
-                    results['failed_sections_info'].append({
-                        'section_number': section_number,
-                        'index': i,
-                        'error': '未提取到人员信息'
-                    })
-                    logging.warning(f"第 {section_number} 部分未提取到人员信息")
+        # 直接处理整个文件
+        logging.info("直接处理整个markdown文件")
+        try:
+            result = process_webpage_with_QWen(markdown_content, publish_time)
+            
+            parsed_record_ids = []  # 收集成功解析的记录ID
+            
+            # 更新解析结果中的路径信息
+            if result:
+                for person in result:
+                    person['pic_url'] = minio_path  # 设置原始文件路径
+                    person['image_path'] = minio_path  # 设置image_path
+                    person['origin_source'] = minio_path  # 设置origin_source
+                    if 'career_path' in person and person['career_path']:
+                        for career_entry in person['career_path']:
+                            career_entry['image_path'] = minio_path  # 设置原始文件路径
                     
-            except Exception as e:
-                error_msg = f"处理第 {section_number} 部分失败: {str(e)}"
-                logging.error(error_msg, exc_info=True)
-                results['failed_sections'] += 1
-                results['failed_sections_info'].append({
-                    'section_number': section_number,
-                    'index': i,
-                    'error': error_msg
-                })
-        
-        # 生成最终结果
-        if results['processed_sections'] == results['total_sections']:
-            # 全部处理成功
-            return {
-                'success': True,
-                'message': f'所有 {results["total_sections"]} 个部分处理成功,共提取 {results["total_persons"]} 个人员信息',
-                'data': results
-            }
-        elif results['processed_sections'] > 0:
-            # 部分处理成功
+                    # 记录成功解析的人才信息到parsed_talents表
+                    if task_id and task_type:
+                        try:
+                            from app.core.data_parse.parse_task import record_parsed_talent
+                            standardized_data = _convert_webpage_to_card_format(person, publish_time)
+                            
+                            # 在记录到parsed_talents表之前,设置image_path和origin_source
+                            standardized_data['image_path'] = minio_path
+                            
+                            # 设置origin_source为JSON数组格式
+                            current_date = datetime.now().strftime('%Y-%m-%d')
+                            origin_source_entry = {
+                                "task_type": "新任命",
+                                "minio_path": minio_path,
+                                "source_date": current_date
+                            }
+                            standardized_data['origin_source'] = [origin_source_entry]
+                            
+                            record_result = record_parsed_talent(standardized_data, task_id, task_type)
+                            if record_result.get('success'):
+                                # 收集成功解析的记录ID
+                                parsed_record = record_result.get('data', {})
+                                if parsed_record and 'id' in parsed_record:
+                                    parsed_record_ids.append(str(parsed_record['id']))
+                                logging.info(f"成功记录人才信息到parsed_talents表: {person.get('name_zh', '')}")
+                            else:
+                                logging.warning(f"记录人才信息失败: {record_result.get('message', '')}")
+                        except Exception as record_error:
+                            logging.error(f"调用record_parsed_talent函数失败: {str(record_error)}")
+            
             return {
                 'success': True,
-                'message': f'部分处理成功:{results["processed_sections"]}/{results["total_sections"]} 个部分成功,共提取 {results["total_persons"]} 个人员信息',
-                'data': results
+                'message': '单个markdown文件处理成功',
+                'data': {
+                    'total_sections': 1,
+                    'processed_sections': 1,
+                    'total_persons': len(result) if result else 0,
+                    'all_results': result,
+                    'section_results': [],
+                    'failed_sections_info': [],
+                    'parsed_record_ids': parsed_record_ids
+                }
             }
-        else:
-            # 全部处理失败
+        except Exception as e:
+            error_msg = f"处理单个markdown文件失败: {str(e)}"
+            logging.error(error_msg, exc_info=True)
             return {
                 'success': False,
-                'message': f'所有 {results["total_sections"]} 个部分处理失败',
-                'data': results
+                'message': error_msg,
+                'data': None
             }
             
     except Exception as e: