浏览代码

明早准备继续修复ddl save neo4j的问题

wangxq 1 月之前
父节点
当前提交
4520d95bcd
共有 3 个文件被更改,包括 263 次插入42 次删除
  1. 61 19
      app/api/data_resource/routes.py
  2. 193 19
      app/core/data_resource/resource.py
  3. 9 4
      app/core/llm/ddl_parser.py

+ 61 - 19
app/api/data_resource/routes.py

@@ -149,26 +149,38 @@ def data_resource_save():
         # 表单以 receiver 开头时使用下面的方法:
         # receiver = request.json.get('receiver', {})
         receiver = request.get_json()
-        additional_info = receiver['additional_info']
-
-        # 检查receiver是否存在
+         # 检查receiver是否存在
         if not receiver:
             return jsonify(failed("参数不完整:缺少receiver"))
-            
         # 检查url是否存在
         if 'url' not in receiver:
             return jsonify(failed("参数不完整:缺少url"))
 
+        additional_info = receiver['additional_info']
+        if not additional_info:
+                return jsonify(failed("参数不完整: 缺少additional_info"))
+        
+        data_resource = additional_info['data_resource']
+        if not data_resource:
+                return jsonify(failed("参数不完整: 缺少data_resource"))         
+
         file_extension = receiver['url'].split('.')[-1]
+        head_data = additional_info['head_data']           
         
         if file_extension == 'xlsx' or file_extension == 'xls' or file_extension == 'csv':
-            head_data = additional_info['head_data']
-            data_resource = additional_info['data_resource']
-
-            if not receiver or not data_resource:
-                return jsonify(failed("参数不完整"))
+            # 如果文件是excel或csv,则需要检查storage_location是否存在
+            storage_location = receiver.get('storage_location', '')
+            if not storage_location:
+                return jsonify(failed("参数不完整:缺少storage_location或storage_location为空"))
+                        
             # 调用业务逻辑处理数据资源创建
             resource_id = handle_node(receiver, head_data, data_resource)
+        elif file_extension == 'sql':
+            data_source = receiver['data_source']
+            # 如果是ddl,则需要检查data_source是否存在
+            if not data_source or (isinstance(data_source, dict) and not data_source.get("en_name")):
+                return jsonify(failed("数据源信息不完整或无效"))
+            resource_id = handle_node(receiver, head_data, data_resource, data_source)
         else:
             return jsonify(failed("文件格式错误"))
     
@@ -416,35 +428,65 @@ def id_data_save():
             
             # 添加新关系
             for meta in metadata_list:
-                # 创建或获取元数据节点
+                # 创建元数据节点
                 meta_cypher = """
                 MERGE (m:Metadata {name: $name})
-                ON CREATE SET m.en_name = $en_name, m.createTime = $create_time
+                ON CREATE SET m.en_name = $en_name, 
+                            m.createTime = $create_time,
+                            m.type = $type
+                ON MATCH SET m.type = $type
                 RETURN m
                 """
                 
+                create_time = get_formatted_time()
                 meta_result = session.run(
                     meta_cypher,
                     name=meta["name"],
                     en_name=meta["en_name"],
-                    create_time=meta.get("createTime", get_formatted_time())
+                    create_time=create_time,
+                    type=meta["data_type"]
                 )
                 meta_node = meta_result.single()["m"]
+                meta_id = meta_node.id
+                
+                # 打印节点ID信息,便于调试
+                logger.info(f"元数据节点ID: {meta_id}, 类型: {type(meta_id)}")
+                logger.info(f"数据资源节点ID: {resource_id}, 类型: {type(resource_id)}")
                 
-                # 创建关系
+                # 使用明确的属性名匹配而不是ID
                 rel_cypher = """
-                MATCH (n:data_resource), (m:Metadata)
-                WHERE id(n) = $resource_id AND id(m) = $meta_id
-                CREATE (n)-[r:contain]->(m)
+                MATCH (a:data_resource {name: $r_name}), (m:Metadata {name: $m_name})
+                MERGE (a)-[r:contain]->(m)
                 RETURN r
                 """
                 
