5 Commits e22e6dc35f ... 08feb8b784

Autore SHA1 Messaggio Data
  wangxq 08feb8b784 统一使用create_time,修改原来使用insert_time的字段。 3 settimane fa
  wangxq 88e8fab112 合并exec python/sql中的目标表检查和建表语句。 4 settimane fa
  wangxq 7d361fb7c8 在加载数据之前建表,根据neo4j生成建表语句和字段注释语句。 1 mese fa
  wangxq 8029197e8c 修改了load_data和load_file时的schema问题。 1 mese fa
  wangxq fbb933c6e1 Add comments for field comment matching 1 mese fa

+ 3 - 1
.gitignore

@@ -25,4 +25,6 @@ Thumbs.db
 node_modules/
 
 # 忽略 JetBrains IDE 配置
-.idea/
+.idea/
+
+/test

+ 4 - 0
dags/.airflowignore

@@ -0,0 +1,4 @@
+__pycache__/
+*.pyc
+config.py
+utils.py

+ 1 - 1
dags/config.py

@@ -38,7 +38,7 @@ DATAOPS_DAGS_PATH = os.path.join(AIRFLOW_BASE_PATH, 'dags')
 SCRIPTS_BASE_PATH = os.path.join(AIRFLOW_BASE_PATH, 'dataops_scripts')
 
 # 上传的CSV/EXCEL文件的基准上传路径
-STRUCTURE_UPLOAD_BASE_PATH ="/data/csv"
+STRUCTURE_UPLOAD_BASE_PATH ="/data/upload"
 STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH ="/data/archive"
 
 # 本地开发环境脚本路径(如果需要区分环境)

+ 4 - 4
dags/dataops_productline_prepare_dag.py

@@ -775,16 +775,16 @@ def check_execution_plan_in_db(**kwargs):
                 if EXECUTION_PLAN_KEEP_COUNT > 0:
                     cursor.execute(f"""
                         WITH to_keep AS (
-                            SELECT dag_id, run_id, exec_date, insert_time
+                            SELECT dag_id, run_id, exec_date, create_time
                             FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
                             WHERE dag_id = %s AND exec_date = %s
-                            ORDER BY insert_time DESC
+                            ORDER BY create_time DESC
                             LIMIT %s
                         )
                         DELETE FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
                         WHERE dag_id = %s AND exec_date = %s
-                        AND (dag_id, run_id, exec_date, insert_time) NOT IN (
-                            SELECT dag_id, run_id, exec_date, insert_time FROM to_keep
+                        AND (dag_id, run_id, exec_date, create_time) NOT IN (
+                            SELECT dag_id, run_id, exec_date, create_time FROM to_keep
                         )
                     """, (dag_id, ds, EXECUTION_PLAN_KEEP_COUNT, dag_id, ds))
                     

+ 41 - 19
dataops_scripts/execution_python.py

@@ -236,27 +236,30 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None, s
         
         logger.info(f"成功获取脚本内容,长度: {len(script_code)} 字符")
         
-        # 日期计算
-        # try:
-        #     # 直接使用script_utils.get_date_range计算日期范围
-        #     logger.info(f"使用script_utils.get_date_range计算日期范围,参数: exec_date={exec_date}, frequency={schedule_frequency}")
-        #     start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
-        #     logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
-        # except Exception as date_err:
-        #     logger.error(f"日期处理失败: {str(date_err)}", exc_info=True)
-        #     # 使用简单的默认日期范围计算
-        #     date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
-        #     if schedule_frequency.lower() == 'daily':
-        #         start_date = date_obj.strftime('%Y-%m-%d')
-        #         end_date = (date_obj + timedelta(days=1)).strftime('%Y-%m-%d')
-        #     else:
-        #         # 对其他频率使用默认范围
-        #         start_date = exec_date
-        #         end_date = (date_obj + timedelta(days=30)).strftime('%Y-%m-%d')
-        #     logger.warning(f"使用默认日期范围计算: start_date={start_date}, end_date={end_date}")
+        # 获取目标表标签,用于决定schema
+        target_table_label = kwargs.get('target_table_label', '')
+        
+        # 根据 target_table_label 决定 schema
+        if target_table_label and 'DataResource' in target_table_label:
+            target_schema = 'ods'
+        else:
+            target_schema = 'ads'
+        
+        logger.info(f"目标表标签: {target_table_label}, 决定使用schema: {target_schema}")
+        
+        # 检查目标表是否存在,如果不存在则尝试创建
+        logger.info(f"检查目标表 '{target_table}' 是否存在,如果不存在则尝试从Neo4j创建")
+        if not script_utils.check_and_create_table(target_table, default_schema=target_schema):
+            logger.error(f"目标表 '{target_table}' 不存在且无法创建,无法继续执行Python脚本")
+            return False
+        
+        logger.info(f"目标表 '{target_table}' 检查完成,可以继续执行")
+        
+        # 初始化日期变量,确保在所有情况下都有值
+        start_date = None
+        end_date = None
         
         # 检查是否开启ETL幂等性
