Sfoglia il codice sorgente

修改人才档案和重复记录的处理逻辑,新增酒店职位数据和酒店集团品牌数据同步到Neo4j图数据库程序
创建人才、酒店、职位的知识图谱

maxiaolong 1 settimana fa
parent
commit
dc289f34ef

+ 6 - 2
app/api/data_parse/routes.py

@@ -60,6 +60,8 @@ import base64
 import os
 import urllib.parse
 from minio import Minio
+from app.models.parse_models import ParseTaskRepository
+from app.core.data_parse.parse_system import db
 
 # Define logger
 logger = logging.getLogger(__name__)
@@ -1741,7 +1743,8 @@ def execute_parse_task():
         # 更新parse_task_repository数据库表中的task_source
         if task_id:
             try:
-                from app.core.data_parse.parse_system import ParseTaskRepository, db
+                from app.models.parse_models import ParseTaskRepository
+                from app.core.data_parse.parse_system import db
                 task_record = ParseTaskRepository.query.get(task_id)
                 if task_record:
                     task_record.task_source = task_source
@@ -1794,7 +1797,8 @@ def execute_parse_task():
                 }), 400
             
             # 记录处理结果日志并更新任务状态
-            from app.core.data_parse.parse_system import db, ParseTaskRepository
+            from app.models.parse_models import ParseTaskRepository
+            from app.core.data_parse.parse_system import db
             task_obj = None
             
             if task_id:

+ 2 - 1
app/core/data_parse/parse_card.py

@@ -554,7 +554,8 @@ def batch_process_business_card_images(minio_paths_json, task_id=None, task_type
             }
         
         # 导入数据库模型
-        from app.core.data_parse.parse_system import ParseTaskRepository, db
+        from app.models.parse_models import ParseTaskRepository
+        from app import db
         
         # 查询对应的任务记录
         task_record = ParseTaskRepository.query.get(task_id)

+ 2 - 1
app/core/data_parse/parse_menduner.py

@@ -397,7 +397,8 @@ def batch_process_menduner_data(data_list: List[Dict[str, Any]], task_id=None, t
             }
         
         # 导入数据库模型
-        from app.core.data_parse.parse_system import ParseTaskRepository, db
+        from app.models.parse_models import ParseTaskRepository
+        from app import db
         
         # 查询对应的任务记录
         task_record = ParseTaskRepository.query.get(task_id)

+ 30 - 24
app/core/data_parse/parse_neo4j_process.py

@@ -39,7 +39,6 @@ project_root = os.path.dirname(os.path.dirname(os.path.dirname(current_dir)))
 sys.path.insert(0, project_root)
 
 try:
-    from app.config.config import config, current_env
     from app.services.neo4j_driver import Neo4jDriver
     from sqlalchemy import create_engine, text
     from sqlalchemy.exc import SQLAlchemyError
@@ -75,14 +74,15 @@ class HotelPositionNeo4jProcessor:
     def __init__(self):
         """初始化处理器"""
         self.logger = logging.getLogger(__name__)
-        self.config = config[current_env]()
+        # 直接使用数据库连接信息,不依赖Flask配置
+        self.pg_connection_string = 'postgresql://postgres:postgres@localhost:5432/dataops'
         self.pg_engine = None
         self.neo4j_driver = None
         
     def connect_postgresql(self):
         """连接PostgreSQL数据库"""
         try:
-            self.pg_engine = create_engine(self.config.SQLALCHEMY_DATABASE_URI)
+            self.pg_engine = create_engine(self.pg_connection_string)
             # 测试连接
             with self.pg_engine.connect() as conn:
                 conn.execute(text("SELECT 1"))
@@ -98,7 +98,13 @@ class HotelPositionNeo4jProcessor:
     def connect_neo4j(self):
         """连接Neo4j数据库"""
         try:
-            self.neo4j_driver = Neo4jDriver()
+            # 直接使用Neo4j连接信息
+            self.neo4j_driver = Neo4jDriver(
+                uri="bolt://192.168.3.143:7687",
+                user="neo4j",
+                password="cituneo4j",
+                encrypted=False
+            )
             if self.neo4j_driver.verify_connectivity():
                 self.logger.info("Neo4j数据库连接成功")
                 return True
@@ -194,9 +200,9 @@ class HotelPositionNeo4jProcessor:
             return []
     
     def check_neo4j_node_exists(self, session, name: str) -> bool:
-        """检查Neo4j中是否已存在相同name的DataLabel节点"""
+        """检查Neo4j中是否已存在相同name_zh的DataLabel节点"""
         try:
-            query = "MATCH (n:DataLabel {name: $name}) RETURN n LIMIT 1"
+            query = "MATCH (n:DataLabel {name_zh: $name}) RETURN n LIMIT 1"
             result = session.run(query, name=name)
             return result.single() is not None
         except Exception as e:
@@ -241,8 +247,8 @@ class HotelPositionNeo4jProcessor:
         """创建两个DataLabel节点之间的关系"""
         try:
             query = """
-                MATCH (from:DataLabel {name: $from_name})
-                MATCH (to:DataLabel {name: $to_name})
+                MATCH (from:DataLabel {name_zh: $from_name})
+                MATCH (to:DataLabel {name_zh: $to_name})
                 MERGE (from)-[r:$relationship_type]->(to)
                 RETURN r
             """
@@ -250,15 +256,15 @@ class HotelPositionNeo4jProcessor:
             # 使用参数化查询避免Cypher注入
             if relationship_type == "BELONGS_TO":
                 query = """
-                    MATCH (from:DataLabel {name: $from_name})
-                    MATCH (to:DataLabel {name: $to_name})
+                    MATCH (from:DataLabel {name_zh: $from_name})
+                    MATCH (to:DataLabel {name_zh: $to_name})
                     MERGE (from)-[r:BELONGS_TO]->(to)
                     RETURN r
                 """
             elif relationship_type == "HAS_LEVEL":
                 query = """
-                    MATCH (from:DataLabel {name: $from_name})
-                    MATCH (to:DataLabel {name: $to_name})
+                    MATCH (from:DataLabel {name_zh: $from_name})
+                    MATCH (to:DataLabel {name_zh: $to_name})
                     MERGE (from)-[r:HAS_LEVEL]->(to)
                     RETURN r
                 """
@@ -308,8 +314,8 @@ class HotelPositionNeo4jProcessor:
                     # 处理部门节点
                     if not self.check_neo4j_node_exists(session, department_zh):
                         dept_data = {
-                            'name': department_zh,
-                            'en_name': position['department_en']
+                            'name_zh': department_zh,
+                            'name_en': position['department_en']
                         }
                         if self.create_neo4j_node(session, dept_data, 'department'):
                             self.logger.info(f"成功创建部门节点: {department_zh}")
@@ -323,8 +329,8 @@ class HotelPositionNeo4jProcessor:
                     # 处理职位节点
                     if not self.check_neo4j_node_exists(session, position_zh):
                         pos_data = {
-                            'name': position_zh,
-                            'en_name': position['position_en']
+                            'name_zh': position_zh,
+                            'name_en': position['position_en']
                         }
                         if self.create_neo4j_node(session, pos_data, 'position'):
                             self.logger.info(f"成功创建职位节点: {position_zh}")
