from typing import Dict, Any from app import db from datetime import datetime import os import boto3 from botocore.config import Config import logging import requests import json import re import uuid from PIL import Image from io import BytesIO import pytesseract import base64 from openai import OpenAI from app.config.config import DevelopmentConfig, ProductionConfig # 测试用的解析数据接口。没有实际使用。 def parse_data(data: Dict[str, Any]) -> Dict[str, Any]: """ 解析数据的主函数 Args: data: 要解析的数据 Returns: 解析后的数据 """ # TODO: 实现数据解析逻辑 return { 'code': 200, 'status': 'success', 'message': 'Data parsed successfully', 'data': data } # 名片解析数据模型 class BusinessCard(db.Model): __tablename__ = 'business_cards' id = db.Column(db.Integer, primary_key=True, autoincrement=True) name_zh = db.Column(db.String(100), nullable=False) name_en = db.Column(db.String(100)) title_zh = db.Column(db.String(100)) title_en = db.Column(db.String(100)) mobile = db.Column(db.String(50)) phone = db.Column(db.String(50)) email = db.Column(db.String(100)) hotel_zh = db.Column(db.String(200)) hotel_en = db.Column(db.String(200)) address_zh = db.Column(db.Text) address_en = db.Column(db.Text) postal_code_zh = db.Column(db.String(20)) postal_code_en = db.Column(db.String(20)) brand_zh = db.Column(db.String(100)) brand_en = db.Column(db.String(100)) affiliation_zh = db.Column(db.String(200)) affiliation_en = db.Column(db.String(200)) image_path = db.Column(db.String(255)) # MinIO中存储的路径 career_path = db.Column(db.JSON) # 职业轨迹,JSON格式 brand_group = db.Column(db.String(200)) # 品牌组合 created_at = db.Column(db.DateTime, default=datetime.now, nullable=False) updated_at = db.Column(db.DateTime, onupdate=datetime.now) updated_by = db.Column(db.String(50)) status = db.Column(db.String(20), default='active') def to_dict(self): return { 'id': self.id, 'name_zh': self.name_zh, 'name_en': self.name_en, 'title_zh': self.title_zh, 'title_en': self.title_en, 'mobile': self.mobile, 'phone': self.phone, 'email': self.email, 'hotel_zh': self.hotel_zh, 'hotel_en': self.hotel_en, 'address_zh': self.address_zh, 'address_en': self.address_en, 'postal_code_zh': self.postal_code_zh, 'postal_code_en': self.postal_code_en, 'brand_zh': self.brand_zh, 'brand_en': self.brand_en, 'affiliation_zh': self.affiliation_zh, 'affiliation_en': self.affiliation_en, 'image_path': self.image_path, 'career_path': self.career_path, 'brand_group': self.brand_group, 'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None, 'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None, 'updated_by': self.updated_by, 'status': self.status } # 名片解析功能模块 # DeepSeek API配置 DEEPSEEK_API_KEY = os.environ.get('DEEPSEEK_API_KEY', 'sk-2aea6e8b159b448aa3c1e29acd6f4349') DEEPSEEK_API_URL = os.environ.get('DEEPSEEK_API_URL', 'https://api.deepseek.com/v1/chat/completions') # 备用API端点 DEEPSEEK_API_URL_BACKUP = 'https://api.deepseek.com/v1/completions' # OCR配置 # 设置pytesseract路径(如果需要) # pytesseract.pytesseract.tesseract_cmd = r'/path/to/tesseract' # OCR语言设置,支持多语言 OCR_LANG = os.environ.get('OCR_LANG', 'chi_sim+eng') # 根据环境选择配置 """ if os.environ.get('FLASK_ENV') == 'production': config = ProductionConfig() else: config = DevelopmentConfig() """ # 使用配置变量,缺省认为在生产环境运行 config = ProductionConfig() # 使用配置变量 minio_url = f"{'https' if config.MINIO_SECURE else 'http'}://{config.MINIO_HOST}" minio_access_key = config.MINIO_USER minio_secret_key = config.MINIO_PASSWORD minio_bucket = config.MINIO_BUCKET use_ssl = config.MINIO_SECURE def get_minio_client(): """获取MinIO客户端连接""" try: # 使用全局配置变量 global minio_url, minio_access_key, minio_secret_key, minio_bucket, use_ssl logging.info(f"尝试连接MinIO服务器: {minio_url}") minio_client = boto3.client( 's3', endpoint_url=minio_url, aws_access_key_id=minio_access_key, aws_secret_access_key=minio_secret_key, config=Config( signature_version='s3v4', retries={'max_attempts': 3, 'mode': 'standard'}, connect_timeout=10, read_timeout=30 ) ) # 确保存储桶存在 buckets = minio_client.list_buckets() bucket_names = [bucket['Name'] for bucket in buckets.get('Buckets', [])] logging.info(f"成功连接到MinIO服务器,现有存储桶: {bucket_names}") if minio_bucket not in bucket_names: logging.info(f"创建存储桶: {minio_bucket}") minio_client.create_bucket(Bucket=minio_bucket) return minio_client except Exception as e: logging.error(f"MinIO连接错误: {str(e)}") return None def extract_text_from_image(image_data): """ 使用OCR从图像中提取文本,然后通过DeepSeek API解析名片信息 Args: image_data (bytes): 图像的二进制数据 Returns: dict: 提取的信息(姓名、职位、公司等) Raises: Exception: 当OCR或API调用失败或配置错误时抛出异常 """ try: # 步骤1: 使用OCR从图像中提取文本 ocr_text = ocr_extract_text(image_data) if not ocr_text or ocr_text.strip() == "": error_msg = "OCR无法从图像中提取文本" logging.error(error_msg) raise Exception(error_msg) logging.info(f"OCR提取的文本: {ocr_text[:200]}..." if len(ocr_text) > 200 else ocr_text) # 步骤2: 使用DeepSeek API解析文本中的信息 return parse_text_with_deepseek(ocr_text) except Exception as e: error_msg = f"从图像中提取和解析文本失败: {str(e)}" logging.error(error_msg, exc_info=True) raise Exception(error_msg) def ocr_extract_text(image_data): """ 使用OCR从图像中提取文本 Args: image_data (bytes): 图像的二进制数据 Returns: str: 提取的文本 """ try: # 将二进制数据转换为PIL图像 image = Image.open(BytesIO(image_data)) # 使用pytesseract进行OCR文本提取 text = pytesseract.image_to_string(image, lang=OCR_LANG) # 清理提取的文本 text = text.strip() logging.info(f"OCR成功从图像中提取文本,长度: {len(text)}") print(text) return text except Exception as e: error_msg = f"OCR提取文本失败: {str(e)}" logging.error(error_msg, exc_info=True) raise Exception(error_msg) def parse_text_with_deepseek(text): """ 使用DeepSeek API解析文本中的名片信息 Args: text (str): 要解析的文本 Returns: dict: 解析的名片信息 """ # 准备请求DeepSeek API if not DEEPSEEK_API_KEY: error_msg = "未配置DeepSeek API密钥" logging.error(error_msg) raise Exception(error_msg) # 构建API请求的基本信息 headers = { "Authorization": f"Bearer {DEEPSEEK_API_KEY}", "Content-Type": "application/json" } # 构建提示语,包含OCR提取的文本 prompt = f"""请从以下名片文本中提取详细信息,需分别识别中英文内容。 以JSON格式返回,包含以下字段: - name_zh: 中文姓名 - name_en: 英文姓名 - title_zh: 中文职位/头衔 - title_en: 英文职位/头衔 - hotel_zh: 中文酒店/公司名称 - hotel_en: 英文酒店/公司名称 - mobile: 手机号码 - phone: 固定电话 - email: 电子邮箱 - address_zh: 中文地址 - address_en: 英文地址 - brand_group: 品牌组合(如有多个品牌,以逗号分隔) - career_path: 职业轨迹(如果能从文本中推断出,以JSON数组格式返回,包含公司名称和职位) 名片文本: {text} """ # 使用模型名称 model_name = 'deepseek-chat' try: # 尝试调用DeepSeek API logging.info(f"尝试通过DeepSeek API解析文本") payload = { "model": model_name, "messages": [ {"role": "system", "content": "你是一个专业的名片信息提取助手。请用JSON格式返回结果,不要有多余的文字说明。"}, {"role": "user", "content": prompt} ], "temperature": 0.1 } logging.info(f"向DeepSeek API发送请求") response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload, timeout=30) # 检查响应状态 response.raise_for_status() # 解析API响应 result = response.json() content = result.get("choices", [{}])[0].get("message", {}).get("content", "{}") # 尝试解析JSON内容 try: # 找到内容中的JSON部分(有时模型会在JSON前后添加额外文本) json_content = extract_json_from_text(content) extracted_data = json.loads(json_content) logging.info(f"成功解析DeepSeek API返回的JSON") except json.JSONDecodeError: logging.warning(f"无法解析JSON,尝试直接从文本提取信息") # 如果无法解析JSON,尝试直接从文本中提取关键信息 extracted_data = extract_fields_from_text(content) # 确保所有必要的字段都存在 required_fields = ['name', 'title', 'company', 'phone', 'email', 'address', 'brand_group', 'career_path'] for field in required_fields: if field not in extracted_data: extracted_data[field] = "" if field != 'career_path' else [] logging.info(f"成功从DeepSeek API获取解析结果") return extracted_data except requests.exceptions.HTTPError as e: error_msg = f"DeepSeek API调用失败: {str(e)}" logging.error(error_msg) if hasattr(e, 'response') and e.response: logging.error(f"错误状态码: {e.response.status_code}") logging.error(f"错误内容: {e.response.text}") raise Exception(error_msg) except Exception as e: error_msg = f"解析文本过程中发生错误: {str(e)}" logging.error(error_msg, exc_info=True) raise Exception(error_msg) def extract_json_from_text(text): """ 从文本中提取JSON部分 Args: text (str): 包含JSON的文本 Returns: str: 提取的JSON字符串 """ # 尝试找到最外层的花括号对 start_idx = text.find('{') if start_idx == -1: return "{}" # 使用简单的括号匹配算法找到对应的闭合括号 count = 0 for i in range(start_idx, len(text)): if text[i] == '{': count += 1 elif text[i] == '}': count -= 1 if count == 0: return text[start_idx:i+1] # 如果没有找到闭合括号,返回从开始位置到文本结尾 return text[start_idx:] def extract_fields_from_text(text): """ 从文本中直接提取名片字段信息 Args: text (str): 要分析的文本 Returns: dict: 提取的字段 """ # 初始化结果字典 result = { 'name_zh': '', 'name_en': '', 'title_zh': '', 'title_en': '', 'mobile': '', 'phone': '', 'email': '', 'hotel_zh': '', 'hotel_en': '', 'address_zh': '', 'address_en': '', 'postal_code_zh': '', 'postal_code_en': '', 'brand_zh': '', 'brand_en': '', 'affiliation_zh': '', 'affiliation_en': '' } # 提取中文姓名 name_zh_match = re.search(r'["\'](姓名)["\'][\s\{:]*["\']?(中文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if name_zh_match: result['name_zh'] = name_zh_match.group(3) # 提取英文姓名 name_en_match = re.search(r'["\'](姓名)["\'][\s\{:]*["\']?(英文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if name_en_match: result['name_en'] = name_en_match.group(3) # 提取中文头衔 title_zh_match = re.search(r'["\'](头衔|职位)["\'][\s\{:]*["\']?(中文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if title_zh_match: result['title_zh'] = title_zh_match.group(3) # 提取英文头衔 title_en_match = re.search(r'["\'](头衔|职位)["\'][\s\{:]*["\']?(英文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if title_en_match: result['title_en'] = title_en_match.group(3) # 提取手机 mobile_match = re.search(r'["\'](手机)["\'][\s:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if mobile_match: result['mobile'] = mobile_match.group(2) # 提取电话 phone_match = re.search(r'["\'](电话)["\'][\s:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if phone_match: result['phone'] = phone_match.group(2) # 提取邮箱 email_match = re.search(r'["\'](邮箱)["\'][\s:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if email_match: result['email'] = email_match.group(2) # 提取中文酒店名称 hotel_zh_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(酒店名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if hotel_zh_match: result['hotel_zh'] = hotel_zh_match.group(4) # 提取英文酒店名称 hotel_en_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(酒店名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if hotel_en_match: result['hotel_en'] = hotel_en_match.group(4) # 提取中文详细地址 address_zh_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(详细地址)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if address_zh_match: result['address_zh'] = address_zh_match.group(4) # 提取英文详细地址 address_en_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(详细地址)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if address_en_match: result['address_en'] = address_en_match.group(4) # 提取中文邮政编码 postal_code_zh_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(邮政编码)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if postal_code_zh_match: result['postal_code_zh'] = postal_code_zh_match.group(4) # 提取英文邮政编码 postal_code_en_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(邮政编码)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if postal_code_en_match: result['postal_code_en'] = postal_code_en_match.group(4) # 提取中文品牌名称 brand_zh_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(品牌名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if brand_zh_match: result['brand_zh'] = brand_zh_match.group(4) # 提取英文品牌名称 brand_en_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(品牌名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if brand_en_match: result['brand_en'] = brand_en_match.group(4) # 提取中文隶属关系 affiliation_zh_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(隶属关系)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if affiliation_zh_match: result['affiliation_zh'] = affiliation_zh_match.group(4) # 提取英文隶属关系 affiliation_en_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(隶属关系)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE) if affiliation_en_match: result['affiliation_en'] = affiliation_en_match.group(4) return result def parse_text_with_qwen25VLplus(image_data): """ 使用阿里云的 Qwen 2.5 VL Plus 模型解析图像中的名片信息 Args: image_data (bytes): 图像的二进制数据 Returns: dict: 解析的名片信息 """ # 阿里云 Qwen API 配置 QWEN_API_KEY = os.environ.get('QWEN_API_KEY', 'sk-8f2320dafc9e4076968accdd8eebd8e9') try: # 将图片数据转为 base64 编码 base64_image = base64.b64encode(image_data).decode('utf-8') # 初始化 OpenAI 客户端,配置为阿里云 API client = OpenAI( api_key=QWEN_API_KEY, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) # 构建优化后的提示语 prompt = """你是企业名片的信息提取专家。请仔细分析提供的名片,精确提取以下信息: ## 提取要求 - 区分中英文内容,分别提取 - 保持提取信息的原始格式(如大小写、标点) - 对于无法识别或名片中不存在的信息,返回空字符串 - 名片中没有的信息,请不要猜测 ## 需提取的字段 1. 中文姓名 (name_zh) 2. 英文姓名 (name_en) 3. 中文职位/头衔 (title_zh) 4. 英文职位/头衔 (title_en) 5. 中文酒店/公司名称 (hotel_zh) 6. 英文酒店/公司名称 (hotel_en) 7. 手机号码 (mobile) - 如有多个,使用逗号分隔 8. 固定电话 (phone) - 如有多个,使用逗号分隔 9. 电子邮箱 (email) 10. 中文地址 (address_zh) 11. 英文地址 (address_en) 12. 中文邮政编码 (postal_code_zh) 13. 英文邮政编码 (postal_code_en) 14. 品牌组合 (brand_group) - 如有多个品牌,使用逗号分隔 15. 职业轨迹 (career_path) - 如能从名片中推断,以JSON数组格式返回,包含当前日期,公司名称和职位 16. 隶属关系 (affiliation) - 如能从名片中推断,以JSON数组格式返回,包含公司名称和隶属集团名称 ## 输出格式 请以严格的JSON格式返回结果,不要添加任何额外解释文字。JSON格式如下: ```json { "name_zh": "", "name_en": "", "title_zh": "", "title_en": "", "hotel_zh": "", "hotel_en": "", "mobile": "", "phone": "", "email": "", "address_zh": "", "address_en": "", "postal_code_zh": "", "postal_code_en": "", "brand_group": "", "career_path": [], "affiliation": [] } ```""" # 调用 Qwen 2.5 VL Plus API logging.info("发送请求到 Qwen 2.5 VL Plus 模型") completion = client.chat.completions.create( model="qwen-vl-plus", messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}} ] } ], temperature=0.1, # 降低温度增加精确性 response_format={"type": "json_object"} # 要求输出JSON格式 ) # 解析响应 response_content = completion.choices[0].message.content logging.info(f"成功从 Qwen 模型获取响应: {response_content}") # 尝试从响应中提取 JSON try: json_content = extract_json_from_text(response_content) extracted_data = json.loads(json_content) logging.info("成功解析 Qwen 响应中的 JSON") except json.JSONDecodeError: logging.warning("无法解析 JSON,尝试从文本中提取信息") extracted_data = extract_fields_from_text(response_content) # 确保所有必要字段存在 required_fields = [ 'name_zh', 'name_en', 'title_zh', 'title_en', 'hotel_zh', 'hotel_en', 'mobile', 'phone', 'email', 'address_zh', 'address_en', 'postal_code_zh', 'postal_code_en', 'brand_group', 'career_path' ] for field in required_fields: if field not in extracted_data: extracted_data[field] = [] if field == 'career_path' else "" return extracted_data except Exception as e: error_msg = f"Qwen 2.5 VL Plus 模型解析失败: {str(e)}" logging.error(error_msg, exc_info=True) raise Exception(error_msg) def process_business_card(image_file): """ 处理名片图片并提取信息 Args: image_file (FileStorage): 上传的名片图片文件 Returns: dict: 处理结果,包含提取的信息和状态 """ minio_path = None try: # 读取图片数据 image_data = image_file.read() image_file.seek(0) # 重置文件指针以便后续读取 try: # 优先使用 Qwen 2.5 VL Plus 模型直接从图像提取信息 try: logging.info("尝试使用 Qwen 2.5 VL Plus 模型解析名片") extracted_data = parse_text_with_qwen25VLplus(image_data) logging.info("成功使用 Qwen 2.5 VL Plus 模型解析名片") except Exception as qwen_error: logging.warning(f"Qwen 模型解析失败,错误原因: {str(qwen_error)}") # extracted_data = extract_text_from_image(image_data) except Exception as e: return { 'code': 500, 'success': False, 'message': f"名片解析失败: {str(e)}", 'data': None } try: # 生成唯一的文件名 file_ext = os.path.splitext(image_file.filename)[1].lower() if not file_ext: file_ext = '.jpg' # 默认扩展名 unique_filename = f"{uuid.uuid4().hex}{file_ext}" minio_path = f"{unique_filename}" # 尝试上传到MinIO minio_client = get_minio_client() if minio_client: try: # 上传文件 logging.info(f"上传文件到MinIO: {minio_path}") minio_client.put_object( Bucket=minio_bucket, Key=minio_path, Body=image_file, ContentType=image_file.content_type ) logging.info(f"图片已上传到MinIO: {minio_path}") except Exception as upload_err: logging.error(f"上传文件到MinIO时出错: {str(upload_err)}") # 即使上传失败,仍继续处理,但路径为None minio_path = None else: minio_path = None logging.warning("MinIO客户端未初始化,图片未上传") except Exception as e: logging.error(f"上传图片到MinIO失败: {str(e)}", exc_info=True) minio_path = None try: # 保存到数据库 business_card = BusinessCard( name_zh=extracted_data.get('name_zh', ''), name_en=extracted_data.get('name_en', ''), title_zh=extracted_data.get('title_zh', ''), title_en=extracted_data.get('title_en', ''), mobile=extracted_data.get('mobile', ''), phone=extracted_data.get('phone', ''), email=extracted_data.get('email', ''), hotel_zh=extracted_data.get('hotel_zh', ''), hotel_en=extracted_data.get('hotel_en', ''), address_zh=extracted_data.get('address_zh', ''), address_en=extracted_data.get('address_en', ''), postal_code_zh=extracted_data.get('postal_code_zh', ''), postal_code_en=extracted_data.get('postal_code_en', ''), brand_zh=extracted_data.get('brand_zh', ''), brand_en=extracted_data.get('brand_en', ''), affiliation_zh=extracted_data.get('affiliation_zh', ''), affiliation_en=extracted_data.get('affiliation_en', ''), image_path=minio_path, # 存储相对路径 career_path=extracted_data.get('career_path', []), # 添加职业轨迹 brand_group=extracted_data.get('brand_group', ''), # 添加品牌组合 status='active', updated_by='system' ) db.session.add(business_card) db.session.commit() logging.info(f"名片信息已保存到数据库,ID: {business_card.id}") return { 'code': 200, 'success': True, 'message': '名片解析成功', 'data': business_card.to_dict() } except Exception as e: db.session.rollback() error_msg = f"保存名片信息到数据库失败: {str(e)}" logging.error(error_msg, exc_info=True) # 即使数据库操作失败,仍返回提取的信息 return { 'code': 500, 'success': False, 'message': error_msg, 'data': { 'id': None, 'name_zh': extracted_data.get('name_zh', ''), 'name_en': extracted_data.get('name_en', ''), 'title_zh': extracted_data.get('title_zh', ''), 'title_en': extracted_data.get('title_en', ''), 'mobile': extracted_data.get('mobile', ''), 'phone': extracted_data.get('phone', ''), 'email': extracted_data.get('email', ''), 'hotel_zh': extracted_data.get('hotel_zh', ''), 'hotel_en': extracted_data.get('hotel_en', ''), 'address_zh': extracted_data.get('address_zh', ''), 'address_en': extracted_data.get('address_en', ''), 'postal_code_zh': extracted_data.get('postal_code_zh', ''), 'postal_code_en': extracted_data.get('postal_code_en', ''), 'brand_zh': extracted_data.get('brand_zh', ''), 'brand_en': extracted_data.get('brand_en', ''), 'affiliation_zh': extracted_data.get('affiliation_zh', ''), 'affiliation_en': extracted_data.get('affiliation_en', ''), 'image_path': minio_path, # 返回相对路径 'career_path': extracted_data.get('career_path', []), # 添加职业轨迹 'brand_group': extracted_data.get('brand_group', ''), # 添加品牌组合 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'updated_at': None, 'updated_by': 'system', 'status': 'active' } } except Exception as e: db.session.rollback() error_msg = f"名片处理失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def update_business_card(card_id, data): """ 更新名片信息 Args: card_id (int): 名片记录ID data (dict): 包含要更新的字段的字典 Returns: dict: 包含操作结果和更新后的名片信息 """ try: # 查找要更新的名片记录 card = BusinessCard.query.get(card_id) if not card: return { 'code': 500, 'success': False, 'message': f'未找到ID为{card_id}的名片记录', 'data': None } # 更新名片信息 card.name_zh = data.get('name_zh', card.name_zh) card.name_en = data.get('name_en', card.name_en) card.title_zh = data.get('title_zh', card.title_zh) card.title_en = data.get('title_en', card.title_en) card.mobile = data.get('mobile', card.mobile) card.phone = data.get('phone', card.phone) card.email = data.get('email', card.email) card.hotel_zh = data.get('hotel_zh', card.hotel_zh) card.hotel_en = data.get('hotel_en', card.hotel_en) card.address_zh = data.get('address_zh', card.address_zh) card.address_en = data.get('address_en', card.address_en) card.postal_code_zh = data.get('postal_code_zh', card.postal_code_zh) card.postal_code_en = data.get('postal_code_en', card.postal_code_en) card.brand_zh = data.get('brand_zh', card.brand_zh) card.brand_en = data.get('brand_en', card.brand_en) card.affiliation_zh = data.get('affiliation_zh', card.affiliation_zh) card.affiliation_en = data.get('affiliation_en', card.affiliation_en) card.career_path = data.get('career_path', card.career_path) # 更新职业轨迹 card.brand_group = data.get('brand_group', card.brand_group) # 更新品牌组合 card.updated_by = data.get('updated_by', 'user') # 可以根据实际情况修改为当前用户 # 保存更新 db.session.commit() # 更新成功后,更新Neo4j图数据库中的人才-酒店关系 try: from app.services.neo4j_driver import neo4j_driver from app.core.graph.graph_operations import create_or_get_node # 获取当前时间 current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 创建或更新人才节点 talent_properties = { 'pg_id': card_id, # PostgreSQL数据库中的ID 'name_zh': card.name_zh, # 中文姓名 'name_en': card.name_en, # 英文姓名 'mobile': card.mobile, # 手机号码 'email': card.email, # 电子邮箱 'updated_at': current_time # 更新时间 } talent_node_id = create_or_get_node('talent', **talent_properties) # 如果有酒店信息,创建或更新酒店节点 if card.hotel_zh or card.hotel_en: hotel_properties = { 'hotel_zh': card.hotel_zh, # 酒店中文名称 'hotel_en': card.hotel_en, # 酒店英文名称 'updated_at': current_time # 更新时间 } hotel_node_id = create_or_get_node('hotel', **hotel_properties) # 创建或更新人才与酒店之间的WORK_FOR关系 if talent_node_id and hotel_node_id: # 构建Cypher查询以创建或更新关系 cypher_query = """ MATCH (t:talent), (h:hotel) WHERE id(t) = $talent_id AND id(h) = $hotel_id MERGE (t)-[r:WORKS_FOR]->(h) SET r.title_zh = $title_zh, r.title_en = $title_en, r.updated_at = $updated_at RETURN r """ with neo4j_driver.get_session() as session: session.run( cypher_query, talent_id=talent_node_id, hotel_id=hotel_node_id, title_zh=card.title_zh, title_en=card.title_en, updated_at=current_time ) logging.info(f"已成功更新人才(ID:{talent_node_id})与酒店(ID:{hotel_node_id})的WORK_FOR关系") logging.info(f"Neo4j图数据库关系更新成功") except Exception as e: logging.error(f"更新Neo4j图数据库关系失败: {str(e)}", exc_info=True) # 不因为图数据库更新失败而影响PostgreSQL数据库的更新结果 return { 'code': 200, 'success': True, 'message': '名片信息已更新', 'data': card.to_dict() } except Exception as e: db.session.rollback() error_msg = f"更新名片信息失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def get_business_cards(): """ 获取所有名片记录列表 Returns: dict: 包含操作结果和名片列表 """ try: # 查询所有名片记录 cards = BusinessCard.query.all() # 将所有记录转换为字典格式 cards_data = [card.to_dict() for card in cards] return { 'code': 200, 'success': True, 'message': '获取名片列表成功', 'data': cards_data } except Exception as e: error_msg = f"获取名片列表失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': [] } def update_business_card_status(card_id, status): """ 更新名片状态(激活/禁用) Args: card_id (int): 名片记录ID status (str): 新状态,'active'或'inactive' Returns: dict: 包含操作结果和更新后的名片信息 """ try: # 查找要更新的名片记录 card = BusinessCard.query.get(card_id) if not card: return { 'code': 500, 'success': False, 'message': f'未找到ID为{card_id}的名片记录', 'data': None } # 验证状态值 if status not in ['active', 'inactive']: return { 'code': 500, 'success': False, 'message': f'无效的状态值: {status},必须为 active 或 inactive', 'data': None } # 更新状态 card.status = status card.updated_at = datetime.now() card.updated_by = 'system' # 可以根据实际情况修改为当前用户 # 保存更新 db.session.commit() return { 'code': 200, 'success': True, 'message': f'名片状态已更新为: {status}', 'data': card.to_dict() } except Exception as e: db.session.rollback() error_msg = f"更新名片状态失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def create_talent_tag(tag_data): """ 创建人才标签节点 Args: tag_data: 包含标签信息的字典,包括: - name: 标签名称 - category: 标签分类 - description: 标签描述 - status: 启用状态 Returns: dict: 操作结果字典 """ try: from app.services.neo4j_driver import neo4j_driver # 准备节点属性 tag_properties = { 'name': tag_data.get('name'), 'category': tag_data.get('category', '未分类'), 'describe': tag_data.get('description', ''), # 使用describe与现有系统保持一致 'status': tag_data.get('status', 'active'), 'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } # 生成标签的英文名(可选) from app.core.graph.graph_operations import create_or_get_node # 如果提供了名称,尝试获取英文翻译 if 'name' in tag_data and tag_data['name']: try: from app.api.data_interface.routes import translate_and_parse en_name = translate_and_parse(tag_data['name']) tag_properties['en_name'] = en_name[0] if en_name and isinstance(en_name, list) else '' except Exception as e: logging.warning(f"获取标签英文名失败: {str(e)}") tag_properties['en_name'] = '' # 创建节点 node_id = create_or_get_node('data_label', **tag_properties) if node_id: return { 'code': 200, 'success': True, 'message': '人才标签创建成功', 'data': { 'id': node_id, **tag_properties } } else: return { 'code': 500, 'success': False, 'message': '人才标签创建失败', 'data': None } except Exception as e: logging.error(f"创建人才标签失败: {str(e)}", exc_info=True) return { 'code': 500, 'success': False, 'message': f'创建人才标签失败: {str(e)}', 'data': None } def get_talent_tag_list(): """ 从Neo4j图数据库获取人才标签列表 Returns: dict: 包含操作结果和标签列表的字典 """ try: from app.services.neo4j_driver import neo4j_driver # 构建Cypher查询语句,获取分类为talent的标签 query = """ MATCH (n:data_label) WHERE n.category CONTAINS 'talent' OR n.category CONTAINS '人才' RETURN id(n) as id, n.name as name, n.en_name as en_name, n.category as category, n.describe as description, n.status as status, n.time as time ORDER BY n.time DESC """ # 执行查询 tags = [] with neo4j_driver.get_session() as session: result = session.run(query) # 处理查询结果 for record in result: tag = { 'id': record['id'], 'name': record['name'], 'en_name': record['en_name'], 'category': record['category'], 'description': record['description'], 'status': record['status'], 'time': record['time'] } tags.append(tag) return { 'code': 200, 'success': True, 'message': '获取人才标签列表成功', 'data': tags } except Exception as e: error_msg = f"获取人才标签列表失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': [] } def update_talent_tag(tag_id, tag_data): """ 更新人才标签节点属性 Args: tag_id: 标签节点ID tag_data: 包含更新信息的字典,可能包括: - name: 标签名称 - category: 标签分类 - description: 标签描述 - status: 启用状态 Returns: dict: 操作结果字典 """ try: from app.services.neo4j_driver import neo4j_driver # 准备要更新的属性 update_properties = {} # 检查并添加需要更新的属性 if 'name' in tag_data and tag_data['name']: update_properties['name'] = tag_data['name'] # 如果名称更新了,尝试更新英文名称 try: from app.api.data_interface.routes import translate_and_parse en_name = translate_and_parse(tag_data['name']) update_properties['en_name'] = en_name[0] if en_name and isinstance(en_name, list) else '' except Exception as e: logging.warning(f"更新标签英文名失败: {str(e)}") if 'category' in tag_data and tag_data['category']: update_properties['category'] = tag_data['category'] if 'description' in tag_data: update_properties['describe'] = tag_data['description'] if 'status' in tag_data: update_properties['status'] = tag_data['status'] # 添加更新时间 update_properties['time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 如果没有可更新的属性,返回错误 if not update_properties: return { 'code': 400, 'success': False, 'message': '未提供任何可更新的属性', 'data': None } # 构建更新的Cypher查询 set_clauses = [] params = {'nodeId': tag_id} for key, value in update_properties.items(): param_name = f"param_{key}" set_clauses.append(f"n.{key} = ${param_name}") params[param_name] = value set_clause = ", ".join(set_clauses) query = f""" MATCH (n:data_label) WHERE id(n) = $nodeId SET {set_clause} RETURN id(n) as id, n.name as name, n.en_name as en_name, n.category as category, n.describe as description, n.status as status, n.time as time """ # 执行更新查询 with neo4j_driver.get_session() as session: result = session.run(query, **params) record = result.single() if not record: return { 'code': 404, 'success': False, 'message': f'未找到ID为{tag_id}的标签', 'data': None } # 提取更新后的标签信息 updated_tag = { 'id': record['id'], 'name': record['name'], 'en_name': record['en_name'], 'category': record['category'], 'description': record['description'], 'status': record['status'], 'time': record['time'] } return { 'code': 200, 'success': True, 'message': '人才标签更新成功', 'data': updated_tag } except Exception as e: error_msg = f"更新人才标签失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def delete_talent_tag(tag_id): """ 删除人才标签节点及其相关关系 Args: tag_id: 标签节点ID Returns: dict: 操作结果字典 """ try: from app.services.neo4j_driver import neo4j_driver # 首先获取要删除的标签信息,以便在成功后返回 get_query = """ MATCH (n:data_label) WHERE id(n) = $nodeId RETURN id(n) as id, n.name as name, n.en_name as en_name, n.category as category, n.describe as description, n.status as status, n.time as time """ # 构建删除节点和关系的Cypher查询 delete_query = """ MATCH (n:data_label) WHERE id(n) = $nodeId OPTIONAL MATCH (n)-[r]-() DELETE r, n RETURN count(n) AS deleted """ # 执行查询 tag_info = None with neo4j_driver.get_session() as session: # 先获取标签信息 result = session.run(get_query, nodeId=tag_id) record = result.single() if not record: return { 'code': 404, 'success': False, 'message': f'未找到ID为{tag_id}的标签', 'data': None } # 保存标签信息用于返回 tag_info = { 'id': record['id'], 'name': record['name'], 'en_name': record['en_name'], 'category': record['category'], 'description': record['description'], 'status': record['status'], 'time': record['time'] } # 执行删除操作 delete_result = session.run(delete_query, nodeId=tag_id) deleted = delete_result.single()['deleted'] if deleted > 0: return { 'code': 200, 'success': True, 'message': '人才标签删除成功', 'data': tag_info } else: return { 'code': 404, 'success': False, 'message': f'未能删除ID为{tag_id}的标签', 'data': None } except Exception as e: error_msg = f"删除人才标签失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': None } def query_neo4j_graph(query_requirement): """ 查询Neo4j图数据库,通过Deepseek API生成Cypher脚本 Args: query_requirement (str): 查询需求描述 Returns: dict: 包含查询结果的字典,JSON格式 """ try: # 导入必要的模块 from app.services.neo4j_driver import neo4j_driver import requests import json # Deepseek API配置 api_key = DEEPSEEK_API_KEY api_url = DEEPSEEK_API_URL # 构建提示文本,描述图数据库结构和查询需求 prompt = f""" 请根据以下Neo4j图数据库结构和查询需求,生成一个Cypher查询脚本。 ## 图数据库结构 ### 节点 1. talent - 人才节点 属性: pg_id(PostgreSQL数据库ID), name_zh(中文姓名), name_en(英文姓名), mobile(手机号码), email(电子邮箱), updated_at(更新时间) 2. hotel - 酒店节点 属性: hotel_zh(酒店中文名称), hotel_en(酒店英文名称), updated_at(更新时间) 3. talent_tag - 人才标签节点 属性: name(标签名称), category(标签分类), en_name(英文名称) 4. hotel_tag - 酒店标签节点 属性: name(标签名称), category(标签分类), en_name(英文名称) 5. brand_group - 品牌集团节点 属性: name(集团名称), en_name(英文名称) ### 关系 1. WORKS_FOR - 工作关系,人才在酒店工作 (talent)-[WORKS_FOR]->(hotel) 属性: title_zh(中文职位), title_en(英文职位), updated_at(更新时间) 2. BELONGS_TO - 从属关系 (talent)-[BELONGS_TO]->(talent_tag) - 人才属于某标签 (hotel)-[BELONGS_TO]->(hotel_tag) - 酒店属于某标签 (hotel)-[BELONGS_TO]->(brand_group) - 酒店属于某品牌集团 ## 查询需求 {query_requirement} ## 输出要求 1. 只输出有效的Cypher查询语句,不要包含任何解释或注释 2. 确保查询结果包含有意义的列名 3. 根据需要使用适当的过滤、排序、聚合和限制 4. 尽量利用图数据库的特性来优化查询效率 注意:请直接返回Cypher查询语句,无需任何其他文本。 """ # 调用Deepseek API生成Cypher脚本 headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "model": "deepseek-chat", "messages": [ {"role": "system", "content": "你是一个专业的Neo4j Cypher查询专家。"}, {"role": "user", "content": prompt} ], "temperature": 0.1 } logging.info("发送请求到Deepseek API生成Cypher脚本") response = requests.post(api_url, headers=headers, json=payload, timeout=30) response.raise_for_status() # 解析API响应 result = response.json() cypher_script = result.get("choices", [{}])[0].get("message", {}).get("content", "") # 清理Cypher脚本,移除不必要的markdown格式或注释 cypher_script = cypher_script.strip() if cypher_script.startswith("```cypher"): cypher_script = cypher_script[9:] if cypher_script.endswith("```"): cypher_script = cypher_script[:-3] cypher_script = cypher_script.strip() logging.info(f"生成的Cypher脚本: {cypher_script}") # 执行Cypher脚本 with neo4j_driver.get_session() as session: result = session.run(cypher_script) records = [record.data() for record in result] # 构建查询结果 response_data = { 'code': 200, 'success': True, 'message': '查询成功执行', 'query': cypher_script, 'data': records } return response_data except requests.exceptions.HTTPError as e: error_msg = f"调用Deepseek API失败: {str(e)}" logging.error(error_msg) if hasattr(e, 'response') and e.response: logging.error(f"错误状态码: {e.response.status_code}") logging.error(f"错误内容: {e.response.text}") return { 'code': 500, 'success': False, 'message': error_msg, 'data': [] } except Exception as e: error_msg = f"查询Neo4j图数据库失败: {str(e)}" logging.error(error_msg, exc_info=True) return { 'code': 500, 'success': False, 'message': error_msg, 'data': [] }