3 Commits f4671db9fa ... 68f974c46e

Autore SHA1 Messaggio Data
  wangxq 68f974c46e 准备开发data_source相关的API 1 mese fa
  wangxq 3f01403ff0 完成对ddl加载界面的修改,增加tables标签,未确定是否在其它页面增加tables标签 1 mese fa
  wangxq d447f85872 准备修改 /ddl/parse 接口 1 mese fa

+ 89 - 48
app/api/data_resource/routes.py

@@ -573,11 +573,28 @@ def sql_test():
 def ddl_identify():
     """识别DDL语句"""
     try:
-        # 获取参数
-        sql_content = request.json.get('sql', '')
-        
+        # 获取参数 - 支持两种方式:上传文件或JSON
+        sql_content = ''
+        
+        # 检查是否有文件上传
+        if 'file' in request.files:
+            file = request.files['file']
+            # 检查文件是否存在且文件名不为空
+            if file and file.filename:
+                # 检查是否是SQL文件
+                if not file.filename.lower().endswith('.sql'):
+                    return jsonify(failed("只接受SQL文件"))
+                
+                # 读取文件内容
+                sql_content = file.read().decode('utf-8')
+                logger.info(f"从上传的文件中读取SQL内容,文件名: {file.filename}")
+        # 如果没有文件上传,检查是否有JSON输入
+        elif request.is_json:
+            sql_content = request.json.get('sql', '')
+            
+        # 如果两种方式都没有提供SQL内容,则返回错误
         if not sql_content:
-            return jsonify(failed("SQL内容不能为空"))
+            return jsonify(failed("SQL内容不能为空,请上传SQL文件或提供SQL内容"))
         
         parser = DDLParser()        
         # 提取创建表的DDL语句
@@ -586,58 +603,82 @@ def ddl_identify():
         if not ddl_list:
             return jsonify(failed("未找到有效的CREATE TABLE语句"))
         
-        # 为每个表名添加exist字段
+        # 处理结果 - 假设ddl_list已经包含tables结构
+        result = {}
+        data_source = None
+            
+        # 处理数据源和表的存在状态
         if isinstance(ddl_list, dict):
-            # 检查是否有data_source键
-            data_source = None
+            # 处理数据源信息
             if "data_source" in ddl_list:
-                # 临时移除data_source,以便只遍历表
                 data_source = ddl_list.pop("data_source", None)
-            
-            # 获取所有表名 - 过滤掉可能的非表结构键
-            table_names = []
-            for key, value in list(ddl_list.items()):
-                # 检查值是否是字典且包含meta键,这表明它是一个表结构
-                if isinstance(value, dict) and "meta" in value:
-                    table_names.append(key)
-                # 如果不是表结构,则不处理
-            
-            # 只有在有表名时才调用status_query
-            if table_names:
-                try:
-                    # 调用status_query获取表的存在状态
-                    status_results = status_query(table_names)
-                    
-                    # status_query返回的可能是单个值或嵌套列表,需要平展处理
-                    flat_results = []
-                    if status_results:
-                        # 如果是嵌套列表(通常只有一层嵌套),则拍平
-                        if isinstance(status_results, list):
-                            if len(status_results) == 1 and isinstance(status_results[0], list):
-                                flat_results = status_results[0]  # 拍平一层嵌套
+                
+                if data_source:
+                    # 检查数据源是否包含en_name
+                    if "en_name" not in data_source:
+                        logger.debug(f"data_source内容: {json.dumps(data_source, ensure_ascii=False) if data_source is not None else 'None'}")
+                        return jsonify(failed("数据源信息不完整:缺少en_name字段"))
+                        
+                    try:
+                        # 查询数据源是否存在
+                        data_source_name = data_source["en_name"]
+                        with neo4j_driver.get_session() as session:
+                            source_query = """
+                            MATCH (n:data_source {en_name: $name})
+                            RETURN n IS NOT NULL AS exists
+                            """
+                            source_result = session.run(source_query, name=data_source_name)
+                            source_exists = source_result.single()
+                            if source_exists:
+                                data_source["exist"] = source_exists["exists"]
                             else:
-                                flat_results = status_results  # 已经是平的列表
-                    
-                    # 将状态添加到每个表
-                    for i, table_name in enumerate(table_names):
-                        if i < len(flat_results):
-                            ddl_list[table_name]["exist"] = flat_results[i]
-                        else:
-                            ddl_list[table_name]["exist"] = False
+                                data_source["exist"] = False
+                    except Exception as e:
+                        logger.error(f"检查数据源存在状态失败: {str(e)}")
+                        data_source["exist"] = False
+            
+            # 处理表的存在状态 - 假设tables已经在ddl_list中
+            if "tables" in ddl_list and isinstance(ddl_list["tables"], dict):
+                table_names = list(ddl_list["tables"].keys())
+                
+                if table_names:
+                    try:
+                        # 查询表是否存在
+                        with neo4j_driver.get_session() as session:
+                            table_query = """
+                            UNWIND $names AS name
+                            OPTIONAL MATCH (n:data_resource {en_name: name})
+                            RETURN name, n IS NOT NULL AS exists
+                            """
+                            table_results = session.run(table_query, names=table_names)
                             
-                except Exception as e:
-                    logger.error(f"检查表存在状态失败: {str(e)}")
-                    # 如果status_query失败,所有表默认为不存在
-                    for table_name in table_names:
-                        ddl_list[table_name]["exist"] = False
+                            # 处理结果
+                            for record in table_results:
+                                table_name = record["name"]
+                                exists = record["exists"]
+                                if table_name in ddl_list["tables"]:
+                                    ddl_list["tables"][table_name]["exist"] = exists
+                            
+                            # 确保所有表都有exist字段
+                            for table_name in table_names:
+                                if "exist" not in ddl_list["tables"][table_name]:
+                                    ddl_list["tables"][table_name]["exist"] = False
+                    except Exception as e:
+                        logger.error(f"检查表存在状态失败: {str(e)}")
+                        # 如果查询失败,所有表默认为不存在
+                        for table_name in table_names:
+                            ddl_list["tables"][table_name]["exist"] = False
             
