Explorar o código

修改解析任务中task_type字段保存信息。

maxiaolong hai 1 día
pai
achega
771923636c
Modificáronse 2 ficheiros con 602 adicións e 1 borrados
  1. 143 1
      app/api/data_parse/routes.py
  2. 459 0
      app/core/data_parse/parse_task.py

+ 143 - 1
app/api/data_parse/routes.py

@@ -21,7 +21,8 @@ from app.core.data_parse.parse_system import (
 # 导入解析任务相关函数
 from app.core.data_parse.parse_task import (
     get_parse_tasks, 
-    get_parse_task_detail
+    get_parse_task_detail,
+    add_parse_task
 )
 # 导入酒店管理相关函数
 from app.core.data_parse.hotel_management import (
@@ -1943,3 +1944,144 @@ def get_parse_task_detail_route():
             'data': None
         }), 500
 
+
+# 新增解析任务接口
+@bp.route('/add-parse-task', methods=['POST'])
+def add_parse_task_route():
+    """
+    新增解析任务的API接口
+    
+    请求参数:
+        - task_type: 任务类型 (form-data字段,必填)
+                    可选值:'名片', '简历', '新任命', '招聘', '杂项'
+        - files: 文件数组 (multipart/form-data,对于招聘类型可选)
+        - created_by: 创建者 (可选,form-data字段)
+        
+    返回:
+        - JSON: 包含任务创建结果和上传摘要
+        
+    功能说明:
+        - 根据任务类型处理不同格式的文件
+        - 名片任务:JPG/PNG格式图片 → talent_photos目录
+        - 简历任务:PDF格式文件 → resume_files目录
+        - 新任命任务:MD格式文件 → appointment_files目录
+        - 招聘任务:数据库记录处理,无需文件上传
+        - 杂项任务:任意格式文件 → misc_files目录
+        - 使用timestamp+uuid自动生成文件名
+        - 在parse_task_repository表中创建待解析任务记录
+        
+    状态码:
+        - 200: 所有文件上传成功,任务创建成功
+        - 206: 部分文件上传成功,任务创建成功
+        - 400: 请求参数错误
+        - 500: 服务器内部错误
+    """
+    try:
+        # 获取任务类型参数
+        task_type = request.form.get('task_type')
+        
+        # 验证任务类型
+        if not task_type:
+            return jsonify({
+                'success': False,
+                'message': '缺少task_type参数',
+                'data': None
+            }), 400
+        
+        if task_type not in ['名片', '简历', '新任命', '招聘', '杂项']:
+            return jsonify({
+                'success': False,
+                'message': 'task_type参数必须是以下值之一:名片、简历、新任命、招聘、杂项',
+                'data': None
+            }), 400
+        
+        # 获取创建者信息(可选参数)
+        created_by = request.form.get('created_by', 'api_user')
+        
+        # 对于招聘类型,不需要文件上传
+        if task_type == '招聘':
+            # 检查是否误传了文件
+            if 'files' in request.files and request.files.getlist('files'):
+                return jsonify({
+                    'success': False,
+                    'message': '招聘类型任务不需要上传文件',
+                    'data': None
+                }), 400
+            
+            # 记录请求日志
+            logger.info(f"新增招聘任务请求: 创建者={created_by}")
+            
+            # 调用核心业务逻辑
+            result = add_parse_task(None, task_type, created_by)
+        else:
+            # 其他类型需要文件上传
+            if 'files' not in request.files:
+                return jsonify({
+                    'success': False,
+                    'message': f'{task_type}任务需要上传文件,请使用files字段上传文件',
+                    'data': None
+                }), 400
+            
+            # 获取上传的文件列表
+            uploaded_files = request.files.getlist('files')
+            
+            # 检查文件列表是否为空
+            if not uploaded_files or len(uploaded_files) == 0:
+                return jsonify({
+                    'success': False,
+                    'message': '文件数组不能为空',
+                    'data': None
+                }), 400
+            
+            # 验证所有文件
+            valid_files = []
+            for i, file in enumerate(uploaded_files):
+                # 检查文件是否为空
+                if not file or file.filename == '':
+                    return jsonify({
+                        'success': False,
+                        'message': f'第{i+1}个文件为空或未选择',
+                        'data': None
+                    }), 400
+                
+                valid_files.append(file)
+            
+            # 记录请求日志
+            logger.info(f"新增{task_type}任务请求: 文件数量={len(valid_files)}, 创建者={created_by}")
+            
+            # 调用核心业务逻辑
+            result = add_parse_task(valid_files, task_type, created_by)
+        
+        # 根据处理结果设置HTTP状态码
+        if result['success']:
+            if result['code'] == 200:
+                status_code = 200
+            elif result['code'] == 206:
+                status_code = 206
+            else:
+                status_code = 200
+        else:
+            if result['code'] == 400:
+                status_code = 400
+            else:
+                status_code = 500
+        
+        # 返回结果
+        return jsonify({
+            'success': result['success'],
+            'message': result['message'],
+            'data': result['data']
+        }), status_code
+        
+    except Exception as e:
+        # 记录错误日志
+        error_msg = f"新增解析任务接口失败: {str(e)}"
+        logger.error(error_msg, exc_info=True)
+        
+        # 返回错误响应
+        return jsonify({
+            'success': False,
+            'message': error_msg,
+            'data': None
+        }), 500
+

