Jelajahi Sumber

已经完成了对手工执行作业的修改,准备修改config,自适应生成和开发环境

wangxq 3 bulan lalu
induk
melakukan
35641fdbad

+ 12 - 4
app/core/data_resource/resource.py

@@ -135,7 +135,10 @@ def handle_node(receiver, head_data, data_resource):
                     # 创建元数据节点
                     meta_cypher = """
                     MERGE (m:Metadata {name: $name})
-                    ON CREATE SET m.en_name = $en_name, m.createTime = $create_time
+                    ON CREATE SET m.en_name = $en_name, 
+                                m.createTime = $create_time,
+                                m.type = $type
+                    ON MATCH SET m.type = $type
                     RETURN m
                     """
                     
@@ -144,7 +147,8 @@ def handle_node(receiver, head_data, data_resource):
                         meta_cypher,
                         name=item['name'],
                         en_name=item['en_name'],
-                        create_time=create_time
+                        create_time=create_time,
+                        type=item['data_type']  # 使用data_type作为type属性
                     )
                     meta_node = meta_result.single()["m"]
                     
@@ -650,9 +654,13 @@ def select_create_ddl(sql_content):
                                 current_table = None
                     # 列注释处理
                     elif re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE):
-                        column_comment_match = re.search(r'COMMENT\s+ON\s+COLUMN\s+[\'"]?(\w+)[\'"]?\.', stmt, re.IGNORECASE)
+                        column_comment_match = re.search(
+                            r'COMMENT\s+ON\s+COLUMN\s+[\'"]?(\w+)[\'"]?\.[\'"]?(\w+)[\'"]?\s+IS\s+\'([^\']+)\'',
+                            stmt,
+                            re.IGNORECASE
+                        )
                         if column_comment_match:
-                            comment_table = column_comment_match.group(1).strip('"\'')
+                            comment_table = column_comment_match.group(1)
                             if comment_table == current_table:
                                 current_block += " " + stmt
                             else:

+ 50 - 24
app/core/production_line/production_line.py

@@ -189,20 +189,20 @@ def get_resource_storage_info(resource_id):
     获取数据资源的存储位置和元数据信息
     
     Returns:
-        tuple: (storage_location, table_name, db_table_name, metadata_list)
+        tuple: (storage_location, cn_name, en_name, metadata_list)
         - storage_location: 存储位置
