1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846 |
- from typing import Dict, Any
- from app import db
- from datetime import datetime
- import os
- import boto3
- from botocore.config import Config
- import logging
- import requests
- import json
- import re
- import uuid
- from PIL import Image
- from io import BytesIO
- import pytesseract
- import base64
- from openai import OpenAI
- from app.config.config import DevelopmentConfig, ProductionConfig
- # 测试用的解析数据接口。没有实际使用。
- def parse_data(data: Dict[str, Any]) -> Dict[str, Any]:
- """
- 解析数据的主函数
-
- Args:
- data: 要解析的数据
-
- Returns:
- 解析后的数据
- """
- # TODO: 实现数据解析逻辑
- return {
- 'code': 200,
- 'status': 'success',
- 'message': 'Data parsed successfully',
- 'data': data
- }
- # 名片解析数据模型
- class BusinessCard(db.Model):
- __tablename__ = 'business_cards'
-
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- name_zh = db.Column(db.String(100), nullable=False)
- name_en = db.Column(db.String(100))
- title_zh = db.Column(db.String(100))
- title_en = db.Column(db.String(100))
- mobile = db.Column(db.String(50))
- phone = db.Column(db.String(50))
- email = db.Column(db.String(100))
- hotel_zh = db.Column(db.String(200))
- hotel_en = db.Column(db.String(200))
- address_zh = db.Column(db.Text)
- address_en = db.Column(db.Text)
- postal_code_zh = db.Column(db.String(20))
- postal_code_en = db.Column(db.String(20))
- brand_zh = db.Column(db.String(100))
- brand_en = db.Column(db.String(100))
- affiliation_zh = db.Column(db.String(200))
- affiliation_en = db.Column(db.String(200))
- image_path = db.Column(db.String(255)) # MinIO中存储的路径
- career_path = db.Column(db.JSON) # 职业轨迹,JSON格式
- brand_group = db.Column(db.String(200)) # 品牌组合
- created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)
- updated_at = db.Column(db.DateTime, onupdate=datetime.now)
- updated_by = db.Column(db.String(50))
- status = db.Column(db.String(20), default='active')
-
- def to_dict(self):
- return {
- 'id': self.id,
- 'name_zh': self.name_zh,
- 'name_en': self.name_en,
- 'title_zh': self.title_zh,
- 'title_en': self.title_en,
- 'mobile': self.mobile,
- 'phone': self.phone,
- 'email': self.email,
- 'hotel_zh': self.hotel_zh,
- 'hotel_en': self.hotel_en,
- 'address_zh': self.address_zh,
- 'address_en': self.address_en,
- 'postal_code_zh': self.postal_code_zh,
- 'postal_code_en': self.postal_code_en,
- 'brand_zh': self.brand_zh,
- 'brand_en': self.brand_en,
- 'affiliation_zh': self.affiliation_zh,
- 'affiliation_en': self.affiliation_en,
- 'image_path': self.image_path,
- 'career_path': self.career_path,
- 'brand_group': self.brand_group,
- 'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
- 'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
- 'updated_by': self.updated_by,
- 'status': self.status
- }
- # 名片解析功能模块
- # DeepSeek API配置
- DEEPSEEK_API_KEY = os.environ.get('DEEPSEEK_API_KEY', 'sk-2aea6e8b159b448aa3c1e29acd6f4349')
- DEEPSEEK_API_URL = os.environ.get('DEEPSEEK_API_URL', 'https://api.deepseek.com/v1/chat/completions')
- # 备用API端点
- DEEPSEEK_API_URL_BACKUP = 'https://api.deepseek.com/v1/completions'
- # OCR配置
- # 设置pytesseract路径(如果需要)
- # pytesseract.pytesseract.tesseract_cmd = r'/path/to/tesseract'
- # OCR语言设置,支持多语言
- OCR_LANG = os.environ.get('OCR_LANG', 'chi_sim+eng')
- # 根据环境选择配置
- """
- if os.environ.get('FLASK_ENV') == 'production':
- config = ProductionConfig()
- else:
- config = DevelopmentConfig()
- """
- # 使用配置变量,缺省认为在生产环境运行
- config = ProductionConfig()
- # 使用配置变量
- minio_url = f"{'https' if config.MINIO_SECURE else 'http'}://{config.MINIO_HOST}"
- minio_access_key = config.MINIO_USER
- minio_secret_key = config.MINIO_PASSWORD
- minio_bucket = config.MINIO_BUCKET
- use_ssl = config.MINIO_SECURE
- def get_minio_client():
- """获取MinIO客户端连接"""
- try:
- # 使用全局配置变量
- global minio_url, minio_access_key, minio_secret_key, minio_bucket, use_ssl
-
- logging.info(f"尝试连接MinIO服务器: {minio_url}")
-
- minio_client = boto3.client(
- 's3',
- endpoint_url=minio_url,
- aws_access_key_id=minio_access_key,
- aws_secret_access_key=minio_secret_key,
- config=Config(
- signature_version='s3v4',
- retries={'max_attempts': 3, 'mode': 'standard'},
- connect_timeout=10,
- read_timeout=30
- )
- )
-
- # 确保存储桶存在
- buckets = minio_client.list_buckets()
- bucket_names = [bucket['Name'] for bucket in buckets.get('Buckets', [])]
- logging.info(f"成功连接到MinIO服务器,现有存储桶: {bucket_names}")
-
- if minio_bucket not in bucket_names:
- logging.info(f"创建存储桶: {minio_bucket}")
- minio_client.create_bucket(Bucket=minio_bucket)
-
- return minio_client
- except Exception as e:
- logging.error(f"MinIO连接错误: {str(e)}")
- return None
- def extract_text_from_image(image_data):
- """
- 使用OCR从图像中提取文本,然后通过DeepSeek API解析名片信息
-
- Args:
- image_data (bytes): 图像的二进制数据
-
- Returns:
- dict: 提取的信息(姓名、职位、公司等)
-
- Raises:
- Exception: 当OCR或API调用失败或配置错误时抛出异常
- """
- try:
- # 步骤1: 使用OCR从图像中提取文本
- ocr_text = ocr_extract_text(image_data)
- if not ocr_text or ocr_text.strip() == "":
- error_msg = "OCR无法从图像中提取文本"
- logging.error(error_msg)
- raise Exception(error_msg)
-
- logging.info(f"OCR提取的文本: {ocr_text[:200]}..." if len(ocr_text) > 200 else ocr_text)
-
- # 步骤2: 使用DeepSeek API解析文本中的信息
- return parse_text_with_deepseek(ocr_text)
-
- except Exception as e:
- error_msg = f"从图像中提取和解析文本失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def ocr_extract_text(image_data):
- """
- 使用OCR从图像中提取文本
-
- Args:
- image_data (bytes): 图像的二进制数据
-
- Returns:
- str: 提取的文本
- """
- try:
- # 将二进制数据转换为PIL图像
- image = Image.open(BytesIO(image_data))
-
- # 使用pytesseract进行OCR文本提取
- text = pytesseract.image_to_string(image, lang=OCR_LANG)
-
- # 清理提取的文本
- text = text.strip()
- logging.info(f"OCR成功从图像中提取文本,长度: {len(text)}")
- print(text)
-
- return text
- except Exception as e:
- error_msg = f"OCR提取文本失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def parse_text_with_deepseek(text):
- """
- 使用DeepSeek API解析文本中的名片信息
-
- Args:
- text (str): 要解析的文本
-
- Returns:
- dict: 解析的名片信息
- """
- # 准备请求DeepSeek API
- if not DEEPSEEK_API_KEY:
- error_msg = "未配置DeepSeek API密钥"
- logging.error(error_msg)
- raise Exception(error_msg)
-
- # 构建API请求的基本信息
- headers = {
- "Authorization": f"Bearer {DEEPSEEK_API_KEY}",
- "Content-Type": "application/json"
- }
-
- # 构建提示语,包含OCR提取的文本
- prompt = f"""请从以下名片文本中提取详细信息,需分别识别中英文内容。
- 以JSON格式返回,包含以下字段:
- - name_zh: 中文姓名
- - name_en: 英文姓名
- - title_zh: 中文职位/头衔
- - title_en: 英文职位/头衔
- - hotel_zh: 中文酒店/公司名称
- - hotel_en: 英文酒店/公司名称
- - mobile: 手机号码
- - phone: 固定电话
- - email: 电子邮箱
- - address_zh: 中文地址
- - address_en: 英文地址
- - brand_group: 品牌组合(如有多个品牌,以逗号分隔)
- - career_path: 职业轨迹(如果能从文本中推断出,以JSON数组格式返回,包含公司名称和职位)
- 名片文本:
- {text}
- """
-
- # 使用模型名称
- model_name = 'deepseek-chat'
-
- try:
- # 尝试调用DeepSeek API
- logging.info(f"尝试通过DeepSeek API解析文本")
- payload = {
- "model": model_name,
- "messages": [
- {"role": "system", "content": "你是一个专业的名片信息提取助手。请用JSON格式返回结果,不要有多余的文字说明。"},
- {"role": "user", "content": prompt}
- ],
- "temperature": 0.1
- }
-
- logging.info(f"向DeepSeek API发送请求")
- response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload, timeout=30)
-
- # 检查响应状态
- response.raise_for_status()
-
- # 解析API响应
- result = response.json()
- content = result.get("choices", [{}])[0].get("message", {}).get("content", "{}")
-
- # 尝试解析JSON内容
- try:
- # 找到内容中的JSON部分(有时模型会在JSON前后添加额外文本)
- json_content = extract_json_from_text(content)
- extracted_data = json.loads(json_content)
- logging.info(f"成功解析DeepSeek API返回的JSON")
- except json.JSONDecodeError:
- logging.warning(f"无法解析JSON,尝试直接从文本提取信息")
- # 如果无法解析JSON,尝试直接从文本中提取关键信息
- extracted_data = extract_fields_from_text(content)
-
- # 确保所有必要的字段都存在
- required_fields = ['name', 'title', 'company', 'phone', 'email', 'address', 'brand_group', 'career_path']
- for field in required_fields:
- if field not in extracted_data:
- extracted_data[field] = "" if field != 'career_path' else []
-
- logging.info(f"成功从DeepSeek API获取解析结果")
- return extracted_data
-
- except requests.exceptions.HTTPError as e:
- error_msg = f"DeepSeek API调用失败: {str(e)}"
- logging.error(error_msg)
-
- if hasattr(e, 'response') and e.response:
- logging.error(f"错误状态码: {e.response.status_code}")
- logging.error(f"错误内容: {e.response.text}")
-
- raise Exception(error_msg)
- except Exception as e:
- error_msg = f"解析文本过程中发生错误: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def extract_json_from_text(text):
- """
- 从文本中提取JSON部分
-
- Args:
- text (str): 包含JSON的文本
-
- Returns:
- str: 提取的JSON字符串
- """
- # 尝试找到最外层的花括号对
- start_idx = text.find('{')
- if start_idx == -1:
- return "{}"
-
- # 使用简单的括号匹配算法找到对应的闭合括号
- count = 0
- for i in range(start_idx, len(text)):
- if text[i] == '{':
- count += 1
- elif text[i] == '}':
- count -= 1
- if count == 0:
- return text[start_idx:i+1]
-
- # 如果没有找到闭合括号,返回从开始位置到文本结尾
- return text[start_idx:]
- def extract_fields_from_text(text):
- """
- 从文本中直接提取名片字段信息
-
- Args:
- text (str): 要分析的文本
-
- Returns:
- dict: 提取的字段
- """
- # 初始化结果字典
- result = {
- 'name_zh': '',
- 'name_en': '',
- 'title_zh': '',
- 'title_en': '',
- 'mobile': '',
- 'phone': '',
- 'email': '',
- 'hotel_zh': '',
- 'hotel_en': '',
- 'address_zh': '',
- 'address_en': '',
- 'postal_code_zh': '',
- 'postal_code_en': '',
- 'brand_zh': '',
- 'brand_en': '',
- 'affiliation_zh': '',
- 'affiliation_en': ''
- }
-
- # 提取中文姓名
- name_zh_match = re.search(r'["\'](姓名)["\'][\s\{:]*["\']?(中文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if name_zh_match:
- result['name_zh'] = name_zh_match.group(3)
-
- # 提取英文姓名
- name_en_match = re.search(r'["\'](姓名)["\'][\s\{:]*["\']?(英文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if name_en_match:
- result['name_en'] = name_en_match.group(3)
-
- # 提取中文头衔
- title_zh_match = re.search(r'["\'](头衔|职位)["\'][\s\{:]*["\']?(中文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if title_zh_match:
- result['title_zh'] = title_zh_match.group(3)
-
- # 提取英文头衔
- title_en_match = re.search(r'["\'](头衔|职位)["\'][\s\{:]*["\']?(英文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if title_en_match:
- result['title_en'] = title_en_match.group(3)
-
- # 提取手机
- mobile_match = re.search(r'["\'](手机)["\'][\s:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if mobile_match:
- result['mobile'] = mobile_match.group(2)
-
- # 提取电话
- phone_match = re.search(r'["\'](电话)["\'][\s:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if phone_match:
- result['phone'] = phone_match.group(2)
-
- # 提取邮箱
- email_match = re.search(r'["\'](邮箱)["\'][\s:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if email_match:
- result['email'] = email_match.group(2)
-
- # 提取中文酒店名称
- hotel_zh_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(酒店名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if hotel_zh_match:
- result['hotel_zh'] = hotel_zh_match.group(4)
-
- # 提取英文酒店名称
- hotel_en_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(酒店名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if hotel_en_match:
- result['hotel_en'] = hotel_en_match.group(4)
-
- # 提取中文详细地址
- address_zh_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(详细地址)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if address_zh_match:
- result['address_zh'] = address_zh_match.group(4)
-
- # 提取英文详细地址
- address_en_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(详细地址)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if address_en_match:
- result['address_en'] = address_en_match.group(4)
-
- # 提取中文邮政编码
- postal_code_zh_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(邮政编码)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if postal_code_zh_match:
- result['postal_code_zh'] = postal_code_zh_match.group(4)
-
- # 提取英文邮政编码
- postal_code_en_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(邮政编码)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if postal_code_en_match:
- result['postal_code_en'] = postal_code_en_match.group(4)
-
- # 提取中文品牌名称
- brand_zh_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(品牌名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if brand_zh_match:
- result['brand_zh'] = brand_zh_match.group(4)
-
- # 提取英文品牌名称
- brand_en_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(品牌名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if brand_en_match:
- result['brand_en'] = brand_en_match.group(4)
-
- # 提取中文隶属关系
- affiliation_zh_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(隶属关系)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if affiliation_zh_match:
- result['affiliation_zh'] = affiliation_zh_match.group(4)
-
- # 提取英文隶属关系
- affiliation_en_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(隶属关系)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if affiliation_en_match:
- result['affiliation_en'] = affiliation_en_match.group(4)
-
- return result
- def parse_text_with_qwen25VLplus(image_data):
- """
- 使用阿里云的 Qwen 2.5 VL Plus 模型解析图像中的名片信息
-
- Args:
- image_data (bytes): 图像的二进制数据
-
- Returns:
- dict: 解析的名片信息
- """
- # 阿里云 Qwen API 配置
- QWEN_API_KEY = os.environ.get('QWEN_API_KEY', 'sk-8f2320dafc9e4076968accdd8eebd8e9')
-
- try:
- # 将图片数据转为 base64 编码
- base64_image = base64.b64encode(image_data).decode('utf-8')
-
- # 初始化 OpenAI 客户端,配置为阿里云 API
- client = OpenAI(
- api_key=QWEN_API_KEY,
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- )
-
- # 构建优化后的提示语
- prompt = """你是企业名片的信息提取专家。请仔细分析提供的名片,精确提取以下信息:
- ## 提取要求
- - 区分中英文内容,分别提取
- - 保持提取信息的原始格式(如大小写、标点)
- - 对于无法识别或名片中不存在的信息,返回空字符串
- - 名片中没有的信息,请不要猜测
- ## 需提取的字段
- 1. 中文姓名 (name_zh)
- 2. 英文姓名 (name_en)
- 3. 中文职位/头衔 (title_zh)
- 4. 英文职位/头衔 (title_en)
- 5. 中文酒店/公司名称 (hotel_zh)
- 6. 英文酒店/公司名称 (hotel_en)
- 7. 手机号码 (mobile) - 如有多个,使用逗号分隔
- 8. 固定电话 (phone) - 如有多个,使用逗号分隔
- 9. 电子邮箱 (email)
- 10. 中文地址 (address_zh)
- 11. 英文地址 (address_en)
- 12. 中文邮政编码 (postal_code_zh)
- 13. 英文邮政编码 (postal_code_en)
- 14. 品牌组合 (brand_group) - 如有多个品牌,使用逗号分隔
- 15. 职业轨迹 (career_path) - 如能从名片中推断,以JSON数组格式返回,包含当前日期,公司名称和职位
- 16. 隶属关系 (affiliation) - 如能从名片中推断,以JSON数组格式返回,包含公司名称和隶属集团名称
- ## 输出格式
- 请以严格的JSON格式返回结果,不要添加任何额外解释文字。JSON格式如下:
- ```json
- {
- "name_zh": "",
- "name_en": "",
- "title_zh": "",
- "title_en": "",
- "hotel_zh": "",
- "hotel_en": "",
- "mobile": "",
- "phone": "",
- "email": "",
- "address_zh": "",
- "address_en": "",
- "postal_code_zh": "",
- "postal_code_en": "",
- "brand_group": "",
- "career_path": [],
- "affiliation": []
- }
- ```"""
-
- # 调用 Qwen 2.5 VL Plus API
- logging.info("发送请求到 Qwen 2.5 VL Plus 模型")
- completion = client.chat.completions.create(
- model="qwen-vl-plus",
- messages=[
- {
- "role": "user",
- "content": [
- {"type": "text", "text": prompt},
- {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
- ]
- }
- ],
- temperature=0.1, # 降低温度增加精确性
- response_format={"type": "json_object"} # 要求输出JSON格式
- )
-
- # 解析响应
- response_content = completion.choices[0].message.content
- logging.info(f"成功从 Qwen 模型获取响应: {response_content}")
-
- # 尝试从响应中提取 JSON
- try:
- json_content = extract_json_from_text(response_content)
- extracted_data = json.loads(json_content)
- logging.info("成功解析 Qwen 响应中的 JSON")
- except json.JSONDecodeError:
- logging.warning("无法解析 JSON,尝试从文本中提取信息")
- extracted_data = extract_fields_from_text(response_content)
-
- # 确保所有必要字段存在
- required_fields = [
- 'name_zh', 'name_en', 'title_zh', 'title_en',
- 'hotel_zh', 'hotel_en', 'mobile', 'phone',
- 'email', 'address_zh', 'address_en',
- 'postal_code_zh', 'postal_code_en', 'brand_group', 'career_path'
- ]
-
- for field in required_fields:
- if field not in extracted_data:
- extracted_data[field] = [] if field == 'career_path' else ""
-
- return extracted_data
-
- except Exception as e:
- error_msg = f"Qwen 2.5 VL Plus 模型解析失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def process_business_card(image_file):
- """
- 处理名片图片并提取信息
-
- Args:
- image_file (FileStorage): 上传的名片图片文件
-
- Returns:
- dict: 处理结果,包含提取的信息和状态
- """
- minio_path = None
-
- try:
- # 读取图片数据
- image_data = image_file.read()
- image_file.seek(0) # 重置文件指针以便后续读取
-
- try:
- # 优先使用 Qwen 2.5 VL Plus 模型直接从图像提取信息
- try:
- logging.info("尝试使用 Qwen 2.5 VL Plus 模型解析名片")
- extracted_data = parse_text_with_qwen25VLplus(image_data)
- logging.info("成功使用 Qwen 2.5 VL Plus 模型解析名片")
- except Exception as qwen_error:
- logging.warning(f"Qwen 模型解析失败,错误原因: {str(qwen_error)}")
- # extracted_data = extract_text_from_image(image_data)
- except Exception as e:
- return {
- 'code': 500,
- 'success': False,
- 'message': f"名片解析失败: {str(e)}",
- 'data': None
- }
-
- try:
- # 生成唯一的文件名
- file_ext = os.path.splitext(image_file.filename)[1].lower()
- if not file_ext:
- file_ext = '.jpg' # 默认扩展名
-
- unique_filename = f"{uuid.uuid4().hex}{file_ext}"
- minio_path = f"{unique_filename}"
-
- # 尝试上传到MinIO
- minio_client = get_minio_client()
- if minio_client:
- try:
- # 上传文件
- logging.info(f"上传文件到MinIO: {minio_path}")
- minio_client.put_object(
- Bucket=minio_bucket,
- Key=minio_path,
- Body=image_file,
- ContentType=image_file.content_type
- )
- logging.info(f"图片已上传到MinIO: {minio_path}")
- except Exception as upload_err:
- logging.error(f"上传文件到MinIO时出错: {str(upload_err)}")
- # 即使上传失败,仍继续处理,但路径为None
- minio_path = None
- else:
- minio_path = None
- logging.warning("MinIO客户端未初始化,图片未上传")
- except Exception as e:
- logging.error(f"上传图片到MinIO失败: {str(e)}", exc_info=True)
- minio_path = None
-
- try:
- # 保存到数据库
- business_card = BusinessCard(
- name_zh=extracted_data.get('name_zh', ''),
- name_en=extracted_data.get('name_en', ''),
- title_zh=extracted_data.get('title_zh', ''),
- title_en=extracted_data.get('title_en', ''),
- mobile=extracted_data.get('mobile', ''),
- phone=extracted_data.get('phone', ''),
- email=extracted_data.get('email', ''),
- hotel_zh=extracted_data.get('hotel_zh', ''),
- hotel_en=extracted_data.get('hotel_en', ''),
- address_zh=extracted_data.get('address_zh', ''),
- address_en=extracted_data.get('address_en', ''),
- postal_code_zh=extracted_data.get('postal_code_zh', ''),
- postal_code_en=extracted_data.get('postal_code_en', ''),
- brand_zh=extracted_data.get('brand_zh', ''),
- brand_en=extracted_data.get('brand_en', ''),
- affiliation_zh=extracted_data.get('affiliation_zh', ''),
- affiliation_en=extracted_data.get('affiliation_en', ''),
- image_path=minio_path, # 存储相对路径
- career_path=extracted_data.get('career_path', []), # 添加职业轨迹
- brand_group=extracted_data.get('brand_group', ''), # 添加品牌组合
- status='active',
- updated_by='system'
- )
-
- db.session.add(business_card)
- db.session.commit()
-
- logging.info(f"名片信息已保存到数据库,ID: {business_card.id}")
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '名片解析成功',
- 'data': business_card.to_dict()
- }
- except Exception as e:
- db.session.rollback()
- error_msg = f"保存名片信息到数据库失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- # 即使数据库操作失败,仍返回提取的信息
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': {
- 'id': None,
- 'name_zh': extracted_data.get('name_zh', ''),
- 'name_en': extracted_data.get('name_en', ''),
- 'title_zh': extracted_data.get('title_zh', ''),
- 'title_en': extracted_data.get('title_en', ''),
- 'mobile': extracted_data.get('mobile', ''),
- 'phone': extracted_data.get('phone', ''),
- 'email': extracted_data.get('email', ''),
- 'hotel_zh': extracted_data.get('hotel_zh', ''),
- 'hotel_en': extracted_data.get('hotel_en', ''),
- 'address_zh': extracted_data.get('address_zh', ''),
- 'address_en': extracted_data.get('address_en', ''),
- 'postal_code_zh': extracted_data.get('postal_code_zh', ''),
- 'postal_code_en': extracted_data.get('postal_code_en', ''),
- 'brand_zh': extracted_data.get('brand_zh', ''),
- 'brand_en': extracted_data.get('brand_en', ''),
- 'affiliation_zh': extracted_data.get('affiliation_zh', ''),
- 'affiliation_en': extracted_data.get('affiliation_en', ''),
- 'image_path': minio_path, # 返回相对路径
- 'career_path': extracted_data.get('career_path', []), # 添加职业轨迹
- 'brand_group': extracted_data.get('brand_group', ''), # 添加品牌组合
- 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'updated_at': None,
- 'updated_by': 'system',
- 'status': 'active'
- }
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"名片处理失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def update_business_card(card_id, data):
- """
- 更新名片信息
-
- Args:
- card_id (int): 名片记录ID
- data (dict): 包含要更新的字段的字典
-
- Returns:
- dict: 包含操作结果和更新后的名片信息
- """
- try:
- # 查找要更新的名片记录
- card = BusinessCard.query.get(card_id)
-
- if not card:
- return {
- 'code': 500,
- 'success': False,
- 'message': f'未找到ID为{card_id}的名片记录',
- 'data': None
- }
-
- # 更新名片信息
- card.name_zh = data.get('name_zh', card.name_zh)
- card.name_en = data.get('name_en', card.name_en)
- card.title_zh = data.get('title_zh', card.title_zh)
- card.title_en = data.get('title_en', card.title_en)
- card.mobile = data.get('mobile', card.mobile)
- card.phone = data.get('phone', card.phone)
- card.email = data.get('email', card.email)
- card.hotel_zh = data.get('hotel_zh', card.hotel_zh)
- card.hotel_en = data.get('hotel_en', card.hotel_en)
- card.address_zh = data.get('address_zh', card.address_zh)
- card.address_en = data.get('address_en', card.address_en)
- card.postal_code_zh = data.get('postal_code_zh', card.postal_code_zh)
- card.postal_code_en = data.get('postal_code_en', card.postal_code_en)
- card.brand_zh = data.get('brand_zh', card.brand_zh)
- card.brand_en = data.get('brand_en', card.brand_en)
- card.affiliation_zh = data.get('affiliation_zh', card.affiliation_zh)
- card.affiliation_en = data.get('affiliation_en', card.affiliation_en)
- card.career_path = data.get('career_path', card.career_path) # 更新职业轨迹
- card.brand_group = data.get('brand_group', card.brand_group) # 更新品牌组合
- card.updated_by = data.get('updated_by', 'user') # 可以根据实际情况修改为当前用户
-
- # 保存更新
- db.session.commit()
-
- # 更新成功后,更新Neo4j图数据库中的人才-酒店关系
- try:
- from app.services.neo4j_driver import neo4j_driver
- from app.core.graph.graph_operations import create_or_get_node
-
- # 获取当前时间
- current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 创建或更新人才节点
- talent_properties = {
- 'pg_id': card_id, # PostgreSQL数据库中的ID
- 'name_zh': card.name_zh, # 中文姓名
- 'name_en': card.name_en, # 英文姓名
- 'mobile': card.mobile, # 手机号码
- 'email': card.email, # 电子邮箱
- 'updated_at': current_time # 更新时间
- }
-
- talent_node_id = create_or_get_node('talent', **talent_properties)
-
- # 如果有酒店信息,创建或更新酒店节点
- if card.hotel_zh or card.hotel_en:
- hotel_properties = {
- 'hotel_zh': card.hotel_zh, # 酒店中文名称
- 'hotel_en': card.hotel_en, # 酒店英文名称
- 'updated_at': current_time # 更新时间
- }
-
- hotel_node_id = create_or_get_node('hotel', **hotel_properties)
-
- # 创建或更新人才与酒店之间的WORK_FOR关系
- if talent_node_id and hotel_node_id:
- # 构建Cypher查询以创建或更新关系
- cypher_query = """
- MATCH (t:talent), (h:hotel)
- WHERE id(t) = $talent_id AND id(h) = $hotel_id
- MERGE (t)-[r:WORKS_FOR]->(h)
- SET r.title_zh = $title_zh,
- r.title_en = $title_en,
- r.updated_at = $updated_at
- RETURN r
- """
-
- with neo4j_driver.get_session() as session:
- session.run(
- cypher_query,
- talent_id=talent_node_id,
- hotel_id=hotel_node_id,
- title_zh=card.title_zh,
- title_en=card.title_en,
- updated_at=current_time
- )
-
- logging.info(f"已成功更新人才(ID:{talent_node_id})与酒店(ID:{hotel_node_id})的WORK_FOR关系")
-
- logging.info(f"Neo4j图数据库关系更新成功")
- except Exception as e:
- logging.error(f"更新Neo4j图数据库关系失败: {str(e)}", exc_info=True)
- # 不因为图数据库更新失败而影响PostgreSQL数据库的更新结果
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '名片信息已更新',
- 'data': card.to_dict()
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"更新名片信息失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def get_business_cards():
- """
- 获取所有名片记录列表
-
- Returns:
- dict: 包含操作结果和名片列表
- """
- try:
- # 查询所有名片记录
- cards = BusinessCard.query.all()
-
- # 将所有记录转换为字典格式
- cards_data = [card.to_dict() for card in cards]
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取名片列表成功',
- 'data': cards_data
- }
-
- except Exception as e:
- error_msg = f"获取名片列表失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def update_business_card_status(card_id, status):
- """
- 更新名片状态(激活/禁用)
-
- Args:
- card_id (int): 名片记录ID
- status (str): 新状态,'active'或'inactive'
-
- Returns:
- dict: 包含操作结果和更新后的名片信息
- """
- try:
- # 查找要更新的名片记录
- card = BusinessCard.query.get(card_id)
-
- if not card:
- return {
- 'code': 500,
- 'success': False,
- 'message': f'未找到ID为{card_id}的名片记录',
- 'data': None
- }
-
- # 验证状态值
- if status not in ['active', 'inactive']:
- return {
- 'code': 500,
- 'success': False,
- 'message': f'无效的状态值: {status},必须为 active 或 inactive',
- 'data': None
- }
-
- # 更新状态
- card.status = status
- card.updated_at = datetime.now()
- card.updated_by = 'system' # 可以根据实际情况修改为当前用户
-
- # 保存更新
- db.session.commit()
-
- return {
- 'code': 200,
- 'success': True,
- 'message': f'名片状态已更新为: {status}',
- 'data': card.to_dict()
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"更新名片状态失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def create_talent_tag(tag_data):
- """
- 创建人才标签节点
-
- Args:
- tag_data: 包含标签信息的字典,包括:
- - name: 标签名称
- - category: 标签分类
- - description: 标签描述
- - status: 启用状态
-
- Returns:
- dict: 操作结果字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 验证必要参数存在
- if not tag_data or 'name' not in tag_data or not tag_data['name']:
- return {
- 'code': 400,
- 'success': False,
- 'message': '标签名称为必填项',
- 'data': None
- }
-
- # 准备节点属性
- tag_properties = {
- 'name': tag_data.get('name'),
- 'category': tag_data.get('category', '未分类'),
- 'describe': tag_data.get('description', ''), # 使用describe与现有系统保持一致
- 'status': tag_data.get('status', 'active'),
- 'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
-
- # 生成标签的英文名(可选)
- from app.core.graph.graph_operations import create_or_get_node
-
- # 如果提供了名称,尝试获取英文翻译
- if 'name' in tag_data and tag_data['name']:
- try:
- from app.api.data_interface.routes import translate_and_parse
- en_name = translate_and_parse(tag_data['name'])
- tag_properties['en_name'] = en_name[0] if en_name and isinstance(en_name, list) else ''
- except Exception as e:
- logging.warning(f"获取标签英文名失败: {str(e)}")
- tag_properties['en_name'] = ''
-
- # 创建节点
- node_id = create_or_get_node('data_label', **tag_properties)
-
- if node_id:
- return {
- 'code': 200,
- 'success': True,
- 'message': '人才标签创建成功',
- 'data': {
- 'id': node_id,
- **tag_properties
- }
- }
- else:
- return {
- 'code': 500,
- 'success': False,
- 'message': '人才标签创建失败',
- 'data': None
- }
-
- except Exception as e:
- logging.error(f"创建人才标签失败: {str(e)}", exc_info=True)
- return {
- 'code': 500,
- 'success': False,
- 'message': f'创建人才标签失败: {str(e)}',
- 'data': None
- }
- def get_talent_tag_list():
- """
- 从Neo4j图数据库获取人才标签列表
-
- Returns:
- dict: 包含操作结果和标签列表的字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 构建Cypher查询语句,获取分类为talent的标签
- query = """
- MATCH (n:data_label)
- WHERE n.category CONTAINS 'talent' OR n.category CONTAINS '人才'
- RETURN id(n) as id, n.name as name, n.en_name as en_name,
- n.category as category, n.describe as description,
- n.status as status, n.time as time
- ORDER BY n.time DESC
- """
-
- # 执行查询
- tags = []
- with neo4j_driver.get_session() as session:
- result = session.run(query)
-
- # 处理查询结果
- for record in result:
- tag = {
- 'id': record['id'],
- 'name': record['name'],
- 'en_name': record['en_name'],
- 'category': record['category'],
- 'description': record['description'],
- 'status': record['status'],
- 'time': record['time']
- }
- tags.append(tag)
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取人才标签列表成功',
- 'data': tags
- }
-
- except Exception as e:
- error_msg = f"获取人才标签列表失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def update_talent_tag(tag_id, tag_data):
- """
- 更新人才标签节点属性
-
- Args:
- tag_id: 标签节点ID
- tag_data: 包含更新信息的字典,可能包括:
- - name: 标签名称
- - category: 标签分类
- - description: 标签描述
- - status: 启用状态
-
- Returns:
- dict: 操作结果字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 准备要更新的属性
- update_properties = {}
-
- # 检查并添加需要更新的属性
- if 'name' in tag_data and tag_data['name']:
- update_properties['name'] = tag_data['name']
-
- # 如果名称更新了,尝试更新英文名称
- try:
- from app.api.data_interface.routes import translate_and_parse
- en_name = translate_and_parse(tag_data['name'])
- update_properties['en_name'] = en_name[0] if en_name and isinstance(en_name, list) else ''
- except Exception as e:
- logging.warning(f"更新标签英文名失败: {str(e)}")
-
- if 'category' in tag_data and tag_data['category']:
- update_properties['category'] = tag_data['category']
-
- if 'description' in tag_data:
- update_properties['describe'] = tag_data['description']
-
- if 'status' in tag_data:
- update_properties['status'] = tag_data['status']
-
- # 添加更新时间
- update_properties['time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 如果没有可更新的属性,返回错误
- if not update_properties:
- return {
- 'code': 400,
- 'success': False,
- 'message': '未提供任何可更新的属性',
- 'data': None
- }
-
- # 构建更新的Cypher查询
- set_clauses = []
- params = {'nodeId': tag_id}
-
- for key, value in update_properties.items():
- param_name = f"param_{key}"
- set_clauses.append(f"n.{key} = ${param_name}")
- params[param_name] = value
-
- set_clause = ", ".join(set_clauses)
-
- query = f"""
- MATCH (n:data_label)
- WHERE id(n) = $nodeId
- SET {set_clause}
- RETURN id(n) as id, n.name as name, n.en_name as en_name,
- n.category as category, n.describe as description,
- n.status as status, n.time as time
- """
-
- # 执行更新查询
- with neo4j_driver.get_session() as session:
- result = session.run(query, **params)
- record = result.single()
-
- if not record:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{tag_id}的标签',
- 'data': None
- }
-
- # 提取更新后的标签信息
- updated_tag = {
- 'id': record['id'],
- 'name': record['name'],
- 'en_name': record['en_name'],
- 'category': record['category'],
- 'description': record['description'],
- 'status': record['status'],
- 'time': record['time']
- }
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '人才标签更新成功',
- 'data': updated_tag
- }
-
- except Exception as e:
- error_msg = f"更新人才标签失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def delete_talent_tag(tag_id):
- """
- 删除人才标签节点及其相关关系
-
- Args:
- tag_id: 标签节点ID
-
- Returns:
- dict: 操作结果字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 首先获取要删除的标签信息,以便在成功后返回
- get_query = """
- MATCH (n:data_label)
- WHERE id(n) = $nodeId
- RETURN id(n) as id, n.name as name, n.en_name as en_name,
- n.category as category, n.describe as description,
- n.status as status, n.time as time
- """
-
- # 构建删除节点和关系的Cypher查询
- delete_query = """
- MATCH (n:data_label)
- WHERE id(n) = $nodeId
- OPTIONAL MATCH (n)-[r]-()
- DELETE r, n
- RETURN count(n) AS deleted
- """
-
- # 执行查询
- tag_info = None
- with neo4j_driver.get_session() as session:
- # 先获取标签信息
- result = session.run(get_query, nodeId=tag_id)
- record = result.single()
-
- if not record:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{tag_id}的标签',
- 'data': None
- }
-
- # 保存标签信息用于返回
- tag_info = {
- 'id': record['id'],
- 'name': record['name'],
- 'en_name': record['en_name'],
- 'category': record['category'],
- 'description': record['description'],
- 'status': record['status'],
- 'time': record['time']
- }
-
- # 执行删除操作
- delete_result = session.run(delete_query, nodeId=tag_id)
- deleted = delete_result.single()['deleted']
-
- if deleted > 0:
- return {
- 'code': 200,
- 'success': True,
- 'message': '人才标签删除成功',
- 'data': tag_info
- }
- else:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未能删除ID为{tag_id}的标签',
- 'data': None
- }
-
- except Exception as e:
- error_msg = f"删除人才标签失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def query_neo4j_graph(query_requirement):
- """
- 查询Neo4j图数据库,通过Deepseek API生成Cypher脚本
-
- Args:
- query_requirement (str): 查询需求描述
-
- Returns:
- dict: 包含查询结果的字典,JSON格式
- """
- try:
- # 导入必要的模块
- from app.services.neo4j_driver import neo4j_driver
- import requests
- import json
-
- # Deepseek API配置
- api_key = DEEPSEEK_API_KEY
- api_url = DEEPSEEK_API_URL
-
- # 步骤1: 从Neo4j获取所有标签列表
- logging.info("第一步:从Neo4j获取人才类别的标签列表")
- all_labels_query = """
- MATCH (dl:data_label)
- WHERE dl.category CONTAINS '人才' OR dl.category CONTAINS 'talent'
- RETURN dl.name as name
- """
-
- all_labels = []
- with neo4j_driver.get_session() as session:
- result = session.run(all_labels_query)
- for record in result:
- all_labels.append(record['name'])
-
- logging.info(f"获取到{len(all_labels)}个人才标签: {all_labels}")
-
- # 步骤2: 使用Deepseek判断查询需求中的关键信息与标签的对应关系
- logging.info("第二步:调用Deepseek API匹配查询需求与标签")
-
- # 构建所有标签的JSON字符串
- labels_json = json.dumps(all_labels, ensure_ascii=False)
-
- # 构建匹配标签的提示语
- matching_prompt = f"""
- 请分析以下查询需求,并从标签列表中找出与查询需求相关的标签。
-
- ## 查询需求
- {query_requirement}
-
- ## 可用标签列表
- {labels_json}
-
- ## 输出要求
- 1. 请以JSON数组格式返回匹配的标签名称列表,格式如: ["标签1", "标签2", "标签3"]
- 2. 只返回标签名称数组,不要包含任何解释或其他文本
- 3. 如果没有找到匹配的标签,请返回空数组 []
- """
-
- # 调用Deepseek API匹配标签
- headers = {
- "Authorization": f"Bearer {api_key}",
- "Content-Type": "application/json"
- }
-
- payload = {
- "model": "deepseek-chat",
- "messages": [
- {"role": "system", "content": "你是一个专业的文本分析和匹配专家。"},
- {"role": "user", "content": matching_prompt}
- ],
- "temperature": 0.1,
- "response_format": {"type": "json_object"}
- }
-
- logging.info("发送请求到Deepseek API匹配标签:"+matching_prompt)
- response = requests.post(api_url, headers=headers, json=payload, timeout=30)
- response.raise_for_status()
-
- # 解析API响应
- result = response.json()
- matching_content = result.get("choices", [{}])[0].get("message", {}).get("content", "[]")
-
- # 提取JSON数组
- try:
- # 尝试直接解析返回结果,预期格式为 ["新开酒店经验", "五星级酒店", "总经理"]
- logging.info(f"Deepseek返回的匹配内容: {matching_content}")
-
- # 如果返回的是JSON字符串,先去除可能的前后缀文本
- if isinstance(matching_content, str):
- # 查找JSON数组的开始和结束位置
- start_idx = matching_content.find('[')
- end_idx = matching_content.rfind(']') + 1
-
- if start_idx >= 0 and end_idx > start_idx:
- json_str = matching_content[start_idx:end_idx]
- matched_labels = json.loads(json_str)
- else:
- matched_labels = []
- else:
- matched_labels = []
-
- # 确保结果是字符串列表
- if matched_labels and all(isinstance(item, str) for item in matched_labels):
- logging.info(f"成功解析到标签列表: {matched_labels}")
- else:
- logging.warning("解析结果不是预期的字符串列表格式,将使用空列表")
- matched_labels = []
- except json.JSONDecodeError as e:
- logging.error(f"JSON解析错误: {str(e)}")
- matched_labels = []
- except Exception as e:
- logging.error(f"解析匹配标签时出错: {str(e)}")
- matched_labels = []
-
- logging.info(f"匹配到的标签: {matched_labels}")
-
- # 如果没有匹配到标签,返回空结果
- if not matched_labels:
- return {
- 'code': 200,
- 'success': True,
- 'message': '未找到与查询需求匹配的标签',
- 'query': '',
- 'data': []
- }
-
- # 步骤3: 构建Cypher生成提示文本
- logging.info("第三步:构建提示文本生成Cypher查询语句")
-
- # 将匹配的标签转换为字符串
- matched_labels_str = ", ".join([f"'{label}'" for label in matched_labels])
-
- # 构建生成Cypher的提示语
- cypher_prompt = f"""
- 请根据以下Neo4j图数据库结构和已匹配的标签,生成一个Cypher查询脚本。
-
- ## 图数据库结构
-
- ### 节点
- 1. talent - 人才节点
- 属性: pg_id(PostgreSQL数据库ID), name_zh(中文姓名), name_en(英文姓名),
- mobile(手机号码), email(电子邮箱), updated_at(更新时间)
-
- 2. data_label - 人才标签节点
-
- ### 关系
- BELONGS_TO - 从属关系
- (talent)-[BELONGS_TO]->(data_label) - 人才属于某标签
-
- ## 匹配的标签列表
- [{matched_labels_str}]
-
- ## 查询需求
- {query_requirement}
-
- ## 输出要求
- 1. 只输出有效的Cypher查询语句,不要包含任何解释或注释
- 2. 确保return语句中包含talent节点属性
- 3. 尽量利用图数据库的特性来优化查询效率
- 4. 使用WITH子句和COLLECT函数收集标签,确保查询到同时拥有所有标签的人才
-
- 注意:请直接返回Cypher查询语句,无需任何其他文本。
-
- 以下是一个示例:
- 假设匹配的标签是 ['五星级酒店', '新开酒店经验', '总经理']
-
- 生成的Cypher查询语句应该是:
- MATCH (t:talent)-[:BELONGS_TO]->(dl:data_label)
- WHERE dl.name IN ['五星级酒店', '新开酒店经验', '总经理']
- WITH t, COLLECT(DISTINCT dl.name) AS labels
- WHERE size(labels) = 3
- RETURN t.pg_id as pg_id, t.name_zh as name_zh, t.name_en as name_en, t.mobile as mobile, t.email as email, t.updated_at as updated_at
- """
-
- # 调用Deepseek API生成Cypher脚本
- payload = {
- "model": "deepseek-chat",
- "messages": [
- {"role": "system", "content": "你是一个专业的Neo4j Cypher查询专家。"},
- {"role": "user", "content": cypher_prompt}
- ],
- "temperature": 0.1
- }
-
- logging.info("发送请求到Deepseek API生成Cypher脚本")
- response = requests.post(api_url, headers=headers, json=payload, timeout=30)
- response.raise_for_status()
-
- # 解析API响应
- result = response.json()
- cypher_script = result.get("choices", [{}])[0].get("message", {}).get("content", "")
-
- # 清理Cypher脚本,移除不必要的markdown格式或注释
- cypher_script = cypher_script.strip()
- if cypher_script.startswith("```cypher"):
- cypher_script = cypher_script[9:]
- elif cypher_script.startswith("```"):
- cypher_script = cypher_script[3:]
- if cypher_script.endswith("```"):
- cypher_script = cypher_script[:-3]
- cypher_script = cypher_script.strip()
-
- logging.info(f"生成的Cypher脚本: {cypher_script}")
-
- # 步骤4: 执行Cypher脚本
- logging.info("第四步:执行Cypher脚本并返回结果")
- with neo4j_driver.get_session() as session:
- result = session.run(cypher_script)
- records = [record.data() for record in result]
-
- # 构建查询结果
- response_data = {
- 'code': 200,
- 'success': True,
- 'message': '查询成功执行',
- 'query': cypher_script,
- 'matched_labels': matched_labels,
- 'data': records
- }
-
- return response_data
-
- except requests.exceptions.HTTPError as e:
- error_msg = f"调用Deepseek API失败: {str(e)}"
- logging.error(error_msg)
- if hasattr(e, 'response') and e.response:
- logging.error(f"错误状态码: {e.response.status_code}")
- logging.error(f"错误内容: {e.response.text}")
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- except Exception as e:
- error_msg = f"查询Neo4j图数据库失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def talent_get_tags(talent_id):
- """
- 根据talent ID获取人才节点关联的标签
-
- Args:
- talent_id (int): 人才节点pg_id
-
- Returns:
- dict: 包含人才ID和关联标签的字典,JSON格式
- """
- try:
- # 导入必要的模块
- from app.services.neo4j_driver import neo4j_driver
-
- # 准备查询返回数据
- response_data = {
- 'code': 200,
- 'success': True,
- 'message': '获取人才标签成功',
- 'data': []
- }
-
- # 构建Cypher查询语句,获取人才节点关联的标签
- cypher_query = """
- MATCH (t:talent)-[r:BELONGS_TO]->(tag:data_label)
- WHERE t.pg_id = $talent_id
- RETURN t.pg_id as talent_id, tag.name as tag_name
- """
-
- # 执行查询
- with neo4j_driver.get_session() as session:
- result = session.run(cypher_query, talent_id=int(talent_id))
- records = list(result)
-
- # 如果没有查询到标签,返回空数组
- if not records:
- response_data['message'] = f'人才pg_id {talent_id} 没有关联的标签'
- return response_data
-
- # 处理查询结果
- for record in records:
- talent_tag = {
- 'talent': record['talent_id'],
- 'tag': record['tag_name']
- }
- response_data['data'].append(talent_tag)
-
- return response_data
-
- except Exception as e:
- error_msg = f"获取人才标签失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def talent_update_tags(data):
- """
- 根据传入的JSON数据为人才节点创建与标签的BELONGS_TO关系
-
- Args:
- data (list): 包含talent和tag字段的对象列表
- 例如: [
- {"talent": 12345, "tag": "市场营销"},
- {"talent": 12345, "tag": "酒店管理"}
- ]
-
- Returns:
- dict: 操作结果和状态信息
- """
- try:
- # 导入必要的模块
- from app.services.neo4j_driver import neo4j_driver
-
- # 验证输入参数
- if not isinstance(data, list):
- return {
- 'code': 400,
- 'success': False,
- 'message': '参数格式错误,需要JSON数组',
- 'data': None
- }
-
- if len(data) == 0:
- return {
- 'code': 400,
- 'success': False,
- 'message': '数据列表为空',
- 'data': None
- }
-
- # 获取当前时间
- current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 成功和失败计数
- success_count = 0
- failed_items = []
-
- # 按talent分组处理数据
- talent_tags = {}
- for item in data:
- # 验证每个项目的格式
- if not isinstance(item, dict) or 'talent' not in item or 'tag' not in item:
- failed_items.append(item)
- continue
-
- talent_id = item.get('talent')
- tag_name = item.get('tag')
-
- # 验证talent_id和tag_name的值
- if not talent_id or not tag_name or not isinstance(tag_name, str):
- failed_items.append(item)
- continue
-
- # 按talent_id分组
- if talent_id not in talent_tags:
- talent_tags[talent_id] = []
-
- talent_tags[talent_id].append(tag_name)
-
- with neo4j_driver.get_session() as session:
- # 处理每个talent及其标签
- for talent_id, tags in talent_tags.items():
- # 首先验证talent节点是否存在
- check_talent_query = """
- MATCH (t:talent)
- WHERE t.pg_id = $talent_id
- RETURN t
- """
- talent_result = session.run(check_talent_query, talent_id=int(talent_id))
- if not talent_result.single():
- # 该talent不存在,记录失败项并继续下一个talent
- for tag in tags:
- failed_items.append({'talent_pg_id': talent_id, 'tag': tag})
- continue
-
- # 首先清除所有现有的BELONGS_TO关系
- clear_relations_query = """
- MATCH (t:talent)-[r:BELONGS_TO]->(:data_label)
- WHERE t.pg_id = $talent_id
- DELETE r
- RETURN count(r) as deleted_count
- """
- clear_result = session.run(clear_relations_query, talent_id=int(talent_id))
- deleted_count = clear_result.single()['deleted_count']
- logging.info(f"已删除talent_id={talent_id}的{deleted_count}个已有标签关系")
-
- # 处理每个标签
- for tag_name in tags:
- try:
- # 1. 查找或创建标签节点
- # 先查找是否存在该标签
- find_tag_query = """
- MATCH (tag:data_label)
- WHERE tag.name = $tag_name
- RETURN id(tag) as tag_id
- """
- tag_result = session.run(find_tag_query, tag_name=tag_name)
- tag_record = tag_result.single()
-
- if tag_record:
- tag_id = tag_record['tag_id']
- else:
- # 创建新标签
- create_tag_query = """
- CREATE (tag:data_label {name: $name, category: $category, updated_at: $updated_at})
- RETURN id(tag) as tag_id
- """
- tag_result = session.run(
- create_tag_query,
- name=tag_name,
- category='talent',
- updated_at=current_time
- )
- tag_record = tag_result.single()
- tag_id = tag_record['tag_id']
-
- # 2. 创建人才与标签的BELONGS_TO关系
- create_relation_query = """
- MATCH (t:talent), (tag:data_label)
- WHERE t.pg_id = $talent_id AND tag.name = $tag_name
- CREATE (t)-[r:BELONGS_TO]->(tag)
- SET r.created_at = $current_time
- RETURN r
- """
-
- relation_result = session.run(
- create_relation_query,
- talent_id=int(talent_id),
- tag_name=tag_name,
- current_time=current_time
- )
-
- if relation_result.single():
- success_count += 1
- else:
- failed_items.append({'talent_pg_id': talent_id, 'tag': tag_name})
-
- except Exception as tag_error:
- logging.error(f"为标签 {tag_name} 创建关系时出错: {str(tag_error)}")
- failed_items.append({'talent_pg_id': talent_id, 'tag': tag_name})
-
- # 返回结果
- total_items = len(data)
- if success_count == total_items:
- return {
- 'code': 200,
- 'success': True,
- 'message': f'成功创建或更新了 {success_count} 个标签关系',
- 'data': {
- 'success_count': success_count,
- 'total_count': total_items,
- 'failed_items': []
- }
- }
- elif success_count > 0:
- return {
- 'code': 206, # Partial Content
- 'success': True,
- 'message': f'部分成功: 创建或更新了 {success_count}/{total_items} 个标签关系',
- 'data': {
- 'success_count': success_count,
- 'total_count': total_items,
- 'failed_items': failed_items
- }
- }
- else:
- return {
- 'code': 500,
- 'success': False,
- 'message': '无法创建任何标签关系',
- 'data': {
- 'success_count': 0,
- 'total_count': total_items,
- 'failed_items': failed_items
- }
- }
-
- except Exception as e:
- error_msg = f"更新人才标签关系失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def get_business_card(card_id):
- """
- 根据ID从PostgreSQL数据库中获取名片记录
-
- Args:
- card_id (int): 名片记录ID
-
- Returns:
- dict: 包含操作结果和名片信息的字典
- """
- try:
- # 查询指定ID的名片记录
- card = BusinessCard.query.get(card_id)
-
- if not card:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{card_id}的名片记录',
- 'data': None
- }
-
- # 将记录转换为字典格式返回
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取名片记录成功',
- 'data': card.to_dict()
- }
-
- except Exception as e:
- error_msg = f"获取名片记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
|