2 次代碼提交 3df57b8812 ... 6baeeb738c

作者 SHA1 備註 提交日期
  wangxq 6baeeb738c 正在开发从mysql->pg的数据抽取,已经完成前两步的测试 1 月之前
  wangxq 6144ac1a8d 准备开发从mysql->pg的数据抽取 1 月之前

+ 8 - 8
app/api/data_resource/routes.py

@@ -173,10 +173,7 @@ def data_resource_save():
         additional_info = receiver['additional_info']
         if not additional_info:
                 return jsonify(failed("参数不完整: 缺少additional_info"))
-        
-        data_resource = additional_info['data_resource']
-        if not data_resource:
-                return jsonify(failed("参数不完整: 缺少data_resource"))         
+              
 
         file_extension = receiver['url'].split('.')[-1]
         head_data = additional_info['head_data']           
@@ -187,20 +184,23 @@ def data_resource_save():
             if not storage_location:
                 return jsonify(failed("参数不完整:缺少storage_location或storage_location为空"))
                         
-            # 调用业务逻辑处理数据资源创建
-            resource_id = handle_node(receiver, head_data, data_resource)
+            # 调用业务逻辑处理数据资源创建,设置resource_type为structure
+            resource_id = handle_node(receiver, head_data, data_source=None, resource_type='structure')
         elif file_extension == 'sql':
-            data_source = receiver['data_source']
+            data_source = additional_info['data_source']
             # 如果是ddl,则需要检查data_source是否存在
             if not data_source or (isinstance(data_source, dict) and not data_source.get("en_name")):
                 return jsonify(failed("数据源信息不完整或无效"))
-            resource_id = handle_node(receiver, head_data, data_resource, data_source)
+            # 调用业务逻辑处理数据资源创建,设置resource_type为ddl
+            resource_id = handle_node(receiver, head_data, data_source, resource_type='ddl')
         else:
             return jsonify(failed("文件格式错误"))
     
         return jsonify(success({"id": resource_id}))
     except Exception as e:
         logger.error(f"保存数据资源失败: {str(e)}")
+        error_traceback = traceback.format_exc()
+        logger.error(f"错误详情: {error_traceback}")
         return jsonify(failed(str(e)))
 
 @bp.route('/delete', methods=['POST'])

+ 3 - 0
app/config/config.py

@@ -33,6 +33,9 @@ class BaseConfig:
     # 文件上传配置
     ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'xlsx', 'xls', 'csv'}
     