-        target_table_label = kwargs.get('target_table_label', '')
         script_exec_mode = kwargs.get('update_mode', 'append')  # 只使用update_mode
         is_manual_dag_trigger = kwargs.get('is_manual_dag_trigger', False) 
         
@@ -361,6 +364,25 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None, s
                 logger.info(f"当前更新模式 {script_exec_mode} 不是append或full_refresh,不执行幂等性处理")
         else:
             logger.info("未开启ETL幂等性,直接执行Python脚本")
+        
+        # 如果日期变量还没有被设置,则计算默认的日期范围
+        if start_date is None or end_date is None:
+            logger.info(f"计算默认日期范围,参数: exec_date={exec_date}, frequency={schedule_frequency}")
+            try:
+                start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
+                logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
+            except Exception as date_err:
+                logger.error(f"日期处理失败: {str(date_err)}", exc_info=True)
+                # 使用简单的默认日期范围计算
+                date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
+                if schedule_frequency.lower() == 'daily':
+                    start_date = date_obj.strftime('%Y-%m-%d')
+                    end_date = (date_obj + timedelta(days=1)).strftime('%Y-%m-%d')
+                else:
+                    # 对其他频率使用默认范围
+                    start_date = exec_date
+                    end_date = (date_obj + timedelta(days=30)).strftime('%Y-%m-%d')
+                logger.warning(f"使用默认日期范围计算: start_date={start_date}, end_date={end_date}")
 
         # 准备执行上下文
         conn = get_pg_conn()  # 在外层先拿到连接

+ 19 - 1
dataops_scripts/execution_sql.py

@@ -301,8 +301,26 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None,
         
         logger.info(f"成功获取脚本内容,长度: {len(sql_content)} 字符")
         
-        # 检查是否开启ETL幂等性
+        # 获取目标表标签,用于决定schema
         target_table_label = kwargs.get('target_table_label', '')
+        
+        # 根据 target_table_label 决定 schema
+        if target_table_label and 'DataResource' in target_table_label:
+            target_schema = 'ods'
+        else:
+            target_schema = 'ads'
+        
+        logger.info(f"目标表标签: {target_table_label}, 决定使用schema: {target_schema}")
+        
+        # 检查目标表是否存在,如果不存在则尝试创建
+        logger.info(f"检查目标表 '{target_table}' 是否存在,如果不存在则尝试从Neo4j创建")
+        if not script_utils.check_and_create_table(target_table, default_schema=target_schema):
+            logger.error(f"目标表 '{target_table}' 不存在且无法创建,无法继续执行SQL脚本")
+            return False
+        
+        logger.info(f"目标表 '{target_table}' 检查完成,可以继续执行")
+        
+        # 检查是否开启ETL幂等性
         script_exec_mode = kwargs.get('update_mode', 'append')  # 只使用update_mode
         is_manual_dag_trigger = kwargs.get('is_manual_dag_trigger', False)  # 新增:获取是否为手动触发的DAG
         

+ 87 - 36
dataops_scripts/load_data.py

@@ -86,7 +86,7 @@ def get_source_database_info(table_name, script_name=None):
                 "username": record.get("username"),
                 "password": record.get("password"),
                 "db_type": record.get("db_type"),
-                "schema": record.get("schema", "public"),
+                "schema": record.get("schema"),
                 "source_table": record.get("source_table"),
                 "labels": record.get("labels", [])
             }
@@ -139,10 +139,13 @@ def get_target_database_info():
             "username": pg_config.get("user"),
             "password": pg_config.get("password"),
             "database": pg_config.get("database"),
-            "db_type": "postgresql",
-            "schema": "public"
+            "db_type": "postgresql"
         }
         
+        # 如果配置中有schema,则添加到连接信息中
+        if "schema" in pg_config and pg_config["schema"]:
+            database_info["schema"] = pg_config["schema"]
+        
         logger.info(f"成功获取目标数据库连接信息: {database_info['host']}:{database_info['port']}/{database_info['database']}")
         return database_info
     except Exception as e:
@@ -187,7 +190,7 @@ def get_sqlalchemy_engine(db_info):
         logger.error(traceback.format_exc())
         return None
 