@@ -338,8 +344,8 @@ class HotelPositionNeo4jProcessor:
                     # 处理级别节点
                     if not self.check_neo4j_node_exists(session, level_zh):
                         level_data = {
-                            'name': level_zh,
-                            'en_name': position['level_en']
+                            'name_zh': level_zh,
+                            'name_en': position['level_en']
                         }
                         if self.create_neo4j_node(session, level_data, 'position_level'):
                             self.logger.info(f"成功创建级别节点: {level_zh}")
@@ -431,8 +437,8 @@ class HotelPositionNeo4jProcessor:
                     # 处理集团节点
                     if not self.check_neo4j_node_exists(session, group_name_zh):
                         group_data = {
-                            'name': group_name_zh,
-                            'en_name': brand['group_name_en']
+                            'name_zh': group_name_zh,
+                            'name_en': brand['group_name_en']
                         }
                         if self.create_neo4j_node(session, group_data, 'group'):
                             self.logger.info(f"成功创建集团节点: {group_name_zh}")
@@ -446,8 +452,8 @@ class HotelPositionNeo4jProcessor:
                     # 处理品牌节点
                     if not self.check_neo4j_node_exists(session, brand_name_zh):
                         brand_data = {
-                            'name': brand_name_zh,
-                            'en_name': brand['brand_name_en']
+                            'name_zh': brand_name_zh,
+                            'name_en': brand['brand_name_en']
                         }
                         if self.create_neo4j_node(session, brand_data, 'brand'):
                             self.logger.info(f"成功创建品牌节点: {brand_name_zh}")
@@ -461,8 +467,8 @@ class HotelPositionNeo4jProcessor:
                     # 处理品牌级别节点
                     if not self.check_neo4j_node_exists(session, positioning_level_zh):
                         level_data = {
-                            'name': positioning_level_zh,
-                            'en_name': brand['positioning_level_en']
+                            'name_zh': positioning_level_zh,
+                            'name_en': brand['positioning_level_en']
                         }
                         if self.create_neo4j_node(session, level_data, 'brand_level'):
                             self.logger.info(f"成功创建品牌级别节点: {positioning_level_zh}")

+ 2 - 1
app/core/data_parse/parse_pic.py

@@ -882,7 +882,8 @@ def batch_process_images(image_paths: List[Any], process_type: str = 'table', ta
             }
         
         # 导入数据库模型
-        from app.core.data_parse.parse_system import ParseTaskRepository, db
+        from app.models.parse_models import ParseTaskRepository
+        from app import db
         
         # 查询对应的任务记录
         task_record = ParseTaskRepository.query.get(task_id)

+ 2 - 1
app/core/data_parse/parse_resume.py

@@ -789,7 +789,8 @@ def batch_parse_resumes(file_paths: List[str], task_id=None, task_type=None) ->
             }
         
         # 导入数据库模型
-        from app.core.data_parse.parse_system import ParseTaskRepository, db
+        from app.models.parse_models import ParseTaskRepository
+        from app import db
         
         # 查询对应的任务记录
         task_record = ParseTaskRepository.query.get(task_id)

+ 171 - 160
app/core/data_parse/parse_system.py

@@ -199,38 +199,7 @@ class DuplicateBusinessCard(db.Model):
         }
 
 
-# 解析任务存储库数据模型
-class ParseTaskRepository(db.Model):
-    __tablename__ = 'parse_task_repository'
-    
-    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
-    task_name = db.Column(db.String(100), nullable=False)
-    task_status = db.Column(db.String(10), nullable=False)
-    task_type = db.Column(db.String(50), nullable=False)
-    task_source = db.Column(db.JSON, nullable=False)
-    collection_count = db.Column(db.Integer, nullable=False, default=0)
-    parse_count = db.Column(db.Integer, nullable=False, default=0)
-    parse_result = db.Column(db.JSON)
-    created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)
-    created_by = db.Column(db.String(50), nullable=False)
-    updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now, nullable=False)
-    updated_by = db.Column(db.String(50), nullable=False)
-    
-    def to_dict(self):
-        return {
-            'id': self.id,
-            'task_name': self.task_name,
-            'task_status': self.task_status,
-            'task_type': self.task_type,
-            'task_source': self.task_source,
-            'collection_count': self.collection_count,
-            'parse_count': self.parse_count,
-            'parse_result': self.parse_result,
-            'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
-            'created_by': self.created_by,
-            'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
-            'updated_by': self.updated_by
-        }
+
 
 
 # 配置变量
@@ -1690,15 +1659,15 @@ def query_neo4j_graph(query_requirement):
         logging.info("第一步:从Neo4j获取人才类别的标签列表")
         all_labels_query = """
         MATCH (dl:DataLabel)
-        WHERE dl.category CONTAINS '人才' OR dl.category CONTAINS 'talent'
-        RETURN dl.name as name
+        WHERE dl.category CONTAINS '人才地图' OR dl.category CONTAINS 'talentmap'
+        RETURN dl.name_zh as name_zh, dl.name_en as name_en
         """
         
         all_labels = []
         with neo4j_driver.get_session() as session:
             result = session.run(all_labels_query)
             for record in result:
-                all_labels.append(record['name'])
+                all_labels.append(record['name_zh'])
         
         logging.info(f"获取到{len(all_labels)}个人才标签: {all_labels}")
         