-            # 恢复data_source
-            if data_source:
-                ddl_list["data_source"] = data_source
+            # 构建最终结果
+            result = ddl_list
+        
+        # 添加数据源信息
+        if data_source:
+            result["data_source"] = data_source
         
-        logger.debug(f"识别到的DDL语句: {ddl_list}")
+        logger.debug(f"识别到的DDL语句: {result}")
             
-        return jsonify(success(ddl_list))
+        return jsonify(success(result))
     except Exception as e:
         logger.error(f"识别DDL语句失败: {str(e)}")
         logger.error(traceback.format_exc())  # 添加详细错误堆栈

+ 10 - 0
app/api/meta_data/routes.py

@@ -270,6 +270,11 @@ def unstructure_text_query():
         if not object_name:
             return jsonify(failed("文档路径不存在"))
             
+        # 获取 MinIO 配置
+        minio_client = get_minio_client()
+        config = get_minio_config()
+        bucket_name = config['bucket_name']
+            
         # 从MinIO获取文件内容
         file_content = get_file_content(minio_client, bucket_name, object_name)
         
@@ -560,6 +565,11 @@ def processing_unstructured_data():
         if not node_id:
             return jsonify(failed("节点ID不能为空"))
             
+        # 获取 MinIO 配置
+        minio_client = get_minio_client()
+        config = get_minio_config()
+        prefix = config['prefix']
+            
         # 调用处理逻辑
         result = solve_unstructured_data(node_id, minio_client, prefix)
         

+ 6 - 6
app/config/config.py

@@ -31,7 +31,7 @@ class BaseConfig:
     PLATFORM = platform.system().lower()
     
     # 文件上传配置
-    ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'xlsx', 'xls', 'csv'}
+    ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'xlsx', 'xls', 'csv', 'sql', 'dll'}
     
     # 数据抽取配置
     DATA_EXTRACT_BATCH_SIZE = 1000  # 每批处理的记录数
@@ -62,7 +62,7 @@ class DevelopmentConfig(BaseConfig):
     PORT = 5500
     
     # 开发环境 MinIO 配置
-    MINIO_HOST = '192.168.67.138:9000'
+    MINIO_HOST = 'localhost:9000'
     MINIO_USER = 'citu-test'
     MINIO_PASSWORD = 'citu-test'
     MINIO_SECURE = False
@@ -70,13 +70,13 @@ class DevelopmentConfig(BaseConfig):
     PREFIX = ''
     
     # 开发环境 PostgreSQL 配置
-    SQLALCHEMY_DATABASE_URI = 'postgresql://postgres:postgres@192.168.67.138:5432/dataops'
+    SQLALCHEMY_DATABASE_URI = 'postgresql://postgres:postgres@localhost:5432/dataops'
     
     # 开发环境 Neo4j 配置
-    NEO4J_URI = "bolt://192.168.67.138:7687"
-    NEO4J_HTTP_URI = "http://192.168.67.138:7474"
+    NEO4J_URI = "bolt://localhost:7687"
+    NEO4J_HTTP_URI = "http://localhost:7474"
     NEO4J_USER = "neo4j"
-    NEO4J_PASSWORD = "password"
+    NEO4J_PASSWORD = "Passw0rd"
     NEO4J_ENCRYPTED = False
     
     # 开发环境文件路径配置

+ 20 - 16
app/core/llm/ddl_parser.py

@@ -105,7 +105,9 @@ class DDLParser:
    - 如无注释但字段名有明确含义,将英文名翻译为中文
    - 如字段名是无意义的拼音缩写,则name为空字符串
    - 字段名中不要出现逗号,以及"主键"、"外键"、"索引"等字样
-4. 数据库连接串处理:
+4. 所有的表的定义信息,请放在tables对象中, tables对象的key为表名,value为表的定义信息。这里可能会有多个表,请一一识别。
+5. data_source对象,请放在data_source标签中,它与tables对象同级。
+6. 数据库连接串处理:
    - 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。
    - 根据连接串格式识别数据库类型,数据库类型请使用小写,参考例子,如 mysql/postgresql/sqlserver/oracle/db2/sybase
    - data_source.en_name格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加.
@@ -113,22 +115,24 @@ class DDLParser:
    - 无法确定数据库类型时,type设为"unknown"
    - 如果从ddl中没有识别到数据库连接串,则json不返回"data_source"标签
    - 除了database,password,username,en_name,host,port,type,name 之外,连接串的其它字段放在param属性中。
-5. 参考格式如下:
+7. 参考格式如下:
 {
-    "users": { //表名
-        "name": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name为空字符串
-        "schema": "public",
-        "meta": [{
-                "en_name": "id",
-                "data_type": "integer",
-                "name": "用户ID"
-            },
-            {
-                "en_name": "username",
-                "data_type": "varchar",
-                "name": "用户名"
-            }
-        ]
+    "tables": {
+        "users": { //表名
+            "name": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name为空字符串
+            "schema": "public",
+            "meta": [{
+                    "en_name": "id",
+                    "data_type": "integer",
+                    "name": "用户ID"
+                },
+                {
+                    "en_name": "username",
+                    "data_type": "varchar",
+                    "name": "用户名"
+                }
+            ]
+        }
     },
     "data_source": [{
         "en_name": "mydatabase_10.52.31.104_5432_myuser", //{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}