-def create_table_if_not_exists(source_engine, target_engine, source_table, target_table, schema="public"):
+def create_table_if_not_exists(source_engine, target_engine, source_table, target_table, schema=None):
     """
     如果目标表不存在,则从源表复制表结构创建目标表
     
@@ -196,7 +199,7 @@ def create_table_if_not_exists(source_engine, target_engine, source_table, targe
         target_engine: 目标数据库引擎
         source_table: 源表名
         target_table: 目标表名
-        schema: 模式名称
+        schema: 模式名称,如果为None或空字符串则使用"ods"
         
     返回:
         bool: 操作是否成功
@@ -207,12 +210,26 @@ def create_table_if_not_exists(source_engine, target_engine, source_table, targe
     logger.info(f"检查目标表 {target_table} 是否存在,不存在则创建")
     
     try:
+        # 处理schema参数
+        if schema == "" or schema is None:
+            # 如果传递的schema为空,使用"ods"
+            schema = "ods"
+            logger.info(f"schema参数为空,使用默认schema: {schema}")
+        else:
+            # 如果传递的schema不为空,使用传递的schema
+            if schema != "ods":
+                logger.warning(f"使用非标准schema: {schema},建议使用'ods'作为目标schema")
+            logger.info(f"使用传递的schema: {schema}")
+        
+        table_display_name = f"{schema}.{target_table}"
+        logger.info(f"目标表完整名称: {table_display_name}")
+        
         # 检查目标表是否存在
         target_inspector = inspect(target_engine)
         target_exists = target_inspector.has_table(target_table, schema=schema)
         
         if target_exists:
-            logger.info(f"目标表 {target_table} 已存在,无需创建")
+            logger.info(f"目标表 {table_display_name} 已存在,无需创建")
             return True
         
         # 目标表不存在,从源表获取表结构
@@ -253,15 +270,16 @@ def create_table_if_not_exists(source_engine, target_engine, source_table, targe
         if not has_create_time:
             from sqlalchemy import TIMESTAMP
             columns.append(Column('create_time', TIMESTAMP, nullable=True))
-            logger.info(f"为表 {target_table} 添加 create_time 字段")
+            logger.info(f"为表 {table_display_name} 添加 create_time 字段")
         
         # 如果不存在update_time字段,则添加
         if not has_update_time:
             from sqlalchemy import TIMESTAMP
             columns.append(Column('update_time', TIMESTAMP, nullable=True))
-            logger.info(f"为表 {target_table} 添加 update_time 字段")
+            logger.info(f"为表 {table_display_name} 添加 update_time 字段")
         
         # 定义目标表结构,让SQLAlchemy处理数据类型映射
+        # 现在schema总是有值(至少是"ods")
         table_def = Table(
             target_table,
             metadata,
@@ -271,7 +289,7 @@ def create_table_if_not_exists(source_engine, target_engine, source_table, targe
         
         # 在目标数据库中创建表
         metadata.create_all(target_engine)
-        logger.info(f"成功在目标数据库中创建表 {schema}.{target_table}")
+        logger.info(f"成功在目标数据库中创建表 {table_display_name}")
         
         return True
     except Exception as e:
@@ -315,23 +333,43 @@ def load_data_from_source(table_name, exec_date=None, update_mode=None, script_n
         if not source_engine or not target_engine:
             raise Exception("无法创建数据库引擎,无法加载数据")
 
-        # 获取源表名
+        # 获取源表名和源schema
         source_table = source_db_info.get("source_table", table_name) or table_name
+        source_schema = source_db_info.get("schema")
+        
+        # 构建完整的源表名
+        if source_schema:
+            full_source_table_name = f"{source_schema}.{source_table}"
+        else:
+            full_source_table_name = source_table
+        
+        logger.info(f"源表完整名称: {full_source_table_name}")
 
-        # 确保目标表存在
-        if not create_table_if_not_exists(source_engine, target_engine, source_table, table_name):
-            raise Exception(f"无法创建目标表 {table_name},无法加载数据")
+        # 获取目标schema
+        target_schema = target_db_info.get("schema")
+        
+        # 构建完整的目标表名
+        if target_schema:
+            full_table_name = f"{target_schema}.{table_name}"
+        else:
+            full_table_name = table_name
+        
+        logger.info(f"目标表完整名称: {full_table_name}")
+        
+        # 确保目标表存在 - create_table_if_not_exists必须使用"ods"作为schema
+        if not create_table_if_not_exists(source_engine, target_engine, full_source_table_name, table_name, "ods"):
+            raise Exception(f"无法创建目标表 {full_table_name},无法加载数据")
 
         # 根据更新模式处理数据
         if update_mode == "full_refresh":
             # 执行全量刷新,清空表
-            logger.info(f"执行全量刷新,清空表 {table_name}")
+            logger.info(f"执行全量刷新,清空表 {full_table_name}")
             with target_engine.begin() as conn:  # 使用begin()自动管理事务
-                conn.execute(f"TRUNCATE TABLE {table_name}")
-            logger.info(f"成功清空表 {table_name}")
+                conn.execute(text(f"TRUNCATE TABLE {full_table_name}"))
+            logger.info(f"成功清空表 {full_table_name}")
 
             # 构建全量查询
-            query = f"SELECT * FROM {source_table}"
+            query = f"SELECT * FROM {full_source_table_name}"
         else:
             # 增量更新,需要获取目标日期列和日期范围
             target_dt_column = get_target_dt_column(table_name, script_name)
@@ -348,13 +386,13 @@ def load_data_from_source(table_name, exec_date=None, update_mode=None, script_n
                     
                     # 执行删除操作
                     delete_sql = f"""
