Ver código fonte

ddl创建data_resource,并从源到目标导入数据,初步打通。

wangxq 1 mês atrás
pai
commit
1c3046bbf5

+ 67 - 96
app/core/data_resource/resource.py

@@ -5,12 +5,12 @@ from py2neo import Relationship
 import pandas as pd
 from app.services.neo4j_driver import neo4j_driver
 from app.services.package_function import create_or_get_node, relationship_exists, get_node
+import time
 
 logger = logging.getLogger("app")
 
 def get_formatted_time():
     """获取格式化的当前时间"""
-    import time
     return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
 
 def get_node_by_id(label, id):
@@ -305,7 +305,7 @@ def handle_id_resource(resource_id):
             
             # 查询关联的标签
             tag_cypher = """
-            MATCH (n:data_resource)-[:label]->(t:data_label)
+            MATCH (n:data_resource)-[r:label]->(t:data_label)
             WHERE id(n) = $resource_id
             RETURN t
             """
@@ -436,7 +436,7 @@ def resource_list(page, page_size, en_name_filter=None, name_filter=None,
                 
                 # 查询关联的标签
                 tag_cypher = """
-                MATCH (n:data_resource)-[:label]->(t:data_label)
+                MATCH (n:data_resource)-[r:label]->(t:data_label)
                 WHERE id(n) = $resource_id
                 RETURN t
                 """
@@ -1194,47 +1194,25 @@ def data_resource_edit(data):
 
 def handle_data_source(data_source):
     """处理数据源的检查和创建
-    
-    参数:
-        data_source: 包含数据源信息的字典,支持两种情况:
-            1. 简单情况(只需查询已有数据源),只要包含en_name即可:
-               {
-                   "en_name": "10-52-31-104_5432_inventory"
-               }
-               或
-               {
-                   "en_name": "10-52-31-104_5432_inventory",
-                   "name": "教师数据库"
-               }
-            2. 完整的数据源信息(用于创建新数据源):
-               {
-                   "en_name": "10-52-31-104_5432_inventory",
-                   "name": "教师数据库",
-                   "type": "postgresql",
-                   "host": "10.52.31.104",
-                   "port": 5432,
-                   "database": "inventory_management",
-                   "username": "app_user",
-                   "password": "pG$ecur3P@ss789"
-               }
-            
-    返回:
-        成功时返回数据源的名称,失败时抛出异常
     """
     try:
-        # 获取数据源名称
+        # 1. 检查en_name是否存在
         ds_en_name = data_source.get("en_name")
         if not ds_en_name:
-            raise ValueError("数据源信息不完整,缺少名称")
+            raise ValueError("数据源信息不完整,缺少名称(en_name)")
+        
+        # 2. 处理name字段
+        if "name" not in data_source or not data_source["name"]:
+            data_source["name"] = ds_en_name
+            logger.debug(f"数据源name为空,使用en_name作为替代: {ds_en_name}")
         
-        # 检查是否为简单查询模式
-        # 判断数据源是否包含创建新数据源所需的必要字段
+        # 3. 检查是否为简单查询模式
         required_fields = ["type", "host", "port", "database", "username"]
         has_required_fields = all(data_source.get(field) for field in required_fields)
         
         with neo4j_driver.get_session() as session:
+            # 简单查询模式:只通过en_name查找已有数据源
             if not has_required_fields:
-                # 简单查询模式:只通过en_name查找已有数据源
                 logger.info(f"简单数据源查询模式,查找en_name为: {ds_en_name}")
                 check_name_cypher = """
                 MATCH (ds:data_source {en_name: $en_name})
@@ -1251,69 +1229,62 @@ def handle_data_source(data_source):
                 else:
                     # 数据源不存在,抛出异常
                     raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
-            else:
-                # 完整的数据源信息模式:创建或获取数据源
-                
-                # 确保name不为空,如果为空则使用en_name
-                if "name" not in data_source or not data_source["name"]:
-                    data_source["name"] = ds_en_name
-                    logger.info(f"数据源name为空,使用en_name作为替代: {ds_en_name}")
-                
-                # 检查是否已存在相同数据源(除名称和密码外属性相同)
-                check_cypher = """
-                MATCH (ds:data_source)
-                WHERE ds.type = $type AND 
-                      ds.host = $host AND 
-                      ds.port = $port AND 
-                      ds.database = $database AND 
-                      ds.username = $username
-                RETURN ds
-                """
-                # 这里列出的字段将会作为数据源的属性
-                connection_info = {
-                    "type": data_source.get("type", "").lower(),
-                    "host": data_source.get("host"),
-                    "port": data_source.get("port"),
-                    "database": data_source.get("database"),   # 支持新旧字段名
-                    "username": data_source.get("username"),  # 支持新旧字段名
-                    "password": data_source.get("password"),  # 支持新旧字段名
-                    "en_name": ds_en_name,  # 确保包含en_name属性
-                    "name": data_source["name"],  # 确保包含name属性
-                    "createTime": get_formatted_time()  # 添加创建时间
-                }
-                
-                check_result = session.run(
-                    check_cypher,
-                    type=connection_info["type"],
-                    host=connection_info["host"],
-                    port=connection_info["port"],
-                    database=connection_info["database"],
-                    username=connection_info["username"]
-                )
-                
-                existing_record = check_result.single()
-                
-                if existing_record:
-                    # 数据源已存在,返回其名称
-                    existing_data_source = dict(existing_record["ds"])
-                    logger.info(f"找到现有数据源: {existing_data_source.get('en_name')}")
-                    return existing_data_source.get("en_name")
-                
-                # 数据源不存在,创建新节点
-                create_cypher = """
-                CREATE (ds:data_source $properties)
-                RETURN ds
-                """
-                
-                create_result = session.run(create_cypher, properties=connection_info)
-                created_record = create_result.single()
-                
-                if not created_record:
-                    raise RuntimeError("创建数据源节点失败")
+            
+            # 完整的数据源信息模式:创建或获取数据源
+            # 检查是否已存在相同数据源,只使用type/host/port/database/username字段
+            check_cypher = """
+            MATCH (ds:data_source)
+            WHERE ds.type = $type AND 
+                  ds.host = $host AND 
+                  ds.port = $port AND 
+                  ds.database = $database AND 
+                  ds.username = $username
+            RETURN ds
+            """
+            
+            # 准备查询参数
+            check_params = {
+                "type": data_source.get("type", "").lower(),
+                "host": data_source.get("host"),
+                "port": data_source.get("port"),
+                "database": data_source.get("database"),
+                "username": data_source.get("username")
+            }
+            
+            check_result = session.run(check_cypher, **check_params)
+            existing_record = check_result.single()
+            
+            # 数据源已存在
+            if existing_record:
+                existing_data_source = dict(existing_record["ds"])
+                logger.info(f"找到现有数据源: {existing_data_source.get('en_name')}")
+                return existing_data_source.get("en_name")
+            
+            # 数据源不存在,创建新节点
+            # 创建数据源属性对象,包含data_source中的所有字段
+            connection_info = dict(data_source)  # 复制所有字段
+            
+            # 确保type字段为小写
+            if "type" in connection_info:
+                connection_info["type"] = connection_info["type"].lower()
                 
-                new_data_source = dict(created_record["ds"])
-                logger.info(f"创建新数据源: {new_data_source.get('en_name')}")
-                return new_data_source.get("en_name")
+            # 添加创建时间
+            connection_info["createTime"] = get_formatted_time()
+            
+            create_cypher = """
+            CREATE (ds:data_source $properties)
+            RETURN ds
+            """
+            
+            create_result = session.run(create_cypher, properties=connection_info)
+            created_record = create_result.single()
+            
+            if not created_record:
+                raise RuntimeError("创建数据源节点失败")
+            
+            new_data_source = dict(created_record["ds"])
+            logger.info(f"创建新数据源: {new_data_source.get('en_name')}")
+            return new_data_source.get("en_name")
             
     except Exception as e:
         logger.error(f"处理数据源失败: {str(e)}")

+ 27 - 3
app/core/production_line/production_line.py

@@ -10,6 +10,7 @@ from app.services.neo4j_driver import neo4j_driver
 import shutil
 import re
 from psycopg2.extras import execute_values
+import time
 
 def production_draw_graph(id, type):
     """
@@ -880,10 +881,11 @@ def execute_ddl_extraction(resource_id):
         
         return {
             "status": "success",
-            "message": f"数据抽取成功,从{extract_result['source_table']}表抽取到{extract_result['target_table']}表,共处理 {extract_result['total_records']} 条记录",
+            "message": f"数据抽取成功,从{extract_result['source_table']}表抽取到{extract_result['target_table']}表,共处理 {extract_result['total_records']} 条记录,执行了 {extract_result['execution_time']:.2f} 秒",
             "total_records": extract_result['total_records'],
             "source_table": extract_result['source_table'],
-            "target_table": extract_result['target_table']
+            "target_table": extract_result['target_table'],
+            "execution_time": extract_result['execution_time']
         }
         
     except Exception as e:
@@ -1059,6 +1061,16 @@ def extract_data_to_postgres(source_conn_info, target_table, metadata_list):
         db_type = source_conn_info["type"]
         if db_type == "mysql":
             connection_string = f"mysql+pymysql://{source_conn_info['username']}:{source_conn_info['password']}@{source_conn_info['host']}:{source_conn_info['port']}/{source_conn_info['database']}"
+            
+            # 检查是否存在param参数,如存在则添加到连接字符串中
+            if 'param' in source_conn_info and source_conn_info['param']:
+                # 确保param参数以&开头
+                param = source_conn_info['param']
+                if not param.startswith('&'):
+                    param = '&' + param
+                connection_string = f"{connection_string}?{param[1:]}"
+                logger.debug(f"添加了数据源的param参数: {param}")
+                
         elif db_type == "postgresql":
             connection_string = f"postgresql://{source_conn_info['username']}:{source_conn_info['password']}@{source_conn_info['host']}:{source_conn_info['port']}/{source_conn_info['database']}"
         else:
@@ -1068,6 +1080,9 @@ def extract_data_to_postgres(source_conn_info, target_table, metadata_list):
         pg_config = get_pg_config()
         target_connection_string = f"postgresql://{pg_config['user']}:{pg_config['password']}@{pg_config['host']}:{pg_config['port']}/{pg_config['dbname']}"
         
+        # 记录最终连接字符串
+        logger.debug(f"python连接源表的最终连接字符串: {connection_string}")
+        
         # 连接源数据库
         source_engine = create_engine(connection_string)
         
@@ -1092,6 +1107,9 @@ def extract_data_to_postgres(source_conn_info, target_table, metadata_list):
         total_records = 0
         offset = 0
         
+        # 计算开始时间
+        start_time = time.time()
+        
         while offset < total_count:
             # 构建带有分页的查询
             paginated_query = f"{query} LIMIT {batch_size} OFFSET {offset}"
@@ -1117,10 +1135,16 @@ def extract_data_to_postgres(source_conn_info, target_table, metadata_list):
             
             logger.info(f"已抽取 {total_records}/{total_count} 条记录")
             
+        # 计算执行时间
+        end_time = time.time()
+        execution_time = end_time - start_time
+        logger.info(f"作业抽取了 {total_records} 条记录,执行了 {execution_time:.2f} 秒")
+            
         return {
             "total_records": total_records,
             "source_table": source_table,
-            "target_table": f"ods.{target_table}"
+            "target_table": f"ods.{target_table}",
+            "execution_time": execution_time
         }
         
     except Exception as e: