from typing import Dict, Any from app import db from datetime import datetime import os import boto3 from botocore.config import Config import logging import requests import json import re import uuid from PIL import Image from io import BytesIO import pytesseract import base64 from openai import OpenAI from app.config.config import DevelopmentConfig, ProductionConfig # 名片解析数据模型 class BusinessCard(db.Model): __tablename__ = 'business_cards' id = db.Column(db.Integer, primary_key=True, autoincrement=True) name_zh = db.Column(db.String(100), nullable=False) name_en = db.Column(db.String(100)) title_zh = db.Column(db.String(100)) title_en = db.Column(db.String(100)) mobile = db.Column(db.String(100)) phone = db.Column(db.String(50)) email = db.Column(db.String(100)) hotel_zh = db.Column(db.String(200)) hotel_en = db.Column(db.String(200)) address_zh = db.Column(db.Text) address_en = db.Column(db.Text) postal_code_zh = db.Column(db.String(20)) postal_code_en = db.Column(db.String(20)) brand_zh = db.Column(db.String(100)) brand_en = db.Column(db.String(100)) affiliation_zh = db.Column(db.String(200)) affiliation_en = db.Column(db.String(200)) birthday = db.Column(db.Date) # 生日,存储年月日 age = db.Column(db.Integer) # 年龄字段 native_place = db.Column(db.Text) # 籍贯字段 residence = db.Column(db.Text) # 居住地 image_path = db.Column(db.String(255)) # MinIO中存储的路径 career_path = db.Column(db.JSON) # 职业轨迹,JSON格式 brand_group = db.Column(db.String(200)) # 品牌组合 origin_source = db.Column(db.JSON) # 原始资料记录,JSON格式 talent_profile = db.Column(db.Text) # 人才档案,文本格式 created_at = db.Column(db.DateTime, default=datetime.now, nullable=False) updated_at = db.Column(db.DateTime, onupdate=datetime.now) updated_by = db.Column(db.String(50)) status = db.Column(db.String(20), default='active') def to_dict(self): return { 'id': self.id, 'name_zh': self.name_zh, 'name_en': self.name_en, 'title_zh': self.title_zh, 'title_en': self.title_en, 'mobile': self.mobile, 'phone': self.phone, 'email': self.email, 'hotel_zh': self.hotel_zh, 'hotel_en': self.hotel_en, 'address_zh': self.address_zh, 'address_en': self.address_en, 'postal_code_zh': self.postal_code_zh, 'postal_code_en': self.postal_code_en, 'brand_zh': self.brand_zh, 'brand_en': self.brand_en, 'affiliation_zh': self.affiliation_zh, 'affiliation_en': self.affiliation_en, 'birthday': self.birthday.strftime('%Y-%m-%d') if self.birthday else None, 'age': self.age, 'native_place': self.native_place, 'residence': self.residence, 'image_path': self.image_path, 'career_path': self.career_path, 'brand_group': self.brand_group, 'origin_source': self.origin_source, 'talent_profile': self.talent_profile, 'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None, 'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None, 'updated_by': self.updated_by, 'status': self.status } # 重复名片处理数据模型 class DuplicateBusinessCard(db.Model): __tablename__ = 'duplicate_business_cards' id = db.Column(db.Integer, primary_key=True, autoincrement=True) main_card_id = db.Column(db.Integer, db.ForeignKey('business_cards.id'), nullable=False) # 新创建的主记录ID suspected_duplicates = db.Column(db.JSON, nullable=False) # 疑似重复记录列表,JSON格式 duplicate_reason = db.Column(db.String(200), nullable=False) # 重复原因 processing_status = db.Column(db.String(20), default='pending') # 处理状态:pending/processed/ignored created_at = db.Column(db.DateTime, default=datetime.now, nullable=False) processed_at = db.Column(db.DateTime) # 处理时间 processed_by = db.Column(db.String(50)) # 处理人 processing_notes = db.Column(db.Text) # 处理备注 # 关联主记录 main_card = db.relationship('BusinessCard', backref=db.backref('as_main_duplicate_records', lazy=True)) def to_dict(self): return { 'id': self.id, 'main_card_id': self.main_card_id, 'suspected_duplicates': self.suspected_duplicates, 'duplicate_reason': self.duplicate_reason, 'processing_status': self.processing_status, 'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None, 'processed_at': self.processed_at.strftime('%Y-%m-%d %H:%M:%S') if self.processed_at else None, 'processed_by': self.processed_by, 'processing_notes': self.processing_notes } # 解析任务存储库数据模型 class ParseTaskRepository(db.Model): __tablename__ = 'parse_task_repository' id = db.Column(db.Integer, primary_key=True, autoincrement=True) task_name = db.Column(db.String(100), nullable=False) task_status = db.Column(db.String(10), nullable=False) task_type = db.Column(db.String(50), nullable=False) task_source = db.Column(db.String(300), nullable=False) collection_count = db.Column(db.Integer, nullable=False, default=0) parse_count = db.Column(db.Integer, nullable=False, default=0) parse_result = db.Column(db.JSON) created_at = db.Column(db.DateTime, default=datetime.now, nullable=False) created_by = db.Column(db.String(50), nullable=False) updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now, nullable=False) updated_by = db.Column(db.String(50), nullable=False) def to_dict(self): return { 'id': self.id, 'task_name': self.task_name, 'task_status': self.task_status, 'task_type': self.task_type, 'task_source': self.task_source, 'collection_count': self.collection_count, 'parse_count': self.parse_count, 'parse_result': self.parse_result, 'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None, 'created_by': self.created_by, 'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None, 'updated_by': self.updated_by } # 配置变量 # 使用配置变量,缺省认为在生产环境运行 config = ProductionConfig() # 使用配置变量 minio_url = f"{'https' if getattr(config, 'MINIO_SECURE', False) else 'http'}://{getattr(config, 'MINIO_HOST', 'localhost')}" minio_access_key = getattr(config, 'MINIO_USER', 'minioadmin') minio_secret_key = getattr(config, 'MINIO_PASSWORD', 'minioadmin') minio_bucket = getattr(config, 'MINIO_BUCKET', 'dataops') use_ssl = getattr(config, 'MINIO_SECURE', False) # API密钥配置 DEEPSEEK_API_KEY = getattr(config, 'DEEPSEEK_API_KEY', '') DEEPSEEK_API_URL = getattr(config, 'DEEPSEEK_API_URL', 'https://api.deepseek.com/v1/chat/completions') QWEN_API_KEY = getattr(config, 'QWEN_API_KEY', '') QWEN_API_URL = getattr(config, 'QWEN_API_URL', 'https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation') # OCR配置 OCR_LANG = getattr(config, 'OCR_LANG', 'chi_sim+eng') # 名片解析功能模块 def normalize_mobile_numbers(mobile_str): """ 标准化手机号码字符串,去重并限制最多3个 Args: mobile_str (str): 手机号码字符串,可能包含多个手机号码,用逗号分隔 Returns: str: 标准化后的手机号码字符串,最多3个,用逗号分隔 """ if not mobile_str or not mobile_str.strip(): return '' # 按逗号分割并清理每个手机号码 mobiles = [] for mobile in mobile_str.split(','): mobile = mobile.strip() if mobile and mobile not in mobiles: # 去重 mobiles.append(mobile) # 限制最多3个手机号码 return ','.join(mobiles[:3]) def mobile_numbers_overlap(mobile1, mobile2): """ 检查两个手机号码字符串是否有重叠 Args: mobile1 (str): 第一个手机号码字符串 mobile2 (str): 第二个手机号码字符串 Returns: bool: 是否有重叠的手机号码 """ if not mobile1 or not mobile2: return False mobiles1 = set(mobile.strip() for mobile in mobile1.split(',') if mobile.strip()) mobiles2 = set(mobile.strip() for mobile in mobile2.split(',') if mobile.strip()) return bool(mobiles1 & mobiles2) # 检查交集 def merge_mobile_numbers(existing_mobile, new_mobile): """ 合并手机号码,去重并限制最多3个 Args: existing_mobile (str): 现有手机号码字符串 new_mobile (str): 新手机号码字符串 Returns: str: 合并后的手机号码字符串,最多3个,用逗号分隔 """ mobiles = [] # 添加现有手机号码 if existing_mobile: for mobile in existing_mobile.split(','): mobile = mobile.strip() if mobile and mobile not in mobiles: mobiles.append(mobile) # 添加新手机号码 if new_mobile: for mobile in new_mobile.split(','): mobile = mobile.strip() if mobile and mobile not in mobiles: mobiles.append(mobile) # 限制最多3个手机号码 return ','.join(mobiles[:3]) def check_duplicate_business_card(extracted_data): """ 检查是否存在重复的名片记录 Args: extracted_data (dict): 提取的名片信息 Returns: dict: 包含检查结果的字典,格式为: { 'is_duplicate': bool, 'action': str, # 'update', 'create_with_duplicates' 或 'create_new' 'existing_card': BusinessCard 或 None, 'suspected_duplicates': list, # 疑似重复记录列表 'reason': str } """ try: # 提取关键信息进行匹配 name_zh = extracted_data.get('name_zh', '').strip() mobile = extracted_data.get('mobile', '').strip() # 如果没有姓名,无法进行有效的重复检测 if not name_zh: return { 'is_duplicate': False, 'action': 'create_new', 'existing_card': None, 'suspected_duplicates': [], 'reason': '缺少中文姓名,无法进行重复检测' } # 根据姓名进行精确匹配 name_matches = BusinessCard.query.filter_by(name_zh=name_zh).all() # 如果有手机号,同时检查手机号匹配 mobile_matches = [] if mobile: # 标准化手机号进行比较 normalized_mobile = normalize_mobile_numbers(mobile) if normalized_mobile: # 查找所有有手机号的记录 all_cards_with_mobile = BusinessCard.query.filter(BusinessCard.mobile.isnot(None)).all() for card in all_cards_with_mobile: if card.mobile and mobile_numbers_overlap(normalized_mobile, card.mobile): mobile_matches.append(card) # 合并姓名匹配和手机号匹配的结果,去重 all_matches = [] for card in name_matches + mobile_matches: if card not in all_matches: all_matches.append(card) if not all_matches: # 没有找到匹配记录,创建新记录 return { 'is_duplicate': False, 'action': 'create_new', 'existing_card': None, 'suspected_duplicates': [], 'reason': '未找到重复记录,将创建新记录' } elif len(all_matches) == 1: # 找到一个匹配记录 existing_card = all_matches[0] # 检查是否是完全匹配(姓名和手机号都相同) existing_mobile = existing_card.mobile or '' is_name_match = existing_card.name_zh == name_zh is_mobile_match = mobile and mobile_numbers_overlap(mobile, existing_mobile) if is_name_match and is_mobile_match: # 完全匹配,更新现有记录 return { 'is_duplicate': True, 'action': 'update', 'existing_card': existing_card, 'suspected_duplicates': [], 'reason': f'找到完全匹配的记录 (ID: {existing_card.id}),将更新现有记录' } else: # 部分匹配,标记为疑似重复 return { 'is_duplicate': True, 'action': 'create_with_duplicates', 'existing_card': None, 'suspected_duplicates': [existing_card], 'reason': f'找到疑似重复记录 (ID: {existing_card.id}),将创建新记录并标记重复' } else: # 找到多个匹配记录,标记为疑似重复 return { 'is_duplicate': True, 'action': 'create_with_duplicates', 'existing_card': None, 'suspected_duplicates': all_matches, 'reason': f'找到 {len(all_matches)} 个疑似重复记录,将创建新记录并标记重复' } except Exception as e: logging.error(f"重复检测过程中发生错误: {str(e)}", exc_info=True) # 出错时默认创建新记录 return { 'is_duplicate': False, 'action': 'create_new', 'existing_card': None, 'suspected_duplicates': [], 'reason': f'重复检测失败: {str(e)},将创建新记录' } def update_career_path(existing_card, new_data, image_path=None): """ 更新名片的职业轨迹信息 Args: existing_card: 现有的名片记录对象 new_data (dict): 新的职位信息 image_path (str): 新的图片路径 Returns: list: 更新后的职业轨迹列表 """ try: # 获取现有的职业轨迹,如果没有则初始化为空列表 career_path = existing_card.career_path if existing_card.career_path else [] # 确保career_path是列表格式 if not isinstance(career_path, list): career_path = [] # 构建新的职业记录 new_career_entry = { 'hotel_zh': new_data.get('hotel_zh', ''), 'hotel_en': new_data.get('hotel_en', ''), 'title_zh': new_data.get('title_zh', ''), 'title_en': new_data.get('title_en', ''), 'start_date': datetime.now().strftime('%Y-%m-%d'), 'image_path': image_path or existing_card.image_path } # 检查是否与最新的职业记录相同,避免重复添加 if career_path: latest_entry = career_path[-1] if (latest_entry.get('hotel_zh') == new_career_entry['hotel_zh'] and latest_entry.get('title_zh') == new_career_entry['title_zh']): # 如果职位信息相同,只更新图片路径和时间 latest_entry['image_path'] = new_career_entry['image_path'] latest_entry['start_date'] = new_career_entry['start_date'] return career_path # 添加新的职业记录 career_path.append(new_career_entry) # 限制职业轨迹记录数量(最多保留10条) if len(career_path) > 10: career_path = career_path[-10:] return career_path except Exception as e: logging.error(f"更新职业轨迹失败: {str(e)}", exc_info=True) # 出错时返回原有的职业轨迹 return existing_card.career_path if existing_card.career_path else [] def create_main_card_with_duplicates(extracted_data, minio_path, suspected_duplicates, reason): """ 创建主名片记录并标记疑似重复记录 Args: extracted_data (dict): 提取的名片信息 minio_path (str): MinIO中的图片路径 suspected_duplicates (list): 疑似重复的名片记录列表 reason (str): 重复原因描述 Returns: BusinessCard: 创建的主名片记录 """ try: # 标准化手机号码 mobile = normalize_mobile_numbers(extracted_data.get('mobile', '')) # 构建职业轨迹 career_path = [] if extracted_data.get('hotel_zh') or extracted_data.get('title_zh'): career_entry = { 'hotel_zh': extracted_data.get('hotel_zh', ''), 'hotel_en': extracted_data.get('hotel_en', ''), 'title_zh': extracted_data.get('title_zh', ''), 'title_en': extracted_data.get('title_en', ''), 'start_date': datetime.now().strftime('%Y-%m-%d'), 'image_path': minio_path } career_path.append(career_entry) # 创建新的主名片记录 main_card = BusinessCard( name_zh=extracted_data.get('name_zh', ''), name_en=extracted_data.get('name_en', ''), title_zh=extracted_data.get('title_zh', ''), title_en=extracted_data.get('title_en', ''), mobile=mobile, phone=extracted_data.get('phone', ''), email=extracted_data.get('email', ''), hotel_zh=extracted_data.get('hotel_zh', ''), hotel_en=extracted_data.get('hotel_en', ''), address_zh=extracted_data.get('address_zh', ''), address_en=extracted_data.get('address_en', ''), postal_code_zh=extracted_data.get('postal_code_zh', ''), postal_code_en=extracted_data.get('postal_code_en', ''), brand_zh=extracted_data.get('brand_zh', ''), brand_en=extracted_data.get('brand_en', ''), affiliation_zh=extracted_data.get('affiliation_zh', ''), affiliation_en=extracted_data.get('affiliation_en', ''), brand_group=extracted_data.get('brand_group', ''), image_path=minio_path, career_path=career_path, origin_source={'source': 'manual_upload', 'timestamp': datetime.now().isoformat()}, created_at=datetime.now(), updated_by='system', status='active' ) # 保存主记录到数据库 db.session.add(main_card) db.session.flush() # 获取主记录的ID # 创建重复记录标记 suspected_duplicates_data = [] for duplicate_card in suspected_duplicates: suspected_duplicates_data.append({ 'id': duplicate_card.id, 'name_zh': duplicate_card.name_zh, 'mobile': duplicate_card.mobile, 'hotel_zh': duplicate_card.hotel_zh, 'title_zh': duplicate_card.title_zh }) duplicate_record = DuplicateBusinessCard( main_card_id=main_card.id, suspected_duplicates=suspected_duplicates_data, duplicate_reason=reason, processing_status='pending', created_at=datetime.now() ) # 保存重复记录标记 db.session.add(duplicate_record) db.session.commit() logging.info(f"成功创建主名片记录 ID: {main_card.id},并标记 {len(suspected_duplicates)} 个疑似重复记录") return main_card except Exception as e: db.session.rollback() error_msg = f"创建主名片记录失败: {str(e)}" logging.error(error_msg, exc_info=True) raise Exception(error_msg) def get_minio_client(): """获取MinIO客户端连接""" try: # 使用全局配置变量 global minio_url, minio_access_key, minio_secret_key, minio_bucket, use_ssl logging.info(f"尝试连接MinIO服务器: {minio_url}") minio_client = boto3.client( 's3', endpoint_url=minio_url, aws_access_key_id=minio_access_key, aws_secret_access_key=minio_secret_key, config=Config( signature_version='s3v4', retries={'max_attempts': 3, 'mode': 'standard'}, connect_timeout=10, read_timeout=30 ) ) # 确保存储桶存在 buckets = minio_client.list_buckets() bucket_names = [bucket['Name'] for bucket in buckets.get('Buckets', [])] logging.info(f"成功连接到MinIO服务器,现有存储桶: {bucket_names}") if minio_bucket not in bucket_names: logging.info(f"创建存储桶: {minio_bucket}") minio_client.create_bucket(Bucket=minio_bucket) return minio_client except Exception as e: logging.error(f"MinIO连接错误: {str(e)}") return None def get_business_cards(): """ 获取所有名片记录 Returns: dict: 包含名片记录列表的字典 """ try: # 查询所有名片记录,按创建时间倒序排列 cards = BusinessCard.query.filter_by(status='active').order_by(BusinessCard.created_at.desc()).all() # 转换为字典格式 cards_data = [card.to_dict() for card in cards] return { 'code': 200, 'success': True, 'message': '获取名片列表成功', 'data': cards_data, 'count': len(cards_data) } except Exception as e: error_msg = f"获取名片列表失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': [], 'count': 0 } def get_business_card(card_id): """ 根据ID获取单个名片记录 Args: card_id (int): 名片记录ID Returns: dict: 包含名片记录的字典 """ try: card = BusinessCard.query.get(card_id) if not card: return { 'code': 404, 'success': False, 'message': f'未找到ID为{card_id}的名片记录', 'data': None } return { 'code': 200, 'success': True, 'message': '获取名片记录成功', 'data': card.to_dict() } except Exception as e: error_msg = f"获取名片记录失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def update_business_card(card_id, data): """ 更新名片记录 Args: card_id (int): 名片记录ID data (dict): 要更新的数据 Returns: dict: 包含更新结果的字典 """ try: card = BusinessCard.query.get(card_id) if not card: return { 'code': 404, 'success': False, 'message': f'未找到ID为{card_id}的名片记录', 'data': None } # 更新字段 updatable_fields = ['name_zh', 'name_en', 'title_zh', 'title_en', 'mobile', 'phone', 'email', 'hotel_zh', 'hotel_en', 'address_zh', 'address_en', 'postal_code_zh', 'postal_code_en', 'brand_zh', 'brand_en', 'affiliation_zh', 'affiliation_en', 'brand_group', 'talent_profile'] for field in updatable_fields: if field in data and data[field] is not None: setattr(card, field, data[field]) # 处理手机号标准化 if 'mobile' in data: card.mobile = normalize_mobile_numbers(data['mobile']) card.updated_at = datetime.now() card.updated_by = data.get('updated_by', 'system') db.session.commit() return { 'code': 200, 'success': True, 'message': '名片记录更新成功', 'data': card.to_dict() } except Exception as e: db.session.rollback() error_msg = f"更新名片记录失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def update_business_card_status(card_id, status): """ 更新名片记录状态 Args: card_id (int): 名片记录ID status (str): 新的状态 Returns: dict: 包含更新结果的字典 """ try: card = BusinessCard.query.get(card_id) if not card: return { 'code': 404, 'success': False, 'message': f'未找到ID为{card_id}的名片记录', 'data': None } card.status = status card.updated_at = datetime.now() db.session.commit() return { 'code': 200, 'success': True, 'message': '名片状态更新成功', 'data': card.to_dict() } except Exception as e: db.session.rollback() error_msg = f"更新名片状态失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def search_business_cards_by_mobile(mobile_number): """ 根据手机号搜索名片记录 Args: mobile_number (str): 手机号码 Returns: dict: 包含搜索结果的字典 """ try: if not mobile_number or not mobile_number.strip(): return { 'code': 400, 'success': False, 'message': '手机号码不能为空', 'data': [], 'count': 0 } # 查询包含该手机号的记录 cards = BusinessCard.query.filter( BusinessCard.mobile.contains(mobile_number.strip()) ).all() # 转换为字典格式 cards_data = [card.to_dict() for card in cards] return { 'code': 200, 'success': True, 'message': f'找到 {len(cards_data)} 条匹配记录', 'data': cards_data, 'count': len(cards_data) } except Exception as e: error_msg = f"搜索名片记录失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': [], 'count': 0 } # 重复记录管理函数 def get_duplicate_records(status=None): """ 获取重复记录列表 Args: status (str, optional): 筛选特定状态的记录 Returns: dict: 包含操作结果和重复记录列表 """ try: query = DuplicateBusinessCard.query if status: query = query.filter_by(processing_status=status) duplicate_records = query.order_by(DuplicateBusinessCard.created_at.desc()).all() records_data = [] for record in duplicate_records: record_dict = record.to_dict() if record.main_card: record_dict['main_card'] = record.main_card.to_dict() records_data.append(record_dict) return { 'code': 200, 'success': True, 'message': '获取重复记录列表成功', 'data': records_data, 'count': len(records_data) } except Exception as e: error_msg = f"获取重复记录列表失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': [], 'count': 0 } def process_duplicate_record(duplicate_id, action, selected_duplicate_id=None, processed_by=None, notes=None): """ 处理重复记录 Args: duplicate_id (int): 重复记录ID action (str): 处理动作 selected_duplicate_id (int, optional): 选择的重复记录ID processed_by (str, optional): 处理人 notes (str, optional): 处理备注 Returns: dict: 包含操作结果 """ try: duplicate_record = DuplicateBusinessCard.query.filter_by(main_card_id=duplicate_id).first() if not duplicate_record: return { 'code': 404, 'success': False, 'message': f'未找到main_card_id为{duplicate_id}的重复记录', 'data': None } if duplicate_record.processing_status != 'pending': return { 'code': 400, 'success': False, 'message': f'重复记录状态为{duplicate_record.processing_status},无法处理', 'data': None } main_card = duplicate_record.main_card if not main_card: return { 'code': 404, 'success': False, 'message': '未找到对应的主记录', 'data': None } result_data = None if action == 'merge_to_suspected': if not selected_duplicate_id: return { 'code': 400, 'success': False, 'message': '执行合并操作时必须提供selected_duplicate_id', 'data': None } target_card = BusinessCard.query.get(selected_duplicate_id) if not target_card: return { 'code': 404, 'success': False, 'message': f'未找到ID为{selected_duplicate_id}的目标记录', 'data': None } # 合并信息到目标记录 target_card.name_en = main_card.name_en or target_card.name_en target_card.title_zh = main_card.title_zh or target_card.title_zh target_card.title_en = main_card.title_en or target_card.title_en if main_card.mobile: target_card.mobile = merge_mobile_numbers(target_card.mobile, main_card.mobile) target_card.phone = main_card.phone or target_card.phone target_card.email = main_card.email or target_card.email target_card.hotel_zh = main_card.hotel_zh or target_card.hotel_zh target_card.hotel_en = main_card.hotel_en or target_card.hotel_en target_card.address_zh = main_card.address_zh or target_card.address_zh target_card.address_en = main_card.address_en or target_card.address_en target_card.brand_group = main_card.brand_group or target_card.brand_group target_card.image_path = main_card.image_path target_card.updated_by = processed_by or 'system' new_data = { 'hotel_zh': main_card.hotel_zh, 'hotel_en': main_card.hotel_en, 'title_zh': main_card.title_zh, 'title_en': main_card.title_en } target_card.career_path = update_career_path(target_card, new_data, main_card.image_path) db.session.delete(duplicate_record) db.session.delete(main_card) result_data = target_card.to_dict() elif action == 'keep_main': result_data = main_card.to_dict() elif action == 'ignore': result_data = main_card.to_dict() if action != 'merge_to_suspected': duplicate_record.processing_status = 'processed' duplicate_record.processed_at = datetime.now() duplicate_record.processed_by = processed_by or 'system' duplicate_record.processing_notes = notes or f'执行操作: {action}' db.session.commit() return { 'code': 200, 'success': True, 'message': f'重复记录处理成功,操作: {action}', 'data': { 'duplicate_record': duplicate_record.to_dict() if action != 'merge_to_suspected' else None, 'result': result_data } } except Exception as e: db.session.rollback() error_msg = f"处理重复记录失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def get_duplicate_record_detail(duplicate_id): """ 获取重复记录详情 Args: duplicate_id (int): 重复记录ID Returns: dict: 包含重复记录详细信息 """ try: duplicate_record = DuplicateBusinessCard.query.filter_by(main_card_id=duplicate_id).first() if not duplicate_record: return { 'code': 404, 'success': False, 'message': f'未找到main_card_id为{duplicate_id}的重复记录', 'data': None } record_dict = duplicate_record.to_dict() if duplicate_record.main_card: record_dict['main_card'] = duplicate_record.main_card.to_dict() else: record_dict['main_card'] = None suspected_duplicates_details = [] if duplicate_record.suspected_duplicates: for suspected_item in duplicate_record.suspected_duplicates: try: if isinstance(suspected_item, dict): card_id = suspected_item.get('id') else: card_id = suspected_item if card_id: card_result = get_business_card(card_id) if card_result['success']: suspected_duplicates_details.append(card_result['data']) except Exception as e: logging.warning(f"获取疑似重复记录详情失败: {str(e)}") continue record_dict['suspected_duplicates_details'] = suspected_duplicates_details return { 'code': 200, 'success': True, 'message': '获取重复记录详情成功', 'data': record_dict } except Exception as e: error_msg = f"获取重复记录详情失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def fix_broken_duplicate_records(): """ 修复损坏的重复记录 Returns: dict: 包含修复结果 """ try: broken_records = DuplicateBusinessCard.query.filter_by(main_card_id=None).all() fixed_count = 0 for record in broken_records: db.session.delete(record) fixed_count += 1 db.session.commit() return { 'code': 200, 'success': True, 'message': f'成功修复 {fixed_count} 条损坏的重复记录', 'data': {'fixed_count': fixed_count} } except Exception as e: db.session.rollback() error_msg = f"修复重复记录失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def get_parse_tasks(page=1, per_page=10, task_type=None, task_status=None): """ 获取解析任务列表 Args: page (int): 页码 per_page (int): 每页记录数 task_type (str): 任务类型过滤 task_status (str): 任务状态过滤 Returns: dict: 包含查询结果和分页信息 """ try: if page < 1 or per_page < 1 or per_page > 100: return { 'code': 400, 'success': False, 'message': '分页参数错误', 'data': None } query = ParseTaskRepository.query if task_type: query = query.filter_by(task_type=task_type) if task_status: query = query.filter_by(task_status=task_status) query = query.order_by(ParseTaskRepository.created_at.desc()) pagination = query.paginate(page=page, per_page=per_page, error_out=False) tasks = [task.to_dict() for task in pagination.items] return { 'code': 200, 'success': True, 'message': '获取解析任务列表成功', 'data': { 'tasks': tasks, 'pagination': { 'page': page, 'per_page': per_page, 'total': pagination.total, 'pages': pagination.pages, 'has_next': pagination.has_next, 'has_prev': pagination.has_prev } } } except Exception as e: error_msg = f"获取解析任务列表失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def get_parse_task_detail(task_name): """ 获取解析任务详情 Args: task_name (str): 任务名称 Returns: dict: 包含查询结果 """ try: if not task_name: return { 'code': 400, 'success': False, 'message': '任务名称不能为空', 'data': None } task = ParseTaskRepository.query.filter_by(task_name=task_name).first() if not task: return { 'code': 404, 'success': False, 'message': f'未找到任务名称为 {task_name} 的记录', 'data': None } return { 'code': 200, 'success': True, 'message': f'成功获取任务 {task_name} 的详细信息', 'data': task.to_dict() } except Exception as e: error_msg = f"获取解析任务详情失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def create_talent_tag(tag_data): """ 创建人才标签节点 Args: tag_data: 包含标签信息的字典,包括: - name: 标签名称 - category: 标签分类 - description: 标签描述 - status: 启用状态 Returns: dict: 操作结果字典 """ try: from app.services.neo4j_driver import neo4j_driver # 验证必要参数存在 if not tag_data or 'name' not in tag_data or not tag_data['name']: return { 'code': 400, 'success': False, 'message': '标签名称为必填项', 'data': None } # 准备节点属性 tag_properties = { 'name': tag_data.get('name'), 'category': tag_data.get('category', '未分类'), 'describe': tag_data.get('description', ''), # 使用describe与现有系统保持一致 'status': tag_data.get('status', 'active'), 'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } # 生成标签的英文名(可选) from app.core.graph.graph_operations import create_or_get_node # 如果提供了名称,尝试获取英文翻译 if 'name' in tag_data and tag_data['name']: try: from app.api.data_interface.routes import translate_and_parse en_name = translate_and_parse(tag_data['name']) tag_properties['en_name'] = en_name[0] if en_name and isinstance(en_name, list) else '' except Exception as e: logging.warning(f"获取标签英文名失败: {str(e)}") tag_properties['en_name'] = '' # 创建节点 node_id = create_or_get_node('DataLabel', **tag_properties) if node_id: return { 'code': 200, 'success': True, 'message': '人才标签创建成功', 'data': { 'id': node_id, **tag_properties } } else: return { 'code': 500, 'success': False, 'message': '人才标签创建失败', 'data': None } except Exception as e: logging.error(f"创建人才标签失败: {str(e)}", exc_info=True) return { 'code': 500, 'success': False, 'message': f'创建人才标签失败: {str(e)}', 'data': None } def get_talent_tag_list(): """ 从Neo4j图数据库获取人才标签列表 Returns: dict: 包含操作结果和标签列表的字典 """ try: from app.services.neo4j_driver import neo4j_driver # 构建Cypher查询语句,获取分类为talent的标签 query = """ MATCH (n:DataLabel) WHERE n.category CONTAINS 'talent' OR n.category CONTAINS '人才' RETURN id(n) as id, n.name as name, n.en_name as en_name, n.category as category, n.describe as description, n.status as status, n.time as time ORDER BY n.time DESC """ # 执行查询 tags = [] with neo4j_driver.get_session() as session: result = session.run(query) # 处理查询结果 for record in result: tag = { 'id': record['id'], 'name': record['name'], 'en_name': record['en_name'], 'category': record['category'], 'description': record['description'], 'status': record['status'], 'time': record['time'] } tags.append(tag) return { 'code': 200, 'success': True, 'message': '获取人才标签列表成功', 'data': tags } except Exception as e: error_msg = f"获取人才标签列表失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': [] } def update_talent_tag(tag_id, tag_data): """ 更新人才标签节点属性 Args: tag_id: 标签节点ID tag_data: 包含更新信息的字典,可能包括: - name: 标签名称 - category: 标签分类 - description: 标签描述 - status: 启用状态 Returns: dict: 操作结果字典 """ try: from app.services.neo4j_driver import neo4j_driver # 准备要更新的属性 update_properties = {} # 检查并添加需要更新的属性 if 'name' in tag_data and tag_data['name']: update_properties['name'] = tag_data['name'] # 如果名称更新了,尝试更新英文名称 try: from app.api.data_interface.routes import translate_and_parse en_name = translate_and_parse(tag_data['name']) update_properties['en_name'] = en_name[0] if en_name and isinstance(en_name, list) else '' except Exception as e: logging.warning(f"更新标签英文名失败: {str(e)}") if 'category' in tag_data and tag_data['category']: update_properties['category'] = tag_data['category'] if 'description' in tag_data: update_properties['describe'] = tag_data['description'] if 'status' in tag_data: update_properties['status'] = tag_data['status'] # 添加更新时间 update_properties['time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 如果没有可更新的属性,返回错误 if not update_properties: return { 'code': 400, 'success': False, 'message': '未提供任何可更新的属性', 'data': None } # 构建更新的Cypher查询 set_clauses = [] params = {'nodeId': tag_id} for key, value in update_properties.items(): param_name = f"param_{key}" set_clauses.append(f"n.{key} = ${param_name}") params[param_name] = value set_clause = ", ".join(set_clauses) query = f""" MATCH (n:DataLabel) WHERE id(n) = $nodeId SET {set_clause} RETURN id(n) as id, n.name as name, n.en_name as en_name, n.category as category, n.describe as description, n.status as status, n.time as time """ # 执行更新查询 with neo4j_driver.get_session() as session: result = session.run(query, **params) record = result.single() if not record: return { 'code': 404, 'success': False, 'message': f'未找到ID为{tag_id}的标签', 'data': None } # 提取更新后的标签信息 updated_tag = { 'id': record['id'], 'name': record['name'], 'en_name': record['en_name'], 'category': record['category'], 'description': record['description'], 'status': record['status'], 'time': record['time'] } return { 'code': 200, 'success': True, 'message': '人才标签更新成功', 'data': updated_tag } except Exception as e: error_msg = f"更新人才标签失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def delete_talent_tag(tag_id): """ 删除人才标签节点及其相关关系 Args: tag_id: 标签节点ID Returns: dict: 操作结果字典 """ try: from app.services.neo4j_driver import neo4j_driver # 首先获取要删除的标签信息,以便在成功后返回 get_query = """ MATCH (n:DataLabel) WHERE id(n) = $nodeId RETURN id(n) as id, n.name as name, n.en_name as en_name, n.category as category, n.describe as description, n.status as status, n.time as time """ # 构建删除节点和关系的Cypher查询 delete_query = """ MATCH (n:DataLabel) WHERE id(n) = $nodeId OPTIONAL MATCH (n)-[r]-() DELETE r, n RETURN count(n) AS deleted """ # 执行查询 tag_info = None with neo4j_driver.get_session() as session: # 先获取标签信息 result = session.run(get_query, nodeId=tag_id) record = result.single() if not record: return { 'code': 404, 'success': False, 'message': f'未找到ID为{tag_id}的标签', 'data': None } # 保存标签信息用于返回 tag_info = { 'id': record['id'], 'name': record['name'], 'en_name': record['en_name'], 'category': record['category'], 'description': record['description'], 'status': record['status'], 'time': record['time'] } # 执行删除操作 delete_result = session.run(delete_query, nodeId=tag_id) deleted = delete_result.single()['deleted'] if deleted > 0: return { 'code': 200, 'success': True, 'message': '人才标签删除成功', 'data': tag_info } else: return { 'code': 404, 'success': False, 'message': f'未能删除ID为{tag_id}的标签', 'data': None } except Exception as e: error_msg = f"删除人才标签失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def query_neo4j_graph(query_requirement): """ 查询Neo4j图数据库,通过Deepseek API生成Cypher脚本 Args: query_requirement (str): 查询需求描述 Returns: dict: 包含查询结果的字典,JSON格式 """ try: # 导入必要的模块 from app.services.neo4j_driver import neo4j_driver import requests import json # Deepseek API配置 api_key = DEEPSEEK_API_KEY api_url = DEEPSEEK_API_URL # 步骤1: 从Neo4j获取所有标签列表 logging.info("第一步:从Neo4j获取人才类别的标签列表") all_labels_query = """ MATCH (dl:DataLabel) WHERE dl.category CONTAINS '人才' OR dl.category CONTAINS 'talent' RETURN dl.name as name """ all_labels = [] with neo4j_driver.get_session() as session: result = session.run(all_labels_query) for record in result: all_labels.append(record['name']) logging.info(f"获取到{len(all_labels)}个人才标签: {all_labels}") # 步骤2: 使用Deepseek判断查询需求中的关键信息与标签的对应关系 logging.info("第二步:调用Deepseek API匹配查询需求与标签") # 构建所有标签的JSON字符串 labels_json = json.dumps(all_labels, ensure_ascii=False) # 构建匹配标签的提示语 matching_prompt = f""" 请分析以下查询需求,并从标签列表中找出与查询需求相关的标签。 ## 查询需求 {query_requirement} ## 可用标签列表 {labels_json} ## 输出要求 1. 请以JSON数组格式返回匹配的标签名称列表,格式如: ["标签1", "标签2", "标签3"] 2. 只返回标签名称数组,不要包含任何解释或其他文本 3. 如果没有找到匹配的标签,请返回空数组 [] """ # 调用Deepseek API匹配标签 headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "model": "deepseek-chat", "messages": [ {"role": "system", "content": "你是一个专业的文本分析和匹配专家。"}, {"role": "user", "content": matching_prompt} ], "temperature": 0.1, "response_format": {"type": "json_object"} } logging.info("发送请求到Deepseek API匹配标签:"+matching_prompt) response = requests.post(api_url, headers=headers, json=payload, timeout=30) response.raise_for_status() # 解析API响应 result = response.json() matching_content = result.get("choices", [{}])[0].get("message", {}).get("content", "[]") # 提取JSON数组 try: # 尝试直接解析返回结果,预期格式为 ["新开酒店经验", "五星级酒店", "总经理"] logging.info(f"Deepseek返回的匹配内容: {matching_content}") # 如果返回的是JSON字符串,先去除可能的前后缀文本 if isinstance(matching_content, str): # 查找JSON数组的开始和结束位置 start_idx = matching_content.find('[') end_idx = matching_content.rfind(']') + 1 if start_idx >= 0 and end_idx > start_idx: json_str = matching_content[start_idx:end_idx] matched_labels = json.loads(json_str) else: matched_labels = [] else: matched_labels = [] # 确保结果是字符串列表 if matched_labels and all(isinstance(item, str) for item in matched_labels): logging.info(f"成功解析到标签列表: {matched_labels}") else: logging.warning("解析结果不是预期的字符串列表格式,将使用空列表") matched_labels = [] except json.JSONDecodeError as e: logging.error(f"JSON解析错误: {str(e)}") matched_labels = [] except Exception as e: logging.error(f"解析匹配标签时出错: {str(e)}") matched_labels = [] logging.info(f"匹配到的标签: {matched_labels}") # 如果没有匹配到标签,返回空结果 if not matched_labels: return { 'code': 200, 'success': True, 'message': '未找到与查询需求匹配的标签', 'query': '', 'data': [] } # 步骤3: 构建Cypher生成提示文本 logging.info("第三步:构建提示文本生成Cypher查询语句") # 将匹配的标签转换为字符串 matched_labels_str = ", ".join([f"'{label}'" for label in matched_labels]) # 构建生成Cypher的提示语 cypher_prompt = f""" 请根据以下Neo4j图数据库结构和已匹配的标签,生成一个Cypher查询脚本。 ## 图数据库结构 ### 节点 1. talent - 人才节点 属性: pg_id(PostgreSQL数据库ID), name_zh(中文姓名), name_en(英文姓名), mobile(手机号码), email(电子邮箱), updated_at(更新时间) 2. DataLabel - 人才标签节点 ### 关系 BELONGS_TO - 从属关系 (talent)-[BELONGS_TO]->(DataLabel) - 人才属于某标签 ## 匹配的标签列表 [{matched_labels_str}] ## 查询需求 {query_requirement} ## 输出要求 1. 只输出有效的Cypher查询语句,不要包含任何解释或注释 2. 确保return语句中包含talent节点属性 3. 尽量利用图数据库的特性来优化查询效率 4. 使用WITH子句和COLLECT函数收集标签,确保查询到同时拥有所有标签的人才 注意:请直接返回Cypher查询语句,无需任何其他文本。 以下是一个示例: 假设匹配的标签是 ['五星级酒店', '新开酒店经验', '总经理'] 生成的Cypher查询语句应该是: MATCH (t:talent)-[:BELONGS_TO]->(dl:DataLabel) WHERE dl.name IN ['五星级酒店', '新开酒店经验', '总经理'] WITH t, COLLECT(DISTINCT dl.name) AS labels WHERE size(labels) = 3 RETURN t.pg_id as pg_id, t.name_zh as name_zh, t.name_en as name_en, t.mobile as mobile, t.email as email, t.updated_at as updated_at """ # 调用Deepseek API生成Cypher脚本 payload = { "model": "deepseek-chat", "messages": [ {"role": "system", "content": "你是一个专业的Neo4j Cypher查询专家。"}, {"role": "user", "content": cypher_prompt} ], "temperature": 0.1 } logging.info("发送请求到Deepseek API生成Cypher脚本") response = requests.post(api_url, headers=headers, json=payload, timeout=30) response.raise_for_status() # 解析API响应 result = response.json() cypher_script = result.get("choices", [{}])[0].get("message", {}).get("content", "") # 清理Cypher脚本,移除不必要的markdown格式或注释 cypher_script = cypher_script.strip() if cypher_script.startswith("```cypher"): cypher_script = cypher_script[9:] elif cypher_script.startswith("```"): cypher_script = cypher_script[3:] if cypher_script.endswith("```"): cypher_script = cypher_script[:-3] cypher_script = cypher_script.strip() logging.info(f"生成的Cypher脚本: {cypher_script}") # 步骤4: 执行Cypher脚本 logging.info("第四步:执行Cypher脚本并返回结果") with neo4j_driver.get_session() as session: result = session.run(cypher_script) records = [record.data() for record in result] # 构建查询结果 response_data = { 'code': 200, 'success': True, 'message': '查询成功执行', 'query': cypher_script, 'matched_labels': matched_labels, 'data': records } return response_data except requests.exceptions.HTTPError as e: error_msg = f"调用Deepseek API失败: {str(e)}" logging.error(error_msg) if hasattr(e, 'response') and e.response: logging.error(f"错误状态码: {e.response.status_code}") logging.error(f"错误内容: {e.response.text}") return { 'code': 500, 'success': False, 'message': error_msg, 'data': [] } except Exception as e: error_msg = f"查询Neo4j图数据库失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': [] } def talent_get_tags(talent_id): """ 根据talent ID获取人才节点关联的标签 Args: talent_id (int): 人才节点pg_id Returns: dict: 包含人才ID和关联标签的字典,JSON格式 """ try: # 导入必要的模块 from app.services.neo4j_driver import neo4j_driver # 准备查询返回数据 response_data = { 'code': 200, 'success': True, 'message': '获取人才标签成功', 'data': [] } # 构建Cypher查询语句,获取人才节点关联的标签 cypher_query = """ MATCH (t:talent)-[r:BELONGS_TO]->(tag:DataLabel) WHERE t.pg_id = $talent_id RETURN t.pg_id as talent_id, tag.name as tag_name """ # 执行查询 with neo4j_driver.get_session() as session: result = session.run(cypher_query, talent_id=int(talent_id)) records = list(result) # 如果没有查询到标签,返回空数组 if not records: response_data['message'] = f'人才pg_id {talent_id} 没有关联的标签' return response_data # 处理查询结果 for record in records: talent_tag = { 'talent': record['talent_id'], 'tag': record['tag_name'] } response_data['data'].append(talent_tag) return response_data except Exception as e: error_msg = f"获取人才标签失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': [] } def talent_update_tags(data): """ 根据传入的JSON数据为人才节点创建与标签的BELONGS_TO关系 Args: data (list): 包含talent和tag字段的对象列表 例如: [ {"talent": 12345, "tag": "市场营销"}, {"talent": 12345, "tag": "酒店管理"} ] Returns: dict: 操作结果和状态信息 """ try: # 导入必要的模块 from app.services.neo4j_driver import neo4j_driver # 验证输入参数 if not isinstance(data, list): return { 'code': 400, 'success': False, 'message': '参数格式错误,需要JSON数组', 'data': None } if len(data) == 0: return { 'code': 400, 'success': False, 'message': '数据列表为空', 'data': None } # 获取当前时间 current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 成功和失败计数 success_count = 0 failed_items = [] # 按talent分组处理数据 talent_tags = {} for item in data: # 验证每个项目的格式 if not isinstance(item, dict) or 'talent' not in item or 'tag' not in item: failed_items.append(item) continue talent_id = item.get('talent') tag_name = item.get('tag') # 验证talent_id和tag_name的值 if not talent_id or not tag_name or not isinstance(tag_name, str): failed_items.append(item) continue # 按talent_id分组 if talent_id not in talent_tags: talent_tags[talent_id] = [] talent_tags[talent_id].append(tag_name) with neo4j_driver.get_session() as session: # 处理每个talent及其标签 for talent_id, tags in talent_tags.items(): # 首先验证talent节点是否存在 check_talent_query = """ MATCH (t:talent) WHERE t.pg_id = $talent_id RETURN t """ talent_result = session.run(check_talent_query, talent_id=int(talent_id)) if not talent_result.single(): # 该talent不存在,记录失败项并继续下一个talent for tag in tags: failed_items.append({'talent_pg_id': talent_id, 'tag': tag}) continue # 首先清除所有现有的BELONGS_TO关系 clear_relations_query = """ MATCH (t:talent)-[r:BELONGS_TO]->(:DataLabel) WHERE t.pg_id = $talent_id DELETE r RETURN count(r) as deleted_count """ clear_result = session.run(clear_relations_query, talent_id=int(talent_id)) deleted_count = clear_result.single()['deleted_count'] logging.info(f"已删除talent_id={talent_id}的{deleted_count}个已有标签关系") # 处理每个标签 for tag_name in tags: try: # 1. 查找或创建标签节点 # 先查找是否存在该标签 find_tag_query = """ MATCH (tag:DataLabel) WHERE tag.name = $tag_name RETURN id(tag) as tag_id """ tag_result = session.run(find_tag_query, tag_name=tag_name) tag_record = tag_result.single() if tag_record: tag_id = tag_record['tag_id'] else: # 创建新标签 create_tag_query = """ CREATE (tag:DataLabel {name: $name, category: $category, updated_at: $updated_at}) RETURN id(tag) as tag_id """ tag_result = session.run( create_tag_query, name=tag_name, category='talent', updated_at=current_time ) tag_record = tag_result.single() tag_id = tag_record['tag_id'] # 2. 创建人才与标签的BELONGS_TO关系 create_relation_query = """ MATCH (t:talent), (tag:DataLabel) WHERE t.pg_id = $talent_id AND tag.name = $tag_name CREATE (t)-[r:BELONGS_TO]->(tag) SET r.created_at = $current_time RETURN r """ relation_result = session.run( create_relation_query, talent_id=int(talent_id), tag_name=tag_name, current_time=current_time ) if relation_result.single(): success_count += 1 else: failed_items.append({'talent_pg_id': talent_id, 'tag': tag_name}) except Exception as tag_error: logging.error(f"为标签 {tag_name} 创建关系时出错: {str(tag_error)}") failed_items.append({'talent_pg_id': talent_id, 'tag': tag_name}) # 返回结果 total_items = len(data) if success_count == total_items: return { 'code': 200, 'success': True, 'message': f'成功创建或更新了 {success_count} 个标签关系', 'data': { 'success_count': success_count, 'total_count': total_items, 'failed_items': [] } } elif success_count > 0: return { 'code': 206, # Partial Content 'success': True, 'message': f'部分成功: 创建或更新了 {success_count}/{total_items} 个标签关系', 'data': { 'success_count': success_count, 'total_count': total_items, 'failed_items': failed_items } } else: return { 'code': 500, 'success': False, 'message': '无法创建任何标签关系', 'data': { 'success_count': 0, 'total_count': total_items, 'failed_items': failed_items } } except Exception as e: error_msg = f"更新人才标签关系失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def parse_text_with_qwen25VLplus(image_data): """ 使用阿里云的 Qwen VL Max 模型解析图像中的名片信息 Args: image_data (bytes): 图像的二进制数据 Returns: dict: 解析的名片信息 """ try: # 将图片数据转为 base64 编码 base64_image = base64.b64encode(image_data).decode('utf-8') # 初始化 OpenAI 客户端,配置为阿里云 API client = OpenAI( api_key=QWEN_API_KEY, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) # 构建优化后的提示语 prompt = """你是企业名片的信息提取专家。请仔细分析提供的图片,精确提取名片信息。 ## 提取要求 - 区分中英文内容,分别提取 - 保持提取信息的原始格式(如大小写、标点) - 对于无法识别或名片中不存在的信息,返回空字符串 - 名片中没有的信息,请不要猜测 ## 需提取的字段 1. 中文姓名 (name_zh) 2. 英文姓名 (name_en) 3. 中文职位/头衔 (title_zh) 4. 英文职位/头衔 (title_en) 5. 中文酒店/公司名称 (hotel_zh) 6. 英文酒店/公司名称 (hotel_en) 7. 手机号码 (mobile) - 如有多个手机号码,使用逗号分隔,最多提取3个 8. 固定电话 (phone) - 如有多个,使用逗号分隔 9. 电子邮箱 (email) 10. 中文地址 (address_zh) 11. 英文地址 (address_en) 12. 中文邮政编码 (postal_code_zh) 13. 英文邮政编码 (postal_code_en) 14. 生日 (birthday) - 格式为YYYY-MM-DD,如1990-01-01 15. 年龄 (age) - 数字格式,如30 16. 籍贯 (native_place) - 出生地或户籍所在地信息 17. 居住地 (residence) - 个人居住地址信息 18. 品牌组合 (brand_group) - 如有多个品牌,使用逗号分隔 19. 职业轨迹 (career_path) - 如能从名片中推断,以JSON数组格式返回,包含当前日期,公司名称和职位。自动生成当前日期。 20. 隶属关系 (affiliation) - 如能从名片中推断,以JSON数组格式返回,包含公司名称和隶属集团名称 ## 输出格式 请以严格的JSON格式返回结果,不要添加任何额外解释文字。JSON格式如下: ```json { "name_zh": "", "name_en": "", "title_zh": "", "title_en": "", "hotel_zh": "", "hotel_en": "", "mobile": "", "phone": "", "email": "", "address_zh": "", "address_en": "", "postal_code_zh": "", "postal_code_en": "", "birthday": "", "age": 0, "native_place": "", "residence": "", "brand_group": "", "career_path": [], "affiliation": [] } ```""" # 调用 Qwen VL Max API logging.info("发送请求到 Qwen VL Max 模型") completion = client.chat.completions.create( # model="qwen-vl-plus", model="qwen-vl-max-latest", messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}} ] } ], temperature=0.1, # 降低温度增加精确性 response_format={"type": "json_object"} # 要求输出JSON格式 ) # 解析响应 response_content = completion.choices[0].message.content logging.info(f"成功从 Qwen 模型获取响应: {response_content}") # 尝试从响应中提取 JSON try: extracted_data = json.loads(response_content) logging.info("成功解析 Qwen 响应中的 JSON") except json.JSONDecodeError: logging.warning("无法解析 JSON,尝试从文本中提取信息") # 这里可以调用其他的解析函数,但为了简化,先返回错误 raise Exception("无法解析 Qwen 返回的 JSON 格式") # 确保所有必要字段存在 required_fields = [ 'name_zh', 'name_en', 'title_zh', 'title_en', 'hotel_zh', 'hotel_en', 'mobile', 'phone', 'email', 'address_zh', 'address_en', 'postal_code_zh', 'postal_code_en', 'birthday', 'age', 'native_place', 'residence', 'brand_group', 'career_path' ] for field in required_fields: if field not in extracted_data: if field == 'career_path': extracted_data[field] = [] elif field == 'age': extracted_data[field] = 0 else: extracted_data[field] = "" # 为career_path增加一条记录 if extracted_data.get('hotel_zh') or extracted_data.get('hotel_en') or extracted_data.get('title_zh') or extracted_data.get('title_en'): career_entry = { 'date': datetime.now().strftime('%Y-%m-%d'), 'hotel_en': extracted_data.get('hotel_en', ''), 'hotel_zh': extracted_data.get('hotel_zh', ''), 'image_path': '', 'source': 'business_card_creation', 'title_en': extracted_data.get('title_en', ''), 'title_zh': extracted_data.get('title_zh', '') } # 直接清空原有的career_path内容,用career_entry写入 extracted_data['career_path'] = [career_entry] logging.info(f"为解析结果设置了career_path记录: {career_entry}") return extracted_data except Exception as e: error_msg = f"Qwen VL Max 模型解析失败: {str(e)}" logging.error(error_msg, exc_info=True) raise Exception(error_msg)