123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154 |
- from typing import Dict, Any
- from app import db
- from datetime import datetime
- import os
- import boto3
- from botocore.config import Config
- import logging
- import requests
- import json
- import re
- import uuid
- from PIL import Image
- from io import BytesIO
- import pytesseract
- import base64
- from openai import OpenAI
- from app.config.config import DevelopmentConfig, ProductionConfig
- # 名片解析数据模型
- class BusinessCard(db.Model):
- __tablename__ = 'business_cards'
-
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- name_zh = db.Column(db.String(100), nullable=False)
- name_en = db.Column(db.String(100))
- title_zh = db.Column(db.String(100))
- title_en = db.Column(db.String(100))
- mobile = db.Column(db.String(100))
- phone = db.Column(db.String(50))
- email = db.Column(db.String(100))
- hotel_zh = db.Column(db.String(200))
- hotel_en = db.Column(db.String(200))
- address_zh = db.Column(db.Text)
- address_en = db.Column(db.Text)
- postal_code_zh = db.Column(db.String(20))
- postal_code_en = db.Column(db.String(20))
- brand_zh = db.Column(db.String(100))
- brand_en = db.Column(db.String(100))
- affiliation_zh = db.Column(db.String(200))
- affiliation_en = db.Column(db.String(200))
- birthday = db.Column(db.Date) # 生日,存储年月日
- age = db.Column(db.Integer) # 年龄字段
- native_place = db.Column(db.Text) # 籍贯字段
- residence = db.Column(db.Text) # 居住地
- image_path = db.Column(db.String(255)) # MinIO中存储的路径
- career_path = db.Column(db.JSON) # 职业轨迹,JSON格式
- brand_group = db.Column(db.String(200)) # 品牌组合
- origin_source = db.Column(db.JSON) # 原始资料记录,JSON格式
- talent_profile = db.Column(db.Text) # 人才档案,文本格式
- created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)
- updated_at = db.Column(db.DateTime, onupdate=datetime.now)
- updated_by = db.Column(db.String(50))
- status = db.Column(db.String(20), default='active')
-
- def to_dict(self):
- return {
- 'id': self.id,
- 'name_zh': self.name_zh,
- 'name_en': self.name_en,
- 'title_zh': self.title_zh,
- 'title_en': self.title_en,
- 'mobile': self.mobile,
- 'phone': self.phone,
- 'email': self.email,
- 'hotel_zh': self.hotel_zh,
- 'hotel_en': self.hotel_en,
- 'address_zh': self.address_zh,
- 'address_en': self.address_en,
- 'postal_code_zh': self.postal_code_zh,
- 'postal_code_en': self.postal_code_en,
- 'brand_zh': self.brand_zh,
- 'brand_en': self.brand_en,
- 'affiliation_zh': self.affiliation_zh,
- 'affiliation_en': self.affiliation_en,
- 'birthday': self.birthday.strftime('%Y-%m-%d') if self.birthday else None,
- 'age': self.age,
- 'native_place': self.native_place,
- 'residence': self.residence,
- 'image_path': self.image_path,
- 'career_path': self.career_path,
- 'brand_group': self.brand_group,
- 'origin_source': self.origin_source,
- 'talent_profile': self.talent_profile,
- 'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
- 'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
- 'updated_by': self.updated_by,
- 'status': self.status
- }
- # 重复名片处理数据模型
- class DuplicateBusinessCard(db.Model):
- __tablename__ = 'duplicate_business_cards'
-
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- main_card_id = db.Column(db.Integer, db.ForeignKey('business_cards.id'), nullable=False) # 新创建的主记录ID
- suspected_duplicates = db.Column(db.JSON, nullable=False) # 疑似重复记录列表,JSON格式
- duplicate_reason = db.Column(db.String(200), nullable=False) # 重复原因
- processing_status = db.Column(db.String(20), default='pending') # 处理状态:pending/processed/ignored
- created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)
- processed_at = db.Column(db.DateTime) # 处理时间
- processed_by = db.Column(db.String(50)) # 处理人
- processing_notes = db.Column(db.Text) # 处理备注
-
- # 关联主记录
- main_card = db.relationship('BusinessCard', backref=db.backref('as_main_duplicate_records', lazy=True))
-
- def to_dict(self):
- return {
- 'id': self.id,
- 'main_card_id': self.main_card_id,
- 'suspected_duplicates': self.suspected_duplicates,
- 'duplicate_reason': self.duplicate_reason,
- 'processing_status': self.processing_status,
- 'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
- 'processed_at': self.processed_at.strftime('%Y-%m-%d %H:%M:%S') if self.processed_at else None,
- 'processed_by': self.processed_by,
- 'processing_notes': self.processing_notes
- }
- # 解析任务存储库数据模型
- class ParseTaskRepository(db.Model):
- __tablename__ = 'parse_task_repository'
-
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- task_name = db.Column(db.String(100), nullable=False)
- task_status = db.Column(db.String(10), nullable=False)
- task_type = db.Column(db.String(50), nullable=False)
- task_source = db.Column(db.String(300), nullable=False)
- collection_count = db.Column(db.Integer, nullable=False, default=0)
- parse_count = db.Column(db.Integer, nullable=False, default=0)
- parse_result = db.Column(db.JSON)
- created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)
- created_by = db.Column(db.String(50), nullable=False)
- updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now, nullable=False)
- updated_by = db.Column(db.String(50), nullable=False)
-
- def to_dict(self):
- return {
- 'id': self.id,
- 'task_name': self.task_name,
- 'task_status': self.task_status,
- 'task_type': self.task_type,
- 'task_source': self.task_source,
- 'collection_count': self.collection_count,
- 'parse_count': self.parse_count,
- 'parse_result': self.parse_result,
- 'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
- 'created_by': self.created_by,
- 'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
- 'updated_by': self.updated_by
- }
- # 配置变量
- # 使用配置变量,缺省认为在生产环境运行
- config = ProductionConfig()
- # 使用配置变量
- minio_url = f"{'https' if getattr(config, 'MINIO_SECURE', False) else 'http'}://{getattr(config, 'MINIO_HOST', 'localhost')}"
- minio_access_key = getattr(config, 'MINIO_USER', 'minioadmin')
- minio_secret_key = getattr(config, 'MINIO_PASSWORD', 'minioadmin')
- minio_bucket = getattr(config, 'MINIO_BUCKET', 'dataops')
- use_ssl = getattr(config, 'MINIO_SECURE', False)
- # API密钥配置
- DEEPSEEK_API_KEY = getattr(config, 'DEEPSEEK_API_KEY', '')
- DEEPSEEK_API_URL = getattr(config, 'DEEPSEEK_API_URL', 'https://api.deepseek.com/v1/chat/completions')
- QWEN_API_KEY = getattr(config, 'QWEN_API_KEY', '')
- QWEN_API_URL = getattr(config, 'QWEN_API_URL', 'https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation')
- # OCR配置
- OCR_LANG = getattr(config, 'OCR_LANG', 'chi_sim+eng')
- # 名片解析功能模块
- def normalize_mobile_numbers(mobile_str):
- """
- 标准化手机号码字符串,去重并限制最多3个
-
- Args:
- mobile_str (str): 手机号码字符串,可能包含多个手机号码,用逗号分隔
-
- Returns:
- str: 标准化后的手机号码字符串,最多3个,用逗号分隔
- """
- if not mobile_str or not mobile_str.strip():
- return ''
-
- # 按逗号分割并清理每个手机号码
- mobiles = []
- for mobile in mobile_str.split(','):
- mobile = mobile.strip()
- if mobile and mobile not in mobiles: # 去重
- mobiles.append(mobile)
-
- # 限制最多3个手机号码
- return ','.join(mobiles[:3])
- def mobile_numbers_overlap(mobile1, mobile2):
- """
- 检查两个手机号码字符串是否有重叠
-
- Args:
- mobile1 (str): 第一个手机号码字符串
- mobile2 (str): 第二个手机号码字符串
-
- Returns:
- bool: 是否有重叠的手机号码
- """
- if not mobile1 or not mobile2:
- return False
-
- mobiles1 = set(mobile.strip() for mobile in mobile1.split(',') if mobile.strip())
- mobiles2 = set(mobile.strip() for mobile in mobile2.split(',') if mobile.strip())
-
- return bool(mobiles1 & mobiles2) # 检查交集
- def merge_mobile_numbers(existing_mobile, new_mobile):
- """
- 合并手机号码,去重并限制最多3个
-
- Args:
- existing_mobile (str): 现有手机号码字符串
- new_mobile (str): 新手机号码字符串
-
- Returns:
- str: 合并后的手机号码字符串,最多3个,用逗号分隔
- """
- mobiles = []
-
- # 添加现有手机号码
- if existing_mobile:
- for mobile in existing_mobile.split(','):
- mobile = mobile.strip()
- if mobile and mobile not in mobiles:
- mobiles.append(mobile)
-
- # 添加新手机号码
- if new_mobile:
- for mobile in new_mobile.split(','):
- mobile = mobile.strip()
- if mobile and mobile not in mobiles:
- mobiles.append(mobile)
-
- # 限制最多3个手机号码
- return ','.join(mobiles[:3])
- def check_duplicate_business_card(extracted_data):
- """
- 检查是否存在重复的名片记录
-
- Args:
- extracted_data (dict): 提取的名片信息
-
- Returns:
- dict: 包含检查结果的字典,格式为:
- {
- 'is_duplicate': bool,
- 'action': str, # 'update', 'create_with_duplicates' 或 'create_new'
- 'existing_card': BusinessCard 或 None,
- 'suspected_duplicates': list, # 疑似重复记录列表
- 'reason': str
- }
- """
- try:
- # 提取关键信息进行匹配
- name_zh = extracted_data.get('name_zh', '').strip()
- mobile = extracted_data.get('mobile', '').strip()
-
- # 如果没有姓名,无法进行有效的重复检测
- if not name_zh:
- return {
- 'is_duplicate': False,
- 'action': 'create_new',
- 'existing_card': None,
- 'suspected_duplicates': [],
- 'reason': '缺少中文姓名,无法进行重复检测'
- }
-
- # 根据姓名进行精确匹配
- name_matches = BusinessCard.query.filter_by(name_zh=name_zh).all()
-
- # 如果有手机号,同时检查手机号匹配
- mobile_matches = []
- if mobile:
- # 标准化手机号进行比较
- normalized_mobile = normalize_mobile_numbers(mobile)
- if normalized_mobile:
- # 查找所有有手机号的记录
- all_cards_with_mobile = BusinessCard.query.filter(BusinessCard.mobile.isnot(None)).all()
- for card in all_cards_with_mobile:
- if card.mobile and mobile_numbers_overlap(normalized_mobile, card.mobile):
- mobile_matches.append(card)
-
- # 合并姓名匹配和手机号匹配的结果,去重
- all_matches = []
- for card in name_matches + mobile_matches:
- if card not in all_matches:
- all_matches.append(card)
-
- if not all_matches:
- # 没有找到匹配记录,创建新记录
- return {
- 'is_duplicate': False,
- 'action': 'create_new',
- 'existing_card': None,
- 'suspected_duplicates': [],
- 'reason': '未找到重复记录,将创建新记录'
- }
-
- elif len(all_matches) == 1:
- # 找到一个匹配记录
- existing_card = all_matches[0]
-
- # 检查是否是完全匹配(姓名和手机号都相同)
- existing_mobile = existing_card.mobile or ''
- is_name_match = existing_card.name_zh == name_zh
- is_mobile_match = mobile and mobile_numbers_overlap(mobile, existing_mobile)
-
- if is_name_match and is_mobile_match:
- # 完全匹配,更新现有记录
- return {
- 'is_duplicate': True,
- 'action': 'update',
- 'existing_card': existing_card,
- 'suspected_duplicates': [],
- 'reason': f'找到完全匹配的记录 (ID: {existing_card.id}),将更新现有记录'
- }
- else:
- # 部分匹配,标记为疑似重复
- return {
- 'is_duplicate': True,
- 'action': 'create_with_duplicates',
- 'existing_card': None,
- 'suspected_duplicates': [existing_card],
- 'reason': f'找到疑似重复记录 (ID: {existing_card.id}),将创建新记录并标记重复'
- }
-
- else:
- # 找到多个匹配记录,标记为疑似重复
- return {
- 'is_duplicate': True,
- 'action': 'create_with_duplicates',
- 'existing_card': None,
- 'suspected_duplicates': all_matches,
- 'reason': f'找到 {len(all_matches)} 个疑似重复记录,将创建新记录并标记重复'
- }
-
- except Exception as e:
- logging.error(f"重复检测过程中发生错误: {str(e)}", exc_info=True)
- # 出错时默认创建新记录
- return {
- 'is_duplicate': False,
- 'action': 'create_new',
- 'existing_card': None,
- 'suspected_duplicates': [],
- 'reason': f'重复检测失败: {str(e)},将创建新记录'
- }
- def update_career_path(existing_card, new_data, image_path=None):
- """
- 更新名片的职业轨迹信息
-
- Args:
- existing_card: 现有的名片记录对象
- new_data (dict): 新的职位信息
- image_path (str): 新的图片路径
-
- Returns:
- list: 更新后的职业轨迹列表
- """
- try:
- # 获取现有的职业轨迹,如果没有则初始化为空列表
- career_path = existing_card.career_path if existing_card.career_path else []
-
- # 确保career_path是列表格式
- if not isinstance(career_path, list):
- career_path = []
-
- # 构建新的职业记录
- new_career_entry = {
- 'hotel_zh': new_data.get('hotel_zh', ''),
- 'hotel_en': new_data.get('hotel_en', ''),
- 'title_zh': new_data.get('title_zh', ''),
- 'title_en': new_data.get('title_en', ''),
- 'start_date': datetime.now().strftime('%Y-%m-%d'),
- 'image_path': image_path or existing_card.image_path
- }
-
- # 检查是否与最新的职业记录相同,避免重复添加
- if career_path:
- latest_entry = career_path[-1]
- if (latest_entry.get('hotel_zh') == new_career_entry['hotel_zh'] and
- latest_entry.get('title_zh') == new_career_entry['title_zh']):
- # 如果职位信息相同,只更新图片路径和时间
- latest_entry['image_path'] = new_career_entry['image_path']
- latest_entry['start_date'] = new_career_entry['start_date']
- return career_path
-
- # 添加新的职业记录
- career_path.append(new_career_entry)
-
- # 限制职业轨迹记录数量(最多保留10条)
- if len(career_path) > 10:
- career_path = career_path[-10:]
-
- return career_path
-
- except Exception as e:
- logging.error(f"更新职业轨迹失败: {str(e)}", exc_info=True)
- # 出错时返回原有的职业轨迹
- return existing_card.career_path if existing_card.career_path else []
- def create_main_card_with_duplicates(extracted_data, minio_path, suspected_duplicates, reason):
- """
- 创建主名片记录并标记疑似重复记录
-
- Args:
- extracted_data (dict): 提取的名片信息
- minio_path (str): MinIO中的图片路径
- suspected_duplicates (list): 疑似重复的名片记录列表
- reason (str): 重复原因描述
-
- Returns:
- BusinessCard: 创建的主名片记录
- """
- try:
- # 标准化手机号码
- mobile = normalize_mobile_numbers(extracted_data.get('mobile', ''))
-
- # 构建职业轨迹
- career_path = []
- if extracted_data.get('hotel_zh') or extracted_data.get('title_zh'):
- career_entry = {
- 'hotel_zh': extracted_data.get('hotel_zh', ''),
- 'hotel_en': extracted_data.get('hotel_en', ''),
- 'title_zh': extracted_data.get('title_zh', ''),
- 'title_en': extracted_data.get('title_en', ''),
- 'start_date': datetime.now().strftime('%Y-%m-%d'),
- 'image_path': minio_path
- }
- career_path.append(career_entry)
-
- # 创建新的主名片记录
- main_card = BusinessCard(
- name_zh=extracted_data.get('name_zh', ''),
- name_en=extracted_data.get('name_en', ''),
- title_zh=extracted_data.get('title_zh', ''),
- title_en=extracted_data.get('title_en', ''),
- mobile=mobile,
- phone=extracted_data.get('phone', ''),
- email=extracted_data.get('email', ''),
- hotel_zh=extracted_data.get('hotel_zh', ''),
- hotel_en=extracted_data.get('hotel_en', ''),
- address_zh=extracted_data.get('address_zh', ''),
- address_en=extracted_data.get('address_en', ''),
- postal_code_zh=extracted_data.get('postal_code_zh', ''),
- postal_code_en=extracted_data.get('postal_code_en', ''),
- brand_zh=extracted_data.get('brand_zh', ''),
- brand_en=extracted_data.get('brand_en', ''),
- affiliation_zh=extracted_data.get('affiliation_zh', ''),
- affiliation_en=extracted_data.get('affiliation_en', ''),
- brand_group=extracted_data.get('brand_group', ''),
- image_path=minio_path,
- career_path=career_path,
- origin_source={'source': 'manual_upload', 'timestamp': datetime.now().isoformat()},
- created_at=datetime.now(),
- updated_by='system',
- status='active'
- )
-
- # 保存主记录到数据库
- db.session.add(main_card)
- db.session.flush() # 获取主记录的ID
-
- # 创建重复记录标记
- suspected_duplicates_data = []
- for duplicate_card in suspected_duplicates:
- suspected_duplicates_data.append({
- 'id': duplicate_card.id,
- 'name_zh': duplicate_card.name_zh,
- 'mobile': duplicate_card.mobile,
- 'hotel_zh': duplicate_card.hotel_zh,
- 'title_zh': duplicate_card.title_zh
- })
-
- duplicate_record = DuplicateBusinessCard(
- main_card_id=main_card.id,
- suspected_duplicates=suspected_duplicates_data,
- duplicate_reason=reason,
- processing_status='pending',
- created_at=datetime.now()
- )
-
- # 保存重复记录标记
- db.session.add(duplicate_record)
- db.session.commit()
-
- logging.info(f"成功创建主名片记录 ID: {main_card.id},并标记 {len(suspected_duplicates)} 个疑似重复记录")
-
- return main_card
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"创建主名片记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
- def get_minio_client():
- """获取MinIO客户端连接"""
- try:
- # 使用全局配置变量
- global minio_url, minio_access_key, minio_secret_key, minio_bucket, use_ssl
-
- logging.info(f"尝试连接MinIO服务器: {minio_url}")
-
- minio_client = boto3.client(
- 's3',
- endpoint_url=minio_url,
- aws_access_key_id=minio_access_key,
- aws_secret_access_key=minio_secret_key,
- config=Config(
- signature_version='s3v4',
- retries={'max_attempts': 3, 'mode': 'standard'},
- connect_timeout=10,
- read_timeout=30
- )
- )
-
- # 确保存储桶存在
- buckets = minio_client.list_buckets()
- bucket_names = [bucket['Name'] for bucket in buckets.get('Buckets', [])]
- logging.info(f"成功连接到MinIO服务器,现有存储桶: {bucket_names}")
-
- if minio_bucket not in bucket_names:
- logging.info(f"创建存储桶: {minio_bucket}")
- minio_client.create_bucket(Bucket=minio_bucket)
-
- return minio_client
- except Exception as e:
- logging.error(f"MinIO连接错误: {str(e)}")
- return None
- def get_business_cards():
- """
- 获取所有名片记录
-
- Returns:
- dict: 包含名片记录列表的字典
- """
- try:
- # 查询所有名片记录,按创建时间倒序排列
- cards = BusinessCard.query.filter_by(status='active').order_by(BusinessCard.created_at.desc()).all()
-
- # 转换为字典格式
- cards_data = [card.to_dict() for card in cards]
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取名片列表成功',
- 'data': cards_data,
- 'count': len(cards_data)
- }
-
- except Exception as e:
- error_msg = f"获取名片列表失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': [],
- 'count': 0
- }
- def get_business_card(card_id):
- """
- 根据ID获取单个名片记录
-
- Args:
- card_id (int): 名片记录ID
-
- Returns:
- dict: 包含名片记录的字典
- """
- try:
- card = BusinessCard.query.get(card_id)
-
- if not card:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{card_id}的名片记录',
- 'data': None
- }
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取名片记录成功',
- 'data': card.to_dict()
- }
-
- except Exception as e:
- error_msg = f"获取名片记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def update_business_card(card_id, data):
- """
- 更新名片记录
-
- Args:
- card_id (int): 名片记录ID
- data (dict): 要更新的数据
-
- Returns:
- dict: 包含更新结果的字典
- """
- try:
- card = BusinessCard.query.get(card_id)
-
- if not card:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{card_id}的名片记录',
- 'data': None
- }
-
- # 更新字段
- updatable_fields = ['name_zh', 'name_en', 'title_zh', 'title_en', 'mobile', 'phone', 'email',
- 'hotel_zh', 'hotel_en', 'address_zh', 'address_en', 'postal_code_zh', 'postal_code_en',
- 'brand_zh', 'brand_en', 'affiliation_zh', 'affiliation_en', 'brand_group', 'talent_profile']
-
- for field in updatable_fields:
- if field in data and data[field] is not None:
- setattr(card, field, data[field])
-
- # 处理手机号标准化
- if 'mobile' in data:
- card.mobile = normalize_mobile_numbers(data['mobile'])
-
- card.updated_at = datetime.now()
- card.updated_by = data.get('updated_by', 'system')
-
- db.session.commit()
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '名片记录更新成功',
- 'data': card.to_dict()
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"更新名片记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def update_business_card_status(card_id, status):
- """
- 更新名片记录状态
-
- Args:
- card_id (int): 名片记录ID
- status (str): 新的状态
-
- Returns:
- dict: 包含更新结果的字典
- """
- try:
- card = BusinessCard.query.get(card_id)
-
- if not card:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{card_id}的名片记录',
- 'data': None
- }
-
- card.status = status
- card.updated_at = datetime.now()
-
- db.session.commit()
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '名片状态更新成功',
- 'data': card.to_dict()
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"更新名片状态失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def search_business_cards_by_mobile(mobile_number):
- """
- 根据手机号搜索名片记录
-
- Args:
- mobile_number (str): 手机号码
-
- Returns:
- dict: 包含搜索结果的字典
- """
- try:
- if not mobile_number or not mobile_number.strip():
- return {
- 'code': 400,
- 'success': False,
- 'message': '手机号码不能为空',
- 'data': [],
- 'count': 0
- }
-
- # 查询包含该手机号的记录
- cards = BusinessCard.query.filter(
- BusinessCard.mobile.contains(mobile_number.strip())
- ).all()
-
- # 转换为字典格式
- cards_data = [card.to_dict() for card in cards]
-
- return {
- 'code': 200,
- 'success': True,
- 'message': f'找到 {len(cards_data)} 条匹配记录',
- 'data': cards_data,
- 'count': len(cards_data)
- }
-
- except Exception as e:
- error_msg = f"搜索名片记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': [],
- 'count': 0
- }
- # 重复记录管理函数
- def get_duplicate_records(status=None):
- """
- 获取重复记录列表
-
- Args:
- status (str, optional): 筛选特定状态的记录
-
- Returns:
- dict: 包含操作结果和重复记录列表
- """
- try:
- query = DuplicateBusinessCard.query
- if status:
- query = query.filter_by(processing_status=status)
-
- duplicate_records = query.order_by(DuplicateBusinessCard.created_at.desc()).all()
-
- records_data = []
- for record in duplicate_records:
- record_dict = record.to_dict()
- if record.main_card:
- record_dict['main_card'] = record.main_card.to_dict()
- records_data.append(record_dict)
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取重复记录列表成功',
- 'data': records_data,
- 'count': len(records_data)
- }
-
- except Exception as e:
- error_msg = f"获取重复记录列表失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': [],
- 'count': 0
- }
- def process_duplicate_record(duplicate_id, action, selected_duplicate_id=None, processed_by=None, notes=None):
- """
- 处理重复记录
-
- Args:
- duplicate_id (int): 重复记录ID
- action (str): 处理动作
- selected_duplicate_id (int, optional): 选择的重复记录ID
- processed_by (str, optional): 处理人
- notes (str, optional): 处理备注
-
- Returns:
- dict: 包含操作结果
- """
- try:
- duplicate_record = DuplicateBusinessCard.query.filter_by(main_card_id=duplicate_id).first()
- if not duplicate_record:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到main_card_id为{duplicate_id}的重复记录',
- 'data': None
- }
-
- if duplicate_record.processing_status != 'pending':
- return {
- 'code': 400,
- 'success': False,
- 'message': f'重复记录状态为{duplicate_record.processing_status},无法处理',
- 'data': None
- }
-
- main_card = duplicate_record.main_card
- if not main_card:
- return {
- 'code': 404,
- 'success': False,
- 'message': '未找到对应的主记录',
- 'data': None
- }
-
- result_data = None
-
- if action == 'merge_to_suspected':
- if not selected_duplicate_id:
- return {
- 'code': 400,
- 'success': False,
- 'message': '执行合并操作时必须提供selected_duplicate_id',
- 'data': None
- }
-
- target_card = BusinessCard.query.get(selected_duplicate_id)
- if not target_card:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{selected_duplicate_id}的目标记录',
- 'data': None
- }
-
- # 合并信息到目标记录
- target_card.name_en = main_card.name_en or target_card.name_en
- target_card.title_zh = main_card.title_zh or target_card.title_zh
- target_card.title_en = main_card.title_en or target_card.title_en
-
- if main_card.mobile:
- target_card.mobile = merge_mobile_numbers(target_card.mobile, main_card.mobile)
-
- target_card.phone = main_card.phone or target_card.phone
- target_card.email = main_card.email or target_card.email
- target_card.hotel_zh = main_card.hotel_zh or target_card.hotel_zh
- target_card.hotel_en = main_card.hotel_en or target_card.hotel_en
- target_card.address_zh = main_card.address_zh or target_card.address_zh
- target_card.address_en = main_card.address_en or target_card.address_en
- target_card.brand_group = main_card.brand_group or target_card.brand_group
- target_card.image_path = main_card.image_path
- target_card.updated_by = processed_by or 'system'
-
- new_data = {
- 'hotel_zh': main_card.hotel_zh,
- 'hotel_en': main_card.hotel_en,
- 'title_zh': main_card.title_zh,
- 'title_en': main_card.title_en
- }
- target_card.career_path = update_career_path(target_card, new_data, main_card.image_path)
-
- db.session.delete(duplicate_record)
- db.session.delete(main_card)
-
- result_data = target_card.to_dict()
-
- elif action == 'keep_main':
- result_data = main_card.to_dict()
-
- elif action == 'ignore':
- result_data = main_card.to_dict()
-
- if action != 'merge_to_suspected':
- duplicate_record.processing_status = 'processed'
- duplicate_record.processed_at = datetime.now()
- duplicate_record.processed_by = processed_by or 'system'
- duplicate_record.processing_notes = notes or f'执行操作: {action}'
-
- db.session.commit()
-
- return {
- 'code': 200,
- 'success': True,
- 'message': f'重复记录处理成功,操作: {action}',
- 'data': {
- 'duplicate_record': duplicate_record.to_dict() if action != 'merge_to_suspected' else None,
- 'result': result_data
- }
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"处理重复记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def get_duplicate_record_detail(duplicate_id):
- """
- 获取重复记录详情
-
- Args:
- duplicate_id (int): 重复记录ID
-
- Returns:
- dict: 包含重复记录详细信息
- """
- try:
- duplicate_record = DuplicateBusinessCard.query.filter_by(main_card_id=duplicate_id).first()
- if not duplicate_record:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到main_card_id为{duplicate_id}的重复记录',
- 'data': None
- }
-
- record_dict = duplicate_record.to_dict()
-
- if duplicate_record.main_card:
- record_dict['main_card'] = duplicate_record.main_card.to_dict()
- else:
- record_dict['main_card'] = None
-
- suspected_duplicates_details = []
- if duplicate_record.suspected_duplicates:
- for suspected_item in duplicate_record.suspected_duplicates:
- try:
- if isinstance(suspected_item, dict):
- card_id = suspected_item.get('id')
- else:
- card_id = suspected_item
-
- if card_id:
- card_result = get_business_card(card_id)
- if card_result['success']:
- suspected_duplicates_details.append(card_result['data'])
- except Exception as e:
- logging.warning(f"获取疑似重复记录详情失败: {str(e)}")
- continue
-
- record_dict['suspected_duplicates_details'] = suspected_duplicates_details
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取重复记录详情成功',
- 'data': record_dict
- }
-
- except Exception as e:
- error_msg = f"获取重复记录详情失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def fix_broken_duplicate_records():
- """
- 修复损坏的重复记录
-
- Returns:
- dict: 包含修复结果
- """
- try:
- broken_records = DuplicateBusinessCard.query.filter_by(main_card_id=None).all()
-
- fixed_count = 0
- for record in broken_records:
- db.session.delete(record)
- fixed_count += 1
-
- db.session.commit()
-
- return {
- 'code': 200,
- 'success': True,
- 'message': f'成功修复 {fixed_count} 条损坏的重复记录',
- 'data': {'fixed_count': fixed_count}
- }
-
- except Exception as e:
- db.session.rollback()
- error_msg = f"修复重复记录失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def get_parse_tasks(page=1, per_page=10, task_type=None, task_status=None):
- """
- 获取解析任务列表
-
- Args:
- page (int): 页码
- per_page (int): 每页记录数
- task_type (str): 任务类型过滤
- task_status (str): 任务状态过滤
-
- Returns:
- dict: 包含查询结果和分页信息
- """
- try:
- if page < 1 or per_page < 1 or per_page > 100:
- return {
- 'code': 400,
- 'success': False,
- 'message': '分页参数错误',
- 'data': None
- }
-
- query = ParseTaskRepository.query
-
- if task_type:
- query = query.filter_by(task_type=task_type)
- if task_status:
- query = query.filter_by(task_status=task_status)
-
- query = query.order_by(ParseTaskRepository.created_at.desc())
-
- pagination = query.paginate(page=page, per_page=per_page, error_out=False)
-
- tasks = [task.to_dict() for task in pagination.items]
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取解析任务列表成功',
- 'data': {
- 'tasks': tasks,
- 'pagination': {
- 'page': page,
- 'per_page': per_page,
- 'total': pagination.total,
- 'pages': pagination.pages,
- 'has_next': pagination.has_next,
- 'has_prev': pagination.has_prev
- }
- }
- }
-
- except Exception as e:
- error_msg = f"获取解析任务列表失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def get_parse_task_detail(task_name):
- """
- 获取解析任务详情
-
- Args:
- task_name (str): 任务名称
-
- Returns:
- dict: 包含查询结果
- """
- try:
- if not task_name:
- return {
- 'code': 400,
- 'success': False,
- 'message': '任务名称不能为空',
- 'data': None
- }
-
- task = ParseTaskRepository.query.filter_by(task_name=task_name).first()
-
- if not task:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到任务名称为 {task_name} 的记录',
- 'data': None
- }
-
- return {
- 'code': 200,
- 'success': True,
- 'message': f'成功获取任务 {task_name} 的详细信息',
- 'data': task.to_dict()
- }
-
- except Exception as e:
- error_msg = f"获取解析任务详情失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def create_talent_tag(tag_data):
- """
- 创建人才标签节点
-
- Args:
- tag_data: 包含标签信息的字典,包括:
- - name: 标签名称
- - category: 标签分类
- - description: 标签描述
- - status: 启用状态
-
- Returns:
- dict: 操作结果字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 验证必要参数存在
- if not tag_data or 'name' not in tag_data or not tag_data['name']:
- return {
- 'code': 400,
- 'success': False,
- 'message': '标签名称为必填项',
- 'data': None
- }
-
- # 准备节点属性
- tag_properties = {
- 'name': tag_data.get('name'),
- 'category': tag_data.get('category', '未分类'),
- 'describe': tag_data.get('description', ''), # 使用describe与现有系统保持一致
- 'status': tag_data.get('status', 'active'),
- 'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
-
- # 生成标签的英文名(可选)
- from app.core.graph.graph_operations import create_or_get_node
-
- # 如果提供了名称,尝试获取英文翻译
- if 'name' in tag_data and tag_data['name']:
- try:
- from app.api.data_interface.routes import translate_and_parse
- en_name = translate_and_parse(tag_data['name'])
- tag_properties['en_name'] = en_name[0] if en_name and isinstance(en_name, list) else ''
- except Exception as e:
- logging.warning(f"获取标签英文名失败: {str(e)}")
- tag_properties['en_name'] = ''
-
- # 创建节点
- node_id = create_or_get_node('DataLabel', **tag_properties)
-
- if node_id:
- return {
- 'code': 200,
- 'success': True,
- 'message': '人才标签创建成功',
- 'data': {
- 'id': node_id,
- **tag_properties
- }
- }
- else:
- return {
- 'code': 500,
- 'success': False,
- 'message': '人才标签创建失败',
- 'data': None
- }
-
- except Exception as e:
- logging.error(f"创建人才标签失败: {str(e)}", exc_info=True)
- return {
- 'code': 500,
- 'success': False,
- 'message': f'创建人才标签失败: {str(e)}',
- 'data': None
- }
- def get_talent_tag_list():
- """
- 从Neo4j图数据库获取人才标签列表
-
- Returns:
- dict: 包含操作结果和标签列表的字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 构建Cypher查询语句,获取分类为talent的标签
- query = """
- MATCH (n:DataLabel)
- WHERE n.category CONTAINS 'talent' OR n.category CONTAINS '人才'
- RETURN id(n) as id, n.name as name, n.en_name as en_name,
- n.category as category, n.describe as description,
- n.status as status, n.time as time
- ORDER BY n.time DESC
- """
-
- # 执行查询
- tags = []
- with neo4j_driver.get_session() as session:
- result = session.run(query)
-
- # 处理查询结果
- for record in result:
- tag = {
- 'id': record['id'],
- 'name': record['name'],
- 'en_name': record['en_name'],
- 'category': record['category'],
- 'description': record['description'],
- 'status': record['status'],
- 'time': record['time']
- }
- tags.append(tag)
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '获取人才标签列表成功',
- 'data': tags
- }
-
- except Exception as e:
- error_msg = f"获取人才标签列表失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def update_talent_tag(tag_id, tag_data):
- """
- 更新人才标签节点属性
-
- Args:
- tag_id: 标签节点ID
- tag_data: 包含更新信息的字典,可能包括:
- - name: 标签名称
- - category: 标签分类
- - description: 标签描述
- - status: 启用状态
-
- Returns:
- dict: 操作结果字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 准备要更新的属性
- update_properties = {}
-
- # 检查并添加需要更新的属性
- if 'name' in tag_data and tag_data['name']:
- update_properties['name'] = tag_data['name']
-
- # 如果名称更新了,尝试更新英文名称
- try:
- from app.api.data_interface.routes import translate_and_parse
- en_name = translate_and_parse(tag_data['name'])
- update_properties['en_name'] = en_name[0] if en_name and isinstance(en_name, list) else ''
- except Exception as e:
- logging.warning(f"更新标签英文名失败: {str(e)}")
-
- if 'category' in tag_data and tag_data['category']:
- update_properties['category'] = tag_data['category']
-
- if 'description' in tag_data:
- update_properties['describe'] = tag_data['description']
-
- if 'status' in tag_data:
- update_properties['status'] = tag_data['status']
-
- # 添加更新时间
- update_properties['time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 如果没有可更新的属性,返回错误
- if not update_properties:
- return {
- 'code': 400,
- 'success': False,
- 'message': '未提供任何可更新的属性',
- 'data': None
- }
-
- # 构建更新的Cypher查询
- set_clauses = []
- params = {'nodeId': tag_id}
-
- for key, value in update_properties.items():
- param_name = f"param_{key}"
- set_clauses.append(f"n.{key} = ${param_name}")
- params[param_name] = value
-
- set_clause = ", ".join(set_clauses)
-
- query = f"""
- MATCH (n:DataLabel)
- WHERE id(n) = $nodeId
- SET {set_clause}
- RETURN id(n) as id, n.name as name, n.en_name as en_name,
- n.category as category, n.describe as description,
- n.status as status, n.time as time
- """
-
- # 执行更新查询
- with neo4j_driver.get_session() as session:
- result = session.run(query, **params)
- record = result.single()
-
- if not record:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{tag_id}的标签',
- 'data': None
- }
-
- # 提取更新后的标签信息
- updated_tag = {
- 'id': record['id'],
- 'name': record['name'],
- 'en_name': record['en_name'],
- 'category': record['category'],
- 'description': record['description'],
- 'status': record['status'],
- 'time': record['time']
- }
-
- return {
- 'code': 200,
- 'success': True,
- 'message': '人才标签更新成功',
- 'data': updated_tag
- }
-
- except Exception as e:
- error_msg = f"更新人才标签失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def delete_talent_tag(tag_id):
- """
- 删除人才标签节点及其相关关系
-
- Args:
- tag_id: 标签节点ID
-
- Returns:
- dict: 操作结果字典
- """
- try:
- from app.services.neo4j_driver import neo4j_driver
-
- # 首先获取要删除的标签信息,以便在成功后返回
- get_query = """
- MATCH (n:DataLabel)
- WHERE id(n) = $nodeId
- RETURN id(n) as id, n.name as name, n.en_name as en_name,
- n.category as category, n.describe as description,
- n.status as status, n.time as time
- """
-
- # 构建删除节点和关系的Cypher查询
- delete_query = """
- MATCH (n:DataLabel)
- WHERE id(n) = $nodeId
- OPTIONAL MATCH (n)-[r]-()
- DELETE r, n
- RETURN count(n) AS deleted
- """
-
- # 执行查询
- tag_info = None
- with neo4j_driver.get_session() as session:
- # 先获取标签信息
- result = session.run(get_query, nodeId=tag_id)
- record = result.single()
-
- if not record:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未找到ID为{tag_id}的标签',
- 'data': None
- }
-
- # 保存标签信息用于返回
- tag_info = {
- 'id': record['id'],
- 'name': record['name'],
- 'en_name': record['en_name'],
- 'category': record['category'],
- 'description': record['description'],
- 'status': record['status'],
- 'time': record['time']
- }
-
- # 执行删除操作
- delete_result = session.run(delete_query, nodeId=tag_id)
- deleted = delete_result.single()['deleted']
-
- if deleted > 0:
- return {
- 'code': 200,
- 'success': True,
- 'message': '人才标签删除成功',
- 'data': tag_info
- }
- else:
- return {
- 'code': 404,
- 'success': False,
- 'message': f'未能删除ID为{tag_id}的标签',
- 'data': None
- }
-
- except Exception as e:
- error_msg = f"删除人才标签失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def query_neo4j_graph(query_requirement):
- """
- 查询Neo4j图数据库,通过Deepseek API生成Cypher脚本
-
- Args:
- query_requirement (str): 查询需求描述
-
- Returns:
- dict: 包含查询结果的字典,JSON格式
- """
- try:
- # 导入必要的模块
- from app.services.neo4j_driver import neo4j_driver
- import requests
- import json
-
- # Deepseek API配置
- api_key = DEEPSEEK_API_KEY
- api_url = DEEPSEEK_API_URL
-
- # 步骤1: 从Neo4j获取所有标签列表
- logging.info("第一步:从Neo4j获取人才类别的标签列表")
- all_labels_query = """
- MATCH (dl:DataLabel)
- WHERE dl.category CONTAINS '人才' OR dl.category CONTAINS 'talent'
- RETURN dl.name as name
- """
-
- all_labels = []
- with neo4j_driver.get_session() as session:
- result = session.run(all_labels_query)
- for record in result:
- all_labels.append(record['name'])
-
- logging.info(f"获取到{len(all_labels)}个人才标签: {all_labels}")
-
- # 步骤2: 使用Deepseek判断查询需求中的关键信息与标签的对应关系
- logging.info("第二步:调用Deepseek API匹配查询需求与标签")
-
- # 构建所有标签的JSON字符串
- labels_json = json.dumps(all_labels, ensure_ascii=False)
-
- # 构建匹配标签的提示语
- matching_prompt = f"""
- 请分析以下查询需求,并从标签列表中找出与查询需求相关的标签。
-
- ## 查询需求
- {query_requirement}
-
- ## 可用标签列表
- {labels_json}
-
- ## 输出要求
- 1. 请以JSON数组格式返回匹配的标签名称列表,格式如: ["标签1", "标签2", "标签3"]
- 2. 只返回标签名称数组,不要包含任何解释或其他文本
- 3. 如果没有找到匹配的标签,请返回空数组 []
- """
-
- # 调用Deepseek API匹配标签
- headers = {
- "Authorization": f"Bearer {api_key}",
- "Content-Type": "application/json"
- }
-
- payload = {
- "model": "deepseek-chat",
- "messages": [
- {"role": "system", "content": "你是一个专业的文本分析和匹配专家。"},
- {"role": "user", "content": matching_prompt}
- ],
- "temperature": 0.1,
- "response_format": {"type": "json_object"}
- }
-
- logging.info("发送请求到Deepseek API匹配标签:"+matching_prompt)
- response = requests.post(api_url, headers=headers, json=payload, timeout=30)
- response.raise_for_status()
-
- # 解析API响应
- result = response.json()
- matching_content = result.get("choices", [{}])[0].get("message", {}).get("content", "[]")
-
- # 提取JSON数组
- try:
- # 尝试直接解析返回结果,预期格式为 ["新开酒店经验", "五星级酒店", "总经理"]
- logging.info(f"Deepseek返回的匹配内容: {matching_content}")
-
- # 如果返回的是JSON字符串,先去除可能的前后缀文本
- if isinstance(matching_content, str):
- # 查找JSON数组的开始和结束位置
- start_idx = matching_content.find('[')
- end_idx = matching_content.rfind(']') + 1
-
- if start_idx >= 0 and end_idx > start_idx:
- json_str = matching_content[start_idx:end_idx]
- matched_labels = json.loads(json_str)
- else:
- matched_labels = []
- else:
- matched_labels = []
-
- # 确保结果是字符串列表
- if matched_labels and all(isinstance(item, str) for item in matched_labels):
- logging.info(f"成功解析到标签列表: {matched_labels}")
- else:
- logging.warning("解析结果不是预期的字符串列表格式,将使用空列表")
- matched_labels = []
- except json.JSONDecodeError as e:
- logging.error(f"JSON解析错误: {str(e)}")
- matched_labels = []
- except Exception as e:
- logging.error(f"解析匹配标签时出错: {str(e)}")
- matched_labels = []
-
- logging.info(f"匹配到的标签: {matched_labels}")
-
- # 如果没有匹配到标签,返回空结果
- if not matched_labels:
- return {
- 'code': 200,
- 'success': True,
- 'message': '未找到与查询需求匹配的标签',
- 'query': '',
- 'data': []
- }
-
- # 步骤3: 构建Cypher生成提示文本
- logging.info("第三步:构建提示文本生成Cypher查询语句")
-
- # 将匹配的标签转换为字符串
- matched_labels_str = ", ".join([f"'{label}'" for label in matched_labels])
-
- # 构建生成Cypher的提示语
- cypher_prompt = f"""
- 请根据以下Neo4j图数据库结构和已匹配的标签,生成一个Cypher查询脚本。
-
- ## 图数据库结构
-
- ### 节点
- 1. talent - 人才节点
- 属性: pg_id(PostgreSQL数据库ID), name_zh(中文姓名), name_en(英文姓名),
- mobile(手机号码), email(电子邮箱), updated_at(更新时间)
-
- 2. DataLabel - 人才标签节点
-
- ### 关系
- BELONGS_TO - 从属关系
- (talent)-[BELONGS_TO]->(DataLabel) - 人才属于某标签
-
- ## 匹配的标签列表
- [{matched_labels_str}]
-
- ## 查询需求
- {query_requirement}
-
- ## 输出要求
- 1. 只输出有效的Cypher查询语句,不要包含任何解释或注释
- 2. 确保return语句中包含talent节点属性
- 3. 尽量利用图数据库的特性来优化查询效率
- 4. 使用WITH子句和COLLECT函数收集标签,确保查询到同时拥有所有标签的人才
-
- 注意:请直接返回Cypher查询语句,无需任何其他文本。
-
- 以下是一个示例:
- 假设匹配的标签是 ['五星级酒店', '新开酒店经验', '总经理']
-
- 生成的Cypher查询语句应该是:
- MATCH (t:talent)-[:BELONGS_TO]->(dl:DataLabel)
- WHERE dl.name IN ['五星级酒店', '新开酒店经验', '总经理']
- WITH t, COLLECT(DISTINCT dl.name) AS labels
- WHERE size(labels) = 3
- RETURN t.pg_id as pg_id, t.name_zh as name_zh, t.name_en as name_en, t.mobile as mobile, t.email as email, t.updated_at as updated_at
- """
-
- # 调用Deepseek API生成Cypher脚本
- payload = {
- "model": "deepseek-chat",
- "messages": [
- {"role": "system", "content": "你是一个专业的Neo4j Cypher查询专家。"},
- {"role": "user", "content": cypher_prompt}
- ],
- "temperature": 0.1
- }
-
- logging.info("发送请求到Deepseek API生成Cypher脚本")
- response = requests.post(api_url, headers=headers, json=payload, timeout=30)
- response.raise_for_status()
-
- # 解析API响应
- result = response.json()
- cypher_script = result.get("choices", [{}])[0].get("message", {}).get("content", "")
-
- # 清理Cypher脚本,移除不必要的markdown格式或注释
- cypher_script = cypher_script.strip()
- if cypher_script.startswith("```cypher"):
- cypher_script = cypher_script[9:]
- elif cypher_script.startswith("```"):
- cypher_script = cypher_script[3:]
- if cypher_script.endswith("```"):
- cypher_script = cypher_script[:-3]
- cypher_script = cypher_script.strip()
-
- logging.info(f"生成的Cypher脚本: {cypher_script}")
-
- # 步骤4: 执行Cypher脚本
- logging.info("第四步:执行Cypher脚本并返回结果")
- with neo4j_driver.get_session() as session:
- result = session.run(cypher_script)
- records = [record.data() for record in result]
-
- # 构建查询结果
- response_data = {
- 'code': 200,
- 'success': True,
- 'message': '查询成功执行',
- 'query': cypher_script,
- 'matched_labels': matched_labels,
- 'data': records
- }
-
- return response_data
-
- except requests.exceptions.HTTPError as e:
- error_msg = f"调用Deepseek API失败: {str(e)}"
- logging.error(error_msg)
- if hasattr(e, 'response') and e.response:
- logging.error(f"错误状态码: {e.response.status_code}")
- logging.error(f"错误内容: {e.response.text}")
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- except Exception as e:
- error_msg = f"查询Neo4j图数据库失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def talent_get_tags(talent_id):
- """
- 根据talent ID获取人才节点关联的标签
-
- Args:
- talent_id (int): 人才节点pg_id
-
- Returns:
- dict: 包含人才ID和关联标签的字典,JSON格式
- """
- try:
- # 导入必要的模块
- from app.services.neo4j_driver import neo4j_driver
-
- # 准备查询返回数据
- response_data = {
- 'code': 200,
- 'success': True,
- 'message': '获取人才标签成功',
- 'data': []
- }
-
- # 构建Cypher查询语句,获取人才节点关联的标签
- cypher_query = """
- MATCH (t:talent)-[r:BELONGS_TO]->(tag:DataLabel)
- WHERE t.pg_id = $talent_id
- RETURN t.pg_id as talent_id, tag.name as tag_name
- """
-
- # 执行查询
- with neo4j_driver.get_session() as session:
- result = session.run(cypher_query, talent_id=int(talent_id))
- records = list(result)
-
- # 如果没有查询到标签,返回空数组
- if not records:
- response_data['message'] = f'人才pg_id {talent_id} 没有关联的标签'
- return response_data
-
- # 处理查询结果
- for record in records:
- talent_tag = {
- 'talent': record['talent_id'],
- 'tag': record['tag_name']
- }
- response_data['data'].append(talent_tag)
-
- return response_data
-
- except Exception as e:
- error_msg = f"获取人才标签失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': []
- }
- def talent_update_tags(data):
- """
- 根据传入的JSON数据为人才节点创建与标签的BELONGS_TO关系
-
- Args:
- data (list): 包含talent和tag字段的对象列表
- 例如: [
- {"talent": 12345, "tag": "市场营销"},
- {"talent": 12345, "tag": "酒店管理"}
- ]
-
- Returns:
- dict: 操作结果和状态信息
- """
- try:
- # 导入必要的模块
- from app.services.neo4j_driver import neo4j_driver
-
- # 验证输入参数
- if not isinstance(data, list):
- return {
- 'code': 400,
- 'success': False,
- 'message': '参数格式错误,需要JSON数组',
- 'data': None
- }
-
- if len(data) == 0:
- return {
- 'code': 400,
- 'success': False,
- 'message': '数据列表为空',
- 'data': None
- }
-
- # 获取当前时间
- current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 成功和失败计数
- success_count = 0
- failed_items = []
-
- # 按talent分组处理数据
- talent_tags = {}
- for item in data:
- # 验证每个项目的格式
- if not isinstance(item, dict) or 'talent' not in item or 'tag' not in item:
- failed_items.append(item)
- continue
-
- talent_id = item.get('talent')
- tag_name = item.get('tag')
-
- # 验证talent_id和tag_name的值
- if not talent_id or not tag_name or not isinstance(tag_name, str):
- failed_items.append(item)
- continue
-
- # 按talent_id分组
- if talent_id not in talent_tags:
- talent_tags[talent_id] = []
-
- talent_tags[talent_id].append(tag_name)
-
- with neo4j_driver.get_session() as session:
- # 处理每个talent及其标签
- for talent_id, tags in talent_tags.items():
- # 首先验证talent节点是否存在
- check_talent_query = """
- MATCH (t:talent)
- WHERE t.pg_id = $talent_id
- RETURN t
- """
- talent_result = session.run(check_talent_query, talent_id=int(talent_id))
- if not talent_result.single():
- # 该talent不存在,记录失败项并继续下一个talent
- for tag in tags:
- failed_items.append({'talent_pg_id': talent_id, 'tag': tag})
- continue
-
- # 首先清除所有现有的BELONGS_TO关系
- clear_relations_query = """
- MATCH (t:talent)-[r:BELONGS_TO]->(:DataLabel)
- WHERE t.pg_id = $talent_id
- DELETE r
- RETURN count(r) as deleted_count
- """
- clear_result = session.run(clear_relations_query, talent_id=int(talent_id))
- deleted_count = clear_result.single()['deleted_count']
- logging.info(f"已删除talent_id={talent_id}的{deleted_count}个已有标签关系")
-
- # 处理每个标签
- for tag_name in tags:
- try:
- # 1. 查找或创建标签节点
- # 先查找是否存在该标签
- find_tag_query = """
- MATCH (tag:DataLabel)
- WHERE tag.name = $tag_name
- RETURN id(tag) as tag_id
- """
- tag_result = session.run(find_tag_query, tag_name=tag_name)
- tag_record = tag_result.single()
-
- if tag_record:
- tag_id = tag_record['tag_id']
- else:
- # 创建新标签
- create_tag_query = """
- CREATE (tag:DataLabel {name: $name, category: $category, updated_at: $updated_at})
- RETURN id(tag) as tag_id
- """
- tag_result = session.run(
- create_tag_query,
- name=tag_name,
- category='talent',
- updated_at=current_time
- )
- tag_record = tag_result.single()
- tag_id = tag_record['tag_id']
-
- # 2. 创建人才与标签的BELONGS_TO关系
- create_relation_query = """
- MATCH (t:talent), (tag:DataLabel)
- WHERE t.pg_id = $talent_id AND tag.name = $tag_name
- CREATE (t)-[r:BELONGS_TO]->(tag)
- SET r.created_at = $current_time
- RETURN r
- """
-
- relation_result = session.run(
- create_relation_query,
- talent_id=int(talent_id),
- tag_name=tag_name,
- current_time=current_time
- )
-
- if relation_result.single():
- success_count += 1
- else:
- failed_items.append({'talent_pg_id': talent_id, 'tag': tag_name})
-
- except Exception as tag_error:
- logging.error(f"为标签 {tag_name} 创建关系时出错: {str(tag_error)}")
- failed_items.append({'talent_pg_id': talent_id, 'tag': tag_name})
-
- # 返回结果
- total_items = len(data)
- if success_count == total_items:
- return {
- 'code': 200,
- 'success': True,
- 'message': f'成功创建或更新了 {success_count} 个标签关系',
- 'data': {
- 'success_count': success_count,
- 'total_count': total_items,
- 'failed_items': []
- }
- }
- elif success_count > 0:
- return {
- 'code': 206, # Partial Content
- 'success': True,
- 'message': f'部分成功: 创建或更新了 {success_count}/{total_items} 个标签关系',
- 'data': {
- 'success_count': success_count,
- 'total_count': total_items,
- 'failed_items': failed_items
- }
- }
- else:
- return {
- 'code': 500,
- 'success': False,
- 'message': '无法创建任何标签关系',
- 'data': {
- 'success_count': 0,
- 'total_count': total_items,
- 'failed_items': failed_items
- }
- }
-
- except Exception as e:
- error_msg = f"更新人才标签关系失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
-
- return {
- 'code': 500,
- 'success': False,
- 'message': error_msg,
- 'data': None
- }
- def parse_text_with_qwen25VLplus(image_data):
- """
- 使用阿里云的 Qwen VL Max 模型解析图像中的名片信息
-
- Args:
- image_data (bytes): 图像的二进制数据
-
- Returns:
- dict: 解析的名片信息
- """
- try:
- # 将图片数据转为 base64 编码
- base64_image = base64.b64encode(image_data).decode('utf-8')
-
- # 初始化 OpenAI 客户端,配置为阿里云 API
- client = OpenAI(
- api_key=QWEN_API_KEY,
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- )
-
- # 构建优化后的提示语
- prompt = """你是企业名片的信息提取专家。请仔细分析提供的图片,精确提取名片信息。
- ## 提取要求
- - 区分中英文内容,分别提取
- - 保持提取信息的原始格式(如大小写、标点)
- - 对于无法识别或名片中不存在的信息,返回空字符串
- - 名片中没有的信息,请不要猜测
- ## 需提取的字段
- 1. 中文姓名 (name_zh)
- 2. 英文姓名 (name_en)
- 3. 中文职位/头衔 (title_zh)
- 4. 英文职位/头衔 (title_en)
- 5. 中文酒店/公司名称 (hotel_zh)
- 6. 英文酒店/公司名称 (hotel_en)
- 7. 手机号码 (mobile) - 如有多个手机号码,使用逗号分隔,最多提取3个
- 8. 固定电话 (phone) - 如有多个,使用逗号分隔
- 9. 电子邮箱 (email)
- 10. 中文地址 (address_zh)
- 11. 英文地址 (address_en)
- 12. 中文邮政编码 (postal_code_zh)
- 13. 英文邮政编码 (postal_code_en)
- 14. 生日 (birthday) - 格式为YYYY-MM-DD,如1990-01-01
- 15. 年龄 (age) - 数字格式,如30
- 16. 籍贯 (native_place) - 出生地或户籍所在地信息
- 17. 居住地 (residence) - 个人居住地址信息
- 18. 品牌组合 (brand_group) - 如有多个品牌,使用逗号分隔
- 19. 职业轨迹 (career_path) - 如能从名片中推断,以JSON数组格式返回,包含当前日期,公司名称和职位。自动生成当前日期。
- 20. 隶属关系 (affiliation) - 如能从名片中推断,以JSON数组格式返回,包含公司名称和隶属集团名称
- ## 输出格式
- 请以严格的JSON格式返回结果,不要添加任何额外解释文字。JSON格式如下:
- ```json
- {
- "name_zh": "",
- "name_en": "",
- "title_zh": "",
- "title_en": "",
- "hotel_zh": "",
- "hotel_en": "",
- "mobile": "",
- "phone": "",
- "email": "",
- "address_zh": "",
- "address_en": "",
- "postal_code_zh": "",
- "postal_code_en": "",
- "birthday": "",
- "age": 0,
- "native_place": "",
- "residence": "",
- "brand_group": "",
- "career_path": [],
- "affiliation": []
- }
- ```"""
-
- # 调用 Qwen VL Max API
- logging.info("发送请求到 Qwen VL Max 模型")
- completion = client.chat.completions.create(
- # model="qwen-vl-plus",
- model="qwen-vl-max-latest",
- messages=[
- {
- "role": "user",
- "content": [
- {"type": "text", "text": prompt},
- {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
- ]
- }
- ],
- temperature=0.1, # 降低温度增加精确性
- response_format={"type": "json_object"} # 要求输出JSON格式
- )
-
- # 解析响应
- response_content = completion.choices[0].message.content
- logging.info(f"成功从 Qwen 模型获取响应: {response_content}")
-
- # 尝试从响应中提取 JSON
- try:
- extracted_data = json.loads(response_content)
- logging.info("成功解析 Qwen 响应中的 JSON")
- except json.JSONDecodeError:
- logging.warning("无法解析 JSON,尝试从文本中提取信息")
- # 这里可以调用其他的解析函数,但为了简化,先返回错误
- raise Exception("无法解析 Qwen 返回的 JSON 格式")
-
- # 确保所有必要字段存在
- required_fields = [
- 'name_zh', 'name_en', 'title_zh', 'title_en',
- 'hotel_zh', 'hotel_en', 'mobile', 'phone',
- 'email', 'address_zh', 'address_en',
- 'postal_code_zh', 'postal_code_en', 'birthday', 'age', 'native_place', 'residence',
- 'brand_group', 'career_path'
- ]
-
- for field in required_fields:
- if field not in extracted_data:
- if field == 'career_path':
- extracted_data[field] = []
- elif field == 'age':
- extracted_data[field] = 0
- else:
- extracted_data[field] = ""
-
- # 为career_path增加一条记录
- if extracted_data.get('hotel_zh') or extracted_data.get('hotel_en') or extracted_data.get('title_zh') or extracted_data.get('title_en'):
- career_entry = {
- 'date': datetime.now().strftime('%Y-%m-%d'),
- 'hotel_en': extracted_data.get('hotel_en', ''),
- 'hotel_zh': extracted_data.get('hotel_zh', ''),
- 'image_path': '',
- 'source': 'business_card_creation',
- 'title_en': extracted_data.get('title_en', ''),
- 'title_zh': extracted_data.get('title_zh', '')
- }
-
- # 直接清空原有的career_path内容,用career_entry写入
- extracted_data['career_path'] = [career_entry]
- logging.info(f"为解析结果设置了career_path记录: {career_entry}")
-
- return extracted_data
-
- except Exception as e:
- error_msg = f"Qwen VL Max 模型解析失败: {str(e)}"
- logging.error(error_msg, exc_info=True)
- raise Exception(error_msg)
|