3 Commits 68f974c46e ... 33c7df202d

Auteur SHA1 Bericht Datum
  wangxq 33c7df202d 完成datasource api开发和第一版的测试 1 maand geleden
  wangxq e5338fb015 已经基本完成了对/api/resource/save的改造,不再创建数据源 1 maand geleden
  wangxq 1e5d1a886c 已经完成了data source api的开发,准备修改ddl导入 1 maand geleden

+ 2 - 1
app/__init__.py

@@ -27,6 +27,7 @@ def create_app():
     from app.api.production_line import bp as production_line_bp
     from app.api.graph import bp as graph_bp
     from app.api.system import bp as system_bp
+    from app.api.data_source import bp as data_source_bp
     
     app.register_blueprint(meta_bp, url_prefix='/api/meta')
     app.register_blueprint(resource_bp, url_prefix='/api/resource')
@@ -36,7 +37,7 @@ def create_app():
     app.register_blueprint(production_line_bp, url_prefix='/api/pipeline')
     app.register_blueprint(graph_bp, url_prefix='/api/graph')
     app.register_blueprint(system_bp, url_prefix='/api/system')
-    
+    app.register_blueprint(data_source_bp, url_prefix='/api/datasource')
     # Configure logging
     configure_logging(app)
     

+ 62 - 92
app/api/data_resource/routes.py

@@ -160,39 +160,51 @@ def data_resource_save():
     """保存数据资源"""   
     try:
         # 获取表单数据
-        # 表单以 receiver 开头时使用下面的方法:
-        # receiver = request.json.get('receiver', {})
         receiver = request.get_json()
-         # 检查receiver是否存在
+        # 检查receiver是否存在
         if not receiver:
             return jsonify(failed("参数不完整:缺少receiver"))
         # 检查url是否存在
-        if 'url' not in receiver:
-            return jsonify(failed("参数不完整:缺少url"))
+        if 'url' not in receiver or not receiver['url']:
+            logger.debug(f"url 为空")
 
-        additional_info = receiver['additional_info']
+        additional_info = receiver.get('additional_info')
         if not additional_info:
-                return jsonify(failed("参数不完整: 缺少additional_info"))
-              
+            return jsonify(failed("参数不完整: 缺少additional_info"))
+                      
+        head_data = additional_info.get('head_data') 
 
-        file_extension = receiver['url'].split('.')[-1]
-        head_data = additional_info['head_data']           
+        file_extension = receiver['url'].split('.')[-1] if receiver.get('url') else ""
         
-        if file_extension == 'xlsx' or file_extension == 'xls' or file_extension == 'csv':
-            # 如果文件是excel或csv,则需要检查storage_location是否存在
+        
+        if file_extension in ['xlsx', 'xls', 'csv']:
+            # Excel/CSV文件必须有storage_location
             storage_location = receiver.get('storage_location', '')
             if not storage_location:
                 return jsonify(failed("参数不完整:缺少storage_location或storage_location为空"))
-                        
+            
             # 调用业务逻辑处理数据资源创建,设置resource_type为structure
             resource_id = handle_node(receiver, head_data, data_source=None, resource_type='structure')
-        elif file_extension == 'sql':
-            data_source = additional_info['data_source']
-            # 如果是ddl,则需要检查data_source是否存在
-            if not data_source or (isinstance(data_source, dict) and not data_source.get("en_name")):
-                return jsonify(failed("数据源信息不完整或无效"))
-            # 调用业务逻辑处理数据资源创建,设置resource_type为ddl
-            resource_id = handle_node(receiver, head_data, data_source, resource_type='ddl')
+            
+        elif file_extension == 'sql' or file_extension == "" or not file_extension:
+            data_source = additional_info.get('data_source', '')
+            storage_location = receiver.get('storage_location', '')
+
+            # 如果有storage_location,按结构化数据处理
+            if storage_location:
+                resource_id = handle_node(receiver, head_data, data_source=None, resource_type='structure')
+            
+            # 如果有data_source,按DDL处理
+            elif data_source:
+                # 检查data_source格式
+                if not isinstance(data_source, dict) or not data_source.get("en_name"):
+                    return jsonify(failed("数据源信息不完整或无效"))
+                resource_id = handle_node(receiver, head_data, data_source, resource_type='ddl')
+            
+            # 两者都没有
+            else:
+                return jsonify(failed("SQL文件处理需要提供storage_location或有效的data_source信息"))
+                
         else:
             return jsonify(failed("文件格式错误"))
     
@@ -603,82 +615,39 @@ def ddl_identify():
         if not ddl_list:
             return jsonify(failed("未找到有效的CREATE TABLE语句"))
         
