Parcourir la source

合并exec python/sql中的目标表检查和建表语句。

wangxq il y a 4 semaines
Parent
commit
88e8fab112

+ 4 - 0
dags/.airflowignore

@@ -0,0 +1,4 @@
+__pycache__/
+*.pyc
+config.py
+utils.py

+ 41 - 19
dataops_scripts/execution_python.py

@@ -236,27 +236,30 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None, s
         
         logger.info(f"成功获取脚本内容,长度: {len(script_code)} 字符")
         
-        # 日期计算
-        # try:
-        #     # 直接使用script_utils.get_date_range计算日期范围
-        #     logger.info(f"使用script_utils.get_date_range计算日期范围,参数: exec_date={exec_date}, frequency={schedule_frequency}")
-        #     start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
-        #     logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
-        # except Exception as date_err:
-        #     logger.error(f"日期处理失败: {str(date_err)}", exc_info=True)
-        #     # 使用简单的默认日期范围计算
-        #     date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
-        #     if schedule_frequency.lower() == 'daily':
-        #         start_date = date_obj.strftime('%Y-%m-%d')
-        #         end_date = (date_obj + timedelta(days=1)).strftime('%Y-%m-%d')
-        #     else:
-        #         # 对其他频率使用默认范围
-        #         start_date = exec_date
-        #         end_date = (date_obj + timedelta(days=30)).strftime('%Y-%m-%d')
-        #     logger.warning(f"使用默认日期范围计算: start_date={start_date}, end_date={end_date}")
+        # 获取目标表标签,用于决定schema
+        target_table_label = kwargs.get('target_table_label', '')
+        
+        # 根据 target_table_label 决定 schema
+        if target_table_label and 'DataResource' in target_table_label:
+            target_schema = 'ods'
+        else:
+            target_schema = 'ads'
+        
+        logger.info(f"目标表标签: {target_table_label}, 决定使用schema: {target_schema}")
+        
+        # 检查目标表是否存在,如果不存在则尝试创建
+        logger.info(f"检查目标表 '{target_table}' 是否存在,如果不存在则尝试从Neo4j创建")
+        if not script_utils.check_and_create_table(target_table, default_schema=target_schema):
+            logger.error(f"目标表 '{target_table}' 不存在且无法创建,无法继续执行Python脚本")
+            return False
+        
+        logger.info(f"目标表 '{target_table}' 检查完成,可以继续执行")
+        
+        # 初始化日期变量,确保在所有情况下都有值
+        start_date = None
+        end_date = None
         
         # 检查是否开启ETL幂等性
-        target_table_label = kwargs.get('target_table_label', '')
         script_exec_mode = kwargs.get('update_mode', 'append')  # 只使用update_mode
         is_manual_dag_trigger = kwargs.get('is_manual_dag_trigger', False) 
         
@@ -361,6 +364,25 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None, s
                 logger.info(f"当前更新模式 {script_exec_mode} 不是append或full_refresh,不执行幂等性处理")
         else:
             logger.info("未开启ETL幂等性,直接执行Python脚本")
+        
+        # 如果日期变量还没有被设置,则计算默认的日期范围
+        if start_date is None or end_date is None:
+            logger.info(f"计算默认日期范围,参数: exec_date={exec_date}, frequency={schedule_frequency}")
+            try:
+                start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
+                logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
+            except Exception as date_err:
+                logger.error(f"日期处理失败: {str(date_err)}", exc_info=True)
+                # 使用简单的默认日期范围计算
+                date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
+                if schedule_frequency.lower() == 'daily':
+                    start_date = date_obj.strftime('%Y-%m-%d')
+                    end_date = (date_obj + timedelta(days=1)).strftime('%Y-%m-%d')
+                else:
+                    # 对其他频率使用默认范围
+                    start_date = exec_date
+                    end_date = (date_obj + timedelta(days=30)).strftime('%Y-%m-%d')
+                logger.warning(f"使用默认日期范围计算: start_date={start_date}, end_date={end_date}")
 
         # 准备执行上下文
         conn = get_pg_conn()  # 在外层先拿到连接

+ 19 - 1
dataops_scripts/execution_sql.py

@@ -301,8 +301,26 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None,
         
         logger.info(f"成功获取脚本内容,长度: {len(sql_content)} 字符")
         
-        # 检查是否开启ETL幂等性
+        # 获取目标表标签,用于决定schema
         target_table_label = kwargs.get('target_table_label', '')
+        
+        # 根据 target_table_label 决定 schema
+        if target_table_label and 'DataResource' in target_table_label:
+            target_schema = 'ods'
+        else:
+            target_schema = 'ads'
+        
+        logger.info(f"目标表标签: {target_table_label}, 决定使用schema: {target_schema}")
+        
+        # 检查目标表是否存在,如果不存在则尝试创建
+        logger.info(f"检查目标表 '{target_table}' 是否存在,如果不存在则尝试从Neo4j创建")
+        if not script_utils.check_and_create_table(target_table, default_schema=target_schema):
+            logger.error(f"目标表 '{target_table}' 不存在且无法创建,无法继续执行SQL脚本")
+            return False
+        
+        logger.info(f"目标表 '{target_table}' 检查完成,可以继续执行")
+        
+        # 检查是否开启ETL幂等性
         script_exec_mode = kwargs.get('update_mode', 'append')  # 只使用update_mode
         is_manual_dag_trigger = kwargs.get('is_manual_dag_trigger', False)  # 新增:获取是否为手动触发的DAG
         