@@ -1710,27 +1679,43 @@ def query_neo4j_graph(query_requirement):
         
         # 构建匹配标签的提示语
         matching_prompt = f"""
-        请分析以下查询需求,并从标签列表中找出与查询需求相关的标签。
-        
-        ## 查询需求
+        请从上传的查询需求文本中提取以下结构化信息。其中datalabel字段从可用标签列表里进行匹配,匹配结果填写可用标签列表里的标签名称。需要严格按照JSON格式输出:   
+        {{
+         "basic_info": {{
+            "中文姓名": "",
+            "英文姓名": "",
+            "手机号": "",
+            "固定电话": "",
+            "电子邮箱": "",
+            "生日": "",
+            "年龄": "",
+            "居住地": "",
+            "籍贯": ""
+        }},
+         "datalabel": [
+            "标签1","标签2","标签3"
+        ]
+        }}
+        ## 查询需求文本
         {query_requirement}
         
         ## 可用标签列表
         {labels_json}
         
-        ## 输出要求
-        1. 请以JSON数组格式返回匹配的标签名称列表,格式如: ["标签1", "标签2", "标签3"]
-        2. 只返回标签名称数组,不要包含任何解释或其他文本
-        3. 如果没有找到匹配的标签,请返回空数组 []
+        输出要求:
+        1. 中文名称优先,有英文名称也要提取保留
+        2. 年龄字段只需填写数字。
+        3. 标签没有被匹配到,datalabel字段可以为空数组
+        4. 只需返回JSON字符串,不要返回其他信息
         """
         
         # 调用阿里千问API匹配标签
         logging.info("发送请求到阿里千问API匹配标签:"+matching_prompt)
         
         completion = client.chat.completions.create(
-            model=model_name,
+            model="qwen-long-latest",  # 使用qwen-long-latest模型
             messages=[
-                {"role": "system", "content": "你是一个专业的文本分析和匹配专家。"},
+                {"role": "system", "content": "你是一个专业的文本信息提取专家。"},
                 {"role": "user", "content": matching_prompt}
             ],
             temperature=0.1,
@@ -1740,129 +1725,154 @@ def query_neo4j_graph(query_requirement):
         # 解析API响应
         matching_content = completion.choices[0].message.content
         
-        # 提取JSON数组
-        try:
-            # 尝试直接解析返回结果,预期格式为 ["新开酒店经验", "五星级酒店", "总经理"]
-            logging.info(f"阿里千问返回的匹配内容: {matching_content}")
-            
-            # 如果返回的是JSON字符串,先去除可能的前后缀文本
-            if isinstance(matching_content, str):
-                # 查找JSON数组的开始和结束位置
-                start_idx = matching_content.find('[')
-                end_idx = matching_content.rfind(']') + 1
-                
-                if start_idx >= 0 and end_idx > start_idx:
-                    json_str = matching_content[start_idx:end_idx]
-                    matched_labels = json.loads(json_str)
-                else:
-                    matched_labels = []
-            else:
-                matched_labels = []
-                
-            # 确保结果是字符串列表
-            if matched_labels and all(isinstance(item, str) for item in matched_labels):
-                logging.info(f"成功解析到标签列表: {matched_labels}")
-            else:
-                logging.warning("解析结果不是预期的字符串列表格式,将使用空列表")
-                matched_labels = []
-        except json.JSONDecodeError as e:
-            logging.error(f"JSON解析错误: {str(e)}")
-            matched_labels = []
-        except Exception as e:
-            logging.error(f"解析匹配标签时出错: {str(e)}")
-            matched_labels = []
+        # 直接解析JSON响应,提取datalabel字段
+        parsed_content = json.loads(matching_content)
+        matched_labels = parsed_content.get('datalabel', [])
         
         logging.info(f"匹配到的标签: {matched_labels}")
         
-        # 如果没有匹配到标签,返回空结果
-        if not matched_labels:
-            return {
-                'code': 200,
-                'success': True,
-                'message': '未找到与查询需求匹配的标签',
-                'query': '',
-                'data': []
-            }
-        
-        # 步骤3: 构建Cypher生成提示文本
-        logging.info("第三步:构建提示文本生成Cypher查询语句")
-        
-        # 将匹配的标签转换为字符串
-        matched_labels_str = ", ".join([f"'{label}'" for label in matched_labels])
-        
-        # 构建生成Cypher的提示语
-        cypher_prompt = f"""
-        请根据以下Neo4j图数据库结构和已匹配的标签,生成一个Cypher查询脚本。
-        
-        ## 图数据库结构
-        
-        ### 节点
-        1. Talent - 人才节点
-           属性: pg_id(PostgreSQL数据库ID), name_zh(中文姓名), name_en(英文姓名), 
-                mobile(手机号码), email(电子邮箱), updated_at(更新时间)
-        
-        2. DataLabel - 人才标签节点
-                      
-        ### 关系
-        BELONGS_TO - 从属关系
-           (Talent)-[BELONGS_TO]->(DataLabel) - 人才属于某标签
-        
-        ## 匹配的标签列表
-        [{matched_labels_str}]
-        
-        ## 查询需求
-        {query_requirement}
-        
-        ## 输出要求
-        1. 只输出有效的Cypher查询语句,不要包含任何解释或注释
-        2. 确保return语句中包含talent节点属性
-        3. 尽量利用图数据库的特性来优化查询效率
-        4. 使用WITH子句和COLLECT函数收集标签,确保查询到至少拥有一个标签的人才
-        
-        注意:请直接返回Cypher查询语句,无需任何其他文本。
-        
-        以下是一个示例:
-        假设匹配的标签是 ['五星级酒店', '新开酒店经验', '总经理']
-        
-        生成的Cypher查询语句应该是:
-        MATCH (t:Talent)-[:BELONGS_TO]->(dl:DataLabel)  
-        WHERE dl.name IN ['五星级酒店', '新开酒店经验', '总经理']  
-        WITH t, COLLECT(DISTINCT dl.name) AS labels  
-        WHERE size(labels) >= 1  
-        RETURN t.pg_id as pg_id, t.name_zh as name_zh, t.name_en as name_en, t.mobile as mobile, t.email as email, t.updated_at as updated_at
-        """
-        
-        # 调用阿里千问API生成Cypher脚本
-        logging.info("发送请求到阿里千问API生成Cypher脚本")
-        
-        completion = client.chat.completions.create(
-            model=model_name,
-            messages=[
-                {"role": "system", "content": "你是一个专业的Neo4j Cypher查询专家。"},
-                {"role": "user", "content": cypher_prompt}
-            ],
-            temperature=0.1
-        )
-        
-        # 解析API响应
-        cypher_script = completion.choices[0].message.content
-        
-        # 清理Cypher脚本,移除不必要的markdown格式或注释
-        cypher_script = cypher_script.strip()
-        if cypher_script.startswith("```cypher"):
-            cypher_script = cypher_script[9:]
-        elif cypher_script.startswith("```"):
-            cypher_script = cypher_script[3:]
-        if cypher_script.endswith("```"):
-            cypher_script = cypher_script[:-3]
-        cypher_script = cypher_script.strip()
+        # 步骤3: 构建查询逻辑和Cypher语句
+        logging.info("第三步:构建查询逻辑和Cypher语句")
+        
+        # 提取basic_info中的非空字段
+        basic_info = parsed_content.get('basic_info', {})
+        non_empty_fields = {k: v for k, v in basic_info.items() if v and str(v).strip()}
+        
+        logging.info(f"提取到的非空字段: {non_empty_fields}")
+        
+        # 构建Talent节点子集查询
+        talent_conditions = []
+        talent_params = {}
+        
+        if non_empty_fields:
+            # 如果有非空字段,构建Talent节点属性匹配条件
+            for field, value in non_empty_fields.items():
+                if field == "中文姓名":
+                    talent_conditions.append("t.name_zh CONTAINS $name_zh")
+                    talent_params['name_zh'] = value
+                elif field == "英文姓名":
+                    talent_conditions.append("t.name_en CONTAINS $name_en")
+                    talent_params['name_en'] = value
+                elif field == "手机号":
+                    talent_conditions.append("t.mobile CONTAINS $mobile")
+                    talent_params['mobile'] = value
+                elif field == "固定电话":
+                    talent_conditions.append("t.phone CONTAINS $phone")
+                    talent_params['phone'] = value
+                elif field == "电子邮箱":
+                    talent_conditions.append("t.email CONTAINS $email")
+                    talent_params['email'] = value
+                elif field == "生日":
+                    # 格式化生日为YYYY-MM-DD格式
+                    try:
+                        from datetime import datetime
+                        # 尝试解析各种可能的日期格式
+                        if isinstance(value, str):
+                            # 处理常见的日期格式
+                            if len(value) == 8 and value.isdigit():  # YYYYMMDD
+                                formatted_birthday = f"{value[:4]}-{value[4:6]}-{value[6:8]}"
+                            elif len(value) == 10 and value.count('-') == 2:  # YYYY-MM-DD
+                                formatted_birthday = value
+                            elif len(value) == 10 and value.count('/') == 2:  # YYYY/MM/DD
+                                date_obj = datetime.strptime(value, '%Y/%m/%d')
+                                formatted_birthday = date_obj.strftime('%Y-%m-%d')
+                            else:
+                                # 尝试其他常见格式
+                                for fmt in ['%Y-%m-%d', '%Y/%m/%d', '%Y.%m.%d', '%Y年%m月%d日']:
+                                    try:
+                                        date_obj = datetime.strptime(value, fmt)
+                                        formatted_birthday = date_obj.strftime('%Y-%m-%d')
+                                        break
+                                    except ValueError:
+                                        continue
+                                else:
+                                    # 如果所有格式都失败,使用原始值
+                                    formatted_birthday = value
+                        else:
+                            formatted_birthday = str(value)
+                        
+                        talent_conditions.append("t.birthday = $birthday")
+                        talent_params['birthday'] = formatted_birthday
+                        logging.info(f"生日字段格式化: {value} -> {formatted_birthday}")
+                    except Exception as e:
+                        logging.warning(f"生日字段格式化失败: {value}, 错误: {str(e)}")
+                        # 如果格式化失败,使用原始值
+                        talent_conditions.append("t.birthday = $birthday")
+                        talent_params['birthday'] = value
+                elif field == "年龄":
+                    talent_conditions.append("t.age = $age")
+                    talent_params['age'] = int(value) if value.isdigit() else 0
+                elif field == "居住地":
+                    talent_conditions.append("t.residence CONTAINS $residence")
+                    talent_params['residence'] = value
+                elif field == "籍贯":
+                    talent_conditions.append("t.origin CONTAINS $origin")
+                    talent_params['origin'] = value
+        
+        # 构建Talent子集查询
+        if talent_conditions:
+            talent_subset_query = f"""
+            MATCH (t:Talent)
+            WHERE {' AND '.join(talent_conditions)}
+            WITH t
+            """
+            logging.info("构建Talent子集查询条件")
+        else:
+            talent_subset_query = """
+            MATCH (t:Talent)
+            WITH t
+            """
+            logging.info("使用所有Talent节点")
+        
+        # 构建条件子集查询(DataLabel节点和Hotel节点)
+        condition_params = {}
+        
+        if matched_labels:
+            condition_params['labels'] = matched_labels
+            logging.info(f"构建DataLabel和Hotel条件查询,标签: {matched_labels}")
+        
+        # 步骤4: 执行查询并返回结果
+        logging.info("第四步:执行查询并返回结果")
+        
+        # 构建完整的Cypher查询语句
+        if matched_labels:
+            # 有标签条件的情况 - 查找与条件子集(DataLabel和Hotel)有关系的Talent节点
+            # 使用OR逻辑:Talent有WORK_FOR关系链路或者有BELONGS_TO关系链路的节点都可以查询出来
+            cypher_script = f"""
+            {talent_subset_query}
+            WHERE EXISTS {{
+              // 条件1:存在WORK_FOR关系链路
+              MATCH (t)-[:WORK_FOR]->(:Hotel)-[:HAS_LABEL]->(dl:DataLabel)
+              WHERE dl.name_zh IN $labels
+            }} OR EXISTS {{
+              // 条件2:存在BELONGS_TO关系链路
+              MATCH (t)-[:BELONGS_TO]->(dl2:DataLabel)
+              WHERE dl2.name_zh  IN $labels
+            }}
+            RETURN DISTINCT 
+              t.pg_id AS pg_id, 
+              t.name_zh AS name_zh, 
+              t.name_en AS name_en,
+              t.mobile AS mobile, 
+              t.email AS email, 
+              t.updated_at AS updated_at
+            """
+        else:
+            # 无标签条件的情况,只根据Talent属性查询
+            cypher_script = f"""
+            {talent_subset_query}
+            RETURN DISTINCT t.pg_id as pg_id, t.name_zh as name_zh, t.name_en as name_en, 
+                   t.mobile as mobile, t.email as email, t.updated_at as updated_at
+            """
         
         logging.info(f"生成的Cypher脚本: {cypher_script}")
         
-        # 步骤4: 执行Cypher脚本
-        logging.info("第四步:执行Cypher脚本并返回结果")
+        # 合并所有参数
+        all_params = {**talent_params, **condition_params}
+        
+        # 执行查询
         with neo4j_driver.get_session() as session:
-            result = session.run(cypher_script)
+            result = session.run(cypher_script, **all_params)
             records = [record.data() for record in result]
             
         # 构建查询结果
@@ -1872,6 +1882,7 @@ def query_neo4j_graph(query_requirement):
             'message': '查询成功执行',
             'query': cypher_script,
             'matched_labels': matched_labels,
+            'non_empty_fields': non_empty_fields,
             'data': records
         }
         