-                session.run(
+                rel_result = session.run(
                     rel_cypher,
-                    resource_id=int(resource_id),
-                    meta_id=meta_node.id
+                    r_name=receiver['name'],
+                    m_name=meta["name"]
                 )
                 
+                # 检查关系是否创建成功
+                if rel_result.single():
+                    logger.info(f"成功创建关系: {receiver['name']} -> {meta['name']}")
+                else:
+                    logger.warning(f"关系创建结果为空")
+
+                # 额外验证关系是否创建
+                verify_cypher = """
+                MATCH (a:data_resource {name: $r_name})-[r:contain]->(m:Metadata {name: $m_name})
+                RETURN count(r) as rel_count
+                """
+                
+                verify_result = session.run(
+                    verify_cypher,
+                    r_name=receiver['name'],
+                    m_name=meta["name"]
+                )
+                
+                count = verify_result.single()["rel_count"]
+                logger.info(f"验证关系数量: {count}")
+                
         return jsonify(success({"message": "元数据保存成功"}))
     except Exception as e:
         logger.error(f"保存数据资源关联的元数据失败: {str(e)}")

+ 193 - 19
app/core/data_resource/resource.py

@@ -80,7 +80,7 @@ def update_or_create_node(label, **properties):
         return None
 
 # 数据资源-元数据 关系节点创建、查询
-def handle_node(receiver, head_data, data_resource):
+def handle_node(receiver, head_data, data_resource, data_source=None):
     """处理数据资源节点创建和关系建立"""
     try:
         # 更新属性
@@ -91,6 +91,10 @@ def handle_node(receiver, head_data, data_resource):
         }
         if 'additional_info' in receiver:
             del receiver['additional_info']
+        # 从receiver中移除data_source属性,避免将复杂对象作为节点属性
+        if 'data_source' in receiver:
+            del receiver['data_source']
+            
         tag_list = receiver.get('tag')
         receiver.update(update_attributes)
 
@@ -105,7 +109,7 @@ def handle_node(receiver, head_data, data_resource):
             """
             result = session.run(cypher, **receiver)
             data_resource_node = result.single()["n"]
-            resource_id = data_resource_node.id
+            resource_id = data_resource_node.element_id # 使用element_id属性获取完整ID
 
             # 处理标签关系
             if tag_list:
@@ -117,7 +121,7 @@ def handle_node(receiver, head_data, data_resource):
                     WHERE elementId(a) = $resource_id AND elementId(b) = $tag_id
                     RETURN r
                     """
-                    rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id)
+                    rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.element_id)  # 使用element_id
                     
                     # 如果关系不存在则创建
                     if not rel_result.single():