+ 84 - 5
dataops_scripts/script_utils.py

@@ -527,12 +527,83 @@ def get_config_param(param_name, default_value=None):
         logger.warning(f"获取配置参数 {param_name} 失败: {str(e)},使用默认值: {default_value}")
         return default_value
 
-def create_table_from_neo4j(en_name: str):
+def check_and_create_table(table_name, default_schema=None):
+    """
+    检查目标表是否存在,如果不存在则尝试从Neo4j创建表
+    
+    参数:
+        table_name (str): 表名,支持schema.table格式
+        default_schema (str, optional): 当表名不包含schema时的默认schema,默认为None
+    
+    返回:
+        bool: 表存在或创建成功返回True,否则返回False
+    """
+    logger.info(f"开始检查目标表 '{table_name}' 是否存在")
+    
+    # 解析表名,支持schema.table格式
+    if '.' in table_name:
+        schema_name, table_name_only = table_name.split('.', 1)
+    else:
+        schema_name = default_schema if default_schema else 'ods'  # 使用传入的默认schema或'ods'
+        table_name_only = table_name
+    
+    logger.info(f"解析表名: schema='{schema_name}', table='{table_name_only}'")
+    
+    conn = None
+    cursor = None
+    try:
+        # 获取数据库配置和连接
+        pg_config = get_pg_config()
+        import psycopg2
+        conn = psycopg2.connect(**pg_config)
+        cursor = conn.cursor()
+        
+        # 检查表是否存在 - 使用EXISTS查询方式
+        check_table_sql = """
+            SELECT EXISTS (
+                SELECT FROM information_schema.tables 
+                WHERE table_schema = %s AND table_name = %s
+            );
+        """
+        
+        cursor.execute(check_table_sql, (schema_name.lower(), table_name_only.lower()))
+        table_exists = cursor.fetchone()[0]
+        
+        logger.info(f"表存在性检查结果: {table_exists}")
+        
+        if not table_exists:
+            logger.warning(f"表 '{table_name}' 不存在,尝试从Neo4j创建表")
+            # 调用create_table_from_neo4j函数,传递schema参数
+            try:
+                if create_table_from_neo4j(table_name_only, default_schema=schema_name):
+                    logger.info(f"成功从Neo4j创建表 '{table_name}'")
+                    return True
+                else:
+                    logger.error(f"从Neo4j创建表 '{table_name}' 失败")
+                    return False
+            except Exception as create_err:
+                logger.error(f"调用create_table_from_neo4j创建表 '{table_name}' 时出错: {str(create_err)}")
+                return False
+        else:
+            logger.info(f"表 '{table_name}' 已存在")
+            return True
+            
+    except Exception as e:
+        logger.error(f"检查表存在性时出错: {str(e)}", exc_info=True)
+        return False
+    finally:
+        if cursor:
+            cursor.close()
+        if conn:
+            conn.close()
+
+def create_table_from_neo4j(en_name: str, default_schema: str = None):
     """
     根据Neo4j中的表定义创建PostgreSQL表
     
     参数:
         en_name (str): 表的英文名称
+        default_schema (str, optional): 默认schema,如果提供则优先使用,否则从Neo4j查询Label决定
     
     返回:
         bool: 成功返回True,失败返回False
@@ -568,7 +639,13 @@ def create_table_from_neo4j(en_name: str):
             table_cn_name = record["name"]
             node_id = record["node_id"]
 
-            schema = "ods" if "DataResource" in labels else "ads"
+            # 优先使用传入的 default_schema,如果没有则从 Neo4j Label 判断
+            if default_schema:
+                schema = default_schema
+                logger.info(f"使用传入的 default_schema: {schema}")
+            else:
+                schema = "ods" if "DataResource" in labels else "ads"
+                logger.info(f"从 Neo4j Label {labels} 判断 schema: {schema}")
 
             # 2. 查找所有字段(HAS_COLUMN关系)并按Column节点的系统id排序
             column_result = session.run("""
@@ -586,7 +663,7 @@ def create_table_from_neo4j(en_name: str):
 
             # 3. 构造 DDL
             ddl_lines = []
-            pk_fields = []
+            pk_fields = set()  # 使用 set 自动去重
             existing_fields = set()
 
             for col in columns:
@@ -594,7 +671,7 @@ def create_table_from_neo4j(en_name: str):
                 ddl_lines.append(col_line)
                 existing_fields.add(col["en_name"].lower())
                 if col.get("is_pk", False):
-                    pk_fields.append(f'{col["en_name"]}')
+                    pk_fields.add(col["en_name"])  # 使用 add 方法,自动去重
 
             # 检查并添加 create_time 和 update_time 字段
             if 'create_time' not in existing_fields:
@@ -603,7 +680,9 @@ def create_table_from_neo4j(en_name: str):
                 ddl_lines.append('update_time timestamp')
 
             if pk_fields:
-                ddl_lines.append(f'PRIMARY KEY ({", ".join(pk_fields)})')
+                # 将 set 转换为排序后的列表,确保 DDL 生成的一致性
+                sorted_pk_fields = sorted(list(pk_fields))
+                ddl_lines.append(f'PRIMARY KEY ({", ".join(sorted_pk_fields)})')
 
             full_table_name = f"{schema}.{table_en_name}"
             ddl = f'CREATE SCHEMA IF NOT EXISTS {schema};\n'