+ 142 - 58
app/core/data_parse/parse_task.py

@@ -7,7 +7,7 @@ import boto3
 from botocore.config import Config
 from io import BytesIO
 import json
-from .parse_system import ParseTaskRepository
+from app.models.parse_models import ParseTaskRepository
 from app.config.config import DevelopmentConfig, ProductionConfig
 
 # 配置变量
@@ -173,18 +173,65 @@ def process_career_path(career_path, talent_node_id, talent_name_zh):
                 
                 # 使用千问大模型判断酒店所属品牌
                 try:
-                    from app.core.llm.llm_service import llm_client
+                    from openai import OpenAI
+                    
+                    # 配置千问模型参数
+                    QWEN_API_KEY = os.environ.get('QWEN_API_KEY', 'sk-8f2320dafc9e4076968accdd8eebd8e9')
+                    base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
+                    model = "qwen-long-latest"
+                    
+                    # 创建OpenAI客户端
+                    client = OpenAI(
+                        api_key=QWEN_API_KEY,
+                        base_url=base_url,
+                    )
                     
                     # 构建提示词
-                    prompt = f"请根据酒店名称'{hotel_zh}'判断该酒店所属的品牌。请只返回JSON格式结果,格式为{{\"brand\":\"品牌名称\"}}。"
+                    prompt = f"""请根据酒店名称'{hotel_zh}'判断该酒店所属的品牌。通常酒店名称前半部分是地名,后半部分是品牌名称。
+                    例如:扬州瘦西湖夜泊君亭酒店,品牌名称为'夜泊君亭'。
+                    要求:
+                    1. 必须返回标准的JSON格式
+                    2. 格式必须为:{{"brand": "品牌名称"}}
+                    3. 不要包含任何其他文本、说明或markdown格式
+                    4. 如果无法确定品牌,返回:{{"brand": ""}}
+                    示例正确格式:
+                    {{"brand": "万豪"}}
+                    {{"brand": "希尔顿"}}
+                    {{"brand": "洲际"}}
+                    {{"brand": "夜泊君亭"}}
+                    请直接返回JSON,不要有其他内容。"""
                     
                     # 调用千问大模型
-                    brand_response = llm_client(prompt)
+                    logging.info(f"######################发送给千问大模型的prompt: '{prompt}'")
+                    
+                    response = client.chat.completions.create(
+                        model=model,
+                        messages=[
+                            {"role": "user", "content": prompt}
+                        ],
+                        temperature=0.1,
+                        max_tokens=200
+                    )
+                    
+                    brand_response = response.choices[0].message.content
                     
                     if brand_response and isinstance(brand_response, str):