-        # 处理结果 - 假设ddl_list已经包含tables结构
-        result = {}
-        data_source = None
-            
-        # 处理数据源和表的存在状态
+        # 处理表的存在状态
         if isinstance(ddl_list, dict):
-            # 处理数据源信息
-            if "data_source" in ddl_list:
-                data_source = ddl_list.pop("data_source", None)
-                
-                if data_source:
-                    # 检查数据源是否包含en_name
-                    if "en_name" not in data_source:
-                        logger.debug(f"data_source内容: {json.dumps(data_source, ensure_ascii=False) if data_source is not None else 'None'}")
-                        return jsonify(failed("数据源信息不完整:缺少en_name字段"))
-                        
-                    try:
-                        # 查询数据源是否存在
-                        data_source_name = data_source["en_name"]
-                        with neo4j_driver.get_session() as session:
-                            source_query = """
-                            MATCH (n:data_source {en_name: $name})
-                            RETURN n IS NOT NULL AS exists
-                            """
-                            source_result = session.run(source_query, name=data_source_name)
-                            source_exists = source_result.single()
-                            if source_exists:
-                                data_source["exist"] = source_exists["exists"]
-                            else:
-                                data_source["exist"] = False
-                    except Exception as e:
-                        logger.error(f"检查数据源存在状态失败: {str(e)}")
-                        data_source["exist"] = False
+            # 获取所有表名
+            table_names = list(ddl_list.keys())
             
-            # 处理表的存在状态 - 假设tables已经在ddl_list中
-            if "tables" in ddl_list and isinstance(ddl_list["tables"], dict):
-                table_names = list(ddl_list["tables"].keys())
-                
-                if table_names:
-                    try:
-                        # 查询表是否存在
-                        with neo4j_driver.get_session() as session:
-                            table_query = """
-                            UNWIND $names AS name
-                            OPTIONAL MATCH (n:data_resource {en_name: name})
-                            RETURN name, n IS NOT NULL AS exists
-                            """
-                            table_results = session.run(table_query, names=table_names)
-                            
-                            # 处理结果
-                            for record in table_results:
-                                table_name = record["name"]
-                                exists = record["exists"]
-                                if table_name in ddl_list["tables"]:
-                                    ddl_list["tables"][table_name]["exist"] = exists
-                            
-                            # 确保所有表都有exist字段
-                            for table_name in table_names:
-                                if "exist" not in ddl_list["tables"][table_name]:
-                                    ddl_list["tables"][table_name]["exist"] = False
-                    except Exception as e:
-                        logger.error(f"检查表存在状态失败: {str(e)}")
-                        # 如果查询失败,所有表默认为不存在
-                        for table_name in table_names:
-                            ddl_list["tables"][table_name]["exist"] = False
+            # 首先为所有表设置默认的exist状态
+            for table_name in table_names:
+                ddl_list[table_name]["exist"] = False
             
-            # 构建最终结果
-            result = ddl_list
-        
-        # 添加数据源信息
-        if data_source:
-            result["data_source"] = data_source
-        
-        logger.debug(f"识别到的DDL语句: {result}")
+            if table_names:
+                try:
+                    # 查询表是否存在
+                    with neo4j_driver.get_session() as session:
+                        table_query = """
+                        UNWIND $names AS name
+                        OPTIONAL MATCH (n:data_resource {en_name: name})
+                        RETURN name, n IS NOT NULL AS exists
+                        """
+                        table_results = session.run(table_query, names=table_names)
+                        
+                        # 更新存在的表的状态
+                        for record in table_results:
+                            table_name = record["name"]
+                            exists = record["exists"]
+                            if table_name in ddl_list:
+                                ddl_list[table_name]["exist"] = exists
+                except Exception as e:
+                    logger.error(f"检查表存在状态失败: {str(e)}")
+                    # 如果查询失败,所有表保持默认的False状态
+        
+        logger.debug(f"识别到的DDL语句: {json.dumps(ddl_list, ensure_ascii=False)}")
             
-        return jsonify(success(result))
+        return jsonify(success(ddl_list))
     except Exception as e:
         logger.error(f"识别DDL语句失败: {str(e)}")
         logger.error(traceback.format_exc())  # 添加详细错误堆栈
@@ -707,6 +676,7 @@ def sql_ddl_identify():
         logger.error(f"识别DDL语句失败: {str(e)}")
         return jsonify(failed(str(e)))
 
+
 @bp.route('/model/list', methods=['POST'])
 def resource_model_list():
     """获取模型资源列表"""

+ 5 - 0
app/api/data_source/__init__.py

@@ -0,0 +1,5 @@
+from flask import Blueprint
+
+bp = Blueprint('data_source', __name__)
+
+from app.api.data_source import routes 

+ 309 - 0
app/api/data_source/routes.py