-        - table_name: 用于查找Excel文件的名称(优先使用中文名
-        - db_table_name: 数据库表名(使用英文名)
+        - cn_name: 资源中文名(用于查找Excel文件
+        - en_name: 资源英文名(用于数据库表名)
         - metadata_list: 元数据列表
     """
     try:
         with neo4j_driver.get_session() as session:
-            # 修改查询,增加name(中文名)的获取
+            # 获取资源基本信息
             resource_query = """
             MATCH (n:data_resource)
             WHERE id(n) = $resource_id
-            RETURN n.storage_location as storage_location, 
-                   n.name as name,
+            RETURN n.location as storage_location, 
+                   n.name as cn_name,
                    n.en_name as en_name
             """
             result = session.run(resource_query, resource_id=int(resource_id))
@@ -211,7 +211,7 @@ def get_resource_storage_info(resource_id):
             if not resource_data or not resource_data['storage_location']:
                 raise ValueError("存储位置未配置")
                 
-            # 查询元数据节点,同时获取中文名和英文名
+            # 查询元数据节点
             metadata_query = """
             MATCH (n:data_resource)-[:contain]->(m:Metadata)
             WHERE id(n) = $resource_id
@@ -220,15 +220,16 @@ def get_resource_storage_info(resource_id):
             result = session.run(metadata_query, resource_id=int(resource_id))
             metadata_list = [dict(record) for record in result]
             
-            # 用于查找Excel文件的名称(优先使用中文名)
-            table_name = resource_data['name'] if resource_data['name'] else resource_data['en_name']
-            # 数据库表名(使用英文名)
-            db_table_name = resource_data['en_name']
-            
-            if not db_table_name:
+            # 检查英文名是否存在
+            if not resource_data['en_name']:
                 raise ValueError("数据资源的英文名不能为空")
             
-            return resource_data['storage_location'], table_name, db_table_name, metadata_list
+            return (
+                resource_data['storage_location'],
+                resource_data['cn_name'],
+                resource_data['en_name'],
+                metadata_list
+            )
     except Exception as e:
         logger.error(f"获取资源存储信息失败: {str(e)}")
         raise
@@ -260,6 +261,9 @@ def check_and_create_table(table_name, metadata_list):
         table_exists = cur.fetchone()[0]
         
         if not table_exists:
+            # 打印元数据列表用于调试
+            logger.info(f"元数据列表: {metadata_list}")
+            
             # 构建建表SQL
             columns = [
                 f"{meta['en_name']} {meta['type']}"
@@ -276,6 +280,10 @@ def check_and_create_table(table_name, metadata_list):
                 sql.SQL(', ').join(map(sql.SQL, columns))
             )
             
+            # 打印完整的建表SQL
+            formatted_sql = create_table_sql.as_string(conn)
+            logger.info(f"建表SQL: {formatted_sql}")
+            
             cur.execute(create_table_sql)
             logger.info(f"表 ods.{table_name} 创建成功")
             
@@ -424,10 +432,10 @@ def execute_production_line(resource_id):
         pg_config = get_pg_config()
         
         # 1. 获取存储信息
-        storage_location, table_name, db_table_name, metadata_list = get_resource_storage_info(resource_id)
+        storage_location, cn_name, en_name, metadata_list = get_resource_storage_info(resource_id)
         
         # 2. 检查并创建表
-        check_and_create_table(db_table_name, metadata_list)
+        check_and_create_table(en_name, metadata_list)
         
         # 3. 获取完整的存储路径并扫描Excel文件
         full_storage_path = get_full_storage_path(storage_location)
@@ -439,17 +447,35 @@ def execute_production_line(resource_id):
                 logger.info(f"创建目录: {full_storage_path}")
             except Exception as e:
                 raise ValueError(f"无法创建存储路径: {full_storage_path}, 错误: {str(e)}")
-            
-        excel_files = [
-            f for f in os.listdir(full_storage_path)
-            if f.startswith(table_name) and f.endswith(('.xlsx', '.xls'))
-        ]
         
+        # 首先使用中文名查找文件
+        excel_files = []
+        if cn_name:
+            excel_files = [
+                f for f in os.listdir(full_storage_path)
+                if f.startswith(cn_name) and f.endswith(('.xlsx', '.xls'))
+            ]
+            if excel_files:
+                logger.info(f"使用中文名'{cn_name}'找到Excel文件: {excel_files}")
+        
+        # 如果使用中文名没找到文件,尝试使用英文名
+        if not excel_files and en_name:
+            excel_files = [
+                f for f in os.listdir(full_storage_path)
+                if f.startswith(en_name) and f.endswith(('.xlsx', '.xls'))
+            ]
+            if excel_files:
+                logger.info(f"使用英文名'{en_name}'找到Excel文件: {excel_files}")
+        
+        # 如果两种方式都没找到文件,报错
         if not excel_files:
             error_msg = (
-                f"未找到匹配的Excel文件: {table_name}*.xlsx/xls\n"
+                f"未找到匹配的Excel文件\n"
                 f"搜索路径: {full_storage_path}\n"
-                f"请确认文件已上传到正确位置,且文件名以'{table_name}'开头"
+                f"尝试查找的文件名模式:\n"
+                f"1. {cn_name}*.xlsx/xls (中文名)\n"
+                f"2. {en_name}*.xlsx/xls (英文名)\n"
+                f"请确认文件已上传到正确位置,且文件名以资源的中文名或英文名开头"
             )
             logger.error(error_msg)
             raise ValueError(error_msg)
@@ -463,7 +489,7 @@ def execute_production_line(resource_id):
             file_path = os.path.join(full_storage_path, excel_file)
             try:
                 # 加载数据到PostgreSQL
-                records = load_excel_to_postgresql(file_path, db_table_name, metadata_list)
+                records = load_excel_to_postgresql(file_path, en_name, metadata_list)
                 total_records += records
                 processed_files.append(excel_file)