+                        # 记录原始响应内容
+                        logging.info(f"###################千问大模型返回的原始响应: '{brand_response}'")
+                        
                         # 尝试解析JSON响应
                         try:
-                            brand_data = json.loads(brand_response)
+                            # 清理响应内容,移除可能的markdown格式和多余文本
+                            cleaned_response = brand_response.strip()
+                            if cleaned_response.startswith('```json'):
+                                cleaned_response = cleaned_response[7:]
+                            if cleaned_response.endswith('```'):
+                                cleaned_response = cleaned_response[:-3]
+                            cleaned_response = cleaned_response.strip()
+                            
+                            logging.info(f"清理后的响应内容: '{cleaned_response}'")
+                            
+                            brand_data = json.loads(cleaned_response)
                             brand_name = brand_data.get('brand', '')
                             
                             if brand_name:
@@ -221,39 +268,44 @@ def process_career_path(career_path, talent_node_id, talent_name_zh):
                                                         result['brand_relationships_created'] += 0  # 不增加计数,因为关系已存在
                                                     else:
                                                         # 关系不存在,创建新的BELONGS_TO关系
-                                                        from app.core.graph.graph_operations import create_relationship
-                                                        
-                                                        relationship_created = create_relationship(
-                                                            hotel_node_id, 
-                                                            label_node_id, 
-                                                            'BELONGS_TO'
-                                                        )
-                                                        
-                                                        if relationship_created:
-                                                            logging.info(f"成功创建Hotel节点与品牌标签的关系: {hotel_zh} BELONGS_TO {brand_name}")
-                                                            result['brand_relationships_created'] += 1
-                                                        else:
-                                                            logging.warning(f"创建Hotel节点与品牌标签关系失败: {hotel_zh} -> {brand_name}")
-                                                            result['brand_relationships_failed'] += 1
+                                                        # 直接使用Cypher创建关系,避免使用create_relationship函数
+                                                        with connect_graph().session() as session:
+                                                            create_rel_query = """
+                                                            MATCH (h:Hotel), (d:DataLabel)
+                                                            WHERE id(h) = $hotel_node_id AND id(d) = $label_node_id
+                                                            MERGE (h)-[r:BELONGS_TO]->(d)
+                                                            RETURN r
+                                                            """
+                                                            rel_result = session.run(create_rel_query, 
+                                                                                    hotel_node_id=hotel_node_id,
+                                                                                    label_node_id=label_node_id)
+                                                            if rel_result.single():
+                                                                logging.info(f"成功创建Hotel节点与品牌标签的关系: {hotel_zh} BELONGS_TO {brand_name}")
+                                                                result['brand_relationships_created'] += 1
+                                                            else:
+                                                                logging.warning(f"创建Hotel节点与品牌标签关系失败: {hotel_zh} -> {brand_name}")
+                                                                result['brand_relationships_failed'] += 1
                                                             
                                             except Exception as check_error:
                                                 logging.error(f"检查Hotel节点与品牌标签关系失败: {str(check_error)}")
                                                 result['errors'].append(f"检查关系失败: {hotel_zh} -> {brand_name}, 错误: {str(check_error)}")
                                                 # 即使检查失败,也尝试创建关系
-                                                from app.core.graph.graph_operations import create_relationship
-                                                
-                                                relationship_created = create_relationship(
-                                                    hotel_node_id, 
-                                                    label_node_id, 
-                                                    'BELONGS_TO'
-                                                )
-                                                
-                                                if relationship_created:
-                                                    logging.info(f"成功创建Hotel节点与品牌标签的关系: {hotel_zh} BELONGS_TO {brand_name}")
-                                                    result['brand_relationships_created'] += 1
-                                                else:
-                                                    logging.warning(f"创建Hotel节点与品牌标签关系失败: {hotel_zh} -> {brand_name}")
-                                                    result['brand_relationships_failed'] += 1
+                                                with connect_graph().session() as session:
+                                                    create_rel_query = """
+                                                    MATCH (h:Hotel), (d:DataLabel)
+                                                    WHERE id(h) = $hotel_node_id AND id(d) = $label_node_id
+                                                    MERGE (h)-[r:BELONGS_TO]->(d)
+                                                    RETURN r
+                                                    """
+                                                    rel_result = session.run(create_rel_query, 
+                                                                            hotel_node_id=hotel_node_id,
+                                                                            label_node_id=label_node_id)
+                                                    if rel_result.single():
+                                                        logging.info(f"成功创建Hotel节点与品牌标签的关系: {hotel_zh} BELONGS_TO {brand_name}")
+                                                        result['brand_relationships_created'] += 1
+                                                    else:
+                                                        logging.warning(f"创建Hotel节点与品牌标签关系失败: {hotel_zh} -> {brand_name}")
+                                                        result['brand_relationships_failed'] += 1
                                         else:
                                             logging.warning(f"未找到品牌标签节点: {brand_name}")
                                             
@@ -261,12 +313,32 @@ def process_career_path(career_path, talent_node_id, talent_name_zh):
                                     logging.error(f"查询品牌标签节点失败: {str(query_error)}")
                                     result['errors'].append(f"查询品牌标签节点失败: {brand_name}, 错误: {str(query_error)}")
                             else:
-                                logging.warning(f"千问大模型返回的品牌名称为空: {hotel_zh}")
+                                logging.warning(f"千问大模型返回的品牌名称为空,酒店: {hotel_zh}, 响应: '{brand_response}'")
                                 
                         except json.JSONDecodeError as json_error:
-                            logging.warning(f"解析千问大模型返回的JSON失败: {brand_response}, 错误: {json_error}")
+                            logging.warning(f"解析千问大模型返回的JSON失败,原始响应内容: '{brand_response}', 错误: {json_error}")
+                            # 尝试从响应中提取品牌名称(非JSON格式的fallback)
+                            brand_name = ''
+                            
+                            # 首先尝试提取JSON格式的品牌名称
+                            import re
+                            # 尝试匹配 {"brand": "品牌名称"} 或 {"brand":"品牌名称"} 格式
+                            json_brand_match = re.search(r'["\']?brand["\']?\s*:\s*["\']?([^"\']+)["\']?', brand_response, re.IGNORECASE)
+                            if json_brand_match:
+                                brand_name = json_brand_match.group(1).strip()
+                                logging.info(f"从非标准JSON响应中提取到品牌名称: {brand_name}")
+                            # 如果上面没匹配到,尝试匹配包含"品牌"的文本
+                            elif 'brand' in brand_response.lower() or '品牌' in brand_response:
+                                brand_match = re.search(r'["\']?([^"\']*品牌[^"\']*)["\']?', brand_response)
+                                if brand_match:
+                                    brand_name = brand_match.group(1).strip()
+                                    logging.info(f"从非JSON响应中提取到品牌名称: {brand_name}")
+                                else:
+                                    logging.warning(f"无法从响应中提取品牌名称: {brand_response}")
+                            else:
+                                logging.warning(f"响应中未包含品牌相关信息: {brand_response}")
                     else:
-                        logging.warning(f"千问大模型返回结果无效: {brand_response}")
+                        logging.warning(f"千问大模型返回结果无效: '{brand_response}'")
                         
                 except Exception as brand_error:
                     logging.error(f"调用千问大模型判断品牌失败: {str(brand_error)}")
@@ -274,8 +346,6 @@ def process_career_path(career_path, talent_node_id, talent_name_zh):
                 
                 # 创建Talent节点到Hotel节点的WORK_FOR关系
                 try:
-                    from app.core.graph.graph_operations import create_relationship
-                    
                     # 获取职业轨迹信息
                     title_zh = career_item.get('title_zh', '')
                     date = career_item.get('date', '')
@@ -287,20 +357,28 @@ def process_career_path(career_path, talent_node_id, talent_name_zh):
                     if date:
                         work_for_properties['date'] = date
                     
-                    # 创建Talent节点到Hotel节点的WORK_FOR关系
-                    work_for_relationship_created = create_relationship(
-                        talent_node_id,  # Talent节点ID
-                        hotel_node_id,   # Hotel节点ID
-                        'WORK_FOR',
-                        work_for_properties
-                    )
+                    # 直接使用Cypher创建WORK_FOR关系,避免使用create_relationship函数
+                    from app.core.graph.graph_operations import connect_graph
                     
-                    if work_for_relationship_created:
-                        logging.info(f"成功创建Talent到Hotel的WORK_FOR关系: Talent({talent_name_zh}) WORK_FOR Hotel({hotel_zh})")
-                        result['work_for_relationships_created'] += 1
-                    else:
-                        logging.warning(f"创建Talent到Hotel的WORK_FOR关系失败: Talent({talent_name_zh}) -> Hotel({hotel_zh})")
-                        result['work_for_relationships_failed'] += 1
+                    with connect_graph().session() as session:
+                        work_for_query = """
+                        MATCH (t:Talent), (h:Hotel)
+                        WHERE id(t) = $talent_node_id AND id(h) = $hotel_node_id
+                        MERGE (t)-[r:WORK_FOR]->(h)
+                        SET r += $properties
+                        RETURN r
+                        """
+                        work_for_result = session.run(work_for_query, 
+                                                    talent_node_id=talent_node_id,
+                                                    hotel_node_id=hotel_node_id,
+                                                    properties=work_for_properties)
+                        
+                        if work_for_result.single():
+                            logging.info(f"成功创建Talent到Hotel的WORK_FOR关系: Talent({talent_name_zh}) WORK_FOR Hotel({hotel_zh})")
+                            result['work_for_relationships_created'] += 1
+                        else:
+                            logging.warning(f"创建Talent到Hotel的WORK_FOR关系失败: Talent({talent_name_zh}) -> Hotel({hotel_zh})")
+                            result['work_for_relationships_failed'] += 1
                     
                     # 创建Talent节点到DataLabel节点的WORK_AS关系
                     try:
@@ -343,14 +421,20 @@ def process_career_path(career_path, talent_node_id, talent_name_zh):
                                     work_as_properties['date'] = date
                                 
                                 # 创建Talent节点到DataLabel节点的WORK_AS关系
-                                work_as_relationship_created = create_relationship(
-                                    talent_node_id,  # Talent节点ID
-                                    label_node_id,   # DataLabel节点ID
-                                    'WORK_AS',
-                                    work_as_properties
-                                )
+                                # 直接使用Cypher创建WORK_AS关系,避免使用create_relationship函数
+                                work_as_query = """
+                                MATCH (t:Talent), (d:DataLabel)
+                                WHERE id(t) = $talent_node_id AND id(d) = $label_node_id
+                                MERGE (t)-[r:WORK_AS]->(d)
+                                SET r += $properties
+                                RETURN r
+                                """
+                                work_as_result = session.run(work_as_query, 
+                                                            talent_node_id=talent_node_id,
+                                                            label_node_id=label_node_id,
+                                                            properties=work_as_properties)
                                 
-                                if work_as_relationship_created:
+                                if work_as_result.single():
                                     logging.info(f"成功创建Talent到职位标签的WORK_AS关系: Talent({talent_name_zh}) WORK_AS DataLabel({title_zh})")
                                     result['work_as_relationships_created'] += 1
                                 else:

+ 4 - 2
app/core/data_parse/parse_web.py

@@ -16,9 +16,10 @@ from app.config.config import DevelopmentConfig, ProductionConfig
 from app.core.data_parse.parse_system import (
     BusinessCard, check_duplicate_business_card, 
     create_main_card_with_duplicates, update_career_path,
-    normalize_mobile_numbers, ParseTaskRepository,
+    normalize_mobile_numbers,
     create_origin_source_entry, update_origin_source
 )
+from app.models.parse_models import ParseTaskRepository
 from app import db
 
 # 使用配置变量,缺省认为在生产环境运行
@@ -864,7 +865,8 @@ def batch_process_md(markdown_file_list, publish_time=None, task_id=None, task_t
             }
         
         # 导入数据库模型
-        from app.core.data_parse.parse_system import ParseTaskRepository, db
+        from app.models.parse_models import ParseTaskRepository
+        from app import db
         
         # 查询对应的任务记录
         task_record = ParseTaskRepository.query.get(task_id)

+ 4 - 1
app/models/__init__.py

@@ -1 +1,4 @@
-# Models package initialization 
+# Models package initialization
+from .parse_models import ParseTaskRepository
+
+__all__ = ['ParseTaskRepository'] 

+ 34 - 0
app/models/parse_models.py

@@ -0,0 +1,34 @@
+from app import db
+from datetime import datetime
+
+class ParseTaskRepository(db.Model):
+    __tablename__ = 'parse_task_repository'
+    
+    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
+    task_name = db.Column(db.String(100), nullable=False)
+    task_status = db.Column(db.String(10), nullable=False)
+    task_type = db.Column(db.String(50), nullable=False)
+    task_source = db.Column(db.JSON, nullable=False)  # Changed to JSON to match jsonb
+    collection_count = db.Column(db.Integer, default=0, nullable=False)
+    parse_count = db.Column(db.Integer, default=0, nullable=False)
+    parse_result = db.Column(db.JSON)  # Changed to JSON to match jsonb
+    created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)  # Changed to utcnow to match CURRENT_TIMESTAMP
+    created_by = db.Column(db.String(50), nullable=False)
+    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)  # Changed to utcnow
+    updated_by = db.Column(db.String(50), nullable=False)
+    
+    def to_dict(self):
+        return {
+            'id': self.id,
+            'task_name': self.task_name,
+            'task_status': self.task_status,
+            'task_type': self.task_type,
+            'task_source': self.task_source,
+            'collection_count': self.collection_count,
+            'parse_count': self.parse_count,
+            'parse_result': self.parse_result,
+            'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
+            'created_by': self.created_by,
+            'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
+            'updated_by': self.updated_by
+        } 

+ 45 - 5
app/services/neo4j_driver.py

@@ -1,17 +1,57 @@
-from flask import current_app
 from neo4j import GraphDatabase
 from neo4j.exceptions import ServiceUnavailable
+import os
 
 class Neo4jDriver:
-    def __init__(self):
+    def __init__(self, uri=None, user=None, password=None, encrypted=False):
         self._driver = None
         
