123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577 |
- from typing import Dict, Any
- from app import db
- from datetime import datetime
- import os
- import boto3
- from botocore.config import Config
- import logging
- import requests
- import json
- import re
- import uuid
- from PIL import Image
- from io import BytesIO
- import pytesseract
- import base64
- from openai import OpenAI
- from app.config.config import DevelopmentConfig, ProductionConfig
- # 测试用的解析数据接口。没有实际使用。
- def parse_data(data: Dict[str, Any]) -> Dict[str, Any]:
- """
- 解析数据的主函数
-
- Args:
- data: 要解析的数据
-
- Returns:
- 解析后的数据
- """
- # TODO: 实现数据解析逻辑
- return {
- 'code': 200,
- 'status': 'success',
- 'message': 'Data parsed successfully',
- 'data': data
- }
- # 名片解析数据模型
- class BusinessCard(db.Model):
- __tablename__ = 'business_cards'
-
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- name_zh = db.Column(db.String(100), nullable=False)
- name_en = db.Column(db.String(100))
- title_zh = db.Column(db.String(100))
- title_en = db.Column(db.String(100))
- mobile = db.Column(db.String(50))
- phone = db.Column(db.String(50))
- email = db.Column(db.String(100))
- hotel_zh = db.Column(db.String(200))
- hotel_en = db.Column(db.String(200))
- address_zh = db.Column(db.Text)
- address_en = db.Column(db.Text)
- postal_code_zh = db.Column(db.String(20))
- postal_code_en = db.Column(db.String(20))
- brand_zh = db.Column(db.String(100))
- brand_en = db.Column(db.String(100))
- affiliation_zh = db.Column(db.String(200))
- affiliation_en = db.Column(db.String(200))
- image_path = db.Column(db.String(255)) # MinIO中存储的路径
- career_path = db.Column(db.JSON) # 职业轨迹,JSON格式
- brand_group = db.Column(db.String(200)) # 品牌组合
- created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)
- updated_at = db.Column(db.DateTime, onupdate=datetime.now)
- updated_by = db.Column(db.String(50))
- status = db.Column(db.String(20), default='active')
-
- def to_dict(self):
- return {
- 'id': self.id,
- 'name_zh': self.name_zh,
- 'name_en': self.name_en,
- 'title_zh': self.title_zh,
- 'title_en': self.title_en,
- 'mobile': self.mobile,
- 'phone': self.phone,
- 'email': self.email,
- 'hotel_zh': self.hotel_zh,
- 'hotel_en': self.hotel_en,
- 'address_zh': self.address_zh,
- 'address_en': self.address_en,
- 'postal_code_zh': self.postal_code_zh,
- 'postal_code_en': self.postal_code_en,
- 'brand_zh': self.brand_zh,
- 'brand_en': self.brand_en,
- 'affiliation_zh': self.affiliation_zh,
- 'affiliation_en': self.affiliation_en,
- 'image_path': self.image_path,
- 'career_path': self.career_path,
- 'brand_group': self.brand_group,
- 'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
- 'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
- 'updated_by': self.updated_by,
- 'status': self.status
- }
- # 名片解析功能模块
- # DeepSeek API配置
- DEEPSEEK_API_KEY = os.environ.get('DEEPSEEK_API_KEY', 'sk-2aea6e8b159b448aa3c1e29acd6f4349')
- DEEPSEEK_API_URL = os.environ.get('DEEPSEEK_API_URL', 'https://api.deepseek.com/v1/chat/completions')
- # 备用API端点
- DEEPSEEK_API_URL_BACKUP = 'https://api.deepseek.com/v1/completions'
- # OCR配置
- # 设置pytesseract路径(如果需要)
- # pytesseract.pytesseract.tesseract_cmd = r'/path/to/tesseract'
- # OCR语言设置,支持多语言
- OCR_LANG = os.environ.get('OCR_LANG', 'chi_sim+eng')
- # 根据环境选择配置
- """
- if os.environ.get('FLASK_ENV') == 'production':
- config = ProductionConfig()
- else:
- config = DevelopmentConfig()
- """
- # 使用配置变量,缺省认为在生产环境运行
- config = ProductionConfig()
- # 使用配置变量
- minio_url = f"{'https' if config.MINIO_SECURE else 'http'}://{config.MINIO_HOST}"
- minio_access_key = config.MINIO_USER
- minio_secret_key = config.MINIO_PASSWORD
- minio_bucket = config.MINIO_BUCKET
- use_ssl = config.MINIO_SECURE
- def get_minio_client():
- """获取MinIO客户端连接"""
- try:
- # 使用全局配置变量
- global minio_url, minio_access_key, minio_secret_key, minio_bucket, use_ssl
-
- logging.info(f"尝试连接MinIO服务器: {minio_url}")
-
- minio_client = boto3.client(
- 's3',
- endpoint_url=minio_url,
- aws_access_key_id=minio_access_key,
- aws_secret_access_key=minio_secret_key,
- config=Config(
- signature_version='s3v4',
- retries={'max_attempts': 3, 'mode': 'standard'},
- connect_timeout=10,
- read_timeout=30
- )
- )
-
- # 确保存储桶存在
- buckets = minio_client.list_buckets()
- bucket_names = [bucket['Name'] for bucket in buckets.get('Buckets', [])]
- logging.info(f"成功连接到MinIO服务器,现有存储桶: {bucket_names}")
-
- if minio_bucket not in bucket_names:
- logging.info(f"创建存储桶: {minio_bucket}")
- minio_client.create_bucket(Bucket=minio_bucket)
-
- return minio_client
- except Exception as e:
- logging.error(f"MinIO连接错误: {str(e)}")
- return None
- def extract_text_from_image(image_data):
- """
- 使用OCR从图像中提取文本,然后通过DeepSeek API解析名片信息
-
- Args:
- image_data (bytes): 图像的二进制数据
-
- Returns:
- dict: 提取的信息(姓名、职位、公司等)
-
- Raises:
- Exception: 当OCR或API调用失败或配置错误时抛出异常
- """
- try:
- # 步骤1: 使用OCR从图像中提取文本
- ocr_text = ocr_extract_text(image_data)
- if not ocr_text or ocr_text.strip() == "":
- error_msg = "OCR无法从图像中提取文本"
- logging.error(error_msg)
- raise Exception(error_msg)
-
- logging.info(f"OCR提取的文本: {ocr_text[:200]}..." if len(ocr_text) > 200 else ocr_text)
-
- # 步骤2: 使用DeepSeek API解析文本中的信息
- return parse_text_with_deepseek(ocr_text)
-
- except Exception as e:
- error_msg = f"从图像中提取和解析文本失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def ocr_extract_text(image_data):
- """
- 使用OCR从图像中提取文本
-
- Args:
- image_data (bytes): 图像的二进制数据
-
- Returns:
- str: 提取的文本
- """
- try:
- # 将二进制数据转换为PIL图像
- image = Image.open(BytesIO(image_data))
-
- # 使用pytesseract进行OCR文本提取
- text = pytesseract.image_to_string(image, lang=OCR_LANG)
-
- # 清理提取的文本
- text = text.strip()
- logging.info(f"OCR成功从图像中提取文本,长度: {len(text)}")
- print(text)
-
- return text
- except Exception as e:
- error_msg = f"OCR提取文本失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def parse_text_with_deepseek(text):
- """
- 使用DeepSeek API解析文本中的名片信息
-
- Args:
- text (str): 要解析的文本
-
- Returns:
- dict: 解析的名片信息
- """
- # 准备请求DeepSeek API
- if not DEEPSEEK_API_KEY:
- error_msg = "未配置DeepSeek API密钥"
- logging.error(error_msg)
- raise Exception(error_msg)
-
- # 构建API请求的基本信息
- headers = {
- "Authorization": f"Bearer {DEEPSEEK_API_KEY}",
- "Content-Type": "application/json"
- }
-
- # 构建提示语,包含OCR提取的文本
- prompt = f"""请从以下名片文本中提取详细信息,需分别识别中英文内容。
- 以JSON格式返回,包含以下字段:
- - name_zh: 中文姓名
- - name_en: 英文姓名
- - title_zh: 中文职位/头衔
- - title_en: 英文职位/头衔
- - hotel_zh: 中文酒店/公司名称
- - hotel_en: 英文酒店/公司名称
- - mobile: 手机号码
- - phone: 固定电话
- - email: 电子邮箱
- - address_zh: 中文地址
- - address_en: 英文地址
- - brand_group: 品牌组合(如有多个品牌,以逗号分隔)
- - career_path: 职业轨迹(如果能从文本中推断出,以JSON数组格式返回,包含公司名称和职位)
- 名片文本:
- {text}
- """
-
- # 使用模型名称
- model_name = 'deepseek-chat'
-
- try:
- # 尝试调用DeepSeek API
- logging.info(f"尝试通过DeepSeek API解析文本")
- payload = {
- "model": model_name,
- "messages": [
- {"role": "system", "content": "你是一个专业的名片信息提取助手。请用JSON格式返回结果,不要有多余的文字说明。"},
- {"role": "user", "content": prompt}
- ],
- "temperature": 0.1
- }
-
- logging.info(f"向DeepSeek API发送请求")
- response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload, timeout=30)
-
- # 检查响应状态
- response.raise_for_status()
-
- # 解析API响应
- result = response.json()
- content = result.get("choices", [{}])[0].get("message", {}).get("content", "{}")
-
- # 尝试解析JSON内容
- try:
- # 找到内容中的JSON部分(有时模型会在JSON前后添加额外文本)
- json_content = extract_json_from_text(content)
- extracted_data = json.loads(json_content)
- logging.info(f"成功解析DeepSeek API返回的JSON")
- except json.JSONDecodeError:
- logging.warning(f"无法解析JSON,尝试直接从文本提取信息")
- # 如果无法解析JSON,尝试直接从文本中提取关键信息
- extracted_data = extract_fields_from_text(content)
-
- # 确保所有必要的字段都存在
- required_fields = ['name', 'title', 'company', 'phone', 'email', 'address', 'brand_group', 'career_path']
- for field in required_fields:
- if field not in extracted_data:
- extracted_data[field] = "" if field != 'career_path' else []
-
- logging.info(f"成功从DeepSeek API获取解析结果")
- return extracted_data
-
- except requests.exceptions.HTTPError as e:
- error_msg = f"DeepSeek API调用失败: {str(e)}"
- logging.error(error_msg)
-
- if hasattr(e, 'response') and e.response:
- logging.error(f"错误状态码: {e.response.status_code}")
- logging.error(f"错误内容: {e.response.text}")
-
- raise Exception(error_msg)
- except Exception as e:
- error_msg = f"解析文本过程中发生错误: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def extract_json_from_text(text):
- """
- 从文本中提取JSON部分
-
- Args:
- text (str): 包含JSON的文本
-
- Returns:
- str: 提取的JSON字符串
- """
- # 尝试找到最外层的花括号对
- start_idx = text.find('{')
- if start_idx == -1:
- return "{}"
-
- # 使用简单的括号匹配算法找到对应的闭合括号
- count = 0
- for i in range(start_idx, len(text)):
- if text[i] == '{':
- count += 1
- elif text[i] == '}':
- count -= 1
- if count == 0:
- return text[start_idx:i+1]
-
- # 如果没有找到闭合括号,返回从开始位置到文本结尾
- return text[start_idx:]
- def extract_fields_from_text(text):
- """
- 从文本中直接提取名片字段信息
-
- Args:
- text (str): 要分析的文本
-
- Returns:
- dict: 提取的字段
- """
- # 初始化结果字典
- result = {
- 'name_zh': '',
- 'name_en': '',
- 'title_zh': '',
- 'title_en': '',
- 'mobile': '',
- 'phone': '',
- 'email': '',
- 'hotel_zh': '',
- 'hotel_en': '',
- 'address_zh': '',
- 'address_en': '',
- 'postal_code_zh': '',
- 'postal_code_en': '',
- 'brand_zh': '',
- 'brand_en': '',
- 'affiliation_zh': '',
- 'affiliation_en': ''
- }
-
- # 提取中文姓名
- name_zh_match = re.search(r'["\'](姓名)["\'][\s\{:]*["\']?(中文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if name_zh_match:
- result['name_zh'] = name_zh_match.group(3)
-
- # 提取英文姓名
- name_en_match = re.search(r'["\'](姓名)["\'][\s\{:]*["\']?(英文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if name_en_match:
- result['name_en'] = name_en_match.group(3)
-
- # 提取中文头衔
- title_zh_match = re.search(r'["\'](头衔|职位)["\'][\s\{:]*["\']?(中文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if title_zh_match:
- result['title_zh'] = title_zh_match.group(3)
-
- # 提取英文头衔
- title_en_match = re.search(r'["\'](头衔|职位)["\'][\s\{:]*["\']?(英文)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if title_en_match:
- result['title_en'] = title_en_match.group(3)
-
- # 提取手机
- mobile_match = re.search(r'["\'](手机)["\'][\s:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if mobile_match:
- result['mobile'] = mobile_match.group(2)
-
- # 提取电话
- phone_match = re.search(r'["\'](电话)["\'][\s:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if phone_match:
- result['phone'] = phone_match.group(2)
-
- # 提取邮箱
- email_match = re.search(r'["\'](邮箱)["\'][\s:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if email_match:
- result['email'] = email_match.group(2)
-
- # 提取中文酒店名称
- hotel_zh_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(酒店名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if hotel_zh_match:
- result['hotel_zh'] = hotel_zh_match.group(4)
-
- # 提取英文酒店名称
- hotel_en_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(酒店名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if hotel_en_match:
- result['hotel_en'] = hotel_en_match.group(4)
-
- # 提取中文详细地址
- address_zh_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(详细地址)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if address_zh_match:
- result['address_zh'] = address_zh_match.group(4)
-
- # 提取英文详细地址
- address_en_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(详细地址)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if address_en_match:
- result['address_en'] = address_en_match.group(4)
-
- # 提取中文邮政编码
- postal_code_zh_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(邮政编码)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if postal_code_zh_match:
- result['postal_code_zh'] = postal_code_zh_match.group(4)
-
- # 提取英文邮政编码
- postal_code_en_match = re.search(r'["\'](地址)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(邮政编码)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if postal_code_en_match:
- result['postal_code_en'] = postal_code_en_match.group(4)
-
- # 提取中文品牌名称
- brand_zh_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(品牌名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if brand_zh_match:
- result['brand_zh'] = brand_zh_match.group(4)
-
- # 提取英文品牌名称
- brand_en_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(品牌名称)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if brand_en_match:
- result['brand_en'] = brand_en_match.group(4)
-
- # 提取中文隶属关系
- affiliation_zh_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(中文)["\']?[\s\{:]*["\']?(隶属关系)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if affiliation_zh_match:
- result['affiliation_zh'] = affiliation_zh_match.group(4)
-
- # 提取英文隶属关系
- affiliation_en_match = re.search(r'["\'](公司)["\'][\s\{:]*["\']?(英文)["\']?[\s\{:]*["\']?(隶属关系)["\']?[\s\}:]*["\']([^"\']+)["\']', text, re.IGNORECASE)
- if affiliation_en_match:
- result['affiliation_en'] = affiliation_en_match.group(4)
-
- return result
- def parse_text_with_qwen25VLplus(image_data):
- """
- 使用阿里云的 Qwen VL Max 模型解析图像中的名片信息
-
- Args:
- image_data (bytes): 图像的二进制数据
-
- Returns:
- dict: 解析的名片信息
- """
- # 阿里云 Qwen API 配置
- QWEN_API_KEY = os.environ.get('QWEN_API_KEY', 'sk-8f2320dafc9e4076968accdd8eebd8e9')
-
- try:
- # 将图片数据转为 base64 编码
- base64_image = base64.b64encode(image_data).decode('utf-8')
-
- # 初始化 OpenAI 客户端,配置为阿里云 API
- client = OpenAI(
- api_key=QWEN_API_KEY,
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- )
-
- # 构建优化后的提示语
- prompt = """你是企业名片的信息提取专家。请仔细分析提供的图片,精确提取名片信息。
- ## 提取要求
- - 区分中英文内容,分别提取
- - 保持提取信息的原始格式(如大小写、标点)
- - 对于无法识别或名片中不存在的信息,返回空字符串
- - 名片中没有的信息,请不要猜测
- ## 需提取的字段
- 1. 中文姓名 (name_zh)
- 2. 英文姓名 (name_en)
- 3. 中文职位/头衔 (title_zh)
- 4. 英文职位/头衔 (title_en)
- 5. 中文酒店/公司名称 (hotel_zh)
- 6. 英文酒店/公司名称 (hotel_en)
- 7. 手机号码 (mobile) - 如有多个,使用逗号分隔
- 8. 固定电话 (phone) - 如有多个,使用逗号分隔
- 9. 电子邮箱 (email)
- 10. 中文地址 (address_zh)
- 11. 英文地址 (address_en)
- 12. 中文邮政编码 (postal_code_zh)
- 13. 英文邮政编码 (postal_code_en)
- 14. 品牌组合 (brand_group) - 如有多个品牌,使用逗号分隔
- 15. 职业轨迹 (career_path) - 如能从名片中推断,以JSON数组格式返回,包含当前日期,公司名称和职位。自动生成当前日期。
- 16. 隶属关系 (affiliation) - 如能从名片中推断,以JSON数组格式返回,包含公司名称和隶属集团名称
- ## 输出格式
- 请以严格的JSON格式返回结果,不要添加任何额外解释文字。JSON格式如下:
- ```json
- {
- "name_zh": "",
- "name_en": "",
- "title_zh": "",
- "title_en": "",
- "hotel_zh": "",
- "hotel_en": "",
- "mobile": "",
- "phone": "",
- "email": "",
- "address_zh": "",
- "address_en": "",
- "postal_code_zh": "",
- "postal_code_en": "",
- "brand_group": "",
- "career_path": [],
- "affiliation": []
- }
- ```"""
-
- # 调用 Qwen VL Max API
- logging.info("发送请求到 Qwen VL Max 模型")
- completion = client.chat.completions.create(
- # model="qwen-vl-plus",
- model="qwen-vl-max-latest",
- messages=[
- {
- "role": "user",
- "content": [
- {"type": "text", "text": prompt},
- {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
- ]
- }
- ],
- temperature=0.1, # 降低温度增加精确性
- response_format={"type": "json_object"} # 要求输出JSON格式
- )
-
- # 解析响应
- response_content = completion.choices[0].message.content
- logging.info(f"成功从 Qwen 模型获取响应: {response_content}")
-
- # 尝试从响应中提取 JSON
- try:
- json_content = extract_json_from_text(response_content)
- extracted_data = json.loads(json_content)
- logging.info("成功解析 Qwen 响应中的 JSON")
- except json.JSONDecodeError:
- logging.warning("无法解析 JSON,尝试从文本中提取信息")
- extracted_data = extract_fields_from_text(response_content)
-
- # 确保所有必要字段存在
- required_fields = [
- 'name_zh', 'name_en', 'title_zh', 'title_en',
- 'hotel_zh', 'hotel_en', 'mobile', 'phone',
- 'email', 'address_zh', 'address_en',
- 'postal_code_zh', 'postal_code_en', 'brand_group', 'career_path'
- ]
-
- for field in required_fields:
- if field not in extracted_data:
- extracted_data[field] = [] if field == 'career_path' else ""
-
- return extracted_data
-
- except Exception as e:
- error_msg = f"Qwen VL Max 模型解析失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def process_business_card(image_file):
- """
- 处理名片图片并提取信息
-
- Args:
- image_file (FileStorage): 上传的名片图片文件
-
- Returns:
- dict: 处理结果,包含提取的信息和状态
- """
- minio_path = None
-
- try:
- # 读取图片数据
- image_data = image_file.read()
- image_file.seek(0) # 重置文件指针以便后续读取
-
- try:
- # 优先使用 Qwen 2.5 VL Plus 模型直接从图像提取信息
- try:
- logging.info("尝试使用 Qwen 2.5 VL Plus 模型解析名片")
- extracted_data = parse_text_with_qwen25VLplus(image_data)
- logging.info("成功使用 Qwen 2.5 VL Plus 模型解析名片")
- except Exception as qwen_error:
- logging.warning(f"Qwen 模型解析失败,错误原因: {str(qwen_error)}")
- # extracted_data = extract_text_from_image(image_data)
- except Exception as e:
- return {
- 'code': 500,
- 'success': False,
- 'message': f"名片解析失败: {str(e)}",
- 'data': None
- }
-
- try:
- # 生成唯一的文件名
- file_ext = os.path.splitext(image_file.filename)[1].lower()
- if not file_ext:
- file_ext = '.jpg' # 默认扩展名
-
- unique_filename = f"{uuid.uuid4().hex}{file_ext}"
- minio_path = f"{unique_filename}"
-
- # 尝试上传到MinIO
- minio_client = get_minio_client()
- if minio_client:
- try:
- # 上传文件
- logging.info(f"上传文件到MinIO: {minio_path}")
- minio_client.put_object(
- Bucket=minio_bucket,
- Key=minio_path,
- Body=image_file,
- ContentType=image_file.content_type
- )
- logging.info(f"图片已上传到MinIO: {minio_path}")
- except Exception as upload_err:
- logging.error(f"上传文件到MinIO时出错: {str(upload_err)}")
- # 即使上传失败,仍继续处理,但路径为None
- minio_path = None
- else:
- minio_path = None
- logging.warning("MinIO客户端未初始化,图片未上传")
- except Exception as e:
- logging.error(f"上传图片到MinIO失败: {str(e)}", exc_info=True)
- minio_path = None
-
- try:
- # 保存到数据库
- business_card = BusinessCard(
- name_zh=extracted_data.get('name_zh', ''),
- name_en=extracted_data.get('name_en', ''),
- title_zh=extracted_data.get('title_zh', ''),
- title_en=extracted_data.get('title_en', ''),
- mobile=extracted_data.get('mobile', ''),
- phone=extracted_data.get('phone', ''),
- email=extracted_data.get('email', ''),
- hotel_zh=extracted_data.get('hotel_zh', ''),
- hotel_en=extracted_data.get('hotel_en', ''),
- address_zh=extracted_data.get('address_zh', ''),
- address_en=extracted_data.get('address_en', ''),
- postal_code_zh=extracted_data.get('postal_code_zh', ''),
- postal_code_en=extracted_data.get('postal_code_en', ''),
- brand_zh=extracted_data.get('brand_zh', ''),
- brand_en=extracted_data.get('brand_en', ''),
- affiliation_zh=extracted_data.get('affiliation_zh', ''),
- affiliation_en=extracted_data.get('affiliation_en', ''),
- image_path=minio_path, # 存储相对路径
- career_path=extracted_data.get('career_path', []), # 添加职业轨迹
- brand_group=extracted_data.get('brand_group', ''), # 添加品牌组合
- status='active',
- updated_by='system'
- )
-
- db.session.add(business_card)
- db.session.commit()
-
- logging.info(f"名片信息已保存到数据库,ID: {business_card.id}")
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '名片解析成功',
- 'data': business_card.to_dict()
- }
- except Exception as e:
- db.session.rollback()
- error_msg = f"保存名片信息到数据库失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- # 即使数据库操作失败,仍返回提取的信息
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': {
- 'id': None,
- 'name_zh': extracted_data.get('name_zh', ''),
- 'name_en': extracted_data.get('name_en', ''),
- 'title_zh': extracted_data.get('title_zh', ''),
- 'title_en': extracted_data.get('title_en', ''),
- 'mobile': extracted_data.get('mobile', ''),
- 'phone': extracted_data.get('phone', ''),
- 'email': extracted_data.get('email', ''),
- 'hotel_zh': extracted_data.get('hotel_zh', ''),
- 'hotel_en': extracted_data.get('hotel_en', ''),
- 'address_zh': extracted_data.get('address_zh', ''),
- 'address_en': extracted_data.get('address_en', ''),
- 'postal_code_zh': extracted_data.get('postal_code_zh', ''),
- 'postal_code_en': extracted_data.get('postal_code_en', ''),
- 'brand_zh': extracted_data.get('brand_zh', ''),
- 'brand_en': extracted_data.get('brand_en', ''),
- 'affiliation_zh': extracted_data.get('affiliation_zh', ''),
- 'affiliation_en': extracted_data.get('affiliation_en', ''),
- 'image_path': minio_path, # 返回相对路径
- 'career_path': extracted_data.get('career_path', []), # 添加职业轨迹
- 'brand_group': extracted_data.get('brand_group', ''), # 添加品牌组合
- 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'updated_at': None,
- 'updated_by': 'system',
- 'status': 'active'
- }
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"名片处理失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def update_business_card(card_id, data):
- """
- 更新名片信息
-
- Args:
- card_id (int): 名片记录ID
- data (dict): 包含要更新的字段的字典
-
- Returns:
- dict: 包含操作结果和更新后的名片信息
- """
- try:
- # 查找要更新的名片记录
- card = BusinessCard.query.get(card_id)
-
- if not card:
- return {
- 'code': 500,
- 'success': False,
- 'message': f'未找到ID为{card_id}的名片记录',
- 'data': None
- }
-
- # 更新名片信息
- card.name_zh = data.get('name_zh', card.name_zh)
- card.name_en = data.get('name_en', card.name_en)
- card.title_zh = data.get('title_zh', card.title_zh)
- card.title_en = data.get('title_en', card.title_en)
- card.mobile = data.get('mobile', card.mobile)
- card.phone = data.get('phone', card.phone)
- card.email = data.get('email', card.email)
- card.hotel_zh = data.get('hotel_zh', card.hotel_zh)
- card.hotel_en = data.get('hotel_en', card.hotel_en)
- card.address_zh = data.get('address_zh', card.address_zh)
- card.address_en = data.get('address_en', card.address_en)
- card.postal_code_zh = data.get('postal_code_zh', card.postal_code_zh)
- card.postal_code_en = data.get('postal_code_en', card.postal_code_en)
- card.brand_zh = data.get('brand_zh', card.brand_zh)
- card.brand_en = data.get('brand_en', card.brand_en)
- card.affiliation_zh = data.get('affiliation_zh', card.affiliation_zh)
- card.affiliation_en = data.get('affiliation_en', card.affiliation_en)
- card.career_path = data.get('career_path', card.career_path) # 更新职业轨迹
- card.brand_group = data.get('brand_group', card.brand_group) # 更新品牌组合
- card.updated_by = data.get('updated_by', 'user') # 可以根据实际情况修改为当前用户
-
- # 保存更新
- db.session.commit()
-
- # 更新成功后,更新Neo4j图数据库中的人才-酒店关系
- try:
- from app.services.neo4j_driver import neo4j_driver
- from app.core.graph.graph_operations import create_or_get_node
-
- # 获取当前时间
- current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 创建或更新人才节点
- talent_properties = {
- 'pg_id': card_id, # PostgreSQL数据库中的ID
- 'name_zh': card.name_zh, # 中文姓名
- 'name_en': card.name_en, # 英文姓名
- 'mobile': card.mobile, # 手机号码
- 'email': card.email, # 电子邮箱
- 'updated_at': current_time # 更新时间
- }
-
- talent_node_id = create_or_get_node('talent', **talent_properties)
-
- # 如果有酒店信息,创建或更新酒店节点
- if card.hotel_zh or card.hotel_en:
- hotel_properties = {
- 'hotel_zh': card.hotel_zh, # 酒店中文名称
- 'hotel_en': card.hotel_en, # 酒店英文名称
- 'updated_at': current_time # 更新时间
- }
-
- hotel_node_id = create_or_get_node('hotel', **hotel_properties)
-
- # 创建或更新人才与酒店之间的WORK_FOR关系
- if talent_node_id and hotel_node_id:
- # 构建Cypher查询以创建或更新关系
- cypher_query = """
- MATCH (t:talent), (h:hotel)
- WHERE id(t) = $talent_id AND id(h) = $hotel_id
- MERGE (t)-[r:WORKS_FOR]->(h)
- SET r.title_zh = $title_zh,
- r.title_en = $title_en,
- r.updated_at = $updated_at
- RETURN r
- """
-
- with neo4j_driver.get_session() as session:
- session.run(
- cypher_query,
- talent_id=talent_node_id,
- hotel_id=hotel_node_id,
- title_zh=card.title_zh,
- title_en=card.title_en,
- updated_at=current_time
- )
-
- logging.info(f"已成功更新人才(ID:{talent_node_id})与酒店(ID:{hotel_node_id})的WORK_FOR关系")
-
- logging.info(f"Neo4j图数据库关系更新成功")
- except Exception as e:
- logging.error(f"更新Neo4j图数据库关系失败: {str(e)}", exc_info=True)
- # 不因为图数据库更新失败而影响PostgreSQL数据库的更新结果
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '名片信息已更新',
- 'data': card.to_dict()
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"更新名片信息失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def get_business_cards():
- """
- 获取所有名片记录列表
-
- Returns:
- dict: 包含操作结果和名片列表
- """
- try:
- # 查询所有名片记录
- cards = BusinessCard.query.all()
-
- # 将所有记录转换为字典格式
- cards_data = [card.to_dict() for card in cards]
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取名片列表成功',
- 'data': cards_data
- }
-
- except Exception as e:
- error_msg = f"获取名片列表失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def update_business_card_status(card_id, status):
- """
- 更新名片状态(激活/禁用)
-
- Args:
- card_id (int): 名片记录ID
- status (str): 新状态,'active'或'inactive'
-
- Returns:
- dict: 包含操作结果和更新后的名片信息
- """
- try:
- # 查找要更新的名片记录
- card = BusinessCard.query.get(card_id)
-
- if not card:
- return {
- 'code': 500,
- 'success': False,
- 'message': f'未找到ID为{card_id}的名片记录',
- 'data': None
- }
-
- # 验证状态值
- if status not in ['active', 'inactive']:
- return {
- 'code': 500,
- 'success': False,
- 'message': f'无效的状态值: {status},必须为 active 或 inactive',
- 'data': None
- }
-
- # 更新状态
- card.status = status
- card.updated_at = datetime.now()
- card.updated_by = 'system' # 可以根据实际情况修改为当前用户
-
- # 保存更新
- db.session.commit()
-
- return {
- 'code': 200,
- 'success': True,
- 'message': f'名片状态已更新为: {status}',
- 'data': card.to_dict()
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"更新名片状态失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def create_talent_tag(tag_data):
- """
- 创建人才标签节点
-
- Args:
- tag_data: 包含标签信息的字典,包括:
- - name: 标签名称
- - category: 标签分类
- - description: 标签描述
- - status: 启用状态
-
- Returns:
- dict: 操作结果字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 验证必要参数存在
- if not tag_data or 'name' not in tag_data or not tag_data['name']:
- return {
- 'code': 400,
- 'success': False,
- 'message': '标签名称为必填项',
- 'data': None
- }
-
- # 准备节点属性
- tag_properties = {
- 'name': tag_data.get('name'),
- 'category': tag_data.get('category', '未分类'),
- 'describe': tag_data.get('description', ''), # 使用describe与现有系统保持一致
- 'status': tag_data.get('status', 'active'),
- 'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
-
- # 生成标签的英文名(可选)
- from app.core.graph.graph_operations import create_or_get_node
-
- # 如果提供了名称,尝试获取英文翻译
- if 'name' in tag_data and tag_data['name']:
- try:
- from app.api.data_interface.routes import translate_and_parse
- en_name = translate_and_parse(tag_data['name'])
- tag_properties['en_name'] = en_name[0] if en_name and isinstance(en_name, list) else ''
- except Exception as e:
- logging.warning(f"获取标签英文名失败: {str(e)}")
- tag_properties['en_name'] = ''
-
- # 创建节点
- node_id = create_or_get_node('data_label', **tag_properties)
-
- if node_id:
- return {
- 'code': 200,
- 'success': True,
- 'message': '人才标签创建成功',
- 'data': {
- 'id': node_id,
- **tag_properties
- }
- }
- else:
- return {
- 'code': 500,
- 'success': False,
- 'message': '人才标签创建失败',
- 'data': None
- }
-
- except Exception as e:
- logging.error(f"创建人才标签失败: {str(e)}", exc_info=True)
- return {
- 'code': 500,
- 'success': False,
- 'message': f'创建人才标签失败: {str(e)}',
- 'data': None
- }
- def get_talent_tag_list():
- """
- 从Neo4j图数据库获取人才标签列表
-
- Returns:
- dict: 包含操作结果和标签列表的字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 构建Cypher查询语句,获取分类为talent的标签
- query = """
- MATCH (n:data_label)
- WHERE n.category CONTAINS 'talent' OR n.category CONTAINS '人才'
- RETURN id(n) as id, n.name as name, n.en_name as en_name,
- n.category as category, n.describe as description,
- n.status as status, n.time as time
- ORDER BY n.time DESC
- """
-
- # 执行查询
- tags = []
- with neo4j_driver.get_session() as session:
- result = session.run(query)
-
- # 处理查询结果
- for record in result:
- tag = {
- 'id': record['id'],
- 'name': record['name'],
- 'en_name': record['en_name'],
- 'category': record['category'],
- 'description': record['description'],
- 'status': record['status'],
- 'time': record['time']
- }
- tags.append(tag)
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取人才标签列表成功',
- 'data': tags
- }
-
- except Exception as e:
- error_msg = f"获取人才标签列表失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def update_talent_tag(tag_id, tag_data):
- """
- 更新人才标签节点属性
-
- Args:
- tag_id: 标签节点ID
- tag_data: 包含更新信息的字典,可能包括:
- - name: 标签名称
- - category: 标签分类
- - description: 标签描述
- - status: 启用状态
-
- Returns:
- dict: 操作结果字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 准备要更新的属性
- update_properties = {}
-
- # 检查并添加需要更新的属性
- if 'name' in tag_data and tag_data['name']:
- update_properties['name'] = tag_data['name']
-
- # 如果名称更新了,尝试更新英文名称
- try:
- from app.api.data_interface.routes import translate_and_parse
- en_name = translate_and_parse(tag_data['name'])
- update_properties['en_name'] = en_name[0] if en_name and isinstance(en_name, list) else ''
- except Exception as e:
- logging.warning(f"更新标签英文名失败: {str(e)}")
-
- if 'category' in tag_data and tag_data['category']:
- update_properties['category'] = tag_data['category']
-
- if 'description' in tag_data:
- update_properties['describe'] = tag_data['description']
-
- if 'status' in tag_data:
- update_properties['status'] = tag_data['status']
-
- # 添加更新时间
- update_properties['time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 如果没有可更新的属性,返回错误
- if not update_properties:
- return {
- 'code': 400,
- 'success': False,
- 'message': '未提供任何可更新的属性',
- 'data': None
- }
-
- # 构建更新的Cypher查询
- set_clauses = []
- params = {'nodeId': tag_id}
-
- for key, value in update_properties.items():
- param_name = f"param_{key}"
- set_clauses.append(f"n.{key} = ${param_name}")
- params[param_name] = value
-
- set_clause = ", ".join(set_clauses)
-
- query = f"""
- MATCH (n:data_label)
- WHERE id(n) = $nodeId
- SET {set_clause}
- RETURN id(n) as id, n.name as name, n.en_name as en_name,
- n.category as category, n.describe as description,
- n.status as status, n.time as time
- """
-
- # 执行更新查询
- with neo4j_driver.get_session() as session:
- result = session.run(query, **params)
- record = result.single()
-
- if not record:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{tag_id}的标签',
- 'data': None
- }
-
- # 提取更新后的标签信息
- updated_tag = {
- 'id': record['id'],
- 'name': record['name'],
- 'en_name': record['en_name'],
- 'category': record['category'],
- 'description': record['description'],
- 'status': record['status'],
- 'time': record['time']
- }
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '人才标签更新成功',
- 'data': updated_tag
- }
-
- except Exception as e:
- error_msg = f"更新人才标签失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def delete_talent_tag(tag_id):
- """
- 删除人才标签节点及其相关关系
-
- Args:
- tag_id: 标签节点ID
-
- Returns:
- dict: 操作结果字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 首先获取要删除的标签信息,以便在成功后返回
- get_query = """
- MATCH (n:data_label)
- WHERE id(n) = $nodeId
- RETURN id(n) as id, n.name as name, n.en_name as en_name,
- n.category as category, n.describe as description,
- n.status as status, n.time as time
- """
-
- # 构建删除节点和关系的Cypher查询
- delete_query = """
- MATCH (n:data_label)
- WHERE id(n) = $nodeId
- OPTIONAL MATCH (n)-[r]-()
- DELETE r, n
- RETURN count(n) AS deleted
- """
-
- # 执行查询
- tag_info = None
- with neo4j_driver.get_session() as session:
- # 先获取标签信息
- result = session.run(get_query, nodeId=tag_id)
- record = result.single()
-
- if not record:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{tag_id}的标签',
- 'data': None
- }
-
- # 保存标签信息用于返回
- tag_info = {
- 'id': record['id'],
- 'name': record['name'],
- 'en_name': record['en_name'],
- 'category': record['category'],
- 'description': record['description'],
- 'status': record['status'],
- 'time': record['time']
- }
-
- # 执行删除操作
- delete_result = session.run(delete_query, nodeId=tag_id)
- deleted = delete_result.single()['deleted']
-
- if deleted > 0:
- return {
- 'code': 200,
- 'success': True,
- 'message': '人才标签删除成功',
- 'data': tag_info
- }
- else:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未能删除ID为{tag_id}的标签',
- 'data': None
- }
-
- except Exception as e:
- error_msg = f"删除人才标签失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def query_neo4j_graph(query_requirement):
- """
- 查询Neo4j图数据库,通过Deepseek API生成Cypher脚本
-
- Args:
- query_requirement (str): 查询需求描述
-
- Returns:
- dict: 包含查询结果的字典,JSON格式
- """
- try:
- # 导入必要的模块
- from app.services.neo4j_driver import neo4j_driver
- import requests
- import json
-
- # Deepseek API配置
- api_key = DEEPSEEK_API_KEY
- api_url = DEEPSEEK_API_URL
-
- # 步骤1: 从Neo4j获取所有标签列表
- logging.info("第一步:从Neo4j获取人才类别的标签列表")
- all_labels_query = """
- MATCH (dl:data_label)
- WHERE dl.category CONTAINS '人才' OR dl.category CONTAINS 'talent'
- RETURN dl.name as name
- """
-
- all_labels = []
- with neo4j_driver.get_session() as session:
- result = session.run(all_labels_query)
- for record in result:
- all_labels.append(record['name'])
-
- logging.info(f"获取到{len(all_labels)}个人才标签: {all_labels}")
-
- # 步骤2: 使用Deepseek判断查询需求中的关键信息与标签的对应关系
- logging.info("第二步:调用Deepseek API匹配查询需求与标签")
-
- # 构建所有标签的JSON字符串
- labels_json = json.dumps(all_labels, ensure_ascii=False)
-
- # 构建匹配标签的提示语
- matching_prompt = f"""
- 请分析以下查询需求,并从标签列表中找出与查询需求相关的标签。
-
- ## 查询需求
- {query_requirement}
-
- ## 可用标签列表
- {labels_json}
-
- ## 输出要求
- 1. 请以JSON数组格式返回匹配的标签名称列表,格式如: ["标签1", "标签2", "标签3"]
- 2. 只返回标签名称数组,不要包含任何解释或其他文本
- 3. 如果没有找到匹配的标签,请返回空数组 []
- """
-
- # 调用Deepseek API匹配标签
- headers = {
- "Authorization": f"Bearer {api_key}",
- "Content-Type": "application/json"
- }
-
- payload = {
- "model": "deepseek-chat",
- "messages": [
- {"role": "system", "content": "你是一个专业的文本分析和匹配专家。"},
- {"role": "user", "content": matching_prompt}
- ],
- "temperature": 0.1,
- "response_format": {"type": "json_object"}
- }
-
- logging.info("发送请求到Deepseek API匹配标签:"+matching_prompt)
- response = requests.post(api_url, headers=headers, json=payload, timeout=30)
- response.raise_for_status()
-
- # 解析API响应
- result = response.json()
- matching_content = result.get("choices", [{}])[0].get("message", {}).get("content", "[]")
-
- # 提取JSON数组
- try:
- # 尝试直接解析返回结果,预期格式为 ["新开酒店经验", "五星级酒店", "总经理"]
- logging.info(f"Deepseek返回的匹配内容: {matching_content}")
-
- # 如果返回的是JSON字符串,先去除可能的前后缀文本
- if isinstance(matching_content, str):
- # 查找JSON数组的开始和结束位置
- start_idx = matching_content.find('[')
- end_idx = matching_content.rfind(']') + 1
-
- if start_idx >= 0 and end_idx > start_idx:
- json_str = matching_content[start_idx:end_idx]
- matched_labels = json.loads(json_str)
- else:
- matched_labels = []
- else:
- matched_labels = []
-
- # 确保结果是字符串列表
- if matched_labels and all(isinstance(item, str) for item in matched_labels):
- logging.info(f"成功解析到标签列表: {matched_labels}")
- else:
- logging.warning("解析结果不是预期的字符串列表格式,将使用空列表")
- matched_labels = []
- except json.JSONDecodeError as e:
- logging.error(f"JSON解析错误: {str(e)}")
- matched_labels = []
- except Exception as e:
- logging.error(f"解析匹配标签时出错: {str(e)}")
- matched_labels = []
-
- logging.info(f"匹配到的标签: {matched_labels}")
-
- # 如果没有匹配到标签,返回空结果
- if not matched_labels:
- return {
- 'code': 200,
- 'success': True,
- 'message': '未找到与查询需求匹配的标签',
- 'query': '',
- 'data': []
- }
-
- # 步骤3: 构建Cypher生成提示文本
- logging.info("第三步:构建提示文本生成Cypher查询语句")
-
- # 将匹配的标签转换为字符串
- matched_labels_str = ", ".join([f"'{label}'" for label in matched_labels])
-
- # 构建生成Cypher的提示语
- cypher_prompt = f"""
- 请根据以下Neo4j图数据库结构和已匹配的标签,生成一个Cypher查询脚本。
-
- ## 图数据库结构
-
- ### 节点
- 1. talent - 人才节点
- 属性: pg_id(PostgreSQL数据库ID), name_zh(中文姓名), name_en(英文姓名),
- mobile(手机号码), email(电子邮箱), updated_at(更新时间)
-
- 2. data_label - 人才标签节点
-
- ### 关系
- BELONGS_TO - 从属关系
- (talent)-[BELONGS_TO]->(data_label) - 人才属于某标签
-
- ## 匹配的标签列表
- [{matched_labels_str}]
-
- ## 查询需求
- {query_requirement}
-
- ## 输出要求
- 1. 只输出有效的Cypher查询语句,不要包含任何解释或注释
- 2. 确保return语句中包含talent节点属性
- 3. 尽量利用图数据库的特性来优化查询效率
- 4. 使用WITH子句和COLLECT函数收集标签,确保查询到同时拥有所有标签的人才
-
- 注意:请直接返回Cypher查询语句,无需任何其他文本。
-
- 以下是一个示例:
- 假设匹配的标签是 ['五星级酒店', '新开酒店经验', '总经理']
-
- 生成的Cypher查询语句应该是:
- MATCH (t:talent)-[:BELONGS_TO]->(dl:data_label)
- WHERE dl.name IN ['五星级酒店', '新开酒店经验', '总经理']
- WITH t, COLLECT(DISTINCT dl.name) AS labels
- WHERE size(labels) = 3
- RETURN t.pg_id as pg_id, t.name_zh as name_zh, t.name_en as name_en, t.mobile as mobile, t.email as email, t.updated_at as updated_at
- """
-
- # 调用Deepseek API生成Cypher脚本
- payload = {
- "model": "deepseek-chat",
- "messages": [
- {"role": "system", "content": "你是一个专业的Neo4j Cypher查询专家。"},
- {"role": "user", "content": cypher_prompt}
- ],
- "temperature": 0.1
- }
-
- logging.info("发送请求到Deepseek API生成Cypher脚本")
- response = requests.post(api_url, headers=headers, json=payload, timeout=30)
- response.raise_for_status()
-
- # 解析API响应
- result = response.json()
- cypher_script = result.get("choices", [{}])[0].get("message", {}).get("content", "")
-
- # 清理Cypher脚本,移除不必要的markdown格式或注释
- cypher_script = cypher_script.strip()
- if cypher_script.startswith("```cypher"):
- cypher_script = cypher_script[9:]
- elif cypher_script.startswith("```"):
- cypher_script = cypher_script[3:]
- if cypher_script.endswith("```"):
- cypher_script = cypher_script[:-3]
- cypher_script = cypher_script.strip()
-
- logging.info(f"生成的Cypher脚本: {cypher_script}")
-
- # 步骤4: 执行Cypher脚本
- logging.info("第四步:执行Cypher脚本并返回结果")
- with neo4j_driver.get_session() as session:
- result = session.run(cypher_script)
- records = [record.data() for record in result]
-
- # 构建查询结果
- response_data = {
- 'code': 200,
- 'success': True,
- 'message': '查询成功执行',
- 'query': cypher_script,
- 'matched_labels': matched_labels,
- 'data': records
- }
-
- return response_data
-
- except requests.exceptions.HTTPError as e:
- error_msg = f"调用Deepseek API失败: {str(e)}"
- logging.error(error_msg)
- if hasattr(e, 'response') and e.response:
- logging.error(f"错误状态码: {e.response.status_code}")
- logging.error(f"错误内容: {e.response.text}")
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- except Exception as e:
- error_msg = f"查询Neo4j图数据库失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def talent_get_tags(talent_id):
- """
- 根据talent ID获取人才节点关联的标签
-
- Args:
- talent_id (int): 人才节点pg_id
-
- Returns:
- dict: 包含人才ID和关联标签的字典,JSON格式
- """
- try:
- # 导入必要的模块
- from app.services.neo4j_driver import neo4j_driver
-
- # 准备查询返回数据
- response_data = {
- 'code': 200,
- 'success': True,
- 'message': '获取人才标签成功',
- 'data': []
- }
-
- # 构建Cypher查询语句,获取人才节点关联的标签
- cypher_query = """
- MATCH (t:talent)-[r:BELONGS_TO]->(tag:data_label)
- WHERE t.pg_id = $talent_id
- RETURN t.pg_id as talent_id, tag.name as tag_name
- """
-
- # 执行查询
- with neo4j_driver.get_session() as session:
- result = session.run(cypher_query, talent_id=int(talent_id))
- records = list(result)
-
- # 如果没有查询到标签,返回空数组
- if not records:
- response_data['message'] = f'人才pg_id {talent_id} 没有关联的标签'
- return response_data
-
- # 处理查询结果
- for record in records:
- talent_tag = {
- 'talent': record['talent_id'],
- 'tag': record['tag_name']
- }
- response_data['data'].append(talent_tag)
-
- return response_data
-
- except Exception as e:
- error_msg = f"获取人才标签失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def talent_update_tags(data):
- """
- 根据传入的JSON数据为人才节点创建与标签的BELONGS_TO关系
-
- Args:
- data (list): 包含talent和tag字段的对象列表
- 例如: [
- {"talent": 12345, "tag": "市场营销"},
- {"talent": 12345, "tag": "酒店管理"}
- ]
-
- Returns:
- dict: 操作结果和状态信息
- """
- try:
- # 导入必要的模块
- from app.services.neo4j_driver import neo4j_driver
-
- # 验证输入参数
- if not isinstance(data, list):
- return {
- 'code': 400,
- 'success': False,
- 'message': '参数格式错误,需要JSON数组',
- 'data': None
- }
-
- if len(data) == 0:
- return {
- 'code': 400,
- 'success': False,
- 'message': '数据列表为空',
- 'data': None
- }
-
- # 获取当前时间
- current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 成功和失败计数
- success_count = 0
- failed_items = []
-
- # 按talent分组处理数据
- talent_tags = {}
- for item in data:
- # 验证每个项目的格式
- if not isinstance(item, dict) or 'talent' not in item or 'tag' not in item:
- failed_items.append(item)
- continue
-
- talent_id = item.get('talent')
- tag_name = item.get('tag')
-
- # 验证talent_id和tag_name的值
- if not talent_id or not tag_name or not isinstance(tag_name, str):
- failed_items.append(item)
- continue
-
- # 按talent_id分组
- if talent_id not in talent_tags:
- talent_tags[talent_id] = []
-
- talent_tags[talent_id].append(tag_name)
-
- with neo4j_driver.get_session() as session:
- # 处理每个talent及其标签
- for talent_id, tags in talent_tags.items():
- # 首先验证talent节点是否存在
- check_talent_query = """
- MATCH (t:talent)
- WHERE t.pg_id = $talent_id
- RETURN t
- """
- talent_result = session.run(check_talent_query, talent_id=int(talent_id))
- if not talent_result.single():
- # 该talent不存在,记录失败项并继续下一个talent
- for tag in tags:
- failed_items.append({'talent_pg_id': talent_id, 'tag': tag})
- continue
-
- # 首先清除所有现有的BELONGS_TO关系
- clear_relations_query = """
- MATCH (t:talent)-[r:BELONGS_TO]->(:data_label)
- WHERE t.pg_id = $talent_id
- DELETE r
- RETURN count(r) as deleted_count
- """
- clear_result = session.run(clear_relations_query, talent_id=int(talent_id))
- deleted_count = clear_result.single()['deleted_count']
- logging.info(f"已删除talent_id={talent_id}的{deleted_count}个已有标签关系")
-
- # 处理每个标签
- for tag_name in tags:
- try:
- # 1. 查找或创建标签节点
- # 先查找是否存在该标签
- find_tag_query = """
- MATCH (tag:data_label)
- WHERE tag.name = $tag_name
- RETURN id(tag) as tag_id
- """
- tag_result = session.run(find_tag_query, tag_name=tag_name)
- tag_record = tag_result.single()
-
- if tag_record:
- tag_id = tag_record['tag_id']
- else:
- # 创建新标签
- create_tag_query = """
- CREATE (tag:data_label {name: $name, category: $category, updated_at: $updated_at})
- RETURN id(tag) as tag_id
- """
- tag_result = session.run(
- create_tag_query,
- name=tag_name,
- category='talent',
- updated_at=current_time
- )
- tag_record = tag_result.single()
- tag_id = tag_record['tag_id']
-
- # 2. 创建人才与标签的BELONGS_TO关系
- create_relation_query = """
- MATCH (t:talent), (tag:data_label)
- WHERE t.pg_id = $talent_id AND tag.name = $tag_name
- CREATE (t)-[r:BELONGS_TO]->(tag)
- SET r.created_at = $current_time
- RETURN r
- """
-
- relation_result = session.run(
- create_relation_query,
- talent_id=int(talent_id),
- tag_name=tag_name,
- current_time=current_time
- )
-
- if relation_result.single():
- success_count += 1
- else:
- failed_items.append({'talent_pg_id': talent_id, 'tag': tag_name})
-
- except Exception as tag_error:
- logging.error(f"为标签 {tag_name} 创建关系时出错: {str(tag_error)}")
- failed_items.append({'talent_pg_id': talent_id, 'tag': tag_name})
-
- # 返回结果
- total_items = len(data)
- if success_count == total_items:
- return {
- 'code': 200,
- 'success': True,
- 'message': f'成功创建或更新了 {success_count} 个标签关系',
- 'data': {
- 'success_count': success_count,
- 'total_count': total_items,
- 'failed_items': []
- }
- }
- elif success_count > 0:
- return {
- 'code': 206, # Partial Content
- 'success': True,
- 'message': f'部分成功: 创建或更新了 {success_count}/{total_items} 个标签关系',
- 'data': {
- 'success_count': success_count,
- 'total_count': total_items,
- 'failed_items': failed_items
- }
- }
- else:
- return {
- 'code': 500,
- 'success': False,
- 'message': '无法创建任何标签关系',
- 'data': {
- 'success_count': 0,
- 'total_count': total_items,
- 'failed_items': failed_items
- }
- }
-
- except Exception as e:
- error_msg = f"更新人才标签关系失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def get_business_card(card_id):
- """
- 根据ID从PostgreSQL数据库中获取名片记录
-
- Args:
- card_id (int): 名片记录ID
-
- Returns:
- dict: 包含操作结果和名片信息的字典
- """
- try:
- # 查询指定ID的名片记录
- card = BusinessCard.query.get(card_id)
-
- if not card:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{card_id}的名片记录',
- 'data': None
- }
-
- # 将记录转换为字典格式返回
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取名片记录成功',
- 'data': card.to_dict()
- }
-
- except Exception as e:
- error_msg = f"获取名片记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- # 酒店职位数据模型
- class HotelPosition(db.Model):
- __tablename__ = 'hotel_positions'
-
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- department_zh = db.Column(db.String(10), nullable=False)
- department_en = db.Column(db.String(50), nullable=False)
- position_zh = db.Column(db.String(20), nullable=False)
- position_en = db.Column(db.String(100), nullable=False)
- position_abbr = db.Column(db.String(20), nullable=True)
- level_zh = db.Column(db.String(10), nullable=False)
- level_en = db.Column(db.String(30), nullable=False)
- created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)
- updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
- created_by = db.Column(db.String(50), default='system')
- updated_by = db.Column(db.String(50), default='system')
- status = db.Column(db.String(20), default='active')
-
- def to_dict(self):
- return {
- 'id': self.id,
- 'department_zh': self.department_zh,
- 'department_en': self.department_en,
- 'position_zh': self.position_zh,
- 'position_en': self.position_en,
- 'position_abbr': self.position_abbr,
- 'level_zh': self.level_zh,
- 'level_en': self.level_en,
- 'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
- 'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
- 'created_by': self.created_by,
- 'updated_by': self.updated_by,
- 'status': self.status
- }
- def get_hotel_positions_list():
- """
- 获取酒店职位数据表的全部记录
-
- Returns:
- dict: 包含操作结果和酒店职位列表的字典
- """
- try:
- # 查询所有酒店职位记录,按部门和职位排序
- positions = HotelPosition.query.order_by(
- HotelPosition.department_zh,
- HotelPosition.position_zh
- ).all()
-
- # 将所有记录转换为字典格式
- positions_data = [position.to_dict() for position in positions]
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取酒店职位列表成功',
- 'data': positions_data,
- 'count': len(positions_data)
- }
-
- except Exception as e:
- error_msg = f"获取酒店职位列表失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': [],
- 'count': 0
- }
- def add_hotel_positions(position_data):
- """
- 新增酒店职位数据表记录
-
- Args:
- position_data (dict): 包含职位信息的字典,包括:
- - department_zh: 部门中文名称 (必填)
- - department_en: 部门英文名称 (必填)
- - position_zh: 职位中文名称 (必填)
- - position_en: 职位英文名称 (必填)
- - position_abbr: 职位英文缩写 (可选)
- - level_zh: 职级中文名称 (必填)
- - level_en: 职级英文名称 (必填)
- - created_by: 创建者 (可选,默认为'system')
- - updated_by: 更新者 (可选,默认为'system')
- - status: 状态 (可选,默认为'active')
-
- Returns:
- dict: 包含操作结果和创建的职位信息的字典
- """
- try:
- # 验证必填字段
- required_fields = ['department_zh', 'department_en', 'position_zh', 'position_en', 'level_zh', 'level_en']
- missing_fields = []
-
- for field in required_fields:
- if field not in position_data or not position_data[field] or not position_data[field].strip():
- missing_fields.append(field)
-
- if missing_fields:
- return {
- 'code': 400,
- 'success': False,
- 'message': f'缺少必填字段: {", ".join(missing_fields)}',
- 'data': None
- }
-
- # 检查是否已存在相同的职位记录(基于部门和职位的中文名称)
- existing_position = HotelPosition.query.filter_by(
- department_zh=position_data['department_zh'].strip(),
- position_zh=position_data['position_zh'].strip()
- ).first()
-
- if existing_position:
- return {
- 'code': 409,
- 'success': False,
- 'message': f'职位记录已存在:{position_data["department_zh"]} - {position_data["position_zh"]}',
- 'data': existing_position.to_dict()
- }
-
- # 创建新的职位记录
- new_position = HotelPosition(
- department_zh=position_data['department_zh'].strip(),
- department_en=position_data['department_en'].strip(),
- position_zh=position_data['position_zh'].strip(),
- position_en=position_data['position_en'].strip(),
- position_abbr=position_data.get('position_abbr', '').strip() if position_data.get('position_abbr') else None,
- level_zh=position_data['level_zh'].strip(),
- level_en=position_data['level_en'].strip(),
- created_by=position_data.get('created_by', 'system'),
- updated_by=position_data.get('updated_by', 'system'),
- status=position_data.get('status', 'active')
- )
-
- # 保存到数据库
- db.session.add(new_position)
- db.session.commit()
-
- logging.info(f"成功创建酒店职位记录,ID: {new_position.id}")
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '酒店职位记录创建成功',
- 'data': new_position.to_dict()
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"创建酒店职位记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def update_hotel_positions(position_id, position_data):
- """
- 修改酒店职位数据表记录
-
- Args:
- position_id (int): 职位记录ID
- position_data (dict): 包含要更新的职位信息的字典,可能包括:
- - department_zh: 部门中文名称
- - department_en: 部门英文名称
- - position_zh: 职位中文名称
- - position_en: 职位英文名称
- - position_abbr: 职位英文缩写
- - level_zh: 职级中文名称
- - level_en: 职级英文名称
- - updated_by: 更新者
- - status: 状态
-
- Returns:
- dict: 包含操作结果和更新后的职位信息的字典
- """
- try:
- # 查找要更新的职位记录
- position = HotelPosition.query.get(position_id)
-
- if not position:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{position_id}的职位记录',
- 'data': None
- }
-
- # 检查是否有数据需要更新
- if not position_data:
- return {
- 'code': 400,
- 'success': False,
- 'message': '请求数据为空',
- 'data': None
- }
-
- # 如果要更新部门和职位名称,检查是否会与其他记录冲突
- new_department_zh = position_data.get('department_zh', position.department_zh).strip() if position_data.get('department_zh') else position.department_zh
- new_position_zh = position_data.get('position_zh', position.position_zh).strip() if position_data.get('position_zh') else position.position_zh
-
- # 查找是否存在相同的职位记录(排除当前记录)
- existing_position = HotelPosition.query.filter(
- HotelPosition.id != position_id,
- HotelPosition.department_zh == new_department_zh,
- HotelPosition.position_zh == new_position_zh
- ).first()
-
- if existing_position:
- return {
- 'code': 409,
- 'success': False,
- 'message': f'职位记录已存在:{new_department_zh} - {new_position_zh}',
- 'data': existing_position.to_dict()
- }
-
- # 更新职位信息
- if 'department_zh' in position_data and position_data['department_zh']:
- position.department_zh = position_data['department_zh'].strip()
-
- if 'department_en' in position_data and position_data['department_en']:
- position.department_en = position_data['department_en'].strip()
-
- if 'position_zh' in position_data and position_data['position_zh']:
- position.position_zh = position_data['position_zh'].strip()
-
- if 'position_en' in position_data and position_data['position_en']:
- position.position_en = position_data['position_en'].strip()
-
- if 'position_abbr' in position_data:
- # 处理position_abbr,可能为空字符串或None
- if position_data['position_abbr'] and position_data['position_abbr'].strip():
- position.position_abbr = position_data['position_abbr'].strip()
- else:
- position.position_abbr = None
-
- if 'level_zh' in position_data and position_data['level_zh']:
- position.level_zh = position_data['level_zh'].strip()
-
- if 'level_en' in position_data and position_data['level_en']:
- position.level_en = position_data['level_en'].strip()
-
- if 'updated_by' in position_data:
- position.updated_by = position_data['updated_by'] or 'system'
-
- if 'status' in position_data:
- position.status = position_data['status'] or 'active'
-
- # 更新时间会自动设置(onupdate=datetime.now)
-
- # 保存更新
- db.session.commit()
-
- logging.info(f"成功更新酒店职位记录,ID: {position.id}")
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '酒店职位记录更新成功',
- 'data': position.to_dict()
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"更新酒店职位记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def query_hotel_positions(position_id):
- """
- 查找指定ID的酒店职位数据表记录
-
- Args:
- position_id (int): 职位记录ID
-
- Returns:
- dict: 包含操作结果和职位信息的字典
- """
- try:
- # 根据ID查找职位记录
- position = HotelPosition.query.get(position_id)
-
- if not position:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{position_id}的职位记录',
- 'data': None
- }
-
- # 返回找到的记录
- return {
- 'code': 200,
- 'success': True,
- 'message': '查找职位记录成功',
- 'data': position.to_dict()
- }
-
- except Exception as e:
- error_msg = f"查找职位记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def delete_hotel_positions(position_id):
- """
- 删除指定ID的酒店职位数据表记录
-
- Args:
- position_id (int): 职位记录ID
-
- Returns:
- dict: 包含操作结果的字典
- """
- try:
- # 根据ID查找要删除的职位记录
- position = HotelPosition.query.get(position_id)
-
- if not position:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{position_id}的职位记录',
- 'data': None
- }
-
- # 保存被删除记录的信息,用于返回
- deleted_position_info = position.to_dict()
-
- # 执行删除操作
- db.session.delete(position)
- db.session.commit()
-
- logging.info(f"成功删除酒店职位记录,ID: {position_id}")
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '职位记录删除成功',
- 'data': deleted_position_info
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"删除职位记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- # 酒店集团子品牌数据模型
- class HotelGroupBrands(db.Model):
- __tablename__ = 'hotel_group_brands'
-
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- group_name_en = db.Column(db.String(60), nullable=False)
- group_name_zh = db.Column(db.String(20), nullable=False)
- brand_name_en = db.Column(db.String(40), nullable=False)
- brand_name_zh = db.Column(db.String(40), nullable=False)
- positioning_level_en = db.Column(db.String(20), nullable=False)
- positioning_level_zh = db.Column(db.String(5), nullable=False)
- created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)
- updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
- created_by = db.Column(db.String(50), default='system')
- updated_by = db.Column(db.String(50), default='system')
- status = db.Column(db.String(20), default='active')
-
- def to_dict(self):
- return {
- 'id': self.id,
- 'group_name_en': self.group_name_en,
- 'group_name_zh': self.group_name_zh,
- 'brand_name_en': self.brand_name_en,
- 'brand_name_zh': self.brand_name_zh,
- 'positioning_level_en': self.positioning_level_en,
- 'positioning_level_zh': self.positioning_level_zh,
- 'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
- 'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
- 'created_by': self.created_by,
- 'updated_by': self.updated_by,
- 'status': self.status
- }
- def get_hotel_group_brands_list():
- """
- 获取酒店集团子品牌数据表的全部记录
-
- Returns:
- dict: 包含操作结果和酒店集团品牌列表的字典
- """
- try:
- # 查询所有酒店集团品牌记录,按集团和品牌排序
- brands = HotelGroupBrands.query.order_by(
- HotelGroupBrands.group_name_zh,
- HotelGroupBrands.brand_name_zh
- ).all()
-
- # 将所有记录转换为字典格式
- brands_data = [brand.to_dict() for brand in brands]
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取酒店集团品牌列表成功',
- 'data': brands_data,
- 'count': len(brands_data)
- }
-
- except Exception as e:
- error_msg = f"获取酒店集团品牌列表失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': [],
- 'count': 0
- }
- def add_hotel_group_brands(brand_data):
- """
- 新增酒店集团子品牌数据表记录
-
- Args:
- brand_data (dict): 包含品牌信息的字典,包括:
- - group_name_en: 集团英文名称 (必填)
- - group_name_zh: 集团中文名称 (必填)
- - brand_name_en: 品牌英文名称 (必填)
- - brand_name_zh: 品牌中文名称 (必填)
- - positioning_level_en: 定位级别英文名称 (必填)
- - positioning_level_zh: 定位级别中文名称 (必填)
- - created_by: 创建者 (可选,默认为'system')
- - updated_by: 更新者 (可选,默认为'system')
- - status: 状态 (可选,默认为'active')
-
- Returns:
- dict: 包含操作结果和创建的品牌信息的字典
- """
- try:
- # 验证必填字段
- required_fields = ['group_name_en', 'group_name_zh', 'brand_name_en', 'brand_name_zh', 'positioning_level_en', 'positioning_level_zh']
- missing_fields = []
-
- for field in required_fields:
- if field not in brand_data or not brand_data[field] or not brand_data[field].strip():
- missing_fields.append(field)
-
- if missing_fields:
- return {
- 'code': 400,
- 'success': False,
- 'message': f'缺少必填字段: {", ".join(missing_fields)}',
- 'data': None
- }
-
- # 检查是否已存在相同的品牌记录(基于集团和品牌的中文名称)
- existing_brand = HotelGroupBrands.query.filter_by(
- group_name_zh=brand_data['group_name_zh'].strip(),
- brand_name_zh=brand_data['brand_name_zh'].strip()
- ).first()
-
- if existing_brand:
- return {
- 'code': 409,
- 'success': False,
- 'message': f'品牌记录已存在:{brand_data["group_name_zh"]} - {brand_data["brand_name_zh"]}',
- 'data': existing_brand.to_dict()
- }
-
- # 创建新的品牌记录
- new_brand = HotelGroupBrands(
- group_name_en=brand_data['group_name_en'].strip(),
- group_name_zh=brand_data['group_name_zh'].strip(),
- brand_name_en=brand_data['brand_name_en'].strip(),
- brand_name_zh=brand_data['brand_name_zh'].strip(),
- positioning_level_en=brand_data['positioning_level_en'].strip(),
- positioning_level_zh=brand_data['positioning_level_zh'].strip(),
- created_by=brand_data.get('created_by', 'system'),
- updated_by=brand_data.get('updated_by', 'system'),
- status=brand_data.get('status', 'active')
- )
-
- # 保存到数据库
- db.session.add(new_brand)
- db.session.commit()
-
- logging.info(f"成功创建酒店集团品牌记录,ID: {new_brand.id}")
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '酒店集团品牌记录创建成功',
- 'data': new_brand.to_dict()
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"创建酒店集团品牌记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def update_hotel_group_brands(brand_id, brand_data):
- """
- 修改酒店集团子品牌数据表记录
-
- Args:
- brand_id (int): 品牌记录ID
- brand_data (dict): 包含要更新的品牌信息的字典,可能包括:
- - group_name_en: 集团英文名称
- - group_name_zh: 集团中文名称
- - brand_name_en: 品牌英文名称
- - brand_name_zh: 品牌中文名称
- - positioning_level_en: 定位级别英文名称
- - positioning_level_zh: 定位级别中文名称
- - updated_by: 更新者
- - status: 状态
-
- Returns:
- dict: 包含操作结果和更新后的品牌信息的字典
- """
- try:
- # 查找要更新的品牌记录
- brand = HotelGroupBrands.query.get(brand_id)
-
- if not brand:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{brand_id}的品牌记录',
- 'data': None
- }
-
- # 检查是否有数据需要更新
- if not brand_data:
- return {
- 'code': 400,
- 'success': False,
- 'message': '请求数据为空',
- 'data': None
- }
-
- # 如果要更新集团和品牌名称,检查是否会与其他记录冲突
- new_group_name_zh = brand_data.get('group_name_zh', brand.group_name_zh).strip() if brand_data.get('group_name_zh') else brand.group_name_zh
- new_brand_name_zh = brand_data.get('brand_name_zh', brand.brand_name_zh).strip() if brand_data.get('brand_name_zh') else brand.brand_name_zh
-
- # 查找是否存在相同的品牌记录(排除当前记录)
- existing_brand = HotelGroupBrands.query.filter(
- HotelGroupBrands.id != brand_id,
- HotelGroupBrands.group_name_zh == new_group_name_zh,
- HotelGroupBrands.brand_name_zh == new_brand_name_zh
- ).first()
-
- if existing_brand:
- return {
- 'code': 409,
- 'success': False,
- 'message': f'品牌记录已存在:{new_group_name_zh} - {new_brand_name_zh}',
- 'data': existing_brand.to_dict()
- }
-
- # 更新品牌信息
- if 'group_name_en' in brand_data and brand_data['group_name_en']:
- brand.group_name_en = brand_data['group_name_en'].strip()
-
- if 'group_name_zh' in brand_data and brand_data['group_name_zh']:
- brand.group_name_zh = brand_data['group_name_zh'].strip()
-
- if 'brand_name_en' in brand_data and brand_data['brand_name_en']:
- brand.brand_name_en = brand_data['brand_name_en'].strip()
-
- if 'brand_name_zh' in brand_data and brand_data['brand_name_zh']:
- brand.brand_name_zh = brand_data['brand_name_zh'].strip()
-
- if 'positioning_level_en' in brand_data and brand_data['positioning_level_en']:
- brand.positioning_level_en = brand_data['positioning_level_en'].strip()
-
- if 'positioning_level_zh' in brand_data and brand_data['positioning_level_zh']:
- brand.positioning_level_zh = brand_data['positioning_level_zh'].strip()
-
- if 'updated_by' in brand_data:
- brand.updated_by = brand_data['updated_by'] or 'system'
-
- if 'status' in brand_data:
- brand.status = brand_data['status'] or 'active'
-
- # 更新时间会自动设置(onupdate=datetime.now)
-
- # 保存更新
- db.session.commit()
-
- logging.info(f"成功更新酒店集团品牌记录,ID: {brand.id}")
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '酒店集团品牌记录更新成功',
- 'data': brand.to_dict()
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"更新酒店集团品牌记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def query_hotel_group_brands(brand_id):
- """
- 查找指定ID的酒店集团子品牌数据表记录
-
- Args:
- brand_id (int): 品牌记录ID
-
- Returns:
- dict: 包含操作结果和品牌信息的字典
- """
- try:
- # 根据ID查找品牌记录
- brand = HotelGroupBrands.query.get(brand_id)
-
- if not brand:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{brand_id}的品牌记录',
- 'data': None
- }
-
- # 返回找到的记录
- return {
- 'code': 200,
- 'success': True,
- 'message': '查找品牌记录成功',
- 'data': brand.to_dict()
- }
-
- except Exception as e:
- error_msg = f"查找品牌记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def delete_hotel_group_brands(brand_id):
- """
- 删除指定ID的酒店集团子品牌数据表记录
-
- Args:
- brand_id (int): 品牌记录ID
-
- Returns:
- dict: 包含操作结果的字典
- """
- try:
- # 根据ID查找要删除的品牌记录
- brand = HotelGroupBrands.query.get(brand_id)
-
- if not brand:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{brand_id}的品牌记录',
- 'data': None
- }
-
- # 保存被删除记录的信息,用于返回
- deleted_brand_info = brand.to_dict()
-
- # 执行删除操作
- db.session.delete(brand)
- db.session.commit()
-
- logging.info(f"成功删除酒店集团品牌记录,ID: {brand_id}")
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '品牌记录删除成功',
- 'data': deleted_brand_info
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"删除品牌记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
|