@@ -128,8 +132,9 @@ def handle_node(receiver, head_data, data_resource):
                         RETURN r
                         """
                         session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id)
-
-            # 处理头部数据(元数据)
+            
+            
+            # 处理头部数据(元数据,字段)
             if head_data:
                 for item in head_data:
                     # 创建元数据节点
@@ -150,27 +155,82 @@ def handle_node(receiver, head_data, data_resource):
                         create_time=create_time,
                         type=item['data_type']  # 使用data_type作为type属性
                     )
-                    meta_node = meta_result.single()["m"]
+                    meta_record = meta_result.single()
                     
-                    # 创建关系
-                    rel_cypher = """
-                    MATCH (a:data_resource), (m:Metadata)
-                    WHERE elementId(a) = $resource_id AND elementId(m) = $meta_id
-                    MERGE (a)-[r:contain]->(m)
-                    RETURN r
-                    """
+                    if meta_record and meta_record["m"]:
+                        meta_node = meta_record["m"]
+                        meta_id = meta_node.element_id
+                        
+                        # 打印日志确认节点创建成功和ID
+                        logger.info(f"创建或获取到元数据节点: ID={meta_id}, name={item['name']}")
+                        
+                        # 确认数据资源节点是否可以正确查询到
+                        check_resource_cypher = """
+                        MATCH (n:data_resource) 
+                        WHERE elementId(n) = $resource_id 
+                        RETURN n
+                        """
+                        check_resource = session.run(check_resource_cypher, resource_id=resource_id)
+                        if check_resource.single():
+                            logger.info(f"找到数据资源节点: ID={resource_id}")
+                        else:
+                            logger.error(f"无法找到数据资源节点: ID={resource_id}")
+                            continue
+                        
+                        # 创建关系
+                        rel_cypher = """
+                        MATCH (a:data_resource), (m:Metadata)
+                        WHERE elementId(a) = $resource_id AND elementId(m) = $meta_id
+                        MERGE (a)-[r:contain]->(m)
+                        RETURN r
+                        """
+                        
+                        rel_result = session.run(
+                            rel_cypher,
+                            resource_id=resource_id,
+                            meta_id=meta_id
+                        )
+                        
+                        rel_record = rel_result.single()
+                        if rel_record:
+                            logger.info(f"成功创建数据资源与元数据的关系: {resource_id} -> {meta_id}")
+                        else:
+                            logger.warning(f"创建数据资源与元数据的关系失败: {resource_id} -> {meta_id}")
+                    else:
+                        logger.error(f"未能创建或获取元数据节点: {item['name']}")
+            
+            # 处理数据源关系
+            if data_source:
+                try:
+                    # 创建或获取数据源节点
+                    data_source_en_name = handle_data_source(data_source)
                     
-                    session.run(
-                        rel_cypher,
-                        resource_id=resource_id,
-                        meta_id=meta_node.id
-                    )
+                    # 创建数据资源与数据源的关系
+                    if data_source_en_name:
+                        # 创建 isbelongto 关系
+                        rel_data_source_cypher = """
+                        MATCH (a:data_resource), (b:data_source)
+                        WHERE elementId(a) = $resource_id AND b.en_name = $ds_en_name
+                        MERGE (a)-[r:isbelongto]->(b)
+                        RETURN r
+                        """
+                        session.run(
+                            rel_data_source_cypher,
+                            resource_id=resource_id,
+                            ds_en_name=data_source_en_name
+                        )
+                        logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_en_name}")
+                except Exception as e:
+                    logger.error(f"处理数据源关系失败: {str(e)}")
+                    raise RuntimeError(f"处理数据源关系失败: {str(e)}")
             
             return resource_id
     except Exception as e:
         logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}")
         raise
 
+    
+
 def handle_id_resource(resource_id):
     """处理单个数据资源查询"""
     try:
@@ -1048,4 +1108,118 @@ def data_resource_edit(data):
             return dict(updated_node["n"])
     except Exception as e:
         logger.error(f"编辑数据资源失败: {str(e)}")
-        raise
+        raise
+
+def handle_data_source(data_source):
+    """处理数据源的检查和创建
+    
+    参数:
+        data_source: 包含数据源信息的字典,支持两种情况:
+            1. 只包含名称的情况,如:
+               {
+                   "en_name": "10-52-31-104_5432_inventory"
+               }
+            2. 完整的数据源信息,如:
+               {
+                   "en_name": "10-52-31-104_5432_inventory",
+                   "name": "",
+                   "type": "postgresql",
+                   "host": "10.52.31.104",
+                   "port": 5432,
+                   "database": "inventory_management",
+                   "username": "app_user",
+                   "password": "pG$ecur3P@ss789"
+               }
+            
+    返回:
+        成功时返回数据源的名称,失败时抛出异常
+    """
+    try:
+        # 获取数据源名称
+        ds_en_name = data_source.get("en_name")
+        if not ds_en_name:
+            raise ValueError("数据源信息不完整,缺少名称")
+        
+        # 检查是否只有名称参数
+        only_en_name = len(data_source) == 1 and "en_name" in data_source
+        
+        with neo4j_driver.get_session() as session:
+            if only_en_name:
+                # 仅有英文名称的情况:查询是否存在该名称的数据源
+                check_name_cypher = """
+                MATCH (ds:data_source {en_name: $en_name})
+                RETURN ds
+                """
+                check_result = session.run(check_name_cypher, en_name=ds_en_name)
+                existing_record = check_result.single()
+                
+                if existing_record:
+                    # 数据源已存在,返回其名称
+                    existing_data_source = dict(existing_record["ds"])
+                    logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
+                    return existing_data_source.get("en_name")
+                else:
+                    # 数据源不存在,抛出异常
+                    raise ValueError(f"未找到名称为 {ds_en_name} 的数据源")
+            else:
+                # 完整的数据源信息:提取其它属性
+                ds_type = data_source.get("type")
+                ds_host = data_source.get("host")
+                ds_port = data_source.get("port")
+                ds_database = data_source.get("database")
+                ds_username = data_source.get("username")
+                
+                # 检查必要的属性是否存在
+                if not all([ds_type, ds_host, ds_port, ds_database, ds_username]):
+                    raise ValueError("数据源信息不完整,缺少必要的属性")
+                
+                # 检查是否已存在相同数据源(除名称和密码外属性相同)
+                check_cypher = """
+                MATCH (ds:data_source)
+                WHERE ds.type = $type AND 
+                      ds.host = $host AND 
+                      ds.port = $port AND 
+                      ds.database = $database AND 
+                      ds.username = $username
+                RETURN ds
+                """
+                
+                check_result = session.run(
+                    check_cypher,
+                    type=ds_type,
+                    host=ds_host,
+                    port=ds_port,
+                    database=ds_database,
+                    username=ds_username
+                )
+                
+                existing_record = check_result.single()
+                
+                if existing_record:
+                    # 数据源已存在,返回其名称
+                    existing_data_source = dict(existing_record["ds"])
+                    logger.info(f"找到现有数据源: {existing_data_source.get('en_name')}")
+                    return existing_data_source.get("en_name")
+                
+                # 数据源不存在,创建新节点
+                create_cypher = """
+                CREATE (ds:data_source $properties)
+                RETURN ds
+                """
+                
+                # 添加创建时间
+                data_source["createTime"] = get_formatted_time()
+                
+                create_result = session.run(create_cypher, properties=data_source)
+                created_record = create_result.single()
+                
+                if not created_record:
+                    raise RuntimeError("创建数据源节点失败")
+                
+                new_data_source = dict(created_record["ds"])
+                logger.info(f"创建新数据源: {new_data_source.get('en_name')}")
+                return new_data_source.get("en_name")
+            
+    except Exception as e:
+        logger.error(f"处理数据源失败: {str(e)}")
+        raise RuntimeError(f"处理数据源失败: {str(e)}")

+ 9 - 4
app/core/llm/ddl_parser.py

@@ -105,8 +105,9 @@ class DDLParser:
    - 如字段名是无意义的拼音缩写,则name为空字符串
 4. 数据库连接串处理:
    - 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。
-   - 根据连接串格式识别数据库类型,数据库类型请使用小写,参考例子,如 mysql/postgresql/sqlserver/oracle/db2
-   - data_source.name格式为: "{hostname或ip地址(点替换为中划线)}_{端口}_{数据库名称}",如某个元素无法识别,则跳过不添加到data_source.name
+   - 根据连接串格式识别数据库类型,数据库类型请使用小写,参考例子,如 mysql/postgresql/sqlserver/oracle/db2/sybase
+   - data_source.en_name格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加.
+   - data_source.name留空.
    - 无法确定数据库类型时,type设为"unknown"
    - 如果从ddl中没有识别到数据库连接串,则json不返回"data_source"标签
 5. 参考格式如下:
@@ -127,10 +128,14 @@ class DDLParser:
         ]
     },
     "data_source": [{
-        "name": "10-52-31-104_5432_inventory_management", //hostname或ip地址(点替换为中划线)_端口_数据库名称
+        "en_name": "mydatabase_10.52.31.104_5432_myuser", //{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}
+        "name": "", //这里留空
         "type": "postgresql",
         "host": "10.52.31.104",
-        "port": 5432 "database": "inventory_management"
+        "port": 5432,
+        "dbname": "mydatabase",
+        "dbuser": "myuser",
+        "dbpassword": "mypassword"
     }]
 }