+        # 优先使用传入的参数
+        if uri is not None:
+            self.uri = uri
+        elif user is not None or password is not None or encrypted is not None:
+            # 如果只传了部分参数,使用环境变量或默认值填充缺失的
+            self.uri = os.environ.get('NEO4J_URI', "bolt://192.168.3.143:7687")
+        else:
+            # 无参数调用时,尝试从Flask配置获取,否则使用环境变量或默认值
+            self.uri = self._get_config_value('NEO4J_URI', "bolt://192.168.3.143:7687")
+        
+        if user is not None:
+            self.user = user
+        else:
+            self.user = self._get_config_value('NEO4J_USER', "neo4j")
+            
+        if password is not None:
+            self.password = password
+        else:
+            self.password = self._get_config_value('NEO4J_PASSWORD', "cituneo4j")
+            
+        if encrypted is not None:
+            self.encrypted = encrypted
+        else:
+            encrypted_str = self._get_config_value('NEO4J_ENCRYPTED', 'false')
+            self.encrypted = encrypted_str.lower() == 'true' if isinstance(encrypted_str, str) else encrypted_str
+    
+    def _get_config_value(self, key, default_value):
+        """获取配置值,优先从Flask配置获取,否则从环境变量获取,最后使用默认值"""
+        try:
+            # 尝试从Flask配置获取
+            from flask import current_app
+            if current_app and hasattr(current_app, 'config'):
+                return current_app.config.get(key, default_value)
+        except (ImportError, RuntimeError):
+            # 不在Flask环境中或Flask应用上下文外
+            pass
+        
+        # 从环境变量获取
+        return os.environ.get(key, default_value)
+        
     def connect(self):
         if not self._driver:
             self._driver = GraphDatabase.driver(
-                current_app.config['NEO4J_URI'],
-                auth=(current_app.config['NEO4J_USER'], current_app.config['NEO4J_PASSWORD']),
-                encrypted=current_app.config['NEO4J_ENCRYPTED']
+                self.uri,
+                auth=(self.user, self.password),
+                encrypted=self.encrypted
             )
         return self._driver
     

+ 305 - 49
run_parse_neo4j.sh

@@ -1,53 +1,309 @@
 #!/bin/bash
 
-# 酒店职位数据Neo4j同步程序运行脚本
-# 适用于Linux和Mac环境
-
-echo "========================================"
-echo "酒店职位数据Neo4j同步程序"
-echo "========================================"
-echo
-
-# 检查Python是否安装
-if ! command -v python3 &> /dev/null; then
-    if ! command -v python &> /dev/null; then
-        echo "错误: 未找到Python,请先安装Python 3.7+"
+# =============================================================================
+# Neo4j同步程序运行脚本
+# 适用于Linux生产环境
+# =============================================================================
+
+# 脚本配置
+SCRIPT_NAME="run_parse_neo4j.sh"
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+PROJECT_ROOT="$SCRIPT_DIR"
+PYTHON_SCRIPT="$PROJECT_ROOT/app/core/data_parse/parse_neo4j_process.py"
+LOG_DIR="$PROJECT_ROOT/logs"
+LOG_FILE="$LOG_DIR/parse_neo4j_$(date +%Y%m%d_%H%M%S).log"
+PID_FILE="$PROJECT_ROOT/parse_neo4j.pid"
+LOCK_FILE="$PROJECT_ROOT/parse_neo4j.lock"
+
+# 环境变量
+export PYTHONPATH="$PROJECT_ROOT:$PYTHONPATH"
+export PYTHONUNBUFFERED=1
+
+# 颜色定义
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[1;33m'
+BLUE='\033[0;34m'
+NC='\033[0m' # No Color
+
+# 日志函数
+log_info() {
+    echo -e "${GREEN}[$(date '+%Y-%m-%d %H:%M:%S')] [INFO]${NC} $1" | tee -a "$LOG_FILE"
+}
+
+log_warn() {
+    echo -e "${YELLOW}[$(date '+%Y-%m-%d %H:%M:%S')] [WARN]${NC} $1" | tee -a "$LOG_FILE"
+}
+
+log_error() {
+    echo -e "${RED}[$(date '+%Y-%m-%d %H:%M:%S')] [ERROR]${NC} $1" | tee -a "$LOG_FILE"
+}
+
+log_debug() {
+    echo -e "${BLUE}[$(date '+%Y-%m-%d %H:%M:%S')] [DEBUG]${NC} $1" | tee -a "$LOG_FILE"
+}
+
+# 清理函数
+cleanup() {
+    log_info "执行清理操作..."
+    
+    # 删除PID文件
+    if [ -f "$PID_FILE" ]; then
+        rm -f "$PID_FILE"
+        log_info "已删除PID文件: $PID_FILE"
+    fi
+    
+    # 删除锁文件
+    if [ -f "$LOCK_FILE" ]; then
+        rm -f "$LOCK_FILE"
+        log_info "已删除锁文件: $LOCK_FILE"
+    fi
+    
+    log_info "清理操作完成"
+}
+
+# 信号处理
+trap cleanup EXIT
+trap 'log_error "收到中断信号,正在退出..."; exit 1' INT TERM
+
+# 检查依赖
+check_dependencies() {
+    log_info "检查系统依赖..."
+    
+    # 检查Python
+    if ! command -v python3 &> /dev/null; then
+        log_error "Python3 未安装或不在PATH中"
+        return 1
+    fi
+    
+    PYTHON_VERSION=$(python3 --version 2>&1)
+    log_info "Python版本: $PYTHON_VERSION"
+    
+    # 检查pip
+    if ! command -v pip3 &> /dev/null; then
+        log_error "pip3 未安装或不在PATH中"
+        return 1
+    fi
+    
+    # 检查虚拟环境(如果存在)
+    if [ -d "$PROJECT_ROOT/venv" ]; then
+        log_info "检测到虚拟环境: $PROJECT_ROOT/venv"
+        source "$PROJECT_ROOT/venv/bin/activate"
+        log_info "已激活虚拟环境"
+    fi
+    
+    return 0
+}
+
+# 检查Python包依赖
+check_python_packages() {
+    log_info "检查Python包依赖..."
+    
+    local required_packages=(
+        "psycopg2-binary"
+        "neo4j"
+        "openai"
+        "boto3"
+        "Pillow"
+        "pytesseract"
+        "requests"
+    )
+    
+    for package in "${required_packages[@]}"; do
+        if ! python3 -c "import ${package//-/_}" 2>/dev/null; then
+            log_warn "Python包未安装: $package"
+            if [ "$1" = "--install" ]; then
+                log_info "尝试安装包: $package"
+                pip3 install "$package" || log_error "安装包失败: $package"
+            fi
+        else
+            log_debug "Python包已安装: $package"
+        fi
+    done
+}
+
+# 创建日志目录
+create_log_dir() {
+    if [ ! -d "$LOG_DIR" ]; then
+        mkdir -p "$LOG_DIR"
+        log_info "创建日志目录: $LOG_DIR"
+    fi
+}
+
+# 检查是否已在运行
+check_running() {
+    if [ -f "$PID_FILE" ]; then
+        local pid=$(cat "$PID_FILE")
+        if ps -p "$pid" > /dev/null 2>&1; then
+            log_error "程序已在运行,PID: $pid"
+            return 1
+        else
+            log_warn "发现过期的PID文件,正在清理..."
+            rm -f "$PID_FILE"
+        fi
+    fi
+    
+    if [ -f "$LOCK_FILE" ]; then
+        log_error "发现锁文件,可能程序正在运行或上次异常退出"
+        return 1
+    fi
+    
+    return 0
+}
+
+# 创建锁文件
+create_lock() {
+    echo "$$" > "$LOCK_FILE"
+    echo "$$" > "$PID_FILE"
+    log_info "创建锁文件和PID文件"
+}
+
+# 检查数据库连接
+check_database_connection() {
+    log_info "检查数据库连接..."
+    
+    # 这里可以添加数据库连接检查逻辑
+    # 例如:检查PostgreSQL和Neo4j是否可访问
+    
+    log_info "数据库连接检查完成"
+}
+
+# 运行主程序
+run_main_program() {
+    log_info "开始运行Neo4j同步程序..."
+    log_info "工作目录: $PROJECT_ROOT"
+    log_info "Python脚本: $PYTHON_SCRIPT"
+    log_info "日志文件: $LOG_FILE"
+    
+    # 运行Python脚本
+    cd "$PROJECT_ROOT"
+    python3 "$PYTHON_SCRIPT" 2>&1 | tee -a "$LOG_FILE"
+    
+    local exit_code=$?
+    
+    if [ $exit_code -eq 0 ]; then
+        log_info "程序执行成功"
+    else
+        log_error "程序执行失败,退出码: $exit_code"
+    fi
+    
+    return $exit_code
+}
+
+# 显示帮助信息
+show_help() {
+    echo "用法: $0 [选项]"
+    echo ""
+    echo "选项:"
+    echo "  -h, --help          显示此帮助信息"
+    echo "  -v, --version       显示版本信息"
+    echo "  -i, --install       自动安装缺失的Python包"
+    echo "  -c, --check         只检查依赖,不运行程序"
+    echo "  -f, --force         强制运行(忽略锁文件检查)"
+    echo "  -l, --log-dir DIR   指定日志目录"
+    echo ""
+    echo "示例:"
+    echo "  $0                   正常运行程序"
+    echo "  $0 --install         安装依赖后运行程序"
+    echo "  $0 --check           只检查依赖"
+    echo "  $0 --force           强制运行程序"
+}
+
+# 显示版本信息
+show_version() {
+    echo "Neo4j同步程序运行脚本 v1.0.0"
+    echo "适用于Linux生产环境"
+}
+
+# 主函数
+main() {
+    local force_run=false
+    local check_only=false
+    local install_packages=false
+    
+    # 解析命令行参数
+    while [[ $# -gt 0 ]]; do
+        case $1 in
+            -h|--help)
+                show_help
+                exit 0
+                ;;
+            -v|--version)
+                show_version
+                exit 0
+                ;;
+            -i|--install)
+                install_packages=true
+                shift
+                ;;
+            -c|--check)
+                check_only=true
+                shift
+                ;;
+            -f|--force)
+                force_run=true
+                shift
+                ;;
+            -l|--log-dir)
+                LOG_DIR="$2"
+                shift 2
+                ;;
+            *)
+                log_error "未知参数: $1"
+                show_help
+                exit 1
+                ;;
+        esac
+    done
+    
+    # 创建日志目录
+    create_log_dir
+    
+    # 记录脚本启动
+    log_info "=========================================="
+    log_info "Neo4j同步程序运行脚本启动"
+    log_info "脚本路径: $0"
+    log_info "启动时间: $(date)"
+    log_info "=========================================="
+    
+    # 检查依赖
+    if ! check_dependencies; then
+        log_error "依赖检查失败,程序退出"
+        exit 1
+    fi
+    
+    # 检查Python包
+    check_python_packages $([ "$install_packages" = true ] && echo "--install")
+    
+    # 如果只是检查依赖,则退出
+    if [ "$check_only" = true ]; then
+        log_info "依赖检查完成,程序退出"
+        exit 0
+    fi
+    
+    # 检查是否已在运行
+    if [ "$force_run" = false ] && ! check_running; then
+        log_error "程序已在运行或存在锁文件,程序退出"
         exit 1