+ 459 - 0
app/core/data_parse/parse_task.py

@@ -1,7 +1,54 @@
 from app import db
 from datetime import datetime
 import logging
+import uuid
+import os
+import boto3
+from botocore.config import Config
+from io import BytesIO
+import json
 from .parse_system import ParseTaskRepository
+from app.config.config import DevelopmentConfig, ProductionConfig
+
+# 配置变量
+config = ProductionConfig()
+minio_url = f"{'https' if getattr(config, 'MINIO_SECURE', False) else 'http'}://{getattr(config, 'MINIO_HOST', 'localhost')}"
+minio_access_key = getattr(config, 'MINIO_USER', 'minioadmin')
+minio_secret_key = getattr(config, 'MINIO_PASSWORD', 'minioadmin')
+minio_bucket = getattr(config, 'MINIO_BUCKET', 'dataops')
+
+
+def get_minio_client():
+    """获取MinIO客户端连接"""
+    try:
+        logging.info(f"尝试连接MinIO服务器: {minio_url}")
+        
+        minio_client = boto3.client(
+            's3',
+            endpoint_url=minio_url,
+            aws_access_key_id=minio_access_key,
+            aws_secret_access_key=minio_secret_key,
+            config=Config(
+                signature_version='s3v4',
+                retries={'max_attempts': 3, 'mode': 'standard'},
+                connect_timeout=10,
+                read_timeout=30
+            )
+        )
+        
+        # 确保存储桶存在
+        buckets = minio_client.list_buckets()
+        bucket_names = [bucket['Name'] for bucket in buckets.get('Buckets', [])]
+        logging.info(f"成功连接到MinIO服务器,现有存储桶: {bucket_names}")
+        
+        if minio_bucket not in bucket_names:
+            logging.info(f"创建存储桶: {minio_bucket}")
+            minio_client.create_bucket(Bucket=minio_bucket)
+            
+        return minio_client
+    except Exception as e:
+        logging.error(f"MinIO连接错误: {str(e)}")
+        return None
 
 
 def get_parse_tasks(page=1, per_page=10, task_type=None, task_status=None):
@@ -108,6 +155,418 @@ def get_parse_task_detail(task_name):
         error_msg = f"获取解析任务详情失败: {str(e)}"
         logging.error(error_msg, exc_info=True)
         
