Explorar el Código

已经基本完成了对/api/resource/save的改造,不再创建数据源

wangxq hace 1 mes
padre
commit
e5338fb015
Se han modificado 3 ficheros con 99 adiciones y 146 borrados
  1. 56 88
      app/api/data_resource/routes.py
  2. 4 58
      app/core/data_resource/resource.py
  3. 39 0
      app/core/llm/ddl_parser.py

+ 56 - 88
app/api/data_resource/routes.py

@@ -160,39 +160,49 @@ def data_resource_save():
     """保存数据资源"""   
     try:
         # 获取表单数据
-        # 表单以 receiver 开头时使用下面的方法:
-        # receiver = request.json.get('receiver', {})
         receiver = request.get_json()
-         # 检查receiver是否存在
+        # 检查receiver是否存在
         if not receiver:
             return jsonify(failed("参数不完整:缺少receiver"))
         # 检查url是否存在
         if 'url' not in receiver:
             return jsonify(failed("参数不完整:缺少url"))
 
-        additional_info = receiver['additional_info']
+        additional_info = receiver.get('additional_info')
         if not additional_info:
-                return jsonify(failed("参数不完整: 缺少additional_info"))
+            return jsonify(failed("参数不完整: 缺少additional_info"))
               
-
         file_extension = receiver['url'].split('.')[-1]
-        head_data = additional_info['head_data']           
+        head_data = additional_info.get('head_data')           
         
-        if file_extension == 'xlsx' or file_extension == 'xls' or file_extension == 'csv':
-            # 如果文件是excel或csv,则需要检查storage_location是否存在
+        if file_extension in ['xlsx', 'xls', 'csv']:
+            # Excel/CSV文件必须有storage_location
             storage_location = receiver.get('storage_location', '')
             if not storage_location:
                 return jsonify(failed("参数不完整:缺少storage_location或storage_location为空"))
-                        
+            
             # 调用业务逻辑处理数据资源创建,设置resource_type为structure
             resource_id = handle_node(receiver, head_data, data_source=None, resource_type='structure')
+            
         elif file_extension == 'sql':
-            data_source = additional_info['data_source']
-            # 如果是ddl,则需要检查data_source是否存在
-            if not data_source or (isinstance(data_source, dict) and not data_source.get("en_name")):
-                return jsonify(failed("数据源信息不完整或无效"))
-            # 调用业务逻辑处理数据资源创建,设置resource_type为ddl
-            resource_id = handle_node(receiver, head_data, data_source, resource_type='ddl')
+            data_source = additional_info.get('data_source', '')
+            storage_location = receiver.get('storage_location', '')
+
+            # 如果有storage_location,按结构化数据处理
+            if storage_location:
+                resource_id = handle_node(receiver, head_data, data_source=None, resource_type='structure')
+            
+            # 如果有data_source,按DDL处理
+            elif data_source:
+                # 检查data_source格式
+                if not isinstance(data_source, dict) or not data_source.get("en_name"):
+                    return jsonify(failed("数据源信息不完整或无效"))
+                resource_id = handle_node(receiver, head_data, data_source, resource_type='ddl')
+            
+            # 两者都没有
+            else:
+                return jsonify(failed("SQL文件处理需要提供storage_location或有效的data_source信息"))
+                
         else:
             return jsonify(failed("文件格式错误"))
     
@@ -603,82 +613,39 @@ def ddl_identify():
         if not ddl_list:
             return jsonify(failed("未找到有效的CREATE TABLE语句"))
         
-        # 处理结果 - 假设ddl_list已经包含tables结构
-        result = {}
-        data_source = None
-            
-        # 处理数据源和表的存在状态
+        # 处理表的存在状态
         if isinstance(ddl_list, dict):