-                        DELETE FROM {table_name}
+                        DELETE FROM {full_table_name}
                         WHERE {target_dt_column} >= '{start_date}'
                         AND {target_dt_column} < '{end_date}'
                     """
                     with target_engine.begin() as conn:  # 使用begin()自动管理事务
-                        conn.execute(delete_sql)
-                    logger.info(f"成功删除表 {table_name} 中 {target_dt_column} 从 {start_date} 到 {end_date} 的数据")
+                        conn.execute(text(delete_sql))
+                    logger.info(f"成功删除表 {full_table_name} 中 {target_dt_column} 从 {start_date} 到 {end_date} 的数据")
                 else:
                     # 自动调度
                     start_datetime, end_datetime = get_one_day_range(exec_date)
@@ -364,14 +402,14 @@ def load_data_from_source(table_name, exec_date=None, update_mode=None, script_n
                     
                     # 执行删除操作
                     delete_sql = f"""
-                        DELETE FROM {table_name}
+                        DELETE FROM {full_table_name}
                         WHERE create_time >= '{start_date}'
                         AND create_time < '{end_date}'
                     """
                     try:
                         with target_engine.begin() as conn:  # 使用begin()自动管理事务
-                            conn.execute(delete_sql)
-                        logger.info(f"成功删除表 {table_name} 中 create_time 从 {start_date} 到 {end_date} 的数据")
+                            conn.execute(text(delete_sql))
+                        logger.info(f"成功删除表 {full_table_name} 中 create_time 从 {start_date} 到 {end_date} 的数据")
                     except Exception as del_err:
                         logger.error(f"删除数据时出错: {str(del_err)}")
                         logger.warning("继续执行数据加载")
@@ -383,19 +421,23 @@ def load_data_from_source(table_name, exec_date=None, update_mode=None, script_n
                 
                 # 检查源表是否含有目标日期列
                 source_inspector = inspect(source_engine)
-                source_columns = [col['name'].lower() for col in source_inspector.get_columns(source_table)]
+                # 处理源表的schema信息用于检查列
+                if source_schema:
+                    source_columns = [col['name'].lower() for col in source_inspector.get_columns(source_table, schema=source_schema)]
+                else:
+                    source_columns = [col['name'].lower() for col in source_inspector.get_columns(source_table)]
                 
                 if target_dt_column.lower() in source_columns:
                     # 源表含有目标日期列,构建包含日期条件的查询
                     query = f"""
-                        SELECT * FROM {source_table}
+                        SELECT * FROM {full_source_table_name}
                         WHERE {target_dt_column} >= '{start_date}'
                         AND {target_dt_column} < '{end_date}'
                     """
                 else:
                     # 源表不含目标日期列,构建全量查询
-                    logger.warning(f"源表 {source_table} 没有目标日期列 {target_dt_column},将加载全部数据")
-                    query = f"SELECT * FROM {source_table}"
+                    logger.warning(f"源表 {full_source_table_name} 没有目标日期列 {target_dt_column},将加载全部数据")
+                    query = f"SELECT * FROM {full_source_table_name}"
                 
             except Exception as date_err:
                 logger.error(f"计算日期范围时出错: {str(date_err)}")
@@ -435,17 +477,26 @@ def load_data_from_source(table_name, exec_date=None, update_mode=None, script_n
                 logger.info(f"为数据添加 update_time 字段,初始值为: NULL (数据加载时不设置更新时间)")
             
             # 写入数据到目标表
-            logger.info(f"开始写入数据到目标表 {table_name},共 {len(df)} 行")
+            logger.info(f"开始写入数据到目标表 {full_table_name},共 {len(df)} 行")
             with target_engine.connect() as connection:
-                df.to_sql(
-                    name=table_name,
-                    con=connection,
-                    if_exists='append',
-                    index=False,
-                    schema=target_db_info.get("schema", "public")
-                )
+                # 处理schema参数,如果为空则不传递schema参数
+                if target_schema:
+                    df.to_sql(
+                        name=table_name,
+                        con=connection,
+                        if_exists='append',
+                        index=False,
+                        schema=target_schema
+                    )
+                else:
+                    df.to_sql(
+                        name=table_name,
+                        con=connection,
+                        if_exists='append',
+                        index=False
+                    )
             
-            logger.info(f"成功写入数据到目标表 {table_name}")
+            logger.info(f"成功写入数据到目标表 {full_table_name}")
             return True
             
         except Exception as query_err:

+ 119 - 27
dataops_scripts/load_file.py

@@ -52,6 +52,7 @@ def get_pg_conn():
 def get_table_columns(table_name):
     """
     获取表的列信息,包括列名和注释
+    如果表不存在,则尝试从Neo4j创建表
     
     返回:
         dict: {列名: 列注释} 的字典