+        return {
+            'code': 500,
+            'success': False,
+            'message': error_msg,
+            'data': None
+        }
+
+
+def _validate_files_by_task_type(files, task_type):
+    """
+    根据任务类型验证文件格式
+    
+    Args:
+        files (list): 文件数组
+        task_type (str): 任务类型
+        
+    Returns:
+        dict: 验证结果
+    """
+    # 定义不同任务类型允许的文件扩展名
+    allowed_extensions = {
+        '名片': {'.jpg', '.jpeg', '.png'},
+        '简历': {'.pdf'},
+        '新任命': {'.md'},
+        '杂项': None  # 杂项不限制文件格式
+    }
+    
+    task_extensions = allowed_extensions.get(task_type)
+    
+    for i, file_obj in enumerate(files):
+        if not hasattr(file_obj, 'filename') or not file_obj.filename:
+            return {
+                'code': 400,
+                'success': False,
+                'message': f'第{i+1}个文件缺少文件名',
+                'data': None
+            }
+        
+        # 杂项类型不验证文件格式
+        if task_type == '杂项':
+            continue
+            
+        file_ext = os.path.splitext(file_obj.filename)[1].lower()
+        if file_ext not in task_extensions:
+            format_desc = {
+                '名片': 'JPG和PNG格式',
+                '简历': 'PDF格式',
+                '新任命': 'MD格式'
+            }
+            return {
+                'code': 400,
+                'success': False,
+                'message': f'第{i+1}个文件格式不支持,{task_type}任务只支持{format_desc[task_type]}',
+                'data': None
+            }
+    
+    return {'success': True}
+
+
+def _handle_recruitment_task(created_by):
+    """
+    处理招聘类型任务(数据库记录,不需要文件上传)
+    
+    Args:
+        created_by (str): 创建者
+        
+    Returns:
+        dict: 处理结果
+    """
+    try:
+        # 生成任务名称
+        current_date = datetime.now().strftime('%Y%m%d')
+        task_uuid = str(uuid.uuid4())[:8]
+        task_name = f"recruitment_task_{current_date}_{task_uuid}"
+        
+        # 构建任务来源信息
+        task_source = {
+            'minio_paths_json': [],  # 招聘任务无文件,空数组
+            'upload_time': datetime.now().isoformat()
+        }
+        
+        # 创建解析任务记录
+        parse_task = ParseTaskRepository(
+            task_name=task_name,
+            task_status='待解析',
+            task_type='招聘',
+            task_source=json.dumps(task_source, ensure_ascii=False),
+            collection_count=0,  # 招聘任务不涉及文件收集
+            parse_count=0,
+            parse_result=None,
+            created_by=created_by,
+            updated_by=created_by
+        )
+        
+        db.session.add(parse_task)
+        db.session.commit()
+        
+        logging.info(f"成功创建招聘任务记录: {task_name}")
+        
+        return {
+            'code': 200,
+            'success': True,
+            'message': '招聘任务创建成功',
+            'data': {
+                'task_info': parse_task.to_dict(),
+                'task_summary': {
+                    'task_type': '招聘',
+                    'description': '数据库记录处理任务',
+                    'requires_files': False
+                }
+            }
+        }
+        
+    except Exception as e:
+        db.session.rollback()
+        error_msg = f"创建招聘任务失败: {str(e)}"
+        logging.error(error_msg, exc_info=True)
+        
+        return {
+            'code': 500,
+            'success': False,
+            'message': error_msg,
+            'data': None
+        }
+
+
+def _get_minio_directory_by_task_type(task_type):
+    """
+    根据任务类型获取MinIO存储目录
+    
+    Args:
+        task_type (str): 任务类型
+        
+    Returns:
+        str: MinIO目录路径
+    """
+    directory_mapping = {
+        '名片': 'talent_photos',
+        '简历': 'resume_files',
+        '新任命': 'appointment_files',
+        '杂项': 'misc_files'
+    }
+    
+    return directory_mapping.get(task_type, 'misc_files')
+
+
+def _generate_filename_by_task_type(task_type, original_filename):
+    """
+    根据任务类型生成文件名
+    
+    Args:
+        task_type (str): 任务类型
+        original_filename (str): 原始文件名
+        
+    Returns:
+        str: 生成的文件名
+    """
+    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+    unique_id = uuid.uuid4().hex[:8]
+    file_ext = os.path.splitext(original_filename)[1].lower()
+    
+    filename_prefix = {
+        '名片': 'talent_photo',
+        '简历': 'resume',
+        '新任命': 'appointment',
+        '杂项': 'misc_file'
+    }
+    
+    prefix = filename_prefix.get(task_type, 'misc_file')
+    return f"{prefix}_{timestamp}_{unique_id}{file_ext}"
+
+
+def _get_content_type_by_extension(filename):
+    """
+    根据文件扩展名获取ContentType
+    
+    Args:
+        filename (str): 文件名
+        
+    Returns:
+        str: ContentType
+    """
+    file_ext = os.path.splitext(filename)[1].lower()
+    
+    content_type_mapping = {
+        '.jpg': 'image/jpeg',
+        '.jpeg': 'image/jpeg',
+        '.png': 'image/png',
+        '.pdf': 'application/pdf',
+        '.md': 'text/markdown'
+    }
+    
+    return content_type_mapping.get(file_ext, 'application/octet-stream')
+
+
+def add_parse_task(files, task_type, created_by='system'):
+    """
+    新增解析任务,根据任务类型处理不同类型的文件
+    
+    Args:
+        files (list): 前端上传的文件数组,每个元素是FileStorage对象
+        task_type (str): 任务类型,可选值:'名片', '简历', '新任命', '招聘', '杂项'
+        created_by (str): 创建者,默认为'system'
+        
+    Returns:
+        dict: 包含操作结果的字典
+    """
+    try:
+        # 参数验证
+        if not task_type or task_type not in ['名片', '简历', '新任命', '招聘', '杂项']:
+            return {
+                'code': 400,
+                'success': False,
+                'message': 'task_type参数必须是以下值之一:名片、简历、新任命、招聘、杂项',
+                'data': None
+            }
+        
+        # 对于招聘类型,不需要文件,直接处理数据库记录
+        if task_type == '招聘':
+            if files:
+                return {
+                    'code': 400,
+                    'success': False,
+                    'message': '招聘类型任务不需要上传文件',
+                    'data': None
+                }
+            # 招聘任务处理逻辑
+            return _handle_recruitment_task(created_by)
+        
+        # 其他类型需要验证文件
+        if not files or not isinstance(files, list):
+            return {
+                'code': 400,
+                'success': False,
+                'message': 'files参数必须是非空数组',
+                'data': None
+            }
+        
+        if len(files) == 0:
+            return {
+                'code': 400,
+                'success': False,
+                'message': '文件数组不能为空',
+                'data': None
+            }
+        
+        # 根据任务类型验证文件格式
+        validation_result = _validate_files_by_task_type(files, task_type)
+        if not validation_result['success']:
+            return validation_result
+        
+        # 获取MinIO客户端
+        minio_client = get_minio_client()
+        if not minio_client:
+            return {
+                'code': 500,
+                'success': False,
+                'message': '无法连接到MinIO服务器',
+                'data': None
+            }
+        
+        # 存储上传结果
+        uploaded_files = []
+        failed_uploads = []
+        
+        # 获取MinIO存储目录
+        minio_directory = _get_minio_directory_by_task_type(task_type)
+        
+        # 上传每个文件到MinIO
+        for i, file_obj in enumerate(files):
+            try:
+                # 生成唯一的文件名
+                filename = _generate_filename_by_task_type(task_type, file_obj.filename)
+                minio_path = f"{minio_directory}/{filename}"
+                
+                # 上传文件到MinIO
+                logging.info(f"开始上传第{i+1}个文件到MinIO: {minio_path}")
+                
+                # 重置文件指针
+                file_obj.seek(0)
+                
+                # 根据文件类型设置ContentType
+                content_type = file_obj.content_type or _get_content_type_by_extension(file_obj.filename)
+                
+                minio_client.put_object(
+                    Bucket=minio_bucket,
+                    Key=minio_path,
+                    Body=file_obj,
+                    ContentType=content_type,
+                    Metadata={
+                        'original_filename': file_obj.filename,
+                        'upload_time': datetime.now().isoformat(),
+                        'task_type': task_type,
+                        'content_type': f'{task_type}_parse_task'
+                    }
+                )
+                
+                # 构建完整的MinIO路径,包含minio_client部分
+                complete_minio_path = f"{minio_url}/{minio_bucket}/{minio_path}"
+                
+                uploaded_files.append({
+                    'original_filename': file_obj.filename,
+                    'minio_path': complete_minio_path,
+                    'relative_path': minio_path,  # 保留相对路径作为参考
+                    'file_size': len(file_obj.read()) if hasattr(file_obj, 'read') else 0
+                })
+                
+                # 重置文件指针
+                file_obj.seek(0)
+                
+                logging.info(f"成功上传第{i+1}个文件到MinIO: {minio_path}")
+                
+            except Exception as upload_error:
+                error_msg = f"上传第{i+1}个文件失败: {str(upload_error)}"
+                logging.error(error_msg, exc_info=True)
+                failed_uploads.append({
+                    'filename': file_obj.filename,
+                    'error': str(upload_error)
+                })
+        
+        # 检查是否有文件上传成功
+        if len(uploaded_files) == 0:
+            return {
+                'code': 500,
+                'success': False,
+                'message': '所有文件上传失败',
+                'data': {
+                    'uploaded_count': 0,
+                    'failed_count': len(failed_uploads),
+                    'failed_uploads': failed_uploads
+                }
+            }
+        
+        # 生成任务名称
+        current_date = datetime.now().strftime('%Y%m%d')
+        task_uuid = str(uuid.uuid4())[:8]
+        task_name = f"parse_task_{current_date}_{task_uuid}"
+        
+        # 构建任务来源信息,包含所有上传文件的完整MinIO路径
+        complete_minio_paths = [file_info['minio_path'] for file_info in uploaded_files]
+        
+        task_source = {
+            'minio_paths_json': complete_minio_paths,  # JSON数组,包含完整的MinIO路径
+            'upload_time': datetime.now().isoformat()
+        }
+        
+        # 创建解析任务记录
+        try:
+            parse_task = ParseTaskRepository(
+                task_name=task_name,
+                task_status='待解析',
+                task_type=task_type,
+                task_source=json.dumps(task_source, ensure_ascii=False),  # 转换为JSON字符串存储
+                collection_count=len(uploaded_files),
+                parse_count=0,  # 解析数量初始为0
+                parse_result=None,  # 解析结果初始为空
+                created_by=created_by,
+                updated_by=created_by
+            )
+            
+            db.session.add(parse_task)
+            db.session.commit()
+            
+            logging.info(f"成功创建解析任务记录: {task_name}")
+            
+            # 返回成功结果
+            result_data = {
+                'task_info': parse_task.to_dict(),
+                'upload_summary': {
+                    'task_type': task_type,
+                    'total_files': len(files),
+                    'uploaded_count': len(uploaded_files),
+                    'failed_count': len(failed_uploads),
+                    'uploaded_files': uploaded_files,
+                    'failed_uploads': failed_uploads if failed_uploads else []
+                }
+            }
+            
+            if len(failed_uploads) > 0:
+                return {
+                    'code': 206,  # Partial Content
+                    'success': True,
+                    'message': f'解析任务创建成功,但有{len(failed_uploads)}个文件上传失败',
+                    'data': result_data
+                }
+            else:
+                return {
+                    'code': 200,
+                    'success': True,
+                    'message': '解析任务创建成功,所有文件上传完成',
+                    'data': result_data
+                }
+                
+        except Exception as db_error:
+            db.session.rollback()
+            error_msg = f"创建解析任务记录失败: {str(db_error)}"
+            logging.error(error_msg, exc_info=True)
+            
+            return {
+                'code': 500,
+                'success': False,
+                'message': error_msg,
+                'data': {
+                    'uploaded_files': uploaded_files,  # 即使数据库失败,也返回已上传的文件信息
+                    'failed_uploads': failed_uploads
+                }
+            }
+            
+    except Exception as e:
+        error_msg = f"新增解析任务失败: {str(e)}"
+        logging.error(error_msg, exc_info=True)
+        
         return {
             'code': 500,
             'success': False,