-            # 处理数据源信息
-            if "data_source" in ddl_list:
-                data_source = ddl_list.pop("data_source", None)
-                
-                if data_source:
-                    # 检查数据源是否包含en_name
-                    if "en_name" not in data_source:
-                        logger.debug(f"data_source内容: {json.dumps(data_source, ensure_ascii=False) if data_source is not None else 'None'}")
-                        return jsonify(failed("数据源信息不完整:缺少en_name字段"))
-                        
-                    try:
-                        # 查询数据源是否存在
-                        data_source_name = data_source["en_name"]
-                        with neo4j_driver.get_session() as session:
-                            source_query = """
-                            MATCH (n:data_source {en_name: $name})
-                            RETURN n IS NOT NULL AS exists
-                            """
-                            source_result = session.run(source_query, name=data_source_name)
-                            source_exists = source_result.single()
-                            if source_exists:
-                                data_source["exist"] = source_exists["exists"]
-                            else:
-                                data_source["exist"] = False
-                    except Exception as e:
-                        logger.error(f"检查数据源存在状态失败: {str(e)}")
-                        data_source["exist"] = False
+            # 获取所有表名
+            table_names = list(ddl_list.keys())
             
-            # 处理表的存在状态 - 假设tables已经在ddl_list中
-            if "tables" in ddl_list and isinstance(ddl_list["tables"], dict):
-                table_names = list(ddl_list["tables"].keys())
-                
-                if table_names:
-                    try:
-                        # 查询表是否存在
-                        with neo4j_driver.get_session() as session:
-                            table_query = """
-                            UNWIND $names AS name
-                            OPTIONAL MATCH (n:data_resource {en_name: name})
-                            RETURN name, n IS NOT NULL AS exists
-                            """
-                            table_results = session.run(table_query, names=table_names)
-                            
-                            # 处理结果
-                            for record in table_results:
-                                table_name = record["name"]
-                                exists = record["exists"]
-                                if table_name in ddl_list["tables"]:
-                                    ddl_list["tables"][table_name]["exist"] = exists
-                            
-                            # 确保所有表都有exist字段
-                            for table_name in table_names:
-                                if "exist" not in ddl_list["tables"][table_name]:
-                                    ddl_list["tables"][table_name]["exist"] = False
-                    except Exception as e:
-                        logger.error(f"检查表存在状态失败: {str(e)}")
-                        # 如果查询失败,所有表默认为不存在
-                        for table_name in table_names:
-                            ddl_list["tables"][table_name]["exist"] = False
+            # 首先为所有表设置默认的exist状态
+            for table_name in table_names:
+                ddl_list[table_name]["exist"] = False
             
-            # 构建最终结果
-            result = ddl_list
-        
-        # 添加数据源信息
-        if data_source:
-            result["data_source"] = data_source
-        
-        logger.debug(f"识别到的DDL语句: {result}")
+            if table_names:
+                try:
+                    # 查询表是否存在
+                    with neo4j_driver.get_session() as session:
+                        table_query = """
+                        UNWIND $names AS name
+                        OPTIONAL MATCH (n:data_resource {en_name: name})
+                        RETURN name, n IS NOT NULL AS exists
+                        """
+                        table_results = session.run(table_query, names=table_names)
+                        
+                        # 更新存在的表的状态
+                        for record in table_results:
+                            table_name = record["name"]
+                            exists = record["exists"]
+                            if table_name in ddl_list:
+                                ddl_list[table_name]["exist"] = exists
+                except Exception as e:
+                    logger.error(f"检查表存在状态失败: {str(e)}")
+                    # 如果查询失败,所有表保持默认的False状态
+        
+        logger.debug(f"识别到的DDL语句: {json.dumps(ddl_list, ensure_ascii=False)}")
             
-        return jsonify(success(result))
+        return jsonify(success(ddl_list))
     except Exception as e:
         logger.error(f"识别DDL语句失败: {str(e)}")
         logger.error(traceback.format_exc())  # 添加详细错误堆栈
@@ -707,6 +674,7 @@ def sql_ddl_identify():
         logger.error(f"识别DDL语句失败: {str(e)}")
         return jsonify(failed(str(e)))
 
+
 @bp.route('/model/list', methods=['POST'])
 def resource_model_list():
     """获取模型资源列表"""

+ 4 - 58
app/core/data_resource/resource.py

@@ -222,10 +222,11 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
                         logger.error(f"未能创建或获取元数据节点: {item['name']}")
             
             # 处理数据源关系
-            if data_source:
+            if data_source and resource_type == 'ddl':
                 try:
                     # 创建或获取数据源节点