@@ -0,0 +1,309 @@
+from flask import request, jsonify
+import logging
+import json
+from datetime import datetime
+from app.models.result import success, failed
+from app.api.data_source import bp
+from app.core.graph.graph_operations import create_or_get_node, execute_cypher_query
+from flask_sqlalchemy import SQLAlchemy
+from sqlalchemy import create_engine, text, URL
+logger = logging.getLogger(__name__)
+
+# 创建数据源时使用此api
+@bp.route('/save', methods=['POST'])
+def data_source_save():
+    """保存数据源"""
+    try:
+        # 获取表单数据
+        data = request.json
+        logger.debug(f"保存数据源请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
+
+        # 检查必填参数
+        required_fields = ['database', 'host', 'port', 'username', 'password', 'en_name', 'type']
+        missing_fields = [field for field in required_fields if not data.get(field)]
+        
+        if missing_fields:
+            error_msg = f"缺少必填参数: {', '.join(missing_fields)}"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 检查en_name是否已存在
+        check_query = """
+        MATCH (n:DataSource)
+        WHERE n.en_name = $en_name
+        RETURN n
+        """
+        result = execute_cypher_query(check_query, {'en_name': data['en_name']})
+        
+        # 添加创建时间
+        data['create_dt'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+        
+        if result:
+            # 如果存在,更新节点
+            node = result[0]['n']
+            node_id = node['_id']
+            # 更新属性
+            update_query = """
+            MATCH (n:DataSource)
+            WHERE id(n) = $node_id
+            SET n += $properties
+            RETURN id(n) as node_id
+            """
+            result = execute_cypher_query(update_query, {
+                'node_id': node_id,
+                'properties': data
+            })
+            message = "数据源更新成功"
+        else:
+            # 如果不存在,创建新节点
+            node_id = create_or_get_node('DataSource', **data)
+            message = "数据源创建成功"
+        
+        # 返回成功结果
+        return jsonify(success({
+            "id": node_id,
+            "message": message
+        }))
+    except Exception as e:
+        logger.error(f"保存数据源失败: {str(e)}")
+        return jsonify(failed(str(e)))
+    
+    
+# 获取数据源列表 或根据id获取数据源信息
+@bp.route('/list', methods=['POST'])
+def data_source_list():
+    """获取数据源列表或指定数据源信息"""
+    try:
+        # 获取请求参数
+        data = request.json
+        
+        # 构建查询条件
+        where_conditions = []
+        params = {}
+        
+        # 如果指定了id
+        if data and 'id' in data:
+            where_conditions.append("id(n) = $id")
+            params['id'] = int(data['id'])
+        # 如果有其他属性
+        elif data:
+            for key, value in data.items():
+                if value:  # 只处理非空值
+                    where_conditions.append(f"n.{key} = ${key}")
+                    params[key] = value
+        
+        # 构建WHERE子句
+        where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
+        
+        # 构建查询语句
+        cypher = f"""
+        MATCH (n:DataSource)
+        {where_clause}
+        RETURN n
+        """
+        
+        # 执行查询
+        result = execute_cypher_query(cypher, params)
+        
+        # 格式化结果
+        data_sources = []
+        for record in result:
+            node = record['n']
+            node['id'] = node['_id']
+            data_sources.append(node)
+        
+        # 返回结果
+        return jsonify(success({
+            "data_source": data_sources,
+            "total": len(data_sources)
+        }))
+            
+    except Exception as e:
+        logger.error(f"获取数据源列表失败: {str(e)}")
+        return jsonify(failed(str(e)))
+    
+
+@bp.route('/delete', methods=['POST'])
+def data_source_delete():
+    """删除数据源"""
+    try:
+        # 获取请求参数
+        data = request.json
+        logger.debug(f"删除数据源请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
+
+        # 检查参数
+        if not data or ('id' not in data and 'en_name' not in data):
+            error_msg = "必须提供id或en_name参数"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 构建删除条件
+        if 'id' in data:
+            where_clause = "id(n) = $id"
+            params = {'id': int(data['id'])}
+        else:
+            where_clause = "n.en_name = $en_name"
+            params = {'en_name': data['en_name']}
+
+        # 构建删除语句
+        delete_query = f"""
+        MATCH (n:DataSource)
+        WHERE {where_clause}
+        WITH n
+        OPTIONAL MATCH (n)-[r]-()
+        DELETE r, n
+        RETURN count(n) as deleted_count
+        """
+        
+        # 执行删除
+        result = execute_cypher_query(delete_query, params)
+        
+        if result and result[0]['deleted_count'] > 0:
+            return jsonify(success({
+                "message": "数据源删除成功",
+                "deleted_count": result[0]['deleted_count']
+            }))
+        else:
+            error_msg = "未找到指定的数据源"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+            
+    except Exception as e:
+        logger.error(f"删除数据源失败: {str(e)}")
+        return jsonify(failed(str(e)))
+    
+
+
+@bp.route('/parse', methods=['POST'])
+def data_source_connstr_parse():
+    """解析数据源连接字符串"""
+    try:
+        # 获取请求参数
+        data = request.json
+        logger.debug(f"解析连接字符串请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
+
+        # 检查参数
+        if not data or 'conn_str' not in data:
+            error_msg = "缺少连接字符串参数"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 创建DDLParser实例并解析连接字符串
+        from app.core.llm.ddl_parser import DDLParser
+        parser = DDLParser()
+        result = parser.parse_db_conn_str(data['conn_str'])
+
+        # 检查解析结果
+        if isinstance(result, dict) and 'code' in result and result['code'] == 500:
+            error_msg = f"解析连接字符串失败: {result.get('message', '未知错误')}"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 返回成功结果
+        return jsonify(success(result))
+
+    except Exception as e:
+        logger.error(f"解析连接字符串失败: {str(e)}")
+        return jsonify(failed(str(e)))
+    
+
+@bp.route('/valid', methods=['POST'])
+def data_source_connstr_valid():
+    """验证数据源连接信息"""
+    try:
+        # 获取请求参数
+        data = request.json
+        logger.debug(f"验证连接信息请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
+
+        # 检查参数
+        if not data:
+            error_msg = "缺少连接信息参数"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 检查密码是否为空
+        if not data.get('password'):
+            error_msg = "密码不能为空"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 创建DDLParser实例并验证连接信息
+        from app.core.llm.ddl_parser import DDLParser
+        parser = DDLParser()
+        result = parser.valid_db_conn_str(data)
+
+        # 根据验证结果返回响应
+        if result == "success":
+            # 检查数据源是否已存在
+            check_query = """
+            MATCH (n:DataSource)
+            WHERE n.en_name = $en_name
+            RETURN n
+            """
+            existing_source = execute_cypher_query(check_query, {'en_name': data['en_name']})
+            
+            if existing_source:
+                return jsonify(success("连接信息验证通过,但该数据源的定义已经存在,如果保存则会更新该数据源"))
+            else:
+                return jsonify(success("连接信息验证通过"))
+        else:
+            return jsonify(failed("连接信息验证失败"))
+
+    except Exception as e:
+        logger.error(f"验证连接信息失败: {str(e)}")
+        return jsonify(failed(str(e)))
+    
+
+@bp.route('/conntest', methods=['POST'])
+def data_source_conn_test():
+    """测试数据源连接"""
+    try:
+        # 获取请求参数
+        data = request.json
+        logger.debug(f"测试连接请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
+
+        # 检查必需参数
+        required_fields = ['type', 'username', 'host', 'port', 'database', 'password']
+        missing_fields = [field for field in required_fields if not data.get(field)]
+        
+        if missing_fields:
+            error_msg = f"缺少必需参数: {', '.join(missing_fields)}"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 构建数据库URL
+        db_url = URL.create(
+            drivername=data['type'],
+            username=data['username'],
+            password=data.get('password', ''),
+            host=data['host'],
+            port=data['port'],
+            database=data['database']
+        )
+
+        # 创建数据库引擎
+        engine = create_engine(db_url, connect_args={'connect_timeout': 5})
+        
+        # 测试连接
+        try:
+            with engine.connect() as conn:
+                # 执行一个简单的查询来测试连接
+                conn.execute(text("SELECT 1"))
+                return jsonify(success({
+                    "message": f"{data['type']}连接测试成功",
+                    "connected": True
+                }))
+        except Exception as e:
+            return jsonify(failed({
+                "message": f"连接测试失败: {str(e)}",
+                "connected": False
+            }))
+
+    except Exception as e:
+        logger.error(f"测试连接失败: {str(e)}")
+        return jsonify(failed(str(e)))
+
+
+@bp.route('/graph', methods=['POST'])
+def data_source_graph_relationship():
+    pass

+ 29 - 87
app/core/data_resource/resource.py

@@ -105,7 +105,7 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
             
         # 更新属性
         update_attributes = {
-            'en_name': receiver['en_name'],
+            'en_name': receiver.get('en_name', receiver.get('name', '')),
             'time': get_formatted_time(),
             'type': type_value  # 根据资源类型设置不同的type值
         }
@@ -222,16 +222,17 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
                         logger.error(f"未能创建或获取元数据节点: {item['name']}")
             
             # 处理数据源关系
-            if data_source:
+            if data_source and resource_type == 'ddl':
                 try:
                     # 创建或获取数据源节点
-                    data_source_en_name = handle_data_source(data_source)
+                #    data_source_en_name = handle_data_source(data_source)
+                    data_source_en_name = data_source['en_name']
                     
                     # 创建数据资源与数据源的关系
                     if data_source_en_name:
                         # 创建 originates_from 关系
                         rel_data_source_cypher = """
-                        MATCH (a:data_resource), (b:data_source)
+                        MATCH (a:data_resource), (b:DataSource)
                         WHERE id(a) = $resource_id AND b.en_name = $ds_en_name
                         MERGE (a)-[r:originates_from]->(b)
                         RETURN r
@@ -251,7 +252,7 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
                             logger.error(error_msg)
                             
                             # 检查数据源节点是否存在
-                            check_ds_cypher = "MATCH (b:data_source) WHERE b.en_name = $ds_en_name RETURN b"
+                            check_ds_cypher = "MATCH (b:DataSource) WHERE b.en_name = $ds_en_name RETURN b"
                             check_ds_result = session.run(check_ds_cypher, ds_en_name=data_source_en_name)
                             if not check_ds_result.single():
                                 logger.error(f"数据源节点不存在: en_name={data_source_en_name}")
@@ -1193,98 +1194,39 @@ def data_resource_edit(data):
         raise
 
 def handle_data_source(data_source):
-    """处理数据源的检查和创建
-    """
+    """处理数据源信息,创建或获取数据源节点"""
     try:
-        # 1. 检查en_name是否存在
-        ds_en_name = data_source.get("en_name")
-        if not ds_en_name:
-            raise ValueError("数据源信息不完整,缺少名称(en_name)")
-        
-        # 2. 处理name字段
-        if "name" not in data_source or not data_source["name"]:
-            data_source["name"] = ds_en_name
-            logger.debug(f"数据源name为空,使用en_name作为替代: {ds_en_name}")
-        
-        # 3. 检查是否为简单查询模式
-        required_fields = ["type", "host", "port", "database", "username"]
-        has_required_fields = all(data_source.get(field) for field in required_fields)
-        
         with neo4j_driver.get_session() as session:
-            # 简单查询模式:只通过en_name查找已有数据源
-            if not has_required_fields:
-                logger.info(f"简单数据源查询模式,查找en_name为: {ds_en_name}")
-                check_name_cypher = """
-                MATCH (ds:data_source {en_name: $en_name})
-                RETURN ds
-                """
-                check_result = session.run(check_name_cypher, en_name=ds_en_name)
-                existing_record = check_result.single()
+            # 获取英文名称作为唯一标识
+            ds_en_name = data_source.get("en_name")
+            if not ds_en_name:
+                logger.error("数据源缺少必要的en_name属性")
+                return None
                 
-                if existing_record:
-                    # 数据源已存在,返回其名称
-                    existing_data_source = dict(existing_record["ds"])
-                    logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
-                    return existing_data_source.get("en_name")
-                else:
-                    # 数据源不存在,抛出异常
-                    raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
-            
-            # 完整的数据源信息模式:创建或获取数据源
-            # 检查是否已存在相同数据源,只使用type/host/port/database/username字段
-            check_cypher = """
-            MATCH (ds:data_source)
-            WHERE ds.type = $type AND 
-                  ds.host = $host AND 
-                  ds.port = $port AND 
-                  ds.database = $database AND 
-                  ds.username = $username
+            # 如果没有设置name,使用en_name作为name
+            if "name" not in data_source or not data_source["name"]:
+                data_source["name"] = ds_en_name
+            
+            # 检查必填字段
+            required_fields = ["type", "host", "port", "database", "username"]
+            has_required_fields = all(data_source.get(field) for field in required_fields)
+            
+            # 查询是否已存在相同en_name的数据源
+            existing_cypher = """
+            MATCH (ds:DataSource {en_name: $en_name})
             RETURN ds
             """
             
-            # 准备查询参数
-            check_params = {
-                "type": data_source.get("type", "").lower(),
-                "host": data_source.get("host"),
-                "port": data_source.get("port"),
-                "database": data_source.get("database"),
-                "username": data_source.get("username")
-            }
-            
-            check_result = session.run(check_cypher, **check_params)
-            existing_record = check_result.single()
+            existing_result = session.run(existing_cypher, en_name=ds_en_name)
+            existing_record = existing_result.single()
             
-            # 数据源已存在
             if existing_record:
                 existing_data_source = dict(existing_record["ds"])
-                logger.info(f"找到现有数据源: {existing_data_source.get('en_name')}")
+                logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
                 return existing_data_source.get("en_name")
-            
-            # 数据源不存在,创建新节点
-            # 创建数据源属性对象,包含data_source中的所有字段
-            connection_info = dict(data_source)  # 复制所有字段
-            
-            # 确保type字段为小写
-            if "type" in connection_info:
-                connection_info["type"] = connection_info["type"].lower()
-                
-            # 添加创建时间
-            connection_info["createTime"] = get_formatted_time()
-            
-            create_cypher = """
-            CREATE (ds:data_source $properties)
-            RETURN ds
-            """
-            
-            create_result = session.run(create_cypher, properties=connection_info)
-            created_record = create_result.single()
-            
-            if not created_record:
-                raise RuntimeError("创建数据源节点失败")
-            
-            new_data_source = dict(created_record["ds"])
-            logger.info(f"创建新数据源: {new_data_source.get('en_name')}")
-            return new_data_source.get("en_name")
+            else:
+                # 数据源不存在,抛出异常
+                raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
             
     except Exception as e:
         logger.error(f"处理数据源失败: {str(e)}")

+ 222 - 2
app/core/llm/ddl_parser.py

@@ -2,8 +2,11 @@ import os
 import requests
 import re
 import json
+import logging
 from flask import current_app
 
+logger = logging.getLogger(__name__)
+
 class DDLParser:
     def __init__(self, api_key=None):
         """
@@ -34,7 +37,7 @@ class DDLParser:
         返回:
             解析结果的JSON对象
         """
-        prompt = self._optimize_prompt()
+        prompt = self._optimize_ddl_prompt()
         payload = {
             "model": self.model_name,
             "messages": [
@@ -91,7 +94,117 @@ class DDLParser:
                 "message": f"API请求失败: {str(e)}"
             }
 
-    def _optimize_prompt(self):
+
+    def parse_db_conn_str(self, conn_str):
+        """
+        解析数据库连接字符串
+        
+        参数:
+            conn_str: 要解析的数据库连接字符串
+            
+        返回:
+            解析结果的JSON对象
+        """
+        prompt = self._optimize_connstr_parse_prompt()
+        payload = {
+            "model": self.model_name,
+            "messages": [
+                {
+                    "role": "system",
+                    "content": "你是一个专业的数据库连接字符串解析专家,擅长解析各种数据库的连接字符串并提取关键信息。"
+                },
+                {
+                    "role": "user", 
+                    "content": f"{prompt}\n\n{conn_str}"
+                }
+            ]
+        }
+        
+        try:
+            response = requests.post(
+                f"{self.base_url}/chat/completions",
+                headers=self.headers,
+                json=payload,
+                timeout=30
+            )
+            response.raise_for_status()
+            
+            result = response.json()
+            
+            if "choices" in result and len(result["choices"]) > 0:
+                content = result["choices"][0]["message"]["content"]
+                
+                try:
+                    json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content)
+                    if json_match:
+                        json_content = json_match.group(1)
+                    else:
+                        json_content = content
+                        
+                    parsed_result = json.loads(json_content)
+                    return parsed_result
+                except json.JSONDecodeError as e:
+                    return {
+                        "code": 500,
+                        "message": f"无法解析返回的JSON: {str(e)}",
+                        "original_response": content
+                    }
+            
+            return {
+                "code": 500,
+                "message": "无法获取有效响应",
+                "original_response": result
+            }
+            
+        except requests.RequestException as e:
+            return {
+                "code": 500,
+                "message": f"API请求失败: {str(e)}"
+            }
+
+
+    def _optimize_ddl_prompt(self):
+        """返回优化后的提示词模板"""
+        return """
+请解析以下DDL建表语句,并按照指定的JSON格式返回结果:
+
+规则说明:
+1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。
+2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。
+   - 中文表名中不要出现标点符号
+   - 表中的字段对应输出json中的meta对象,en_name对应表的字段名,data_type对应表的字段类型.
+3. 返回结果的中文名称(name)的确定规则:
+   - 对于COMMENT注释,直接使用注释内容作为name
+   - 如sql中无注释但字段名en_name有明确含义,将英文名en_name翻译为中文
+   - 如字段名en_name是无意义的拼音缩写,则name为空字符串
+   - 中文字段名name中不要出现逗号,以及"主键"、"外键"、"索引"等字样
+4. 所有的表的定义信息,请放在tables对象中, tables对象的key为表名,value为表的定义信息。这里可能会有多个表,请一一识别。
+   - 对于每个表的字段都要检查它的en_name和name,name不能为空,首选字段的注释,如果没有注释,则尝试翻译en_name作为name。
+5. 忽略sql文件中除了表的定义和注释信息COMMIT以外的内容。比如,忽略sql中的数据库的连接字符串。
+6. 参考格式如下:
+{
+    "users_table": { //表名
+        "name": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name为空字符串
+        "schema": "public",
+        "meta": [{
+                "en_name": "id", //表的字段名
+                "data_type": "integer", //表的字段类型
+                "name": "用户ID" //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name为空字符串
+            },
+            {
+                "en_name": "username",
+                "data_type": "varchar",
+                "name": "用户名"
+            }
+        ]
+    }    
+}
+
+请仅返回JSON格式结果,不要包含任何其他解释文字。
+"""
+
+
+    def _optimize_ddl_source_prompt(self):
         """返回优化后的提示词模板"""
         return """
 请解析以下DDL建表语句,并按照指定的JSON格式返回结果:
@@ -150,3 +263,110 @@ class DDLParser:
 请仅返回JSON格式结果,不要包含任何其他解释文字。
 """
 
+    def _optimize_connstr_parse_prompt(self):
+        """返回优化后的连接字符串解析提示词模板"""
+        return """
+请解析以下数据库连接字符串,并按照指定的JSON格式返回结果:
+
+规则说明:
+1. 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。
+2. 根据连接串格式识别数据库类型,数据库类型请使用小写,如 mysql/postgresql/sqlserver/oracle/db2/sybase
+3. data_source.en_name格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加
+4. data_source.name留空
+5. 无法确定数据库类型时,type设为"unknown"
+6. 除了database,password,username,en_name,host,port,type,name 之外,连接串的其它字段放在param属性中
+
+返回格式示例:
+{
+    "data_source": {
+        "en_name": "mydatabase_10.52.31.104_5432_myuser",
+        "name": "",
+        "type": "postgresql",
+        "host": "10.52.31.104",
+        "port": 5432,
+        "database": "mydatabase",
+        "username": "myuser",
+        "password": "mypassword",
+        "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
+    }
+}
+
+请仅返回JSON格式结果,不要包含任何其他解释文字。
+"""
+
+
+    def _optimize_connstr_valid_prompt(self):
+        """返回优化后的连接字符串验证提示词模板"""
+        return """
+请验证以下数据库连接信息是否符合规则:
+
+规则说明:
+1. 必填字段检查:
+   - database: 数据库名称,不能为空,符合数据库名称的命名规范。
+   - en_name: 格式必须为 "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}"
+   - host: 主机名或IP地址,不能为空
+   - port: 端口号,必须为数字
+   - type: 数据库类型,必须为以下之一:mysql/postgresql/sqlserver/oracle/db2/sybase
+   - username: 用户名,不能为空,名称中间不能有空格。
+
+2. 字段格式检查:
+   - en_name中的各个部分必须与对应的字段值匹配
+   - port必须是有效的端口号(1-65535)
+   - type必须是小写的数据库类型名称
+   - param中的参数格式必须正确(key=value格式)
+
+3. 可选字段:
+   - password: 密码(可选)
+   - name: 中文名称(可选)
+   - desc: 描述(可选)
+
+请检查提供的连接信息是否符合以上规则,如果符合则返回"success",否则返回"failure"。
+
+请仅返回"success"或"failure",不要包含任何其他解释文字。
+"""
+
+    def valid_db_conn_str(self, conn_str):
+        """
+        验证数据库连接字符串是否符合规则
+        
+        参数:
+            conn_str: 要验证的数据库连接信息(JSON格式)
+            
+        返回:
+            "success" 或 "failure"
+        """
+        prompt = self._optimize_connstr_valid_prompt()
+        payload = {
+            "model": self.model_name,
+            "messages": [
+                {
+                    "role": "system",
+                    "content": "你是一个专业的数据库连接信息验证专家,擅长验证数据库连接信息的完整性和正确性。"
+                },
+                {
+                    "role": "user", 
+                    "content": f"{prompt}\n\n{json.dumps(conn_str, ensure_ascii=False)}"
+                }
+            ]
+        }
+        
+        try:
+            response = requests.post(
+                f"{self.base_url}/chat/completions",
+                headers=self.headers,
+                json=payload,
+                timeout=30
+            )
+            response.raise_for_status()
+            
+            result = response.json()
+            
+            if "choices" in result and len(result["choices"]) > 0:
+                content = result["choices"][0]["message"]["content"].strip().lower()
+                return "success" if content == "success" else "failure"
+            
+            return "failure"
+            
+        except Exception as e:
+            logger.error(f"LLM 验证数据库连接字符串失败: {str(e)}")
+            return "failure"

+ 11 - 27
app/core/production_line/production_line.py

@@ -221,7 +221,7 @@ def get_resource_storage_info(resource_id):
             metadata_query = """
             MATCH (n:data_resource)-[:contain]->(m:meta_data)
             WHERE id(n) = $resource_id
-            RETURN m.name as name, m.en_name as en_name, m.data_type as type
+            RETURN m.name as name, m.en_name as en_name, m.data_type as data_type
             """
             result = session.run(metadata_query, resource_id=int(resource_id))
             metadata_list = [dict(record) for record in result]
@@ -909,51 +909,35 @@ def get_resource_details(resource_id):
     return handle_id_resource(resource_id)
 
 def get_resource_data_source(resource_id):
-    """
-    获取数据资源关联的数据源信息
-    
-    Args:
-        resource_id: 数据资源ID
-        
-    Returns:
-        dict: 数据源连接信息
-    """
+    """获取数据资源关联的数据源信息"""
     try:
         with neo4j_driver.get_session() as session:
-            # 查询数据资源关联的数据源节点
+            # 查询数据资源节点连接的数据源节点
             cypher = """
-            MATCH (n:data_resource)-[:originates_from]->(ds:data_source)
+            MATCH (n:data_resource)-[:originates_from]->(ds:DataSource)
             WHERE id(n) = $resource_id
             RETURN ds
             """
+            
             result = session.run(cypher, resource_id=int(resource_id))
             record = result.single()
             
             if not record:
-                logger.warning(f"资源 {resource_id} 没有关联的数据源节点")
+                logger.warning(f"资源ID {resource_id} 没有关联的数据源")
                 return None
-                
-            data_source = dict(record["ds"])
             
-            # 构建连接信息
-            connection_info = {
+            # 构建数据源连接信息
+            data_source = dict(record["ds"])
+            return {
                 "type": data_source.get("type", "").lower(),
                 "host": data_source.get("host"),
                 "port": data_source.get("port"),
                 "database": data_source.get("database"),
                 "username": data_source.get("username"),
                 "password": data_source.get("password")
-                # 添加param参数,但是由于pyMySQL不支持一些参数,所以暂时不使用参数。
-               # "param": data_source.get("param")  
+                # 如果需要其他参数可以添加
+                # "param": data_source.get("param")
             }
-            
-            # 验证必要字段
-            if not all([connection_info["type"], connection_info["host"], 
-                        connection_info["database"], connection_info["username"]]):
-                logger.error(f"数据源信息不完整: {connection_info}")
-                return None
-                
-            return connection_info
     except Exception as e:
         logger.error(f"获取数据源信息失败: {str(e)}")
         return None

+ 88 - 0
migrate_meta_data_type.py

@@ -0,0 +1,88 @@
+"""
+元数据属性迁移脚本:将meta_data节点的type属性改为data_type属性
+运行方式:python migrate_meta_data_type.py
+"""
+
+import logging
+import sys
+import time
+from app.services.neo4j_driver import neo4j_driver
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(sys.stdout)
+    ]
+)
+
+logger = logging.getLogger("meta_data_migration")
+
+def migrate_type_to_data_type():
+    """
+    将meta_data节点的type属性迁移到data_type属性
+    """
+    try:
+        with neo4j_driver.get_session() as session:
+            # 查询拥有type属性的meta_data节点数量
+            count_query = """
+            MATCH (n:meta_data)
+            WHERE EXISTS(n.type)
+            RETURN COUNT(n) as count
+            """
+            count_result = session.run(count_query)
+            node_count = count_result.single()["count"]
+            
+            if node_count == 0:
+                logger.info("没有找到带有type属性的meta_data节点,无需迁移")
+                return
+                
+            logger.info(f"找到 {node_count} 个需要迁移的meta_data节点")
+            
+            # 执行迁移:复制type属性到data_type,然后删除type属性
+            migrate_query = """
+            MATCH (n:meta_data)
+            WHERE EXISTS(n.type)
+            SET n.data_type = n.type
+            REMOVE n.type
+            RETURN COUNT(n) as migrated_count
+            """
+            
+            migrate_result = session.run(migrate_query)
+            migrated_count = migrate_result.single()["migrated_count"]
+            
+            logger.info(f"成功迁移 {migrated_count} 个meta_data节点的type属性到data_type属性")
+            
+            # 验证迁移结果
+            verify_query = """
+            MATCH (n:meta_data)
+            WHERE EXISTS(n.type)
+            RETURN COUNT(n) as remaining_count
+            """
+            verify_result = session.run(verify_query)
+            remaining_count = verify_result.single()["remaining_count"]
+            
+            if remaining_count > 0:
+                logger.warning(f"仍有 {remaining_count} 个meta_data节点保留了type属性")
+            else:
+                logger.info("所有meta_data节点的type属性已成功迁移到data_type属性")
+                
+    except Exception as e:
+        logger.error(f"迁移过程中发生错误: {str(e)}")
+        raise
+
+if __name__ == "__main__":
+    try:
+        logger.info("开始迁移meta_data节点的type属性到data_type属性...")
+        start_time = time.time()
+        
+        migrate_type_to_data_type()
+        
+        end_time = time.time()
+        execution_time = end_time - start_time
+        logger.info(f"迁移完成,耗时: {execution_time:.2f} 秒")
+        
+    except Exception as e:
+        logger.error(f"迁移失败: {str(e)}")
+        sys.exit(1)