@@ -59,36 +60,99 @@ def get_table_columns(table_name):
     conn = get_pg_conn()
     cursor = conn.cursor()
     try:
+        # 首先检查表是否存在
+        # 解析表名,可能包含schema(如ods.table_name)
+        if '.' in table_name:
+            schema_name, actual_table_name = table_name.split('.', 1)
+            cursor.execute("""
+                SELECT EXISTS (
+                    SELECT FROM information_schema.tables 
+                    WHERE table_schema = %s AND table_name = %s
+                );
+            """, (schema_name.lower(), actual_table_name.lower()))
+        else:
+            cursor.execute("""
+                SELECT EXISTS (
+                    SELECT FROM information_schema.tables 
+                    WHERE table_name = %s
+                );
+            """, (table_name.lower(),))
+        
+        table_exists = cursor.fetchone()[0]
+        
+        if not table_exists:
+            logger.warning(f"表 '{table_name}' 不存在,尝试从Neo4j创建表")
+            # 调用script_utils中的create_table_from_neo4j函数
+            try:
+                if script_utils.create_table_from_neo4j(table_name):
+                    logger.info(f"成功从Neo4j创建表 '{table_name}'")
+                else:
+                    logger.error(f"从Neo4j创建表 '{table_name}' 失败")
+                    return {}
+            except Exception as create_err:
+                logger.error(f"调用create_table_from_neo4j创建表 '{table_name}' 时出错: {str(create_err)}")
+                return {}
+        
         # 查询表列信息
-        cursor.execute("""
-            SELECT 
-                column_name, 
-                col_description((table_schema || '.' || table_name)::regclass::oid, ordinal_position) as column_comment
-            FROM 
-                information_schema.columns
-            WHERE 
-                table_schema = 'public' -- 明确指定 schema,如果需要
-                AND table_name = %s
-            ORDER BY 
-                ordinal_position
-        """, (table_name.lower(),))
+        # 根据表名是否包含schema来构建查询
+        if '.' in table_name:
+            schema_name, actual_table_name = table_name.split('.', 1)
+            cursor.execute("""
+                SELECT 
+                    column_name, 
+                    col_description((table_schema || '.' || table_name)::regclass::oid, ordinal_position) as column_comment
+                FROM 
+                    information_schema.columns
+                WHERE 
+                    table_schema = %s AND table_name = %s
+                ORDER BY 
+                    ordinal_position
+            """, (schema_name.lower(), actual_table_name.lower()))
+        else:
+            cursor.execute("""
+                SELECT 
+                    column_name, 
+                    col_description((table_schema || '.' || table_name)::regclass::oid, ordinal_position) as column_comment
+                FROM 
+                    information_schema.columns
+                WHERE 
+                    table_name = %s
+                ORDER BY 
+                    ordinal_position
+            """, (table_name.lower(),))
         
         columns = {}
+        empty_comment_columns = []  # 记录注释为空的列
+        
         for row in cursor.fetchall():
             col_name = row[0]
-            col_comment = row[1] if row[1] else col_name  # 如果注释为空,使用列名
+            col_comment = row[1]
+            
+            # 检查注释是否为空
+            if not col_comment:
+                empty_comment_columns.append(col_name)
+                col_comment = col_name  # 如果注释为空,使用列名
+            
             columns[col_name] = col_comment
             
         if not columns:
              logger.warning(f"未能获取到表 '{table_name}' 的列信息,请检查表是否存在、schema是否正确以及权限。")
+        else:
+            # 检查是否有注释为空的列,如果有则发出告警
+            if empty_comment_columns:
+                logger.warning(f"表 '{table_name}' 中以下列的注释为空: {empty_comment_columns}")
+            else:
+                logger.info(f"表 '{table_name}' 的所有列都有注释,数据匹配将更加准确")
 
         return columns
     except Exception as e:
         logger.error(f"获取表 '{table_name}' 的列信息时出错: {str(e)}")
         return {}
     finally:
-        cursor.close()
-        conn.close()
+        if cursor:
+            cursor.close()
+        if conn:
+            conn.close()
 
 def match_file_columns(file_headers, table_columns):
     """
@@ -119,24 +183,45 @@ def match_file_columns(file_headers, table_columns):
     # 预处理文件headers
     processed_file_headers_lower = {str(header).lower(): header for header in file_headers}
 
+    # 添加调试日志
+    logger.info(f"开始匹配过程,文件列数: {len(file_headers)}, 表列数: {len(table_columns)}")
+    logger.info(f"表列及其注释: {table_columns}")
+    logger.info(f"处理后的注释到列名映射: {processed_comment_to_column_lower}")
+
     # 1. 通过注释匹配 (忽略大小写)
+    logger.info("=== 开始通过注释匹配 ===")
     for processed_header, original_header in processed_file_headers_lower.items():
+        logger.info(f"尝试匹配文件列 '{original_header}' (处理后: '{processed_header}')")
         if processed_header in processed_comment_to_column_lower:
             table_col_original_case = processed_comment_to_column_lower[processed_header]
             if table_col_original_case not in matched_table_cols:
                 mapping[original_header] = table_col_original_case
                 matched_table_cols.add(table_col_original_case)
                 logger.info(f"通过注释匹配: 文件列 '{original_header}' -> 表列 '{table_col_original_case}'")
+            else:
+                logger.warning(f"表列 '{table_col_original_case}' 已被匹配,跳过文件列 '{original_header}'")
+        else:
+            logger.info(f"文件列 '{original_header}' 在注释中未找到匹配")
 
     # 2. 通过名称直接匹配 (忽略大小写),仅匹配尚未映射的列
+    logger.info("=== 开始通过名称直接匹配 ===")
     for processed_header, original_header in processed_file_headers_lower.items():
          if original_header not in mapping: # 仅当此文件列尚未映射时才进行名称匹配
+            logger.info(f"尝试名称匹配文件列 '{original_header}' (处理后: '{processed_header}')")
             if processed_header in processed_table_columns_lower:
                 table_col_original_case = processed_table_columns_lower[processed_header]
                 if table_col_original_case not in matched_table_cols:
                     mapping[original_header] = table_col_original_case
                     matched_table_cols.add(table_col_original_case)
                     logger.info(f"通过名称匹配: 文件列 '{original_header}' -> 表列 '{table_col_original_case}'")
+                else:
+                    logger.warning(f"表列 '{table_col_original_case}' 已被匹配,跳过文件列 '{original_header}'")
+            else:
+                logger.info(f"文件列 '{original_header}' 在表列名中未找到匹配")
+         else:
+            logger.info(f"文件列 '{original_header}' 已通过注释匹配,跳过名称匹配")
+
+    logger.info(f"=== 匹配完成,成功匹配 {len(mapping)} 个列 ===")
 
     unmapped_file = [h for h in file_headers if h not in mapping]
     if unmapped_file:
@@ -267,13 +352,20 @@ def load_dataframe_to_table(df, file_path, table_name):
         else:
             logger.info(f"数据行数: {final_row_count}")
 
-        # 检查目标表是否有create_time字段,如果有则添加当前时间
+        # 检查目标表是否有时间戳字段,优先使用create_time,其次使用created_at
+        current_time = datetime.now()
+        timestamp_field_added = False
+
         if 'create_time' in table_columns:
-            current_time = datetime.now()
             df_mapped['create_time'] = current_time
             logger.info(f"目标表有 create_time 字段,设置值为: {current_time}")
+            timestamp_field_added = True
+        elif 'created_at' in table_columns:
+            df_mapped['created_at'] = current_time
+            logger.info(f"目标表有 created_at 字段,设置值为: {current_time}")
+            timestamp_field_added = True
         else:
-            logger.warning(f"目标表 '{table_name}' 没有 create_time 字段,跳过添加时间戳")
+            logger.warning(f"目标表 '{table_name}' 没有 create_time 字段也没有 created_at 字段,跳过添加时间戳")
 
         # 连接数据库
         conn = get_pg_conn()
@@ -282,7 +374,7 @@ def load_dataframe_to_table(df, file_path, table_name):
         # 构建INSERT语句
         columns = ', '.join([f'"{col}"' for col in df_mapped.columns])
         placeholders = ', '.join(['%s'] * len(df_mapped.columns))
-        insert_sql = f'INSERT INTO public."{table_name}" ({columns}) VALUES ({placeholders})'
+        insert_sql = f'INSERT INTO "{table_name}" ({columns}) VALUES ({placeholders})'
         
         # 批量插入数据
         rows = [tuple(row) for row in df_mapped.values]
@@ -553,12 +645,12 @@ def run(table_name, update_mode='append', exec_date=None, target_type=None,
                 conn = get_pg_conn()
                 cursor = conn.cursor()
                 # 假设表在 public schema,并为表名加引号
-                logger.info(f"执行全量刷新,清空表 public.\"{table_name}\"")
-                cursor.execute(f'TRUNCATE TABLE public.\"{table_name}\"')
+                logger.info(f"执行全量刷新,清空表 \"{table_name}\"")
+                cursor.execute(f'TRUNCATE TABLE \"{table_name}\"')
                 conn.commit()
-                logger.info("表 public.\"" + table_name + "\" 已清空。")
+                logger.info("表 \"" + table_name + "\" 已清空。")
             except Exception as e:
-                logger.error("清空表 public.\"" + table_name + "\" 时出错: " + str(e))
+                logger.error("清空表 \"" + table_name + "\" 时出错: " + str(e))
                 if conn:
                     conn.rollback()
                 return False # 清空失败则直接失败退出
@@ -579,12 +671,12 @@ def run(table_name, update_mode='append', exec_date=None, target_type=None,
                 conn = get_pg_conn()
                 cursor = conn.cursor()
                 # 假设表在 public schema,并为表名加引号
-                logger.info(f"执行全量刷新,清空表 public.\"{table_name}\"")
-                cursor.execute(f'TRUNCATE TABLE public.\"{table_name}\"')
+                logger.info(f"执行全量刷新,清空表 \"{table_name}\"")
+                cursor.execute(f'TRUNCATE TABLE \"{table_name}\"')
                 conn.commit()
-                logger.info("表 public.\"" + table_name + "\" 已清空。")
+                logger.info("表 \"" + table_name + "\" 已清空。")
             except Exception as e:
-                logger.error("清空表 public.\"" + table_name + "\" 时出错: " + str(e))
+                logger.error("清空表 \"" + table_name + "\" 时出错: " + str(e))
                 if conn:
                     conn.rollback()
                 return False # 清空失败则直接失败退出

+ 231 - 1
dataops_scripts/script_utils.py

@@ -525,4 +525,234 @@ def get_config_param(param_name, default_value=None):
         return getattr(config_module, param_name)
     except Exception as e:
         logger.warning(f"获取配置参数 {param_name} 失败: {str(e)},使用默认值: {default_value}")
-        return default_value
+        return default_value
+
+def check_and_create_table(table_name, default_schema=None):
+    """
+    检查目标表是否存在,如果不存在则尝试从Neo4j创建表
+    
+    参数:
+        table_name (str): 表名,支持schema.table格式
+        default_schema (str, optional): 当表名不包含schema时的默认schema,默认为None
+    
+    返回:
+        bool: 表存在或创建成功返回True,否则返回False
+    """
+    logger.info(f"开始检查目标表 '{table_name}' 是否存在")
+    
+    # 解析表名,支持schema.table格式
+    if '.' in table_name:
+        schema_name, table_name_only = table_name.split('.', 1)
+    else:
+        schema_name = default_schema if default_schema else 'ods'  # 使用传入的默认schema或'ods'
+        table_name_only = table_name
+    
+    logger.info(f"解析表名: schema='{schema_name}', table='{table_name_only}'")
+    
+    conn = None
+    cursor = None
+    try:
+        # 获取数据库配置和连接
+        pg_config = get_pg_config()
+        import psycopg2
+        conn = psycopg2.connect(**pg_config)
+        cursor = conn.cursor()
+        
+        # 检查表是否存在 - 使用EXISTS查询方式
+        check_table_sql = """
+            SELECT EXISTS (
+                SELECT FROM information_schema.tables 
+                WHERE table_schema = %s AND table_name = %s
+            );
+        """
+        
+        cursor.execute(check_table_sql, (schema_name.lower(), table_name_only.lower()))
+        table_exists = cursor.fetchone()[0]
+        
+        logger.info(f"表存在性检查结果: {table_exists}")
+        
+        if not table_exists:
+            logger.warning(f"表 '{table_name}' 不存在,尝试从Neo4j创建表")
+            # 调用create_table_from_neo4j函数,传递schema参数
+            try:
+                if create_table_from_neo4j(table_name_only, default_schema=schema_name):
+                    logger.info(f"成功从Neo4j创建表 '{table_name}'")
+                    return True
+                else:
+                    logger.error(f"从Neo4j创建表 '{table_name}' 失败")
+                    return False
+            except Exception as create_err:
+                logger.error(f"调用create_table_from_neo4j创建表 '{table_name}' 时出错: {str(create_err)}")
+                return False
+        else:
+            logger.info(f"表 '{table_name}' 已存在")
+            return True
+            
+    except Exception as e:
+        logger.error(f"检查表存在性时出错: {str(e)}", exc_info=True)
+        return False
+    finally:
+        if cursor:
+            cursor.close()
+        if conn:
+            conn.close()
+
+def create_table_from_neo4j(en_name: str, default_schema: str = None):
+    """
+    根据Neo4j中的表定义创建PostgreSQL表
+    
+    参数:
+        en_name (str): 表的英文名称
+        default_schema (str, optional): 默认schema,如果提供则优先使用,否则从Neo4j查询Label决定
+    
+    返回:
+        bool: 成功返回True,失败返回False
+    """
+    driver = None
+    conn = None
+    cur = None
+    
+    try:
+        # 使用script_utils中的方法获取连接
+        driver = get_neo4j_driver()
+        pg_config = get_pg_config()
+        
+        import psycopg2
+        conn = psycopg2.connect(**pg_config)
+        cur = conn.cursor()
+
+        with driver.session() as session:
+            # 1. 查找目标表节点(DataResource/DataModel/DataMetric)
+            result = session.run("""
+                MATCH (t)
+                WHERE t.en_name = $en_name AND (t:DataResource OR t:DataModel OR t:DataMetric)
+                RETURN labels(t) AS labels, t.en_name AS en_name, t.name AS name, id(t) AS node_id
+            """, en_name=en_name)
+
+            record = result.single()
+            if not record:
+                logger.error(f"未找到名为 {en_name} 的表节点")
+                return False
+
+            labels = record["labels"]
+            table_en_name = record["en_name"]
+            table_cn_name = record["name"]
+            node_id = record["node_id"]
+
+            # 优先使用传入的 default_schema,如果没有则从 Neo4j Label 判断
+            if default_schema:
+                schema = default_schema
+                logger.info(f"使用传入的 default_schema: {schema}")
+            else:
+                schema = "ods" if "DataResource" in labels else "ads"
+                logger.info(f"从 Neo4j Label {labels} 判断 schema: {schema}")
+
+            # 2. 查找所有字段(HAS_COLUMN关系)并按Column节点的系统id排序
+            column_result = session.run("""
+                MATCH (t)-[:HAS_COLUMN]->(c:Column)
+                WHERE id(t) = $node_id
+                RETURN c.en_name AS en_name, c.data_type AS data_type, 
+                       c.name AS name, c.is_pk AS is_pk, id(c) AS column_id
+                ORDER BY id(c) ASC
+            """, node_id=node_id)
+
+            columns = column_result.data()
+            if not columns:
+                logger.error(f"未找到表 {en_name} 的字段信息")
+                return False
+
+            # 3. 构造 DDL
+            ddl_lines = []
+            pk_fields = set()  # 使用 set 自动去重
+            existing_fields = set()
+
+            for col in columns:
+                col_line = f'{col["en_name"]} {col["data_type"]}'
+                ddl_lines.append(col_line)
+                existing_fields.add(col["en_name"].lower())
+                if col.get("is_pk", False):
+                    pk_fields.add(col["en_name"])  # 使用 add 方法,自动去重
+
+            # 检查并添加 create_time 和 update_time 字段
+            if 'create_time' not in existing_fields:
+                ddl_lines.append('create_time timestamp')
+            if 'update_time' not in existing_fields:
+                ddl_lines.append('update_time timestamp')
+
+            if pk_fields:
+                # 将 set 转换为排序后的列表,确保 DDL 生成的一致性
+                sorted_pk_fields = sorted(list(pk_fields))
+                ddl_lines.append(f'PRIMARY KEY ({", ".join(sorted_pk_fields)})')
+
+            full_table_name = f"{schema}.{table_en_name}"
+            ddl = f'CREATE SCHEMA IF NOT EXISTS {schema};\n'
+            ddl += f'CREATE TABLE IF NOT EXISTS {full_table_name} (\n  '
+            ddl += ",\n  ".join(ddl_lines)
+            ddl += "\n);"
+
+            # 生成表注释SQL
+            table_comment_sql = f"COMMENT ON TABLE {full_table_name} IS '{table_cn_name}';"
+            
+            # 生成字段注释SQL
+            column_comment_sqls = []
+            for col in columns:
+                if col["name"]:  # 如果有中文名称
+                    column_comment_sql = f"COMMENT ON COLUMN {full_table_name}.{col['en_name']} IS '{col['name']}';"
+                    column_comment_sqls.append(column_comment_sql)
+
+            logger.info(f"DDL: {ddl}")
+            logger.info(f"表注释SQL: {table_comment_sql}")
+            if column_comment_sqls:
+                logger.info("字段注释SQL:")
+                for comment_sql in column_comment_sqls:
+                    logger.info(f"  {comment_sql}")
+
+            # 4. 执行 DDL
+            try:
+                # 先检查表是否已经存在
+                check_table_sql = """
+                    SELECT EXISTS (
+                        SELECT FROM information_schema.tables 
+                        WHERE table_schema = %s AND table_name = %s
+                    );
+                """
+                cur.execute(check_table_sql, (schema, table_en_name))
+                table_exists = cur.fetchone()[0]
+                
+                if table_exists:
+                    logger.info(f"表 {full_table_name} 已存在,跳过创建")
+                    return True
+                else:
+                    # 执行创建表的DDL
+                    cur.execute(ddl)
+                    logger.info(f"成功创建新表: {full_table_name}")
+                    
+                    # 执行表注释
+                    cur.execute(table_comment_sql)
+                    logger.info(f"已添加表注释")
+                    
+                    # 执行字段注释
+                    for comment_sql in column_comment_sqls:
+                        cur.execute(comment_sql)
+                    logger.info(f"已添加 {len(column_comment_sqls)} 个字段注释")
+                    
+                    conn.commit()
+                    return True
+                    
+            except Exception as e:
+                logger.error(f"执行DDL失败: {e}")
+                conn.rollback()
+                return False
+
+    except Exception as e:
+        logger.error(f"创建表 {en_name} 时发生错误: {str(e)}")
+        if conn:
+            conn.rollback()
+        return False
+    finally:
+        if cur:
+            cur.close()
+        if conn:
+            conn.close()
+        if driver:
+            driver.close()