-                    data_source_en_name = handle_data_source(data_source)
+                #    data_source_en_name = handle_data_source(data_source)
+                    data_source_en_name = data_source['en_name']
                     
                     # 创建数据资源与数据源的关系
                     if data_source_en_name:
@@ -1230,62 +1231,7 @@ def handle_data_source(data_source):
                     # 数据源不存在,抛出异常
                     raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
             
-            # 完整的数据源信息模式:创建或获取数据源
-            # 检查是否已存在相同数据源,只使用type/host/port/database/username字段
-            check_cypher = """
-            MATCH (ds:data_source)
-            WHERE ds.type = $type AND 
-                  ds.host = $host AND 
-                  ds.port = $port AND 
-                  ds.database = $database AND 
-                  ds.username = $username
-            RETURN ds
-            """
-            
-            # 准备查询参数
-            check_params = {
-                "type": data_source.get("type", "").lower(),
-                "host": data_source.get("host"),
-                "port": data_source.get("port"),
-                "database": data_source.get("database"),
-                "username": data_source.get("username")
-            }
-            
-            check_result = session.run(check_cypher, **check_params)
-            existing_record = check_result.single()
-            
-            # 数据源已存在
-            if existing_record:
-                existing_data_source = dict(existing_record["ds"])
-                logger.info(f"找到现有数据源: {existing_data_source.get('en_name')}")
-                return existing_data_source.get("en_name")
-            
-            # 数据源不存在,创建新节点
-            # 创建数据源属性对象,包含data_source中的所有字段
-            connection_info = dict(data_source)  # 复制所有字段
-            
-            # 确保type字段为小写
-            if "type" in connection_info:
-                connection_info["type"] = connection_info["type"].lower()
-                
-            # 添加创建时间
-            connection_info["createTime"] = get_formatted_time()
-            
-            create_cypher = """
-            CREATE (ds:data_source $properties)
-            RETURN ds
-            """
-            
-            create_result = session.run(create_cypher, properties=connection_info)
-            created_record = create_result.single()
-            
-            if not created_record:
-                raise RuntimeError("创建数据源节点失败")
-            
-            new_data_source = dict(created_record["ds"])
-            logger.info(f"创建新数据源: {new_data_source.get('en_name')}")
-            return new_data_source.get("en_name")
-            
+              
     except Exception as e:
         logger.error(f"处理数据源失败: {str(e)}")
         raise RuntimeError(f"处理数据源失败: {str(e)}")

+ 39 - 0
app/core/llm/ddl_parser.py

@@ -168,6 +168,45 @@ class DDLParser:
         return """
 请解析以下DDL建表语句,并按照指定的JSON格式返回结果:
 
+规则说明:
+1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。
+2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。
+   - 中文表名中不要出现标点符号
+3. 字段中文名称(name)的确定规则:
+   - 如有COMMENT注释,直接使用注释内容
+   - 如无注释但字段名有明确含义,将英文名翻译为中文
+   - 如字段名是无意义的拼音缩写,则name为空字符串
+   - 字段名中不要出现逗号,以及"主键"、"外键"、"索引"等字样
+4. 所有的表的定义信息,请放在tables对象中, tables对象的key为表名,value为表的定义信息。这里可能会有多个表,请一一识别。
+5. 忽略sql文件中除了表的定义和注释信息以外的内容。比如,忽略sql中的数据库的连接字符串。
+6. 参考格式如下:
+{
+    "users_table": { //表名
+        "name": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name为空字符串
+        "schema": "public",
+        "meta": [{
+                "en_name": "id",
+                "data_type": "integer",
+                "name": "用户ID"
+            },
+            {
+                "en_name": "username",
+                "data_type": "varchar",
+                "name": "用户名"
+            }
+        ]
+    }    
+}
+
+请仅返回JSON格式结果,不要包含任何其他解释文字。
+"""
+
+
+    def _optimize_ddl_source_prompt(self):
+        """返回优化后的提示词模板"""
+        return """
+请解析以下DDL建表语句,并按照指定的JSON格式返回结果:
+
 规则说明:
 1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。
 2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。