+    # 数据抽取配置
+    DATA_EXTRACT_BATCH_SIZE = 1000  # 每批处理的记录数
+    
     # PostgreSQL 基础配置
     SQLALCHEMY_ENGINE_OPTIONS = {
         'pool_pre_ping': True,

+ 24 - 9
app/core/data_resource/resource.py

@@ -94,14 +94,20 @@ def update_or_create_node(label, **properties):
         return None
 
 # 数据资源-元数据 关系节点创建、查询
-def handle_node(receiver, head_data, data_resource, data_source=None):
+def handle_node(receiver, head_data, data_source=None, resource_type=None):
     """处理数据资源节点创建和关系建立"""
     try:
+        # 根据resource_type设置type属性的值
+        if resource_type == 'ddl':
+            type_value = 'ddl'
+        else:
+            type_value = 'structure'
+            
         # 更新属性
         update_attributes = {
-            'en_name': data_resource['en_name'],
+            'en_name': receiver['en_name'],
             'time': get_formatted_time(),
-            'type': 'structure'  # 结构化文件没有type
+            'type': type_value  # 根据资源类型设置不同的type值
         }
         if 'additional_info' in receiver:
             del receiver['additional_info']
@@ -1242,13 +1248,22 @@ def handle_data_source(data_source):
                 RETURN ds
                 """
                 
+                connection_info = {
+                    "type": data_source.get("type", "").lower(),
+                    "host": data_source.get("host"),
+                    "port": data_source.get("port"),
+                    "database": data_source.get("database"),   # 支持新旧字段名
+                    "username": data_source.get("username"),  # 支持新旧字段名
+                    "password": data_source.get("password")  # 支持新旧字段名
+                }
+                
                 check_result = session.run(
                     check_cypher,
-                    type=data_source["type"],
-                    host=data_source["host"],
-                    port=data_source["port"],
-                    database=data_source["database"],
-                    username=data_source["username"]
+                    type=connection_info["type"],
+                    host=connection_info["host"],
+                    port=connection_info["port"],
+                    database=connection_info["database"],
+                    username=connection_info["username"]
                 )
                 
                 existing_record = check_result.single()
@@ -1268,7 +1283,7 @@ def handle_data_source(data_source):
                 # 添加创建时间
                 data_source["createTime"] = get_formatted_time()
                 
-                create_result = session.run(create_cypher, properties=data_source)
+                create_result = session.run(create_cypher, properties=connection_info)
                 created_record = create_result.single()
                 
                 if not created_record:

+ 8 - 4
app/core/llm/ddl_parser.py

@@ -99,10 +99,12 @@ class DDLParser:
 规则说明:
 1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。
 2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。
+   - 中文表名中不要出现标点符号
 3. 字段中文名称(name)的确定规则:
    - 如有COMMENT注释,直接使用注释内容
    - 如无注释但字段名有明确含义,将英文名翻译为中文
    - 如字段名是无意义的拼音缩写,则name为空字符串
+   - 字段名中不要出现逗号,以及"主键"、"外键"、"索引"等字样
 4. 数据库连接串处理:
    - 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。
    - 根据连接串格式识别数据库类型,数据库类型请使用小写,参考例子,如 mysql/postgresql/sqlserver/oracle/db2/sybase
@@ -110,6 +112,7 @@ class DDLParser:
    - data_source.name留空.
    - 无法确定数据库类型时,type设为"unknown"
    - 如果从ddl中没有识别到数据库连接串,则json不返回"data_source"标签
+   - 除了database,password,username,en_name,host,port,type,name 之外,连接串的其它字段放在param属性中。
 5. 参考格式如下:
 {
     "users": { //表名
@@ -129,13 +132,14 @@ class DDLParser:
     },
     "data_source": [{
         "en_name": "mydatabase_10.52.31.104_5432_myuser", //{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}
-        "name": "", //这里留空
+        "name": "", //如果没有注释,这里留空
         "type": "postgresql",
         "host": "10.52.31.104",
         "port": 5432,
-        "dbname": "mydatabase",
-        "dbuser": "myuser",
-        "dbpassword": "mypassword"
+        "database": "mydatabase",
+        "username": "myuser",
+        "password": "mypassword",
+        "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
     }]
 }
 

+ 352 - 2
app/core/production_line/production_line.py

@@ -608,6 +608,63 @@ def execute_production_line(resource_id):
     """
     执行生产线数据加载
     
+    Args:
+        resource_id: 数据资源ID
+        
+    Returns:
+        dict: 执行结果
+    """
+    try:
+        # 首先获取资源信息,判断类型
+        resource_type = get_resource_type(resource_id)
+        
+        # 根据资源类型执行不同的加载逻辑
+        if resource_type == 'ddl':
+            # DDL类型资源,执行数据库抽取
+            return execute_ddl_extraction(resource_id)
+        else:
+            # 其他类型(structure等),执行Excel数据加载
+            return execute_excel_loading(resource_id)
+    except Exception as e:
+        logger.error(f"执行生产线失败: {str(e)}", exc_info=True)
+        return {
+            "status": "error",
+            "message": str(e)
+        }
+
+def get_resource_type(resource_id):
+    """
+    获取资源类型
+    
+    Args:
+        resource_id: 数据资源ID
+        
+    Returns:
+        str: 资源类型,如'ddl'或'structure'
+    """
+    try:
+        with neo4j_driver.get_session() as session:
+            # 查询资源类型
+            cypher = """
+            MATCH (n:data_resource)
+            WHERE id(n) = $resource_id
+            RETURN n.type as type
+            """
+            result = session.run(cypher, resource_id=int(resource_id))
+            record = result.single()
+            
+            if not record:
+                raise ValueError(f"找不到ID为{resource_id}的数据资源")
+            
+            return record["type"] or 'structure'  # 默认为structure类型
+    except Exception as e:
+        logger.error(f"获取资源类型失败: {str(e)}")
+        raise
+
+def execute_excel_loading(resource_id):
+    """
+    执行Excel文件数据加载(原有的加载逻辑)
+    
     Args:
         resource_id: 数据资源ID
         
@@ -708,7 +765,7 @@ def execute_production_line(resource_id):
         }
         
     except Exception as e:
-        logger.error(f"执行失败: {str(e)}", exc_info=True)
+        logger.error(f"执行Excel加载失败: {str(e)}", exc_info=True)
         return {
             "status": "error",
             "message": str(e)
@@ -775,4 +832,297 @@ def extract_metadata_from_excel(file_path, table_name):
         return metadata_list
     except Exception as e:
         logger.error(f"从Excel文件提取元数据失败: {str(e)}")
-        return [] 
+        return []
+
+def execute_ddl_extraction(resource_id):
+    """
+    执行DDL资源数据抽取
+    
+    Args:
+        resource_id: 数据资源ID
+        
+    Returns:
+        dict: 执行结果
+    """
+    try:
+        from sqlalchemy import create_engine, text
+        import pandas as pd
+        
+        logger.info(f"开始执行DDL资源数据抽取,ID: {resource_id}")
+        
+        # 1. 获取资源详情
+        resource_data = get_resource_details(resource_id)
+        if not resource_data:
+            return {"status": "error", "message": f"资源不存在,ID: {resource_id}"}
+            
+        # 2. 获取资源元数据
+        metadata_list = resource_data.get('meta_list', [])
+        if not metadata_list:
+            return {"status": "error", "message": "资源没有元数据信息,无法创建表"}
+            
+        # 3. 获取资源表名
+        target_table_name = resource_data.get('en_name')
+        if not target_table_name:
+            return {"status": "error", "message": "资源没有英文名称,无法确定目标表名"}
+            
+        # 4. 获取关联的数据源信息
+        data_source_info = get_resource_data_source(resource_id)
+        if not data_source_info:
+            return {"status": "error", "message": "无法获取关联的数据源信息"}
+            
+        # 5. 在PostgreSQL中创建目标表
+        create_result = create_target_table(target_table_name, metadata_list)
+        if not create_result["success"]:
+            return {"status": "error", "message": f"创建目标表失败: {create_result['message']}"}
+            
+        # 6. 执行数据抽取
+        extract_result = extract_data_to_postgres(data_source_info, target_table_name, metadata_list)
+        
+        return {
+            "status": "success",
+            "message": f"数据抽取成功,从{extract_result['source_table']}表抽取到{extract_result['target_table']}表,共处理 {extract_result['total_records']} 条记录",
+            "total_records": extract_result['total_records'],
+            "source_table": extract_result['source_table'],
+            "target_table": extract_result['target_table']
+        }
+        
+    except Exception as e:
+        logger.error(f"DDL数据抽取失败: {str(e)}", exc_info=True)
+        return {
+            "status": "error",
+            "message": str(e)
+        }
+
+def get_resource_details(resource_id):
+    """
+    获取资源详细信息
+    
+    Args:
+        resource_id: 数据资源ID
+        
+    Returns:
+        dict: 资源详情
+    """
+    from app.core.data_resource.resource import handle_id_resource
+    return handle_id_resource(resource_id)
+
+def get_resource_data_source(resource_id):
+    """
+    获取数据资源关联的数据源信息
+    
+    Args:
+        resource_id: 数据资源ID
+        
+    Returns:
+        dict: 数据源连接信息
+    """
+    try:
+        with neo4j_driver.get_session() as session:
+            # 查询数据资源关联的数据源节点
+            cypher = """
+            MATCH (n:data_resource)-[:isbelongto]->(ds:data_source)
+            WHERE id(n) = $resource_id
+            RETURN ds
+            """
+            result = session.run(cypher, resource_id=int(resource_id))
+            record = result.single()
+            
+            if not record:
+                logger.warning(f"资源 {resource_id} 没有关联的数据源节点")
+                return None
+                
+            data_source = dict(record["ds"])
+            
+            # 构建连接信息
+            connection_info = {
+                "type": data_source.get("type", "").lower(),
+                "host": data_source.get("host"),
+                "port": data_source.get("port"),
+                "database": data_source.get("database"),
+                "username": data_source.get("username"),
+                "password": data_source.get("password")
+            }
+            
+            # 验证必要字段
+            if not all([connection_info["type"], connection_info["host"], 
+                        connection_info["database"], connection_info["username"]]):
+                logger.error(f"数据源信息不完整: {connection_info}")
+                return None
+                
+            return connection_info
+    except Exception as e:
+        logger.error(f"获取数据源信息失败: {str(e)}")
+        return None
+
+def create_target_table(table_name, metadata_list):
+    """
+    在PostgreSQL中创建目标表
+    
+    Args:
+        table_name: 表名
+        metadata_list: 元数据列表
+        
+    Returns:
+        dict: {"success": bool, "message": str}
+    """
+    try:
+        import psycopg2
+        from flask import current_app
+        
+        # 获取PostgreSQL配置
+        pg_config = get_pg_config()
+        
+        conn = psycopg2.connect(**pg_config)
+        cur = conn.cursor()
+        
+        # 检查schema是否存在
+        cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
+        
+        # 检查表是否存在
+        cur.execute("""
+            SELECT EXISTS (
+                SELECT FROM information_schema.tables 
+                WHERE table_schema = 'ods' 
+                AND table_name = %s
+            );
+        """, (table_name,))
+        
+        table_exists = cur.fetchone()[0]
+        
+        if table_exists:
+            logger.info(f"表 ods.{table_name} 已存在,将跳过创建")
+            return {"success": True, "message": f"表 ods.{table_name} 已存在"}
+            
+        # 构建列定义
+        columns = []
+        for meta in metadata_list:
+            column_name = meta.get('en_name')
+            data_type = meta.get('data_type')
+            
+            if column_name and data_type:
+                columns.append(f"{column_name} {data_type}")
+        
+        if not columns:
+            return {"success": False, "message": "没有有效的列定义"}
+            
+        # 构建建表SQL
+        sql = f"""
+        CREATE TABLE ods.{table_name} (
+            id SERIAL PRIMARY KEY,
+            {", ".join(columns)},
+            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+        )
+        """
+        
+        logger.info(f"创建表SQL: {sql}")
+        cur.execute(sql)
+        conn.commit()
+        logger.info(f"表 ods.{table_name} 创建成功")
+        
+        return {"success": True, "message": f"表 ods.{table_name} 创建成功"}
+        
+    except Exception as e:
+        logger.error(f"创建目标表失败: {str(e)}")
+        if 'conn' in locals() and conn:
+            conn.rollback()
+        return {"success": False, "message": str(e)}
+    finally:
+        if 'cur' in locals() and cur:
+            cur.close()
+        if 'conn' in locals() and conn:
+            conn.close()
+
+def extract_data_to_postgres(source_conn_info, target_table, metadata_list):
+    """
+    从源数据库抽取数据到PostgreSQL
+    
+    Args:
+        source_conn_info: 源数据库连接信息
+        target_table: 目标表名
+        metadata_list: 元数据列表
+        
+    Returns:
+        dict: 抽取结果
+    """
+    try:
+        from sqlalchemy import create_engine, text
+        import pandas as pd
+        from flask import current_app
+        
+        # 源表名称与目标表相同
+        source_table = target_table
+        
+        # 批处理大小
+        batch_size = current_app.config.get('DATA_EXTRACT_BATCH_SIZE', 1000)
+        
+        # 源数据库连接字符串构建
+        db_type = source_conn_info["type"]
+        if db_type == "mysql":
+            connection_string = f"mysql+pymysql://{source_conn_info['username']}:{source_conn_info['password']}@{source_conn_info['host']}:{source_conn_info['port']}/{source_conn_info['database']}"
+        elif db_type == "postgresql":
+            connection_string = f"postgresql://{source_conn_info['username']}:{source_conn_info['password']}@{source_conn_info['host']}:{source_conn_info['port']}/{source_conn_info['database']}"
+        else:
+            raise ValueError(f"不支持的数据库类型: {db_type}")
+            
+        # 目标数据库连接参数
+        pg_config = get_pg_config()
+        target_connection_string = f"postgresql://{pg_config['user']}:{pg_config['password']}@{pg_config['host']}:{pg_config['port']}/{pg_config['dbname']}"
+        
+        # 连接源数据库
+        source_engine = create_engine(connection_string)
+        
+        # 连接目标数据库
+        target_engine = create_engine(target_connection_string)
+        
+        # 获取元数据列名,构建查询字段列表
+        column_names = [meta.get('en_name') for meta in metadata_list if meta.get('en_name')]
+        if not column_names:
+            raise ValueError("没有有效的列名")
+            
+        # 构建查询语句
+        select_columns = ", ".join(column_names)
+        query = f"SELECT {select_columns} FROM {source_table}"
+        
+        # 获取记录总数
+        with source_engine.connect() as conn:
+            count_result = conn.execute(text(f"SELECT COUNT(*) FROM {source_table}"))
+            total_count = count_result.scalar()
+            
+        # 分批抽取数据
+        total_records = 0
+        offset = 0
+        
+        while offset < total_count:
+            # 构建带有分页的查询
+            paginated_query = f"{query} LIMIT {batch_size} OFFSET {offset}"
+            
+            # 读取数据批次
+            df = pd.read_sql(paginated_query, source_engine)
+            batch_count = len(df)
+            
+            if batch_count == 0:
+                break
+                
+            # 写入目标数据库
+            df.to_sql(
+                target_table, 
+                target_engine, 
+                schema='ods', 
+                if_exists='append', 
+                index=False
+            )
+            
+            total_records += batch_count
+            offset += batch_size
+            
+            logger.info(f"已抽取 {total_records}/{total_count} 条记录")
+            
+        return {
+            "total_records": total_records,
+            "source_table": source_table,
+            "target_table": f"ods.{target_table}"
+        }
+        
+    except Exception as e:
+        logger.error(f"数据抽取失败: {str(e)}")
+        raise 

+ 2 - 1
requirements.txt

@@ -16,4 +16,5 @@ python-dateutil>=2.8.0
 psutil>=6.0.0
 flask_sqlalchemy>=3.1.1
 openpyxl>=3.1.5
-requests>=2.32.3
+requests>=2.32.3
+pymysql>=1.1.1