12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733 |
- from typing import Dict, Any
- from app import db
- from datetime import datetime
- import os
- import boto3
- from botocore.config import Config
- import logging
- import requests
- import json
- import re
- import uuid
- from PIL import Image
- from io import BytesIO
- import pytesseract
- import base64
- from openai import OpenAI
- from app.config.config import DevelopmentConfig, ProductionConfig
- # 测试用的解析数据接口。没有实际使用。
- def parse_data(data: Dict[str, Any]) -> Dict[str, Any]:
- """
- 解析数据的主函数
-
- Args:
- data: 要解析的数据
-
- Returns:
- 解析后的数据
- """
- # TODO: 实现数据解析逻辑
- return {
- 'code': 200,
- 'status': 'success',
- 'message': 'Data parsed successfully',
- 'data': data
- }
- # 名片解析数据模型
- class BusinessCard(db.Model):
- __tablename__ = 'business_cards'
-
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- name_zh = db.Column(db.String(100), nullable=False)
- name_en = db.Column(db.String(100))
- title_zh = db.Column(db.String(100))
- title_en = db.Column(db.String(100))
- mobile = db.Column(db.String(50))
- phone = db.Column(db.String(50))
- email = db.Column(db.String(100))
- hotel_zh = db.Column(db.String(200))
- hotel_en = db.Column(db.String(200))
- address_zh = db.Column(db.Text)
- address_en = db.Column(db.Text)
- postal_code_zh = db.Column(db.String(20))
- postal_code_en = db.Column(db.String(20))
- brand_zh = db.Column(db.String(100))
- brand_en = db.Column(db.String(100))
- affiliation_zh = db.Column(db.String(200))
- affiliation_en = db.Column(db.String(200))
- image_path = db.Column(db.String(255)) # MinIO中存储的路径
- career_path = db.Column(db.JSON) # 职业轨迹,JSON格式
- brand_group = db.Column(db.String(200)) # 品牌组合
- created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)
- updated_at = db.Column(db.DateTime, onupdate=datetime.now)
- updated_by = db.Column(db.String(50))
- status = db.Column(db.String(20), default='active')
-
- def to_dict(self):
- return {
- 'id': self.id,
- 'name_zh': self.name_zh,
- 'name_en': self.name_en,
- 'title_zh': self.title_zh,
- 'title_en': self.title_en,
- 'mobile': self.mobile,
- 'phone': self.phone,
- 'email': self.email,
- 'hotel_zh': self.hotel_zh,
- 'hotel_en': self.hotel_en,
- 'address_zh': self.address_zh,
- 'address_en': self.address_en,
- 'postal_code_zh': self.postal_code_zh,
- 'postal_code_en': self.postal_code_en,
- 'brand_zh': self.brand_zh,
- 'brand_en': self.brand_en,
- 'affiliation_zh': self.affiliation_zh,
- 'affiliation_en': self.affiliation_en,
- 'image_path': self.image_path,
- 'career_path': self.career_path,
- 'brand_group': self.brand_group,
- 'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
- 'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
- 'updated_by': self.updated_by,
- 'status': self.status
- }
- # 名片解析功能模块
- # DeepSeek API配置
- DEEPSEEK_API_KEY = os.environ.get('DEEPSEEK_API_KEY', 'sk-2aea6e8b159b448aa3c1e29acd6f4349')
- DEEPSEEK_API_URL = os.environ.get('DEEPSEEK_API_URL', 'https://api.deepseek.com/v1/chat/completions')
- # 备用API端点
- DEEPSEEK_API_URL_BACKUP = 'https://api.deepseek.com/v1/completions'
- # OCR配置
- # 设置pytesseract路径(如果需要)
- # pytesseract.pytesseract.tesseract_cmd = r'/path/to/tesseract'
- # OCR语言设置,支持多语言
- OCR_LANG = os.environ.get('OCR_LANG', 'chi_sim+eng')
- # 根据环境选择配置
- """
- if os.environ.get('FLASK_ENV') == 'production':
- config = ProductionConfig()
- else:
- config = DevelopmentConfig()
- """
- # 使用配置变量,缺省认为在生产环境运行
- config = ProductionConfig()
- # 使用配置变量
- minio_url = f"{'https' if config.MINIO_SECURE else 'http'}://{config.MINIO_HOST}"
- minio_access_key = config.MINIO_USER
- minio_secret_key = config.MINIO_PASSWORD
- minio_bucket = config.MINIO_BUCKET
- use_ssl = config.MINIO_SECURE
- def get_minio_client():
- """获取MinIO客户端连接"""
- try:
- # 使用全局配置变量
- global minio_url, minio_access_key, minio_secret_key, minio_bucket, use_ssl
-
- logging.info(f"尝试连接MinIO服务器: {minio_url}")
-
- minio_client = boto3.client(
- 's3',
- endpoint_url=minio_url,
- aws_access_key_id=minio_access_key,
- aws_secret_access_key=minio_secret_key,
- config=Config(
- signature_version='s3v4',
- retries={'max_attempts': 3, 'mode': 'standard'},
- connect_timeout=10,
- read_timeout=30
- )
- )
-
- # 确保存储桶存在
- buckets = minio_client.list_buckets()
- bucket_names = [bucket['Name'] for bucket in buckets.get('Buckets', [])]
- logging.info(f"成功连接到MinIO服务器,现有存储桶: {bucket_names}")
-
- if minio_bucket not in bucket_names:
- logging.info(f"创建存储桶: {minio_bucket}")
- minio_client.create_bucket(Bucket=minio_bucket)
-
- return minio_client
- except Exception as e:
- logging.error(f"MinIO连接错误: {str(e)}")
- return None
- def extract_text_from_image(image_data):
- """
- 使用OCR从图像中提取文本,然后通过DeepSeek API解析名片信息
-
- Args:
- image_data (bytes): 图像的二进制数据
-
- Returns:
- dict: 提取的信息(姓名、职位、公司等)
-
- Raises:
- Exception: 当OCR或API调用失败或配置错误时抛出异常
- """
- try:
- # 步骤1: 使用OCR从图像中提取文本
- ocr_text = ocr_extract_text(image_data)
- if not ocr_text or ocr_text.strip() == "":
- error_msg = "OCR无法从图像中提取文本"
- logging.error(error_msg)
- raise Exception(error_msg)
-
- logging.info(f"OCR提取的文本: {ocr_text[:200]}..." if len(ocr_text) > 200 else ocr_text)
-
- # 步骤2: 使用DeepSeek API解析文本中的信息
- return parse_text_with_deepseek(ocr_text)
-
- except Exception as e:
- error_msg = f"从图像中提取和解析文本失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def ocr_extract_text(image_data):
- """
- 使用OCR从图像中提取文本
-
- Args:
- image_data (bytes): 图像的二进制数据
-
- Returns:
- str: 提取的文本
- """
- try:
- # 将二进制数据转换为PIL图像
- image = Image.open(BytesIO(image_data))
-
- # 使用pytesseract进行OCR文本提取
- text = pytesseract.image_to_string(image, lang=OCR_LANG)
-
- # 清理提取的文本
- text = text.strip()
- logging.info(f"OCR成功从图像中提取文本,长度: {len(text)}")
- print(text)
-
- return text
- except Exception as e:
- error_msg = f"OCR提取文本失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def parse_text_with_deepseek(text):
- """
- 使用DeepSeek API解析文本中的名片信息
-
- Args:
- text (str): 要解析的文本
-
- Returns:
- dict: 解析的名片信息
- """
- # 准备请求DeepSeek API
- if not DEEPSEEK_API_KEY:
- error_msg = "未配置DeepSeek API密钥"
- logging.error(error_msg)
- raise Exception(error_msg)
-
- # 构建API请求的基本信息
- headers = {
- "Authorization": f"Bearer {DEEPSEEK_API_KEY}",
- "Content-Type": "application/json"
- }
-
- # 构建提示语,包含OCR提取的文本
- prompt = f"""请从以下名片文本中提取详细信息,需分别识别中英文内容。
- 以JSON格式返回,包含以下字段:
- - name_zh: 中文姓名
- - name_en: 英文姓名
- - title_zh: 中文职位/头衔
- - title_en: 英文职位/头衔
- - hotel_zh: 中文酒店/公司名称
- - hotel_en: 英文酒店/公司名称
- - mobile: 手机号码
- - phone: 固定电话
- - email: 电子邮箱
- - address_zh: 中文地址
- - address_en: 英文地址
- - brand_group: 品牌组合(如有多个品牌,以逗号分隔)
- - career_path: 职业轨迹(如果能从文本中推断出,以JSON数组格式返回,包含公司名称和职位)
- 名片文本:
- {text}
- """
-
- # 使用模型名称
- model_name = 'deepseek-chat'
-
- try:
- # 尝试调用DeepSeek API
- logging.info(f"尝试通过DeepSeek API解析文本")
- payload = {
- "model": model_name,
- "messages": [
- {"role": "system", "content": "你是一个专业的名片信息提取助手。请用JSON格式返回结果,不要有多余的文字说明。"},
- {"role": "user", "content": prompt}
- ],
- "temperature": 0.1
- }
-
- logging.info(f"向DeepSeek API发送请求")
- response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload, timeout=30)
-
- # 检查响应状态
- response.raise_for_status()
-
- # 解析API响应
- result = response.json()
- content = result.get("choices", [{}])[0].get("message", {}).get("content", "{}")
-
- # 尝试解析JSON内容
- try:
- # 找到内容中的JSON部分(有时模型会在JSON前后添加额外文本)
- json_content = extract_json_from_text(content)
- extracted_data = json.loads(json_content)
- logging.info(f"成功解析DeepSeek API返回的JSON")
- except json.JSONDecodeError:
- logging.warning(f"无法解析JSON,尝试直接从文本提取信息")
- # 如果无法解析JSON,尝试直接从文本中提取关键信息
- extracted_data = extract_fields_from_text(content)
-
- # 确保所有必要的字段都存在
- required_fields = ['name', 'title', 'company', 'phone', 'email', 'address', 'brand_group', 'career_path']
- for field in required_fields:
- if field not in extracted_data:
- extracted_data[field] = "" if field != 'career_path' else []
-
- logging.info(f"成功从DeepSeek API获取解析结果")
- return extracted_data
-
- except requests.exceptions.HTTPError as e:
- error_msg = f"DeepSeek API调用失败: {str(e)}"
- logging.error(error_msg)
-
- if hasattr(e, 'response') and e.response:
- logging.error(f"错误状态码: {e.response.status_code}")
- logging.error(f"错误内容: {e.response.text}")
-
- raise Exception(error_msg)
- except Exception as e:
- error_msg = f"解析文本过程中发生错误: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def extract_json_from_text(text):
- """
- 从文本中提取JSON部分
-
- Args:
- text (str): 包含JSON的文本
-
- Returns:
- str: 提取的JSON字符串
- """
- # 尝试找到最外层的花括号对
- start_idx = text.find('{')
- if start_idx == -1:
- return "{}"
-
- # 使用简单的括号匹配算法找到对应的闭合括号
- count = 0
- for i in range(start_idx, len(text)):
- if text[i] == '{':
- count += 1
- elif text[i] == '}':
- count -= 1
- if count == 0:
- return text[start_idx:i+1]
-
- # 如果没有找到闭合括号,返回从开始位置到文本结尾
- return text[start_idx:]
- def extract_fields_from_text(text):
- """
- 从文本中直接提取名片字段信息
-
- Args:
- text (str): 要分析的文本
-
- Returns:
- dict: 提取的字段
- """
- # 初始化结果字典
- result = {
- 'name_zh': '',
- 'name_en': '',
- 'title_zh': '',
- 'title_en': '',
- 'mobile': '',
- 'phone': '',
- 'email': '',
- 'hotel_zh': '',
- 'hotel_en': '',
- 'address_zh': '',
- 'address_en': '',
- 'postal_code_zh': '',
- 'postal_code_en': '',
- 'brand_zh': '',
- 'brand_en': '',
- 'affiliation_zh': '',
- 'affiliation_en': ''
- }
-
- # 提取中文姓名
- name_zh_match = re.search(r'["\'](姓名)["\'][\s\{:]*["\']?(中文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if name_zh_match:
- result['name_zh'] = name_zh_match.group(3)
-
- # 提取英文姓名
- name_en_match = re.search(r'["\'](姓名)["\'][\s\{:]*["\']?(英文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if name_en_match:
- result['name_en'] = name_en_match.group(3)
-
- # 提取中文头衔
- title_zh_match = re.search(r'["\'](头衔|职位)["\'][\s\{:]*["\']?(中文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if title_zh_match:
- result['title_zh'] = title_zh_match.group(3)
-
- # 提取英文头衔
- title_en_match = re.search(r'["\'](头衔|职位)["\'][\s\{:]*["\']?(英文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if title_en_match:
- result['title_en'] = title_en_match.group(3)
-
- # 提取手机
- mobile_match = re.search(r'["\'](手机)["\'][\s:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if mobile_match:
- result['mobile'] = mobile_match.group(2)
-
- # 提取电话
- phone_match = re.search(r'["\'](电话)["\'][\s:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if phone_match:
- result['phone'] = phone_match.group(2)
-
- # 提取邮箱
- email_match = re.search(r'["\'](邮箱)["\'][\s:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if email_match:
- result['email'] = email_match.group(2)
-
- # 提取中文酒店名称
- hotel_zh_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(酒店名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if hotel_zh_match:
- result['hotel_zh'] = hotel_zh_match.group(4)
-
- # 提取英文酒店名称
- hotel_en_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(酒店名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if hotel_en_match:
- result['hotel_en'] = hotel_en_match.group(4)
-
- # 提取中文详细地址
- address_zh_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(详细地址)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if address_zh_match:
- result['address_zh'] = address_zh_match.group(4)
-
- # 提取英文详细地址
- address_en_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(详细地址)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if address_en_match:
- result['address_en'] = address_en_match.group(4)
-
- # 提取中文邮政编码
- postal_code_zh_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(邮政编码)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if postal_code_zh_match:
- result['postal_code_zh'] = postal_code_zh_match.group(4)
-
- # 提取英文邮政编码
- postal_code_en_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(邮政编码)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if postal_code_en_match:
- result['postal_code_en'] = postal_code_en_match.group(4)
-
- # 提取中文品牌名称
- brand_zh_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(品牌名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if brand_zh_match:
- result['brand_zh'] = brand_zh_match.group(4)
-
- # 提取英文品牌名称
- brand_en_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(品牌名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if brand_en_match:
- result['brand_en'] = brand_en_match.group(4)
-
- # 提取中文隶属关系
- affiliation_zh_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(隶属关系)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if affiliation_zh_match:
- result['affiliation_zh'] = affiliation_zh_match.group(4)
-
- # 提取英文隶属关系
- affiliation_en_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(隶属关系)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if affiliation_en_match:
- result['affiliation_en'] = affiliation_en_match.group(4)
-
- return result
- def parse_text_with_qwen25VLplus(image_data):
- """
- 使用阿里云的 Qwen 2.5 VL Plus 模型解析图像中的名片信息
-
- Args:
- image_data (bytes): 图像的二进制数据
-
- Returns:
- dict: 解析的名片信息
- """
- # 阿里云 Qwen API 配置
- QWEN_API_KEY = os.environ.get('QWEN_API_KEY', 'sk-8f2320dafc9e4076968accdd8eebd8e9')
-
- try:
- # 将图片数据转为 base64 编码
- base64_image = base64.b64encode(image_data).decode('utf-8')
-
- # 初始化 OpenAI 客户端,配置为阿里云 API
- client = OpenAI(
- api_key=QWEN_API_KEY,
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- )
-
- # 构建优化后的提示语
- prompt = """你是企业名片的信息提取专家。请仔细分析提供的名片,精确提取以下信息:
- ## 提取要求
- - 区分中英文内容,分别提取
- - 保持提取信息的原始格式(如大小写、标点)
- - 对于无法识别或名片中不存在的信息,返回空字符串
- - 名片中没有的信息,请不要猜测
- ## 需提取的字段
- 1. 中文姓名 (name_zh)
- 2. 英文姓名 (name_en)
- 3. 中文职位/头衔 (title_zh)
- 4. 英文职位/头衔 (title_en)
- 5. 中文酒店/公司名称 (hotel_zh)
- 6. 英文酒店/公司名称 (hotel_en)
- 7. 手机号码 (mobile) - 如有多个,使用逗号分隔
- 8. 固定电话 (phone) - 如有多个,使用逗号分隔
- 9. 电子邮箱 (email)
- 10. 中文地址 (address_zh)
- 11. 英文地址 (address_en)
- 12. 中文邮政编码 (postal_code_zh)
- 13. 英文邮政编码 (postal_code_en)
- 14. 品牌组合 (brand_group) - 如有多个品牌,使用逗号分隔
- 15. 职业轨迹 (career_path) - 如能从名片中推断,以JSON数组格式返回,包含当前日期,公司名称和职位
- 16. 隶属关系 (affiliation) - 如能从名片中推断,以JSON数组格式返回,包含公司名称和隶属集团名称
- ## 输出格式
- 请以严格的JSON格式返回结果,不要添加任何额外解释文字。JSON格式如下:
- ```json
- {
- "name_zh": "",
- "name_en": "",
- "title_zh": "",
- "title_en": "",
- "hotel_zh": "",
- "hotel_en": "",
- "mobile": "",
- "phone": "",
- "email": "",
- "address_zh": "",
- "address_en": "",
- "postal_code_zh": "",
- "postal_code_en": "",
- "brand_group": "",
- "career_path": [],
- "affiliation": []
- }
- ```"""
-
- # 调用 Qwen 2.5 VL Plus API
- logging.info("发送请求到 Qwen 2.5 VL Plus 模型")
- completion = client.chat.completions.create(
- model="qwen-vl-plus",
- messages=[
- {
- "role": "user",
- "content": [
- {"type": "text", "text": prompt},
- {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
- ]
- }
- ],
- temperature=0.1, # 降低温度增加精确性
- response_format={"type": "json_object"} # 要求输出JSON格式
- )
-
- # 解析响应
- response_content = completion.choices[0].message.content
- logging.info(f"成功从 Qwen 模型获取响应: {response_content}")
-
- # 尝试从响应中提取 JSON
- try:
- json_content = extract_json_from_text(response_content)
- extracted_data = json.loads(json_content)
- logging.info("成功解析 Qwen 响应中的 JSON")
- except json.JSONDecodeError:
- logging.warning("无法解析 JSON,尝试从文本中提取信息")
- extracted_data = extract_fields_from_text(response_content)
-
- # 确保所有必要字段存在
- required_fields = [
- 'name_zh', 'name_en', 'title_zh', 'title_en',
- 'hotel_zh', 'hotel_en', 'mobile', 'phone',
- 'email', 'address_zh', 'address_en',
- 'postal_code_zh', 'postal_code_en', 'brand_group', 'career_path'
- ]
-
- for field in required_fields:
- if field not in extracted_data:
- extracted_data[field] = [] if field == 'career_path' else ""
-
- return extracted_data
-
- except Exception as e:
- error_msg = f"Qwen 2.5 VL Plus 模型解析失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def process_business_card(image_file):
- """
- 处理名片图片并提取信息
-
- Args:
- image_file (FileStorage): 上传的名片图片文件
-
- Returns:
- dict: 处理结果,包含提取的信息和状态
- """
- minio_path = None
-
- try:
- # 读取图片数据
- image_data = image_file.read()
- image_file.seek(0) # 重置文件指针以便后续读取
-
- try:
- # 优先使用 Qwen 2.5 VL Plus 模型直接从图像提取信息
- try:
- logging.info("尝试使用 Qwen 2.5 VL Plus 模型解析名片")
- extracted_data = parse_text_with_qwen25VLplus(image_data)
- logging.info("成功使用 Qwen 2.5 VL Plus 模型解析名片")
- except Exception as qwen_error:
- logging.warning(f"Qwen 模型解析失败,错误原因: {str(qwen_error)}")
- # extracted_data = extract_text_from_image(image_data)
- except Exception as e:
- return {
- 'code': 500,
- 'success': False,
- 'message': f"名片解析失败: {str(e)}",
- 'data': None
- }
-
- try:
- # 生成唯一的文件名
- file_ext = os.path.splitext(image_file.filename)[1].lower()
- if not file_ext:
- file_ext = '.jpg' # 默认扩展名
-
- unique_filename = f"{uuid.uuid4().hex}{file_ext}"
- minio_path = f"{unique_filename}"
-
- # 尝试上传到MinIO
- minio_client = get_minio_client()
- if minio_client:
- try:
- # 上传文件
- logging.info(f"上传文件到MinIO: {minio_path}")
- minio_client.put_object(
- Bucket=minio_bucket,
- Key=minio_path,
- Body=image_file,
- ContentType=image_file.content_type
- )
- logging.info(f"图片已上传到MinIO: {minio_path}")
- except Exception as upload_err:
- logging.error(f"上传文件到MinIO时出错: {str(upload_err)}")
- # 即使上传失败,仍继续处理,但路径为None
- minio_path = None
- else:
- minio_path = None
- logging.warning("MinIO客户端未初始化,图片未上传")
- except Exception as e:
- logging.error(f"上传图片到MinIO失败: {str(e)}", exc_info=True)
- minio_path = None
-
- try:
- # 保存到数据库
- business_card = BusinessCard(
- name_zh=extracted_data.get('name_zh', ''),
- name_en=extracted_data.get('name_en', ''),
- title_zh=extracted_data.get('title_zh', ''),
- title_en=extracted_data.get('title_en', ''),
- mobile=extracted_data.get('mobile', ''),
- phone=extracted_data.get('phone', ''),
- email=extracted_data.get('email', ''),
- hotel_zh=extracted_data.get('hotel_zh', ''),
- hotel_en=extracted_data.get('hotel_en', ''),
- address_zh=extracted_data.get('address_zh', ''),
- address_en=extracted_data.get('address_en', ''),
- postal_code_zh=extracted_data.get('postal_code_zh', ''),
- postal_code_en=extracted_data.get('postal_code_en', ''),
- brand_zh=extracted_data.get('brand_zh', ''),
- brand_en=extracted_data.get('brand_en', ''),
- affiliation_zh=extracted_data.get('affiliation_zh', ''),
- affiliation_en=extracted_data.get('affiliation_en', ''),
- image_path=minio_path, # 存储相对路径
- career_path=extracted_data.get('career_path', []), # 添加职业轨迹
- brand_group=extracted_data.get('brand_group', ''), # 添加品牌组合
- status='active',
- updated_by='system'
- )
-
- db.session.add(business_card)
- db.session.commit()
-
- logging.info(f"名片信息已保存到数据库,ID: {business_card.id}")
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '名片解析成功',
- 'data': business_card.to_dict()
- }
- except Exception as e:
- db.session.rollback()
- error_msg = f"保存名片信息到数据库失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- # 即使数据库操作失败,仍返回提取的信息
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': {
- 'id': None,
- 'name_zh': extracted_data.get('name_zh', ''),
- 'name_en': extracted_data.get('name_en', ''),
- 'title_zh': extracted_data.get('title_zh', ''),
- 'title_en': extracted_data.get('title_en', ''),
- 'mobile': extracted_data.get('mobile', ''),
- 'phone': extracted_data.get('phone', ''),
- 'email': extracted_data.get('email', ''),
- 'hotel_zh': extracted_data.get('hotel_zh', ''),
- 'hotel_en': extracted_data.get('hotel_en', ''),
- 'address_zh': extracted_data.get('address_zh', ''),
- 'address_en': extracted_data.get('address_en', ''),
- 'postal_code_zh': extracted_data.get('postal_code_zh', ''),
- 'postal_code_en': extracted_data.get('postal_code_en', ''),
- 'brand_zh': extracted_data.get('brand_zh', ''),
- 'brand_en': extracted_data.get('brand_en', ''),
- 'affiliation_zh': extracted_data.get('affiliation_zh', ''),
- 'affiliation_en': extracted_data.get('affiliation_en', ''),
- 'image_path': minio_path, # 返回相对路径
- 'career_path': extracted_data.get('career_path', []), # 添加职业轨迹
- 'brand_group': extracted_data.get('brand_group', ''), # 添加品牌组合
- 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'updated_at': None,
- 'updated_by': 'system',
- 'status': 'active'
- }
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"名片处理失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def update_business_card(card_id, data):
- """
- 更新名片信息
-
- Args:
- card_id (int): 名片记录ID
- data (dict): 包含要更新的字段的字典
-
- Returns:
- dict: 包含操作结果和更新后的名片信息
- """
- try:
- # 查找要更新的名片记录
- card = BusinessCard.query.get(card_id)
-
- if not card:
- return {
- 'code': 500,
- 'success': False,
- 'message': f'未找到ID为{card_id}的名片记录',
- 'data': None
- }
-
- # 更新名片信息
- card.name_zh = data.get('name_zh', card.name_zh)
- card.name_en = data.get('name_en', card.name_en)
- card.title_zh = data.get('title_zh', card.title_zh)
- card.title_en = data.get('title_en', card.title_en)
- card.mobile = data.get('mobile', card.mobile)
- card.phone = data.get('phone', card.phone)
- card.email = data.get('email', card.email)
- card.hotel_zh = data.get('hotel_zh', card.hotel_zh)
- card.hotel_en = data.get('hotel_en', card.hotel_en)
- card.address_zh = data.get('address_zh', card.address_zh)
- card.address_en = data.get('address_en', card.address_en)
- card.postal_code_zh = data.get('postal_code_zh', card.postal_code_zh)
- card.postal_code_en = data.get('postal_code_en', card.postal_code_en)
- card.brand_zh = data.get('brand_zh', card.brand_zh)
- card.brand_en = data.get('brand_en', card.brand_en)
- card.affiliation_zh = data.get('affiliation_zh', card.affiliation_zh)
- card.affiliation_en = data.get('affiliation_en', card.affiliation_en)
- card.career_path = data.get('career_path', card.career_path) # 更新职业轨迹
- card.brand_group = data.get('brand_group', card.brand_group) # 更新品牌组合
- card.updated_by = data.get('updated_by', 'user') # 可以根据实际情况修改为当前用户
-
- # 保存更新
- db.session.commit()
-
- # 更新成功后,更新Neo4j图数据库中的人才-酒店关系
- try:
- from app.services.neo4j_driver import neo4j_driver
- from app.core.graph.graph_operations import create_or_get_node
-
- # 获取当前时间
- current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 创建或更新人才节点
- talent_properties = {
- 'pg_id': card_id, # PostgreSQL数据库中的ID
- 'name_zh': card.name_zh, # 中文姓名
- 'name_en': card.name_en, # 英文姓名
- 'mobile': card.mobile, # 手机号码
- 'email': card.email, # 电子邮箱
- 'updated_at': current_time # 更新时间
- }
-
- talent_node_id = create_or_get_node('talent', **talent_properties)
-
- # 如果有酒店信息,创建或更新酒店节点
- if card.hotel_zh or card.hotel_en:
- hotel_properties = {
- 'hotel_zh': card.hotel_zh, # 酒店中文名称
- 'hotel_en': card.hotel_en, # 酒店英文名称
- 'updated_at': current_time # 更新时间
- }
-
- hotel_node_id = create_or_get_node('hotel', **hotel_properties)
-
- # 创建或更新人才与酒店之间的WORK_FOR关系
- if talent_node_id and hotel_node_id:
- # 构建Cypher查询以创建或更新关系
- cypher_query = """
- MATCH (t:talent), (h:hotel)
- WHERE id(t) = $talent_id AND id(h) = $hotel_id
- MERGE (t)-[r:WORKS_FOR]->(h)
- SET r.title_zh = $title_zh,
- r.title_en = $title_en,
- r.updated_at = $updated_at
- RETURN r
- """
-
- with neo4j_driver.get_session() as session:
- session.run(
- cypher_query,
- talent_id=talent_node_id,
- hotel_id=hotel_node_id,
- title_zh=card.title_zh,
- title_en=card.title_en,
- updated_at=current_time
- )
-
- logging.info(f"已成功更新人才(ID:{talent_node_id})与酒店(ID:{hotel_node_id})的WORK_FOR关系")
-
- logging.info(f"Neo4j图数据库关系更新成功")
- except Exception as e:
- logging.error(f"更新Neo4j图数据库关系失败: {str(e)}", exc_info=True)
- # 不因为图数据库更新失败而影响PostgreSQL数据库的更新结果
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '名片信息已更新',
- 'data': card.to_dict()
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"更新名片信息失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def get_business_cards():
- """
- 获取所有名片记录列表
-
- Returns:
- dict: 包含操作结果和名片列表
- """
- try:
- # 查询所有名片记录
- cards = BusinessCard.query.all()
-
- # 将所有记录转换为字典格式
- cards_data = [card.to_dict() for card in cards]
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取名片列表成功',
- 'data': cards_data
- }
-
- except Exception as e:
- error_msg = f"获取名片列表失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def update_business_card_status(card_id, status):
- """
- 更新名片状态(激活/禁用)
-
- Args:
- card_id (int): 名片记录ID
- status (str): 新状态,'active'或'inactive'
-
- Returns:
- dict: 包含操作结果和更新后的名片信息
- """
- try:
- # 查找要更新的名片记录
- card = BusinessCard.query.get(card_id)
-
- if not card:
- return {
- 'code': 500,
- 'success': False,
- 'message': f'未找到ID为{card_id}的名片记录',
- 'data': None
- }
-
- # 验证状态值
- if status not in ['active', 'inactive']:
- return {
- 'code': 500,
- 'success': False,
- 'message': f'无效的状态值: {status},必须为 active 或 inactive',
- 'data': None
- }
-
- # 更新状态
- card.status = status
- card.updated_at = datetime.now()
- card.updated_by = 'system' # 可以根据实际情况修改为当前用户
-
- # 保存更新
- db.session.commit()
-
- return {
- 'code': 200,
- 'success': True,
- 'message': f'名片状态已更新为: {status}',
- 'data': card.to_dict()
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"更新名片状态失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def create_talent_tag(tag_data):
- """
- 创建人才标签节点
-
- Args:
- tag_data: 包含标签信息的字典,包括:
- - name: 标签名称
- - category: 标签分类
- - description: 标签描述
- - status: 启用状态
-
- Returns:
- dict: 操作结果字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 验证必要参数存在
- if not tag_data or 'name' not in tag_data or not tag_data['name']:
- return {
- 'code': 400,
- 'success': False,
- 'message': '标签名称为必填项',
- 'data': None
- }
-
- # 准备节点属性
- tag_properties = {
- 'name': tag_data.get('name'),
- 'category': tag_data.get('category', '未分类'),
- 'describe': tag_data.get('description', ''), # 使用describe与现有系统保持一致
- 'status': tag_data.get('status', 'active'),
- 'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
-
- # 生成标签的英文名(可选)
- from app.core.graph.graph_operations import create_or_get_node
-
- # 如果提供了名称,尝试获取英文翻译
- if 'name' in tag_data and tag_data['name']:
- try:
- from app.api.data_interface.routes import translate_and_parse
- en_name = translate_and_parse(tag_data['name'])
- tag_properties['en_name'] = en_name[0] if en_name and isinstance(en_name, list) else ''
- except Exception as e:
- logging.warning(f"获取标签英文名失败: {str(e)}")
- tag_properties['en_name'] = ''
-
- # 创建节点
- node_id = create_or_get_node('data_label', **tag_properties)
-
- if node_id:
- return {
- 'code': 200,
- 'success': True,
- 'message': '人才标签创建成功',
- 'data': {
- 'id': node_id,
- **tag_properties
- }
- }
- else:
- return {
- 'code': 500,
- 'success': False,
- 'message': '人才标签创建失败',
- 'data': None
- }
-
- except Exception as e:
- logging.error(f"创建人才标签失败: {str(e)}", exc_info=True)
- return {
- 'code': 500,
- 'success': False,
- 'message': f'创建人才标签失败: {str(e)}',
- 'data': None
- }
- def get_talent_tag_list():
- """
- 从Neo4j图数据库获取人才标签列表
-
- Returns:
- dict: 包含操作结果和标签列表的字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 构建Cypher查询语句,获取分类为talent的标签
- query = """
- MATCH (n:data_label)
- WHERE n.category CONTAINS 'talent' OR n.category CONTAINS '人才'
- RETURN id(n) as id, n.name as name, n.en_name as en_name,
- n.category as category, n.describe as description,
- n.status as status, n.time as time
- ORDER BY n.time DESC
- """
-
- # 执行查询
- tags = []
- with neo4j_driver.get_session() as session:
- result = session.run(query)
-
- # 处理查询结果
- for record in result:
- tag = {
- 'id': record['id'],
- 'name': record['name'],
- 'en_name': record['en_name'],
- 'category': record['category'],
- 'description': record['description'],
- 'status': record['status'],
- 'time': record['time']
- }
- tags.append(tag)
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取人才标签列表成功',
- 'data': tags
- }
-
- except Exception as e:
- error_msg = f"获取人才标签列表失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def update_talent_tag(tag_id, tag_data):
- """
- 更新人才标签节点属性
-
- Args:
- tag_id: 标签节点ID
- tag_data: 包含更新信息的字典,可能包括:
- - name: 标签名称
- - category: 标签分类
- - description: 标签描述
- - status: 启用状态
-
- Returns:
- dict: 操作结果字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 准备要更新的属性
- update_properties = {}
-
- # 检查并添加需要更新的属性
- if 'name' in tag_data and tag_data['name']:
- update_properties['name'] = tag_data['name']
-
- # 如果名称更新了,尝试更新英文名称
- try:
- from app.api.data_interface.routes import translate_and_parse
- en_name = translate_and_parse(tag_data['name'])
- update_properties['en_name'] = en_name[0] if en_name and isinstance(en_name, list) else ''
- except Exception as e:
- logging.warning(f"更新标签英文名失败: {str(e)}")
-
- if 'category' in tag_data and tag_data['category']:
- update_properties['category'] = tag_data['category']
-
- if 'description' in tag_data:
- update_properties['describe'] = tag_data['description']
-
- if 'status' in tag_data:
- update_properties['status'] = tag_data['status']
-
- # 添加更新时间
- update_properties['time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 如果没有可更新的属性,返回错误
- if not update_properties:
- return {
- 'code': 400,
- 'success': False,
- 'message': '未提供任何可更新的属性',
- 'data': None
- }
-
- # 构建更新的Cypher查询
- set_clauses = []
- params = {'nodeId': tag_id}
-
- for key, value in update_properties.items():
- param_name = f"param_{key}"
- set_clauses.append(f"n.{key} = ${param_name}")
- params[param_name] = value
-
- set_clause = ", ".join(set_clauses)
-
- query = f"""
- MATCH (n:data_label)
- WHERE id(n) = $nodeId
- SET {set_clause}
- RETURN id(n) as id, n.name as name, n.en_name as en_name,
- n.category as category, n.describe as description,
- n.status as status, n.time as time
- """
-
- # 执行更新查询
- with neo4j_driver.get_session() as session:
- result = session.run(query, **params)
- record = result.single()
-
- if not record:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{tag_id}的标签',
- 'data': None
- }
-
- # 提取更新后的标签信息
- updated_tag = {
- 'id': record['id'],
- 'name': record['name'],
- 'en_name': record['en_name'],
- 'category': record['category'],
- 'description': record['description'],
- 'status': record['status'],
- 'time': record['time']
- }
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '人才标签更新成功',
- 'data': updated_tag
- }
-
- except Exception as e:
- error_msg = f"更新人才标签失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def delete_talent_tag(tag_id):
- """
- 删除人才标签节点及其相关关系
-
- Args:
- tag_id: 标签节点ID
-
- Returns:
- dict: 操作结果字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 首先获取要删除的标签信息,以便在成功后返回
- get_query = """
- MATCH (n:data_label)
- WHERE id(n) = $nodeId
- RETURN id(n) as id, n.name as name, n.en_name as en_name,
- n.category as category, n.describe as description,
- n.status as status, n.time as time
- """
-
- # 构建删除节点和关系的Cypher查询
- delete_query = """
- MATCH (n:data_label)
- WHERE id(n) = $nodeId
- OPTIONAL MATCH (n)-[r]-()
- DELETE r, n
- RETURN count(n) AS deleted
- """
-
- # 执行查询
- tag_info = None
- with neo4j_driver.get_session() as session:
- # 先获取标签信息
- result = session.run(get_query, nodeId=tag_id)
- record = result.single()
-
- if not record:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{tag_id}的标签',
- 'data': None
- }
-
- # 保存标签信息用于返回
- tag_info = {
- 'id': record['id'],
- 'name': record['name'],
- 'en_name': record['en_name'],
- 'category': record['category'],
- 'description': record['description'],
- 'status': record['status'],
- 'time': record['time']
- }
-
- # 执行删除操作
- delete_result = session.run(delete_query, nodeId=tag_id)
- deleted = delete_result.single()['deleted']
-
- if deleted > 0:
- return {
- 'code': 200,
- 'success': True,
- 'message': '人才标签删除成功',
- 'data': tag_info
- }
- else:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未能删除ID为{tag_id}的标签',
- 'data': None
- }
-
- except Exception as e:
- error_msg = f"删除人才标签失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def query_neo4j_graph(query_requirement):
- """
- 查询Neo4j图数据库,通过Deepseek API生成Cypher脚本
-
- Args:
- query_requirement (str): 查询需求描述
-
- Returns:
- dict: 包含查询结果的字典,JSON格式
- """
- try:
- # 导入必要的模块
- from app.services.neo4j_driver import neo4j_driver
- import requests
- import json
-
- # Deepseek API配置
- api_key = DEEPSEEK_API_KEY
- api_url = DEEPSEEK_API_URL
-
- # 构建提示文本,描述图数据库结构和查询需求
- prompt = f"""
- 请根据以下Neo4j图数据库结构和查询需求,生成一个Cypher查询脚本。
-
- ## 图数据库结构
-
- ### 节点
- 1. talent - 人才节点
- 属性: pg_id(PostgreSQL数据库ID), name_zh(中文姓名), name_en(英文姓名),
- mobile(手机号码), email(电子邮箱), updated_at(更新时间)
-
- 2. data_label - 人才标签节点
-
- ### 关系
- BELONGS_TO - 从属关系
- (talent)-[BELONGS_TO]->(data_label) - 人才属于某标签
-
- ## 查询需求
- {query_requirement}。从查询需求中提取出需要查询的标签。用MATCH和WHERE语句描述。
- 只用一个MATCH语句,描述(t:talent)-[:BELONGS_TO]->(dl:data_label)关系。
- WHERE语句可以包含多个标签,用AND连接。
-
- ## 输出要求
- 1. 只输出有效的Cypher查询语句,不要包含任何解释或注释
- 2. 确保return语句中包含talent节点属性
- 3. 尽量利用图数据库的特性来优化查询效率
-
- 注意:请直接返回Cypher查询语句,无需任何其他文本。
- 例如:
- 查找需求为:查找有新开酒店经验和五星级酒店经验,担任总经理的人。
-
- 生成的Cypher查询语句为:
- MATCH (t:talent)-[:BELONGS_TO]->(dl:data_label)
- WHERE dl.name IN ['新开酒店经验', '五星级酒店', '总经理']
- WITH t, COLLECT(DISTINCT dl.name) AS labels
- WHERE size(labels) = 3
- RETURN t.pg_id as pg_id, t.name_zh as name_zh, t.name_en as name_en, t.mobile as mobile, t.email as email, t.updated_at as updated_at
- """
-
- # 调用Deepseek API生成Cypher脚本
- headers = {
- "Authorization": f"Bearer {api_key}",
- "Content-Type": "application/json"
- }
-
- payload = {
- "model": "deepseek-chat",
- "messages": [
- {"role": "system", "content": "你是一个专业的Neo4j Cypher查询专家。"},
- {"role": "user", "content": prompt}
- ],
- "temperature": 0.1
- }
-
- logging.info("发送请求到Deepseek API生成Cypher脚本")
- response = requests.post(api_url, headers=headers, json=payload, timeout=30)
- response.raise_for_status()
-
- # 解析API响应
- result = response.json()
- cypher_script = result.get("choices", [{}])[0].get("message", {}).get("content", "")
-
- # 清理Cypher脚本,移除不必要的markdown格式或注释
- cypher_script = cypher_script.strip()
- if cypher_script.startswith("```cypher"):
- cypher_script = cypher_script[9:]
- if cypher_script.endswith("```"):
- cypher_script = cypher_script[:-3]
- cypher_script = cypher_script.strip()
-
- logging.info(f"生成的Cypher脚本: {cypher_script}")
-
- # 执行Cypher脚本
- with neo4j_driver.get_session() as session:
- result = session.run(cypher_script)
- records = [record.data() for record in result]
-
- # 构建查询结果
- response_data = {
- 'code': 200,
- 'success': True,
- 'message': '查询成功执行',
- 'query': cypher_script,
- 'data': records
- }
-
- return response_data
-
- except requests.exceptions.HTTPError as e:
- error_msg = f"调用Deepseek API失败: {str(e)}"
- logging.error(error_msg)
- if hasattr(e, 'response') and e.response:
- logging.error(f"错误状态码: {e.response.status_code}")
- logging.error(f"错误内容: {e.response.text}")
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- except Exception as e:
- error_msg = f"查询Neo4j图数据库失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def talent_get_tags(talent_id):
- """
- 根据talent ID获取人才节点关联的标签
-
- Args:
- talent_id (int): 人才节点pg_id
-
- Returns:
- dict: 包含人才ID和关联标签的字典,JSON格式
- """
- try:
- # 导入必要的模块
- from app.services.neo4j_driver import neo4j_driver
-
- # 准备查询返回数据
- response_data = {
- 'code': 200,
- 'success': True,
- 'message': '获取人才标签成功',
- 'data': []
- }
-
- # 构建Cypher查询语句,获取人才节点关联的标签
- cypher_query = """
- MATCH (t:talent)-[r:BELONGS_TO]->(tag:data_label)
- WHERE t.pg_id = $talent_id
- RETURN t.pg_id as talent_id, tag.name as tag_name
- """
-
- # 执行查询
- with neo4j_driver.get_session() as session:
- result = session.run(cypher_query, talent_id=int(talent_id))
- records = list(result)
-
- # 如果没有查询到标签,返回空数组
- if not records:
- response_data['message'] = f'人才pg_id {talent_id} 没有关联的标签'
- return response_data
-
- # 处理查询结果
- for record in records:
- talent_tag = {
- 'talent': record['talent_id'],
- 'tag': record['tag_name']
- }
- response_data['data'].append(talent_tag)
-
- return response_data
-
- except Exception as e:
- error_msg = f"获取人才标签失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def talent_update_tags(data):
- """
- 根据传入的JSON数据为人才节点创建与标签的BELONGS_TO关系
-
- Args:
- data (list): 包含talent和tag字段的对象列表
- 例如: [
- {"talent": 12345, "tag": "市场营销"},
- {"talent": 12345, "tag": "酒店管理"}
- ]
-
- Returns:
- dict: 操作结果和状态信息
- """
- try:
- # 导入必要的模块
- from app.services.neo4j_driver import neo4j_driver
-
- # 验证输入参数
- if not isinstance(data, list):
- return {
- 'code': 400,
- 'success': False,
- 'message': '参数格式错误,需要JSON数组',
- 'data': None
- }
-
- if len(data) == 0:
- return {
- 'code': 400,
- 'success': False,
- 'message': '数据列表为空',
- 'data': None
- }
-
- # 获取当前时间
- current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 成功和失败计数
- success_count = 0
- failed_items = []
-
- # 按talent分组处理数据
- talent_tags = {}
- for item in data:
- # 验证每个项目的格式
- if not isinstance(item, dict) or 'talent' not in item or 'tag' not in item:
- failed_items.append(item)
- continue
-
- talent_id = item.get('talent')
- tag_name = item.get('tag')
-
- # 验证talent_id和tag_name的值
- if not talent_id or not tag_name or not isinstance(tag_name, str):
- failed_items.append(item)
- continue
-
- # 按talent_id分组
- if talent_id not in talent_tags:
- talent_tags[talent_id] = []
-
- talent_tags[talent_id].append(tag_name)
-
- with neo4j_driver.get_session() as session:
- # 处理每个talent及其标签
- for talent_id, tags in talent_tags.items():
- # 首先验证talent节点是否存在
- check_talent_query = """
- MATCH (t:talent)
- WHERE t.pg_id = $talent_id
- RETURN t
- """
- talent_result = session.run(check_talent_query, talent_id=int(talent_id))
- if not talent_result.single():
- # 该talent不存在,记录失败项并继续下一个talent
- for tag in tags:
- failed_items.append({'talent_pg_id': talent_id, 'tag': tag})
- continue
-
- # 首先清除所有现有的BELONGS_TO关系
- clear_relations_query = """
- MATCH (t:talent)-[r:BELONGS_TO]->(:data_label)
- WHERE t.pg_id = $talent_id
- DELETE r
- RETURN count(r) as deleted_count
- """
- clear_result = session.run(clear_relations_query, talent_id=int(talent_id))
- deleted_count = clear_result.single()['deleted_count']
- logging.info(f"已删除talent_id={talent_id}的{deleted_count}个已有标签关系")
-
- # 处理每个标签
- for tag_name in tags:
- try:
- # 1. 查找或创建标签节点
- # 先查找是否存在该标签
- find_tag_query = """
- MATCH (tag:data_label)
- WHERE tag.name = $tag_name
- RETURN id(tag) as tag_id
- """
- tag_result = session.run(find_tag_query, tag_name=tag_name)
- tag_record = tag_result.single()
-
- if tag_record:
- tag_id = tag_record['tag_id']
- else:
- # 创建新标签
- create_tag_query = """
- CREATE (tag:data_label {name: $name, category: $category, updated_at: $updated_at})
- RETURN id(tag) as tag_id
- """
- tag_result = session.run(
- create_tag_query,
- name=tag_name,
- category='talent',
- updated_at=current_time
- )
- tag_record = tag_result.single()
- tag_id = tag_record['tag_id']
-
- # 2. 创建人才与标签的BELONGS_TO关系
- create_relation_query = """
- MATCH (t:talent), (tag:data_label)
- WHERE t.pg_id = $talent_id AND tag.name = $tag_name
- CREATE (t)-[r:BELONGS_TO]->(tag)
- SET r.created_at = $current_time
- RETURN r
- """
-
- relation_result = session.run(
- create_relation_query,
- talent_id=int(talent_id),
- tag_name=tag_name,
- current_time=current_time
- )
-
- if relation_result.single():
- success_count += 1
- else:
- failed_items.append({'talent_pg_id': talent_id, 'tag': tag_name})
-
- except Exception as tag_error:
- logging.error(f"为标签 {tag_name} 创建关系时出错: {str(tag_error)}")
- failed_items.append({'talent_pg_id': talent_id, 'tag': tag_name})
-
- # 返回结果
- total_items = len(data)
- if success_count == total_items:
- return {
- 'code': 200,
- 'success': True,
- 'message': f'成功创建或更新了 {success_count} 个标签关系',
- 'data': {
- 'success_count': success_count,
- 'total_count': total_items,
- 'failed_items': []
- }
- }
- elif success_count > 0:
- return {
- 'code': 206, # Partial Content
- 'success': True,
- 'message': f'部分成功: 创建或更新了 {success_count}/{total_items} 个标签关系',
- 'data': {
- 'success_count': success_count,
- 'total_count': total_items,
- 'failed_items': failed_items
- }
- }
- else:
- return {
- 'code': 500,
- 'success': False,
- 'message': '无法创建任何标签关系',
- 'data': {
- 'success_count': 0,
- 'total_count': total_items,
- 'failed_items': failed_items
- }
- }
-
- except Exception as e:
- error_msg = f"更新人才标签关系失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def get_business_card(card_id):
- """
- 根据ID从PostgreSQL数据库中获取名片记录
-
- Args:
- card_id (int): 名片记录ID
-
- Returns:
- dict: 包含操作结果和名片信息的字典
- """
- try:
- # 查询指定ID的名片记录
- card = BusinessCard.query.get(card_id)
-
- if not card:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{card_id}的名片记录',
- 'data': None
- }
-
- # 将记录转换为字典格式返回
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取名片记录成功',
- 'data': card.to_dict()
- }
-
- except Exception as e:
- error_msg = f"获取名片记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
|