+    fi
+    
+    # 创建锁文件
+    create_lock
+    
+    # 检查数据库连接
+    check_database_connection
+    
+    # 运行主程序
+    if run_main_program; then
+        log_info "程序执行完成"
+        exit 0
     else
-        PYTHON_CMD="python"
-    fi
-else
-    PYTHON_CMD="python3"
-fi
-
-echo "Python环境检查通过: $($PYTHON_CMD --version)"
-echo
-
-# 检查程序文件是否存在
-if [ ! -f "app/core/data_parse/parse_neo4j_process.py" ]; then
-    echo "错误: 找不到程序文件 app/core/data_parse/parse_neo4j_process.py"
-    exit 1
-fi
-
-echo "程序文件检查通过"
-echo
-
-echo "开始执行数据同步程序..."
-echo "时间: $(date)"
-echo
-
-# 运行Python程序
-$PYTHON_CMD app/core/data_parse/parse_neo4j_process.py
-
-# 检查执行结果
-if [ $? -eq 0 ]; then
-    echo
-    echo "✅ 程序执行成功"
-else
-    echo
-    echo "❌ 程序执行失败"
-    echo "请检查日志文件 logs/parse_neo4j_process.log 获取详细错误信息"
-fi
-
-echo
-echo "程序执行完成" 
+        log_error "程序执行失败"
+        exit 1
+    fi
+}
+
+# 脚本入口点
+if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
+    main "$@"
+fi 

+ 48 - 0
test_neo4j_connection.py

@@ -0,0 +1,48 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+测试Neo4j连接脚本
+用于验证修改后的Neo4jDriver类是否可以在没有Flask的情况下工作
+"""
+
+import sys
+import os
+
+# 添加项目根目录到Python路径
+current_dir = os.path.dirname(os.path.abspath(__file__))
+project_root = current_dir
+sys.path.insert(0, project_root)
+
+try:
+    from app.services.neo4j_driver import Neo4jDriver
+    print("✅ 成功导入Neo4jDriver")
+    
+    # 测试Neo4j连接
+    print("正在测试Neo4j连接...")
+    neo4j_driver = Neo4jDriver(
+        uri="bolt://192.168.3.143:7687",
+        user="neo4j",
+        password="cituneo4j",
+        encrypted=False
+    )
+    
+    if neo4j_driver.verify_connectivity():
+        print("✅ Neo4j连接成功!")
+        
+        # 测试会话创建
+        try:
+            with neo4j_driver.get_session() as session:
+                result = session.run("RETURN 1 as test")
+                record = result.single()
+                print(f"✅ 测试查询成功: {record['test']}")
+        except Exception as e:
+            print(f"❌ 测试查询失败: {e}")
+    else:
+        print("❌ Neo4j连接失败")
+        
+except ImportError as e:
+    print(f"❌ 导入失败: {e}")
+except Exception as e:
+    print(f"❌ 其他错误: {e}")
+
+print("测试完成")