Procházet zdrojové kódy

手工调度基本修改完成,但幂等性设置有些问题

wangxq před 1 týdnem
rodič
revize
bbbd7d6206

+ 1 - 1
dags/config.py

@@ -4,7 +4,7 @@ import os
 
 # PostgreSQL 连接信息
 PG_CONFIG = {
-    "host": "localhost",
+    "host": "192.168.67.10",
     "port": 5432,
     "user": "postgres",
     "password": "postgres",

+ 55 - 274
dags/dataops_productline_manual_trigger_dag.py

@@ -66,7 +66,8 @@ from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG
 import traceback
 import pendulum
 import pytz
-from utils import get_pg_conn, get_cn_exec_date, check_script_exists
+from utils import get_pg_conn, get_cn_exec_date, check_script_exists, get_complete_script_info
+from airflow.exceptions import AirflowException
 
 # 设置logger
 logger = logging.getLogger(__name__)
@@ -82,73 +83,6 @@ default_args = {
     'retry_delay': timedelta(minutes=1),
 }
 
-def get_execution_mode(table_name):
-    """
-    从Neo4j获取表的执行模式
-    
-    参数:
-        table_name (str): 表名
-        
-    返回:
-        str: 执行模式,如果未找到则返回"append"作为默认值
-    """
-    try:
-        driver = GraphDatabase.driver(
-            NEO4J_CONFIG['uri'], 
-            auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
-        )
-        
-        # 先检查是否为structure类型的DataResource
-        with driver.session() as session:
-            query_structure = """
-                MATCH (n:DataResource {en_name: $table_name})
-                RETURN n.type AS type
-            """
-            
-            result = session.run(query_structure, table_name=table_name)
-            record = result.single()
-            
-            if record and record.get("type") == "structure":
-                logger.info(f"表 {table_name} 是structure类型的DataResource,使用默认执行模式'append'")
-                return "append"
-        
-        # 查询执行模式,分别尝试DataModel和DataResource
-        with driver.session() as session:
-            # 首先检查DataModel类型表
-            query_model = """
-                MATCH (n:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->()
-                RETURN rel.exec_mode AS execution_mode LIMIT 1
-            """
-            
-            result = session.run(query_model, table_name=table_name)
-            record = result.single()
-            
-            if record and record.get("execution_mode"):
-                return record.get("execution_mode")
-            
-            # 然后检查DataResource类型表
-            query_resource = """
-                MATCH (n:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->()
-                RETURN rel.exec_mode AS execution_mode LIMIT 1
-            """
-            
-            result = session.run(query_resource, table_name=table_name)
-            record = result.single()
-            
-            if record and record.get("execution_mode"):
-                return record.get("execution_mode")
-            
-            # 如果上面两种方式都找不到,使用默认值
-            logger.warning(f"未在Neo4j中找到表 {table_name} 的执行模式,使用默认值 'append'")
-            return "append"
-            
-    except Exception as e:
-        logger.error(f"获取表 {table_name} 的执行模式时出错: {str(e)}")
-        return "append"
-    finally:
-        if driver:
-            driver.close()
-
 def get_dag_params(**context):
     """获取DAG运行参数"""
     #params = context.get('params', {})
@@ -304,19 +238,19 @@ def find_scripts_for_table(table_name):
                     return scripts
                 
             if table_label == "DataModel":
-                # 查询DataModel的所有脚本关系
+                # 查询DataModel表中的脚本关系
                 query = """
                     MATCH (target:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
                     RETURN rel.script_name AS script_name, rel.script_type AS script_type
                 """
             elif table_label == "DataResource":
-                # 查询DataResource的所有脚本关系
+                # 查询DataResource表中的脚本关系
                 query = """
                     MATCH (target:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
                     RETURN rel.script_name AS script_name, rel.script_type AS script_type
                 """
             else:
-                logger.warning(f"表 {table_name} 不是DataModel或DataResource类型")
+                logger.warning(f"表 {table_name} 类型未知或不受支持,无法查找脚本")
                 return scripts
             
             result = session.run(query, table_name=table_name)
@@ -330,178 +264,27 @@ def find_scripts_for_table(table_name):
                         "script_name": script_name,
                         "script_type": script_type
                     })
-            
-            # 如果找不到脚本,使用表名作为基础生成默认脚本名
-            if not scripts:
-                logger.warning(f"表 {table_name} 没有关联的脚本,尝试使用默认脚本名")
-                
-                # 尝试查找可能的默认脚本文件
-                default_script_name = f"{table_name}_process.py"
-                script_path = os.path.join(SCRIPTS_BASE_PATH, default_script_name)
-                
-                if os.path.exists(script_path):
-                    logger.info(f"发现默认脚本文件: {default_script_name}")
-                    scripts.append({
-                        "script_name": default_script_name,
-                        "script_type": "python_script"
-                    })
-    except Exception as e:
-        logger.error(f"查找表 {table_name} 对应的脚本时出错: {str(e)}")
-    finally:
-        driver.close()
-    
-    logger.info(f"表 {table_name} 关联的脚本: {scripts}")
-    return scripts
-
-def get_script_info_from_neo4j(script_name, target_table):
-    """
-    从Neo4j获取脚本和表的详细信息
-    
-    参数:
-        script_name: 脚本名称
-        target_table: 目标表名
-        
-    返回:
-        dict: 脚本和表的详细信息
-    """
-    driver = GraphDatabase.driver(
-        NEO4J_CONFIG['uri'], 
-        auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
-    )
-    
-    # 获取表的标签类型
-    table_label = get_table_label(target_table)
-    
-    script_info = {
-        'script_name': script_name,
-        'target_table': target_table,
-        'script_id': f"{script_name.replace('.', '_')}_{target_table}",
-        'target_table_label': table_label,
-        'source_tables': [],
-        'script_type': 'python_script'  # 默认类型改为python_script,表示物理脚本文件
-    }
-    
-    # 检查是否为structure类型的DataResource
-    try:
-        with driver.session() as session:
-            if table_label == 'DataResource':
-                query_structure = """
-                    MATCH (n:DataResource {en_name: $table_name})
-                    RETURN n.type AS type, n.storage_location AS storage_location, n.frequency AS frequency
-                """
-                result = session.run(query_structure, table_name=target_table)
-                record = result.single()
-                
-                if record and record.get("type") == "structure":
-                    logger.info(f"表 {target_table} 是structure类型的DataResource")
-                    
-                    # 设置特殊属性
-                    script_info['target_type'] = 'structure'
-                    storage_location = record.get("storage_location")
-                    frequency = record.get("frequency", "daily")
-                    
-                    if storage_location:
-                        script_info['storage_location'] = storage_location
-                    script_info['frequency'] = frequency
-                    
-                    # 如果没有指定脚本名称或指定的是default,则设置为load_file.py
-                    if not script_name or script_name.lower() == 'default' or script_name == 'load_file.py':
-                        script_info['script_name'] = 'load_file.py'
-                        script_info['script_id'] = f"load_file_py_{target_table}"
-                        script_info['execution_mode'] = "append"
-                        logger.info(f"对于structure类型的DataResource表 {target_table},使用默认脚本'load_file.py'")
-                        return script_info
-                    
-    except Exception as e:
-        logger.error(f"检查表 {target_table} 是否为structure类型时出错: {str(e)}")
-        logger.error(traceback.format_exc())
-    
-    # 根据表标签类型查询脚本信息和依赖关系
-    try:
-        with driver.session() as session:
-            if script_info['target_table_label'] == 'DataModel':
-                # 查询DataModel的上游依赖
-                query = """
-                    MATCH (target:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
-                    RETURN source.en_name AS source_table, rel.script_name AS script_name, rel.script_type AS script_type
-                """
-                result = session.run(query, table_name=target_table)
-                
-                for record in result:
-                    source_table = record.get("source_table")
-                    source_labels = record.get("source_labels", [])
-                    db_script_name = record.get("script_name")
-                    script_type = record.get("script_type", "python_script")
                     
-                    # 验证脚本名称匹配
-                    if db_script_name and db_script_name == script_name:
-                        if source_table and source_table not in script_info['source_tables']:
-                            script_info['source_tables'].append(source_table)
-                        script_info['script_type'] = script_type
+        # 如果没有找到脚本,则尝试使用表名作为脚本名称的一部分查找
+        if not scripts:
+            logger.info(f"未在关系中找到表 {table_name} 对应的脚本,尝试使用表名查找")
             
-            elif script_info['target_table_label'] == 'DataResource':
-                # 查询DataResource的上游依赖
-                query = """
-                    MATCH (target:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
-                    RETURN source.en_name AS source_table, rel.script_name AS script_name, rel.script_type AS script_type
-                """
-                result = session.run(query, table_name=target_table)
-                
-                for record in result:
-                    source_table = record.get("source_table")
-                    source_labels = record.get("source_labels", [])
-                    db_script_name = record.get("script_name")
-                    script_type = record.get("script_type", "python_script")
-                    
-                    # 验证脚本名称匹配
-                    if db_script_name and db_script_name == script_name:
-                        if source_table and source_table not in script_info['source_tables']:
-                            script_info['source_tables'].append(source_table)
-                        script_info['script_type'] = script_type
+            default_script_name = f"{table_name}_process.py"
+            script_path = os.path.join(SCRIPTS_BASE_PATH, default_script_name)
             
-            # 如果没有找到依赖关系,记录警告
-            if not script_info['source_tables']:
-                logger.warning(f"未找到脚本 {script_name} 和表 {target_table} 的依赖关系")
-                
-            # 获取特殊属性(如果是structure类型)
-            if script_info['target_table_label'] == 'DataResource':
-                query = """
-                    MATCH (n:DataResource {en_name: $table_name})
-                    RETURN n.type AS target_type, n.storage_location AS storage_location, n.frequency AS frequency
-                """
-                result = session.run(query, table_name=target_table)
-                record = result.single()
-                
-                if record:
-                    target_type = record.get("target_type")
-                    storage_location = record.get("storage_location")
-                    frequency = record.get("frequency")
-                    
-                    if target_type:
-                        script_info['target_type'] = target_type
-                    if storage_location:
-                        script_info['storage_location'] = storage_location
-                    if frequency:
-                        script_info['frequency'] = frequency
-                    
-                    # 如果是structure类型,再次检查是否应使用默认脚本
-                    if target_type == 'structure' and (not script_name or script_name.lower() == 'default' or script_name == 'load_file.py'):
-                        script_info['script_name'] = 'load_file.py'
-                        script_info['script_id'] = f"load_file_py_{target_table}"
-                        script_info['execution_mode'] = "append"
-                        logger.info(f"对于structure类型的DataResource表 {target_table},使用默认脚本'load_file.py'")
-    
+            if os.path.exists(script_path):
+                logger.info(f"找到默认脚本: {default_script_name}")
+                scripts.append({
+                    "script_name": default_script_name,
+                    "script_type": "python_script"
+                })
     except Exception as e:
-        logger.error(f"从Neo4j获取脚本 {script_name} 和表 {target_table} 的信息时出错: {str(e)}")
+        logger.error(f"查找表 {table_name} 的脚本信息时出错: {str(e)}")
         logger.error(traceback.format_exc())
     finally:
         driver.close()
-    
-    # 获取表的执行模式
-    script_info['execution_mode'] = get_execution_mode(target_table)
-    
-    logger.info(f"获取到脚本信息: {script_info}")
-    return script_info
+        
+    return scripts
 
 def get_upstream_script_dependencies(script_info, dependency_level='resource'):
     """
@@ -569,7 +352,7 @@ def get_upstream_script_dependencies(script_info, dependency_level='resource'):
                     if table_label == 'DataResource':
                         query_structure = """
                             MATCH (n:DataResource {en_name: $table_name})
-                            RETURN n.type AS type, n.storage_location AS storage_location, n.frequency AS frequency
+                            RETURN n.type AS type
                         """
                         result = session.run(query_structure, table_name=source_table)
                         record = result.single()
@@ -577,6 +360,15 @@ def get_upstream_script_dependencies(script_info, dependency_level='resource'):
                         if record and record.get("type") == "structure":
                             logger.info(f"上游表 {source_table} 是structure类型的DataResource,使用默认脚本'load_file.py'")
                             
+                            # 获取structure类型表的基本信息
+                            structure_query = """
+                                MATCH (n:DataResource {en_name: $table_name})
+                                RETURN n.type AS type, n.storage_location AS storage_location, 
+                                      n.schedule_frequency AS schedule_frequency
+                            """
+                            structure_result = session.run(structure_query, table_name=source_table)
+                            structure_record = structure_result.single()
+                            
                             # 构建structure类型表的脚本信息
                             upstream_script_name = "load_file.py"
                             upstream_target_table = source_table
@@ -585,21 +377,7 @@ def get_upstream_script_dependencies(script_info, dependency_level='resource'):
                             # 如果还没有处理过这个脚本
                             if upstream_id not in visited:
                                 # 创建脚本信息
-                                upstream_info = {
-                                    'script_name': upstream_script_name,
-                                    'target_table': upstream_target_table,
-                                    'script_id': upstream_id,
-                                    'target_table_label': 'DataResource',
-                                    'target_type': 'structure',
-                                    'source_tables': [],
-                                    'script_type': 'python_script',
-                                    'execution_mode': 'append',
-                                    'frequency': record.get("frequency", "daily")
-                                }
-                                
-                                # 添加storage_location如果存在
-                                if record.get("storage_location"):
-                                    upstream_info["storage_location"] = record.get("storage_location")
+                                upstream_info = get_complete_script_info(upstream_script_name, upstream_target_table)
                                 
                                 # 添加到图中
                                 G.add_node(upstream_id, **upstream_info)
@@ -650,7 +428,7 @@ def get_upstream_script_dependencies(script_info, dependency_level='resource'):
                         
                         if upstream_id not in visited:
                             # 获取完整的上游脚本信息
-                            upstream_info = get_script_info_from_neo4j(upstream_script_name, upstream_target_table)
+                            upstream_info = get_complete_script_info(upstream_script_name, upstream_target_table)
                             
                             # 添加到图中
                             G.add_node(upstream_id, **upstream_info)
@@ -715,7 +493,7 @@ def execute_python_script(script_info):
     """
     script_name = script_info.get('script_name')
     target_table = script_info.get('target_table')
-    execution_mode = script_info.get('execution_mode', 'append')
+    update_mode = script_info.get('update_mode', 'append')
     target_table_label = script_info.get('target_table_label')
     source_tables = script_info.get('source_tables', [])
     frequency = script_info.get('frequency', 'daily')
@@ -725,7 +503,7 @@ def execute_python_script(script_info):
     # 记录开始执行
     logger.info(f"===== 开始执行物理Python脚本文件: {script_name} =====")
     logger.info(f"目标表: {target_table}")
-    logger.info(f"执行模式: {execution_mode}")
+    logger.info(f"更新模式: {update_mode}")
     logger.info(f"表标签: {target_table_label}")
     logger.info(f"源表: {source_tables}")
     logger.info(f"频率: {frequency}")
@@ -752,7 +530,7 @@ def execute_python_script(script_info):
             # 构建函数参数
             run_kwargs = {
                 "table_name": target_table,
-                "execution_mode": execution_mode,
+                "update_mode": update_mode,
                 "frequency": frequency,
                 "exec_date": exec_date  # 使用传入的执行日期而不是当前日期
             }
@@ -800,7 +578,7 @@ def execute_sql(script_info):
     """
     script_name = script_info.get('script_name')
     target_table = script_info.get('target_table')
-    execution_mode = script_info.get('execution_mode', 'append')
+    update_mode = script_info.get('update_mode', 'append')
     target_table_label = script_info.get('target_table_label')
     frequency = script_info.get('frequency', 'daily')
     # 使用传入的执行日期,如果不存在则使用当前日期
@@ -809,7 +587,7 @@ def execute_sql(script_info):
     # 记录开始执行
     logger.info(f"===== 开始执行SQL脚本: {script_name} =====")
     logger.info(f"目标表: {target_table}")
-    logger.info(f"执行模式: {execution_mode}")
+    logger.info(f"更新模式: {update_mode}")
     logger.info(f"表标签: {target_table_label}")
     logger.info(f"频率: {frequency}")
     logger.info(f"执行日期: {exec_date}")
@@ -841,7 +619,7 @@ def execute_sql(script_info):
                 "exec_date": exec_date,  # 使用传入的执行日期而不是当前日期
                 "frequency": frequency,
                 "target_table_label": target_table_label,
-                "execution_mode": execution_mode
+                "update_mode": update_mode
             }
             
             # 如果是structure类型,添加特殊参数
@@ -887,7 +665,7 @@ def execute_python(script_info):
     """
     script_name = script_info.get('script_name')
     target_table = script_info.get('target_table')
-    execution_mode = script_info.get('execution_mode', 'append')
+    update_mode = script_info.get('update_mode', 'append')
     target_table_label = script_info.get('target_table_label')
     frequency = script_info.get('frequency', 'daily')
     # 使用传入的执行日期,如果不存在则使用当前日期
@@ -896,7 +674,7 @@ def execute_python(script_info):
     # 记录开始执行
     logger.info(f"===== 开始执行Python脚本(data_transform_scripts): {script_name} =====")
     logger.info(f"目标表: {target_table}")
-    logger.info(f"执行模式: {execution_mode}")
+    logger.info(f"更新模式: {update_mode}")
     logger.info(f"表标签: {target_table_label}")
     logger.info(f"频率: {frequency}")
     logger.info(f"执行日期: {exec_date}")
@@ -928,7 +706,7 @@ def execute_python(script_info):
                 "exec_date": exec_date,  # 使用传入的执行日期而不是当前日期
                 "frequency": frequency,
                 "target_table_label": target_table_label,
-                "execution_mode": execution_mode
+                "update_mode": update_mode
             }
             
             # 如果是structure类型,添加特殊参数
@@ -1015,7 +793,7 @@ def prepare_script_info(script_name=None, target_table=None, dependency_level=No
     # 情况1: 同时提供脚本名称和目标表名
     if script_name and target_table:
         logger.info(f"方案1: 同时提供了脚本名称和目标表名")
-        script_info = get_script_info_from_neo4j(script_name, target_table)
+        script_info = get_complete_script_info(script_name, target_table)
         if script_info:
             all_script_infos.append(script_info)
     
@@ -1025,7 +803,7 @@ def prepare_script_info(script_name=None, target_table=None, dependency_level=No
         target_table = find_target_table_for_script(script_name)
         if target_table:
             logger.info(f"找到脚本 {script_name} 对应的目标表: {target_table}")
-            script_info = get_script_info_from_neo4j(script_name, target_table)
+            script_info = get_complete_script_info(script_name, target_table)
             if script_info:
                 all_script_infos.append(script_info)
         else:
@@ -1055,7 +833,7 @@ def prepare_script_info(script_name=None, target_table=None, dependency_level=No
                     
                     if record and record.get("type") == "structure":
                         logger.info(f"表 {target_table} 是structure类型的DataResource,使用默认脚本'load_file.py'")
-                        script_info = get_script_info_from_neo4j('load_file.py', target_table)
+                        script_info = get_complete_script_info('load_file.py', target_table)
                         if script_info:
                             all_script_infos.append(script_info)
                             return all_script_infos
@@ -1071,7 +849,7 @@ def prepare_script_info(script_name=None, target_table=None, dependency_level=No
             # 如果是DataResource的表,再次检查是否为structure类型
             if table_label == 'DataResource':
                 logger.info(f"尝试使用默认脚本'load_file.py'处理表 {target_table}")
-                script_info = get_script_info_from_neo4j('load_file.py', target_table)
+                script_info = get_complete_script_info('load_file.py', target_table)
                 if script_info:
                     all_script_infos.append(script_info)
             
@@ -1084,7 +862,7 @@ def prepare_script_info(script_name=None, target_table=None, dependency_level=No
             # 如果只有一个不同的脚本名称,处理为单个脚本
             single_script_name = next(iter(script_names))
             logger.info(f"表 {target_table} 只关联了一个脚本: {single_script_name}")
-            script_info = get_script_info_from_neo4j(single_script_name, target_table)
+            script_info = get_complete_script_info(single_script_name, target_table)
             if script_info:
                 all_script_infos.append(script_info)
         else:
@@ -1092,7 +870,7 @@ def prepare_script_info(script_name=None, target_table=None, dependency_level=No
             logger.info(f"表 {target_table} 关联了多个不同脚本: {script_names}")
             for script in scripts:
                 script_name = script['script_name']
-                script_info = get_script_info_from_neo4j(script_name, target_table)
+                script_info = get_complete_script_info(script_name, target_table)
                 if script_info:
                     all_script_infos.append(script_info)
     
@@ -1183,7 +961,7 @@ def execute_script_chain(**context):
     
     if not dependency_chain:
         logger.error("没有找到依赖链,无法执行脚本")
-        return False
+        raise AirflowException("没有找到依赖链,无法执行脚本")
     
     # 记录依赖链信息
     logger.info(f"准备执行依赖链中的 {len(dependency_chain)} 个脚本")
@@ -1220,11 +998,14 @@ def execute_script_chain(**context):
         }
         results.append(result)
         
-        # 如果任何一个脚本执行失败,标记整体失败
+        # 如果任何一个脚本执行失败,标记整体失败并抛出异常
         if not success:
             all_success = False
-            logger.error(f"脚本 {script_name} 执行失败,中断执行链")
-            break
+            error_message = f"脚本 {script_name} 执行失败,中断执行链"
+            logger.error(error_message)
+            # 保存执行结果后抛出异常
+            ti.xcom_push(key='execution_results', value=results)
+            raise AirflowException(error_message)
     
     # 保存执行结果
     ti.xcom_push(key='execution_results', value=results)
@@ -1330,13 +1111,13 @@ with DAG(
         task_id='generate_execution_report',
         python_callable=generate_execution_report,
         provide_context=True,
-        trigger_rule='all_done'  # 无论前面的任务成功或失败,都生成报告
+        trigger_rule='all_success'  # 修改为仅在上游任务成功时执行
     )
     
     # 任务4: 完成标记
     completed_task = EmptyOperator(
         task_id='execution_completed',
-        trigger_rule='all_done'  # 无论前面的任务成功或失败,都标记为完成
+        trigger_rule='all_success'  # 修改为仅在上游任务成功时执行
     )
     
     # 设置任务依赖关系

+ 322 - 116
dags/utils.py

@@ -21,18 +21,18 @@ def get_pg_conn():
 
 
 
-def execute_script(script_name=None, table_name=None, execution_mode=None, script_path=None, script_exec_mode=None, args=None):
+def execute_script(script_name=None, table_name=None, update_mode=None, script_path=None, script_exec_mode=None, args=None):
     """
     根据脚本名称动态导入并执行对应的脚本
     支持两种调用方式:
-    1. execute_script(script_name, table_name, execution_mode) - 原始实现
+    1. execute_script(script_name, table_name, update_mode) - 原始实现
     2. execute_script(script_path, script_name, script_exec_mode, args={}) - 来自common.py的实现
         
     返回:
         bool: 执行成功返回True,否则返回False
     """
     # 第一种调用方式 - 原始函数实现
-    if script_name and table_name and execution_mode is not None and script_path is None and script_exec_mode is None:
+    if script_name and table_name and update_mode is not None and script_path is None and script_exec_mode is None:
         if not script_name:
             logger.error("未提供脚本名称,无法执行")
             return False
@@ -50,7 +50,7 @@ def execute_script(script_name=None, table_name=None, execution_mode=None, scrip
             # 使用标准入口函数run
             if hasattr(module, "run"):
                 logger.info(f"执行脚本 {script_name} 的标准入口函数 run()")
-                module.run(table_name=table_name, execution_mode=execution_mode)
+                module.run(table_name=table_name, update_mode=update_mode)
                 return True
             else:
                 logger.warning(f"脚本 {script_name} 未定义标准入口函数 run(),无法执行")
@@ -66,10 +66,10 @@ def execute_script(script_name=None, table_name=None, execution_mode=None, scrip
             # 第二种调用方式 - 显式提供所有参数
             if args is None:
                 args = {}
-        elif script_name and table_name and execution_mode is not None:
+        elif script_name and table_name and update_mode is not None:
             # 第二种调用方式 - 但使用第一种调用方式的参数名
             script_path = os.path.join(SCRIPTS_BASE_PATH, f"{script_name}.py")
-            script_exec_mode = execution_mode
+            script_exec_mode = update_mode
             args = {"table_name": table_name}
         else:
             logger.error("参数不正确,无法执行脚本")
@@ -105,7 +105,7 @@ def execute_script(script_name=None, table_name=None, execution_mode=None, scrip
             # 执行脚本
             if table_name is not None:
                 # 使用table_name参数调用
-                exec_result = module.run(table_name=table_name, execution_mode=script_exec_mode)
+                exec_result = module.run(table_name=table_name, update_mode=script_exec_mode)
             else:
                 # 使用script_exec_mode和args调用
                 exec_result = module.run(script_exec_mode, args)
@@ -160,10 +160,19 @@ def get_dependency_resource_tables(enabled_tables: list) -> list:
 
 # 从 PostgreSQL 获取启用的表,按调度频率 daily/weekly/monthly 过滤
 def get_enabled_tables(frequency: str) -> list:
+    """
+    从PostgreSQL获取启用的表,按调度频率daily/weekly/monthly过滤
+    
+    参数:
+        frequency (str): 调度频率,如daily, weekly, monthly
+        
+    返回:
+        list: 包含表名和执行模式的列表
+    """
     conn = get_pg_conn()
     cursor = conn.cursor()
     cursor.execute("""
-        SELECT table_name, execution_mode
+        SELECT table_name, update_mode
         FROM table_schedule
         WHERE is_enabled = TRUE AND schedule_frequency = %s
     """, (frequency,))
@@ -173,7 +182,7 @@ def get_enabled_tables(frequency: str) -> list:
 
     output = []
     for r in result:
-        output.append({"table_name": r[0], "execution_mode": r[1]})
+        output.append({"table_name": r[0], "update_mode": r[1]})
     return output
 
 # 判断给定表名是否是 Neo4j 中的 DataResource 类型
@@ -293,66 +302,55 @@ def check_script_exists(script_name):
             
         return False, script_path_str
 
-def run_model_script(table_name, execution_mode):
+def run_model_script(table_name, update_mode):
     """
-    根据表名查找并执行对应的模型脚本
+    执行与表关联的脚本
     
     参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-    
+        table_name (str): 表名
+        update_mode (str): 更新模式,如append, full_refresh等
+        
     返回:
         bool: 执行成功返回True,否则返回False
-        
-    抛出:
-        AirflowFailException: 如果脚本不存在或执行失败
     """
-    # 从Neo4j获取脚本名称
-    script_name = get_script_name_from_neo4j(table_name)
-    if not script_name:
-        error_msg = f"未找到表 {table_name} 的脚本名称,任务失败"
-        logger.error(error_msg)
-        raise AirflowFailException(error_msg)
+    logger.info(f"执行表 {table_name} 关联的脚本")
     
-    logger.info(f"从Neo4j获取到表 {table_name} 的脚本名称: {script_name}")
+    # 检查表类型
+    is_model = is_data_model_table(table_name)
     
-    # 检查脚本文件是否存在
-    exists, script_path = check_script_exists(script_name)
-    if not exists:
-        error_msg = f"表 {table_name} 的脚本文件 {script_name} 不存在,任务失败"
-        logger.error(error_msg)
-        raise AirflowFailException(error_msg)
-    
-    # 执行脚本
-    logger.info(f"开始执行脚本: {script_path}")
-    try:
-        # 动态导入模块
-        import importlib.util
-        import sys
+    if is_model:
+        # 从Neo4j获取脚本名称
+        script_name = get_script_name_from_neo4j(table_name)
+        if not script_name:
+            logger.error(f"未找到表 {table_name} 关联的脚本")
+            return False
+            
+        logger.info(f"查询到表 {table_name} 关联的脚本: {script_name}")
         
-        spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
-        module = importlib.util.module_from_spec(spec)
-        spec.loader.exec_module(module)
+        # 检查脚本文件是否存在
+        script_exists, script_path = check_script_exists(script_name)
         
-        # 检查并调用标准入口函数run
-        if hasattr(module, "run"):
-            logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
-            module.run(table_name=table_name, execution_mode=execution_mode)
-            logger.info(f"脚本 {script_name} 执行成功")
-            return True
-        else:
-            error_msg = f"脚本 {script_name} 中未定义标准入口函数 run(),任务失败"
-            logger.error(error_msg)
-            raise AirflowFailException(error_msg)
-    except AirflowFailException:
-        # 直接重新抛出Airflow异常
-        raise
-    except Exception as e:
-        error_msg = f"执行脚本 {script_name} 时出错: {str(e)}"
-        logger.error(error_msg)
-        import traceback
-        logger.error(traceback.format_exc())
-        raise AirflowFailException(error_msg)
+        if not script_exists:
+            logger.error(f"脚本文件 {script_name} 不存在")
+            return False
+        
+        logger.info(f"脚本文件路径: {script_path}")
+        
+        # 执行脚本
+        try:
+            # 包含PY扩展名时,确保使用完整文件名
+            if not script_name.endswith('.py'):
+                script_name = f"{script_name}.py"
+                
+            return execute_script(script_name=script_name, table_name=table_name, update_mode=update_mode)
+        except Exception as e:
+            logger.error(f"执行脚本 {script_name} 时发生错误: {str(e)}")
+            import traceback
+            logger.error(traceback.format_exc())
+            return False
+    else:
+        logger.warning(f"表 {table_name} 不是DataModel类型,跳过脚本执行")
+        return True
 
 
 def get_model_dependency_graph(table_names: list) -> dict:
@@ -637,7 +635,7 @@ def create_task_dict(optimized_table_order, model_tables, dag, execution_type, *
                 task_params = {
                     "task_id": f"process_{execution_type}_{table_name}",
                     "python_callable": run_model_script,
-                    "op_kwargs": {"table_name": table_name, "execution_mode": table_config['execution_mode']},
+                    "op_kwargs": {"table_name": table_name, "update_mode": table_config['update_mode']},
                     "dag": dag
                 }
                 
@@ -911,78 +909,96 @@ def update_task_completion(exec_date, target_table, script_name, success, end_ti
         logger.info("数据库连接已关闭")
         logger.info("===== 更新任务完成信息完成 =====")
 
-def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_date, **kwargs):
-    """执行脚本并监控执行情况"""
-
-    # 添加详细日志
-    logger.info(f"===== 开始监控执行 =====")
-    logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
-    logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
-    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
-    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
-
-    # 检查script_name是否为空
-    if not script_name:
-        logger.error(f"表 {target_table} 的script_name为空,无法执行")
-        # 记录执行失败
-        now = datetime.now()
-        update_task_completion(exec_date, target_table, script_name or "", False, now, 0)
-        return False
-    # 记录执行开始时间
+def execute_with_monitoring(target_table, script_name, update_mode, exec_date, **kwargs):
+    """
+    执行脚本并监控执行状态,更新到airflow_exec_plans表
+    
+    参数:
+        target_table: 目标表名
+        script_name: 脚本名称
+        update_mode: 更新模式(append/full_refresh)
+        exec_date: 执行日期
+        **kwargs: 其他参数
+        
+    返回:
+        bool: 执行成功返回True,否则返回False
+    """
+    conn = None
     start_time = datetime.now()
     
-    # 尝试更新开始时间并记录结果
     try:
+        # 记录任务开始执行
         update_task_start_time(exec_date, target_table, script_name, start_time)
-        logger.info(f"成功更新任务开始时间: {start_time}")
-    except Exception as e:
-        logger.error(f"更新任务开始时间失败: {str(e)}")
-    
-    try:
-        # 执行实际脚本
-        logger.info(f"开始执行脚本: {script_name}")
-        result = execute_script(script_name, target_table, script_exec_mode)
-        logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
         
-        # 确保result是布尔值
-        if result is None:
-            logger.warning(f"脚本返回值为None,转换为False")
-            result = False
-        elif not isinstance(result, bool):
-            original_result = result
-            result = bool(result)
-            logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
+        # 执行脚本
+        script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
+        
+        # 构建执行参数
+        exec_kwargs = {
+            "table_name": target_table,
+            "update_mode": update_mode,
+            "exec_date": exec_date,
+        }
+        
+        # 添加其他传入的参数
+        exec_kwargs.update(kwargs)
+        
+        # 检查脚本是否存在
+        if not os.path.exists(script_path):
+            logger.error(f"脚本文件不存在: {script_path}")
+            success = False
+        else:
+            # 执行脚本
+            try:
+                # 动态导入模块
+                import importlib.util
+                import sys
+                
+                spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
+                module = importlib.util.module_from_spec(spec)
+                spec.loader.exec_module(module)
+                
+                # 检查并调用标准入口函数run
+                if hasattr(module, "run"):
+                    logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
+                    result = module.run(**exec_kwargs)
+                    success = bool(result)  # 确保结果是布尔类型
+                else:
+                    logger.error(f"脚本 {script_name} 中未定义标准入口函数 run()")
+                    success = False
+            except Exception as e:
+                logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
+                import traceback
+                logger.error(traceback.format_exc())
+                success = False
         
-        # 记录结束时间和结果
+        # 记录结束时间
         end_time = datetime.now()
+        
+        # 计算执行时间
         duration = (end_time - start_time).total_seconds()
         
-        # 尝试更新完成状态并记录结果
-        try:
-            logger.info(f"尝试更新完成状态: result={result}, end_time={end_time}, duration={duration}")
-            update_task_completion(exec_date, target_table, script_name, result, end_time, duration)
-            logger.info(f"成功更新任务完成状态,结果: {result}")
-        except Exception as e:
-            logger.error(f"更新任务完成状态失败: {str(e)}")
+        # 更新任务执行结果
+        update_task_completion(exec_date, target_table, script_name, success, end_time, duration)
         
-        logger.info(f"===== 监控执行完成 =====")
-        return result
+        return success
     except Exception as e:
-        # 处理异常
-        logger.error(f"执行任务出错: {str(e)}")
+        # 记录结束时间
         end_time = datetime.now()
+        
+        # 计算执行时间
         duration = (end_time - start_time).total_seconds()
         
-        # 尝试更新失败状态并记录结果
+        # 更新任务执行失败
         try:
-            logger.info(f"尝试更新失败状态: end_time={end_time}, duration={duration}")
             update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
-            logger.info(f"成功更新任务失败状态")
-        except Exception as update_e:
-            logger.error(f"更新任务失败状态失败: {str(update_e)}")
+        except Exception as update_err:
+            logger.error(f"更新任务状态失败: {str(update_err)}")
         
-        logger.info(f"===== 监控执行异常结束 =====")
-        raise e
+        logger.error(f"执行脚本 {script_name} 发生未处理的异常: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        return False
 
 def ensure_boolean_result(func):
     """装饰器:确保函数返回布尔值"""
@@ -1032,4 +1048,194 @@ def get_cn_exec_date(logical_date):
     # 获取逻辑执行日期
     local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
     exec_date = local_logical_date.strftime('%Y-%m-%d')
-    return exec_date, local_logical_date
+    return exec_date, local_logical_date
+
+def get_table_label(table_name):
+    """确定表的标签类型(DataModel or DataResource)"""
+    driver = GraphDatabase.driver(
+        NEO4J_CONFIG['uri'], 
+        auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
+    )
+    query = """
+        MATCH (n {en_name: $table_name})
+        RETURN labels(n) AS labels
+    """
+    try:
+        with driver.session() as session:
+            result = session.run(query, table_name=table_name)
+            record = result.single()
+            if record and record.get("labels"):
+                labels = record.get("labels")
+                if "DataModel" in labels:
+                    return "DataModel"
+                elif "DataResource" in labels:
+                    return "DataResource"
+                elif "DataSource" in labels:
+                    return "DataSource"
+            return None
+    except Exception as e:
+        logger.error(f"获取表 {table_name} 的标签时出错: {str(e)}")
+        return None
+    finally:
+        driver.close()
+
+def get_complete_script_info(script_name=None, target_table=None):
+    """
+    一次性从Neo4j获取脚本和表的完整信息,包括update_mode, schedule_frequency等
+    
+    参数:
+        script_name (str, optional): 脚本名称
+        target_table (str): 目标表名
+        
+    返回:
+        dict: 包含完整脚本信息的字典
+    """
+    if not target_table:
+        return None
+        
+    logger.info(f"从Neo4j获取表 {target_table} 的完整信息")
+    
+    # 连接Neo4j
+    driver = GraphDatabase.driver(
+        NEO4J_CONFIG['uri'], 
+        auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
+    )
+    
+    # 获取表的标签类型
+    table_label = get_table_label(target_table)
+    
+    script_info = {
+        'script_name': script_name,
+        'target_table': target_table,
+        'script_id': f"{script_name.replace('.', '_') if script_name else ''}_{target_table}",
+        'target_table_label': table_label,
+        'source_tables': [],
+        'script_type': 'python_script',  # 默认类型
+        'update_mode': 'append',  # 默认更新模式
+        'schedule_frequency': 'daily',  # 默认调度频率
+        'schedule_status': 'enabled'  # 默认调度状态
+    }
+    
+    try:
+        with driver.session() as session:
+            # 检查是否为structure类型的DataResource
+            if table_label == 'DataResource':
+                query_structure = """
+                    MATCH (n:DataResource {en_name: $table_name})
+                    RETURN n.type AS type, n.storage_location AS storage_location, 
+                           n.schedule_frequency AS schedule_frequency,
+                           n.update_mode AS update_mode,
+                           n.schedule_status AS schedule_status
+                """
+                
+                result = session.run(query_structure, table_name=target_table)
+                record = result.single()
+                
+                if record and record.get("type") == "structure":
+                    logger.info(f"表 {target_table} 是structure类型的DataResource")
+                    
+                    # 设置特殊属性
+                    script_info['target_type'] = 'structure'
+                    
+                    # 从节点属性获取信息
+                    if record.get("storage_location"):
+                        script_info['storage_location'] = record.get("storage_location")
+                    
+                    # 获取调度频率
+                    if record.get("schedule_frequency"):
+                        script_info['schedule_frequency'] = record.get("schedule_frequency")
+                    
+                    # 获取更新模式
+                    if record.get("update_mode"):
+                        script_info['update_mode'] = record.get("update_mode")
+                    
+                    # 获取调度状态
+                    if record.get("schedule_status"):
+                        script_info['schedule_status'] = record.get("schedule_status")
+                    
+                    # 如果没有指定脚本名称或指定的是default,则设置为load_file.py
+                    if not script_name or script_name.lower() == 'default' or script_name == 'load_file.py':
+                        script_info['script_name'] = 'load_file.py'
+                        script_info['script_id'] = f"load_file_py_{target_table}"
+                        return script_info
+            
+            # 非structure类型,或structure类型但有指定脚本名称
+            # 根据表标签类型查询脚本信息和依赖关系
+            if script_info['target_table_label'] == 'DataModel':
+                # 查询DataModel的所有属性和依赖
+                query = """
+                    MATCH (target:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
+                    RETURN source.en_name AS source_table, 
+                           rel.script_name AS script_name, 
+                           rel.script_type AS script_type,
+                           rel.update_mode AS update_mode,
+                           rel.schedule_frequency AS schedule_frequency,
+                           rel.schedule_status AS schedule_status
+                """
+                result = session.run(query, table_name=target_table)
+                
+                for record in result:
+                    source_table = record.get("source_table")
+                    db_script_name = record.get("script_name")
+                    
+                    # 验证脚本名称匹配或未指定脚本名称
+                    if not script_name or (db_script_name and db_script_name == script_name):
+                        if source_table and source_table not in script_info['source_tables']:
+                            script_info['source_tables'].append(source_table)
+                        
+                        # 只在匹配脚本名称时更新这些属性
+                        if db_script_name and db_script_name == script_name:
+                            # 更新脚本信息
+                            script_info['script_type'] = record.get("script_type", script_info['script_type'])
+                            script_info['update_mode'] = record.get("update_mode", script_info['update_mode'])
+                            script_info['schedule_frequency'] = record.get("schedule_frequency", script_info['schedule_frequency'])
+                            script_info['schedule_status'] = record.get("schedule_status", script_info['schedule_status'])
+                            
+                            # 如果未指定脚本名称,则使用查询到的脚本名称
+                            if not script_info['script_name'] and db_script_name:
+                                script_info['script_name'] = db_script_name
+                                script_info['script_id'] = f"{db_script_name.replace('.', '_')}_{target_table}"
+            
+            elif script_info['target_table_label'] == 'DataResource':
+                # 查询DataResource的所有属性和依赖
+                query = """
+                    MATCH (target:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
+                    RETURN source.en_name AS source_table, 
+                           rel.script_name AS script_name, 
+                           rel.script_type AS script_type,
+                           rel.update_mode AS update_mode,
+                           rel.schedule_frequency AS schedule_frequency,
+                           rel.schedule_status AS schedule_status
+                """
+                result = session.run(query, table_name=target_table)
+                
+                for record in result:
+                    source_table = record.get("source_table")
+                    db_script_name = record.get("script_name")
+                    
+                    # 验证脚本名称匹配或未指定脚本名称
+                    if not script_name or (db_script_name and db_script_name == script_name):
+                        if source_table and source_table not in script_info['source_tables']:
+                            script_info['source_tables'].append(source_table)
+                        
+                        # 只在匹配脚本名称时更新这些属性
+                        if db_script_name and db_script_name == script_name:
+                            # 更新脚本信息
+                            script_info['script_type'] = record.get("script_type", script_info['script_type'])
+                            script_info['update_mode'] = record.get("update_mode", script_info['update_mode'])
+                            script_info['schedule_frequency'] = record.get("schedule_frequency", script_info['schedule_frequency'])
+                            script_info['schedule_status'] = record.get("schedule_status", script_info['schedule_status'])
+                            
+                            # 如果未指定脚本名称,则使用查询到的脚本名称
+                            if not script_info['script_name'] and db_script_name:
+                                script_info['script_name'] = db_script_name
+                                script_info['script_id'] = f"{db_script_name.replace('.', '_')}_{target_table}"
+    
+    except Exception as e:
+        logger.error(f"从Neo4j获取表 {target_table} 的信息时出错: {str(e)}")
+    finally:
+        if driver:
+            driver.close()
+    
+    logger.info(f"获取到完整脚本信息: {script_info}")
+    return script_info

+ 6 - 6
dataops_scripts/book_sale_amt_monthly_process.py

@@ -42,29 +42,29 @@ def process_monthly_book_sales():
         logger.error(f"处理月度图书销售额时出错: {str(e)}")
         return False
 
-def run(table_name, execution_mode, **kwargs):
+def run(table_name, update_mode='append', **kwargs):
     """
     统一入口函数,符合Airflow动态脚本调用规范
     
     参数:
         table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
+        update_mode (str): 更新模式 (append/full_refresh)
         **kwargs: 其他可能的参数
     
     返回:
         bool: 执行成功返回True,否则返回False
     """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
+    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {update_mode}")
     
     # 获取当前脚本的文件名
     script_name = os.path.basename(__file__)
     
     # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
+    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {update_mode}")
     logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
     
     # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
+    if update_mode == "full_refresh":
         logger.info("执行完全刷新模式 - 将处理所有历史数据")
     else:  # append
         logger.info("执行增量模式 - 只处理最新数据")
@@ -79,4 +79,4 @@ def run(table_name, execution_mode, **kwargs):
 
 if __name__ == "__main__":
     # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_monthly", execution_mode="append") 
+    run(table_name="book_sale_amt_monthly", update_mode="append") 

+ 39 - 58
dataops_scripts/execution_python.py

@@ -3,7 +3,7 @@
 import sys
 import os
 import logging
-from datetime import datetime
+from datetime import datetime, timedelta
 import psycopg2
 import textwrap
 from airflow.exceptions import AirflowException
@@ -16,68 +16,38 @@ logging.basicConfig(
 )
 logger = logging.getLogger("execution_python")
 
-# 将同级目录加入到Python搜索路径
+# 明确地将当前目录添加到Python模块搜索路径的最前面,确保优先从当前目录导入
 current_dir = os.path.dirname(os.path.abspath(__file__))
 if current_dir not in sys.path:
-    sys.path.append(current_dir)
+    sys.path.insert(0, current_dir)
+    logger.info(f"将当前目录添加到Python模块搜索路径: {current_dir}")
 
-# 尝试导入script_utils,使用多级导入策略
+# 直接尝试导入script_utils
 try:
     import script_utils
-    logger.info("成功导入script_utils模块")
+    from script_utils import get_pg_config
+    logger.info(f"成功导入script_utils模块,模块路径: {script_utils.__file__}")
 except ImportError as e:
-    logger.error(f"无法直接导入script_utils: {str(e)}")
-    
-    # 尝试备用方法1:完整路径导入
-    try:
-        sys.path.append(os.path.dirname(current_dir))  # 添加父目录
-        import dataops.scripts.script_utils as script_utils
-        logger.info("使用完整路径成功导入script_utils模块")
-    except ImportError as e2:
-        logger.error(f"使用完整路径导入失败: {str(e2)}")
-        
-        # 尝试备用方法2:动态导入
-        try:
-            import importlib.util
-            script_utils_path = os.path.join(current_dir, "script_utils.py")
-            logger.info(f"尝试从路径动态导入: {script_utils_path}")
-            
-            spec = importlib.util.spec_from_file_location("script_utils", script_utils_path)
-            script_utils = importlib.util.module_from_spec(spec)
-            spec.loader.exec_module(script_utils)
-            logger.info("通过动态导入成功加载script_utils模块")
-        except Exception as e3:
-            logger.error(f"动态导入也失败: {str(e3)}")
-            raise ImportError(f"无法导入script_utils模块,所有方法都失败")
+    logger.error(f"导入script_utils模块失败: {str(e)}")
+    logger.error(f"当前Python模块搜索路径: {sys.path}")
+    logger.error(f"当前目录内容: {os.listdir(current_dir)}")
+    raise ImportError(f"无法导入script_utils模块,请确保script_utils.py在当前目录下")
 
-# 动态导入 config
-def get_config():
-    """
-    从config模块导入配置
-    
-    返回:
-        dict: PG_CONFIG 数据库连接配置
-    """
+# 使用script_utils中的方法获取配置
+try:
+    # 获取PostgreSQL配置
+    PG_CONFIG = get_pg_config()
+    logger.info(f"通过script_utils.get_pg_config()获取PostgreSQL配置成功")
+except Exception as e:
+    logger.error(f"获取配置失败,使用默认值: {str(e)}")
     # 默认配置
-    default_pg_config = {
+    PG_CONFIG = {
         "host": "localhost",
         "port": 5432,
         "user": "postgres",
         "password": "postgres",
         "database": "dataops"
     }
-    try:
-        config = __import__('config')
-        logger.info("从config模块直接导入配置")
-        pg_config = getattr(config, 'PG_CONFIG', default_pg_config)
-        return pg_config
-    except ImportError:
-        logger.warning("未找到 config.py,使用默认数据库配置")
-        return default_pg_config
-
-# 导入配置
-PG_CONFIG = get_config()
-logger.info(f"配置加载完成: 数据库连接={PG_CONFIG['host']}:{PG_CONFIG['port']}/{PG_CONFIG['database']}")
 
 def get_pg_conn():
     """获取PostgreSQL连接"""
@@ -261,17 +231,28 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None, f
         
         # 日期计算
         try:
+            # 直接使用script_utils.get_date_range计算日期范围
+            logger.info(f"使用script_utils.get_date_range计算日期范围,参数: exec_date={exec_date}, frequency={frequency}")
             start_date, end_date = script_utils.get_date_range(exec_date, frequency)
             logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
         except Exception as date_err:
             logger.error(f"日期处理失败: {str(date_err)}", exc_info=True)
-            return False
+            # 使用简单的默认日期范围计算
+            date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
+            if frequency.lower() == 'daily':
+                start_date = date_obj.strftime('%Y-%m-%d')
+                end_date = (date_obj + timedelta(days=1)).strftime('%Y-%m-%d')
+            else:
+                # 对其他频率使用默认范围
+                start_date = exec_date
+                end_date = (date_obj + timedelta(days=30)).strftime('%Y-%m-%d')
+            logger.warning(f"使用默认日期范围计算: start_date={start_date}, end_date={end_date}")
         
         # 检查是否开启ETL幂等性
         target_table_label = kwargs.get('target_table_label', '')
-        script_exec_mode = kwargs.get('execution_mode', 'append')  # 默认为append
+        script_exec_mode = kwargs.get('update_mode', 'append')  # 只使用update_mode
         
-        logger.info(f"脚本执行模式: {script_exec_mode}")
+        logger.info(f"脚本更新模式: {script_exec_mode}")
         
         # 导入config模块获取幂等性开关
         try:
@@ -338,7 +319,7 @@ WHERE {target_dt_column} >= '{start_date}'
                     logger.warning("TRUNCATE失败,继续执行原始Python脚本")
             
             else:
-                logger.info(f"当前执行模式 {script_exec_mode} 不是append或full_refresh,不执行幂等性处理")
+                logger.info(f"当前更新模式 {script_exec_mode} 不是append或full_refresh,不执行幂等性处理")
         else:
             logger.info("未开启ETL幂等性,直接执行Python脚本")
 
@@ -410,16 +391,16 @@ WHERE {target_dt_column} >= '{start_date}'
 
 if __name__ == "__main__":
     import argparse
-    parser = argparse.ArgumentParser(description='执行Python脚本片段')
+    parser = argparse.ArgumentParser(description='执行Python脚本')
     parser.add_argument('--target-table', type=str, required=True, help='目标表名')
     parser.add_argument('--script-name', type=str, required=True, help='脚本名称')
     parser.add_argument('--exec-date', type=str, required=True, help='执行日期 (YYYY-MM-DD)')
     parser.add_argument('--frequency', type=str, required=True, 
                         choices=['daily', 'weekly', 'monthly', 'quarterly', 'yearly'], 
                         help='频率: daily, weekly, monthly, quarterly, yearly')
-    parser.add_argument('--execution-mode', type=str, default='append', 
-                        choices=['append', 'full_refresh'], 
-                        help='执行模式: append(追加), full_refresh(全量刷新)')
+    parser.add_argument('--update-mode', type=str, default='append',
+                       choices=['append', 'full_refresh'],
+                       help='更新模式: append(追加), full_refresh(全量刷新)')
     
     args = parser.parse_args()
 
@@ -429,7 +410,7 @@ if __name__ == "__main__":
         "script_name": args.script_name,
         "exec_date": args.exec_date,
         "frequency": args.frequency,
-        "execution_mode": args.execution_mode
+        "update_mode": args.update_mode
     }
     
     logger.info("命令行测试执行参数: " + str(run_kwargs))

+ 19 - 33
dataops_scripts/execution_sql.py

@@ -29,15 +29,16 @@ if current_dir not in sys.path:
 # 尝试导入script_utils
 try:
     import script_utils
-    logger.info("成功导入script_utils模块")
+    from script_utils import get_pg_config, logger as utils_logger
+    logger.info("成功导入script_utils模块的get_pg_config方法")
 except ImportError as e:
-    logger.error(f"无法直接导入script_utils: {str(e)}")
+    logger.error(f"无法直接导入script_utils方法: {str(e)}")
     
     # 尝试备用方法1:完整路径导入
     try:
         sys.path.append(os.path.dirname(current_dir))  # 添加父目录
-        import dataops_scripts.script_utils as script_utils
-        logger.info("使用完整路径成功导入script_utils模块")
+        from dataops_scripts.script_utils import get_pg_config
+        logger.info("使用完整路径成功导入script_utils模块的方法")
     except ImportError as e2:
         logger.error(f"使用完整路径导入失败: {str(e2)}")
         
@@ -50,42 +51,27 @@ except ImportError as e:
             spec = importlib.util.spec_from_file_location("script_utils", script_utils_path)
             script_utils = importlib.util.module_from_spec(spec)
             spec.loader.exec_module(script_utils)
-            logger.info("通过动态导入成功加载script_utils模块")
+            get_pg_config = script_utils.get_pg_config
+            logger.info("通过动态导入成功加载script_utils模块的方法")
         except Exception as e3:
             logger.error(f"动态导入也失败: {str(e3)}")
-            raise ImportError(f"无法导入script_utils模块,所有方法都失败")
+            raise ImportError(f"无法导入script_utils模块的方法,所有方法都失败")
 
-# 添加健壮的导入机制
-def get_config():
-    """
-    从config模块导入配置
-    
-    返回:
-        dict: PG_CONFIG 数据库连接配置
-    """
+# 使用script_utils中的方法获取配置
+try:
+    # 获取PostgreSQL配置
+    PG_CONFIG = get_pg_config()
+    logger.info(f"通过script_utils.get_pg_config()获取PostgreSQL配置成功")
+except Exception as e:
+    logger.error(f"获取配置失败,使用默认值: {str(e)}")
     # 默认配置
-    default_pg_config = {
+    PG_CONFIG = {
         "host": "localhost",
         "port": 5432,
         "user": "postgres",
         "password": "postgres",
         "database": "dataops",
     }
-    
-    try:
-        # 动态导入,避免IDE警告
-        config = __import__('config')
-        logger.info("从config模块直接导入配置")
-        pg_config = getattr(config, 'PG_CONFIG', default_pg_config)
-        return pg_config
-    except ImportError:
-        # 使用默认配置
-        logger.warning("无法导入config模块,使用默认值")
-        return default_pg_config
-
-# 导入配置
-PG_CONFIG = get_config()
-logger.info(f"配置加载完成: 数据库连接={PG_CONFIG['host']}:{PG_CONFIG['port']}/{PG_CONFIG['database']}")
 
 def get_pg_conn():
     """获取PostgreSQL连接"""
@@ -324,9 +310,9 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None,
         
         # 检查是否开启ETL幂等性
         target_table_label = kwargs.get('target_table_label', '')
-        script_exec_mode = kwargs.get('execution_mode', 'append')  # 默认为append
+        script_exec_mode = kwargs.get('update_mode', 'append')  # 只使用update_mode
         
-        logger.info(f"脚本执行模式: {script_exec_mode}")
+        logger.info(f"脚本更新模式: {script_exec_mode}")
         
         # 导入config模块获取幂等性开关
         try:
@@ -403,7 +389,7 @@ WHERE {target_dt_column} >= '{{{{ start_date }}}}'
                     logger.warning("TRUNCATE失败,继续执行原始SQL")
             
             else:
-                logger.info(f"当前执行模式 {script_exec_mode} 不是append或full_refresh,不执行幂等性处理")
+                logger.info(f"当前更新模式 {script_exec_mode} 不是append或full_refresh,不执行幂等性处理")
         else:
             logger.info("未满足ETL幂等性处理条件,直接执行原始SQL")
         

+ 40 - 51
dataops_scripts/load_file.py

@@ -10,6 +10,17 @@ import csv
 import glob
 import shutil
 import re
+import argparse
+
+# 修改Python导入路径,确保能找到同目录下的script_utils模块
+current_dir = os.path.dirname(os.path.abspath(__file__))
+if current_dir not in sys.path:
+    sys.path.insert(0, current_dir)
+
+# 先导入整个模块,确保script_utils对象在全局作用域可用
+import script_utils
+# 再导入具体的方法
+from script_utils import get_pg_config, get_upload_paths, logger as utils_logger
 
 # 配置日志记录器
 logging.basicConfig(
@@ -22,41 +33,25 @@ logging.basicConfig(
 
 logger = logging.getLogger("load_file")
 
-# 添加健壮的导入机制
-def get_config():
-    """
-    从config模块导入配置
-    
-    返回:
-        tuple: (PG_CONFIG, STRUCTURE_UPLOAD_BASE_PATH, STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH)
-    """
+# 使用script_utils中的方法获取配置
+try:
+    # 获取PostgreSQL配置
+    PG_CONFIG = get_pg_config()
+    # 获取上传和归档路径
+    STRUCTURE_UPLOAD_BASE_PATH, STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH = get_upload_paths()
+    logger.info(f"通过script_utils获取配置成功: 上传路径={STRUCTURE_UPLOAD_BASE_PATH}, 归档路径={STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH}")
+except Exception as e:
+    logger.error(f"获取配置失败,使用默认值: {str(e)}")
     # 默认配置
-    default_pg_config = {
+    PG_CONFIG = {
         "host": "localhost",
         "port": 5432,
         "user": "postgres",
         "password": "postgres",
         "database": "dataops",
     }
-    default_upload_path = '/tmp/uploads'
-    default_archive_path = '/tmp/uploads/archive'
-    
-    try:
-        # 动态导入,避免IDE警告
-        config = __import__('config')
-        logger.info("从config模块直接导入配置")
-        pg_config = getattr(config, 'PG_CONFIG', default_pg_config)
-        upload_path = getattr(config, 'STRUCTURE_UPLOAD_BASE_PATH', default_upload_path)
-        archive_path = getattr(config, 'STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH', default_archive_path)
-        return pg_config, upload_path, archive_path
-    except ImportError:
-        # 使用默认配置
-        logger.warning("无法导入config模块,使用默认值")
-        return default_pg_config, default_upload_path, default_archive_path
-
-# 导入配置
-PG_CONFIG, STRUCTURE_UPLOAD_BASE_PATH, STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH = get_config()
-logger.info(f"配置加载完成: 上传路径={STRUCTURE_UPLOAD_BASE_PATH}, 归档路径={STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH}")
+    STRUCTURE_UPLOAD_BASE_PATH = '/tmp/uploads'
+    STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH = '/tmp/uploads/archive'
 
 def get_pg_conn():
     """获取PostgreSQL连接"""
@@ -302,14 +297,14 @@ def load_dataframe_to_table(df, file_path, table_name):
         if conn:
             conn.close()
 
-def load_file_to_table(file_path, table_name, execution_mode='append'):
+def load_file_to_table(file_path, table_name, update_mode='append'):
     """
-    根据文件类型,加载文件数据目标
+    加载文件到表,支持不同的文件类型
     
     参数:
         file_path (str): 文件路径
         table_name (str): 目标表名
-        execution_mode (str): 执行模式,'append'或'full_refresh'
+        update_mode (str): 更新模式,'append'或'full_refresh'
     
     返回:
         bool: 成功返回True,失败返回False
@@ -338,7 +333,7 @@ def load_file_to_table(file_path, table_name, execution_mode='append'):
         logger.error(f"处理文件 {file_path} 时发生意外错误", exc_info=True)
         return False
 
-def run(table_name, execution_mode='append', exec_date=None, target_type=None, 
+def run(table_name, update_mode='append', exec_date=None, target_type=None, 
         storage_location=None, frequency=None, script_name=None, **kwargs):
     """
     统一入口函数,支持通配符路径,处理并归档文件
@@ -347,10 +342,10 @@ def run(table_name, execution_mode='append', exec_date=None, target_type=None,
         script_name = os.path.basename(__file__)
 
     # 修正之前的日志记录格式错误
-    exec_mode_str = '全量刷新' if execution_mode == 'full_refresh' else '增量追加'
-    logger.info(f"===== 开始执行 {script_name} ({exec_mode_str}) =====")
+    update_mode_str = '全量刷新' if update_mode == 'full_refresh' else '增量追加'
+    logger.info(f"===== 开始执行 {script_name} ({update_mode_str}) =====")
     logger.info(f"表名: {table_name}")
-    logger.info(f"执行模式: {execution_mode}")
+    logger.info(f"更新模式: {update_mode}")
     logger.info(f"执行日期: {exec_date}")
     logger.info(f"目标类型: {target_type}")
     logger.info(f"资源类型: {target_type}, 文件相对路径模式: {storage_location}")
@@ -435,7 +430,7 @@ def run(table_name, execution_mode='append', exec_date=None, target_type=None,
     logger.info(f"总共找到 {len(found_files)} 个匹配文件: {found_files}")
 
     # 如果是全量刷新,在处理任何文件前清空表
-    if execution_mode == 'full_refresh':
+    if update_mode == 'full_refresh':
         conn = None
         cursor = None
         try:
@@ -547,29 +542,23 @@ def run(table_name, execution_mode='append', exec_date=None, target_type=None,
 
 if __name__ == "__main__":
     # 直接执行时的测试代码
-    import argparse
-    
-    parser = argparse.ArgumentParser(description='从CSV或Excel文件加载数据到表(支持通配符)')
+    parser = argparse.ArgumentParser(description='加载文件到表')
     parser.add_argument('--table', type=str, required=True, help='目标表名')
-    parser.add_argument('--pattern', type=str, required=True, help='文件查找模式 (相对于基准上传路径的相对路径,例如: data/*.csv 或 data/*.xlsx 或 data/*)')
-    parser.add_argument('--mode', type=str, default='append', choices=['append', 'full_refresh'], help='执行模式: append 或 full_refresh')
+    parser.add_argument('--file', type=str, required=True, help='文件路径')
+    parser.add_argument('--update-mode', type=str, default='append', choices=['append', 'full_refresh'], help='更新模式: append 或 full_refresh')
     
     args = parser.parse_args()
     
-    # 构造必要的 kwargs
-    run_kwargs = {
+    # 构建参数字典
+    run_args = {
         "table_name": args.table,
-        "execution_mode": args.mode,
-        "storage_location": args.pattern,
-        "target_type": 'structure',
-        "exec_date": datetime.now().strftime('%Y-%m-%d'),
-        "frequency": "manual",
-        "script_name": os.path.basename(__file__)
+        "update_mode": args.update_mode,
+        "storage_location": args.file
     }
 
-    logger.info("命令行测试执行参数: " + str(run_kwargs))
+    logger.info("命令行测试执行参数: " + str(run_args))
 
-    success = run(**run_kwargs)
+    success = run(**run_args)
     
     if success:
         print("文件加载任务执行完毕,所有文件处理成功。")

+ 9 - 9
dataops_scripts/load_file_test.py

@@ -16,7 +16,7 @@ logging.basicConfig(
 
 logger = logging.getLogger("load_file_mock") # 使用 mock 后缀以区分
 
-def mock_load_file(table_name=None, execution_mode='append', exec_date=None, 
+def mock_load_file(table_name=None, update_mode='append', exec_date=None, 
                    target_type=None, storage_location=None, frequency=None, script_name=None, **kwargs):
     """模拟加载文件数据,仅打印参数"""
     # 获取当前脚本的文件名(如果没有传入)
@@ -26,7 +26,7 @@ def mock_load_file(table_name=None, execution_mode='append', exec_date=None,
     # 打印所有传入的参数
     logger.info(f"===== 传入参数信息 (模拟处理函数内) =====")
     logger.info(f"table_name: {table_name}")
-    logger.info(f"execution_mode: {execution_mode}")
+    logger.info(f"update_mode: {update_mode}")
     logger.info(f"exec_date: {exec_date}")
     logger.info(f"target_type: {target_type}")
     logger.info(f"storage_location: {storage_location}")
@@ -46,8 +46,8 @@ def mock_load_file(table_name=None, execution_mode='append', exec_date=None,
         else:
             logger.info(f"模拟检查文件是否存在: {storage_location}")
 
-        logger.info(f"模拟执行模式: {execution_mode}")
-        if execution_mode == 'full_refresh':
+        logger.info(f"模拟更新模式: {update_mode}")
+        if update_mode == 'full_refresh':
             logger.info(f"模拟: 如果是全量刷新,将清空表 {table_name}")
         
         logger.info("模拟读取和处理文件...")
@@ -58,14 +58,14 @@ def mock_load_file(table_name=None, execution_mode='append', exec_date=None,
         logger.error(f"模拟加载文件时出错: {str(e)}")
         return False
 
-def run(table_name, execution_mode='append', exec_date=None, target_type=None, 
+def run(table_name, update_mode='append', exec_date=None, target_type=None, 
         storage_location=None, frequency=None, script_name=None, **kwargs):
     """
     统一入口函数,符合Airflow动态脚本调用规范 (模拟版本)
 
     参数:
         table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
+        update_mode (str): 更新模式 (append/full_refresh)
         exec_date: 执行日期
         target_type: 目标类型
         storage_location: 文件路径
@@ -79,7 +79,7 @@ def run(table_name, execution_mode='append', exec_date=None, target_type=None,
     # 打印所有传入的参数 (在入口函数再次打印,确保信息完整)
     logger.info(f"===== 传入参数信息 (入口函数 run 内) =====")
     logger.info(f"table_name: {table_name}")
-    logger.info(f"execution_mode: {execution_mode}")
+    logger.info(f"update_mode: {update_mode}")
     logger.info(f"exec_date: {exec_date} (类型: {type(exec_date)}) ")
     logger.info(f"target_type: {target_type}")
     logger.info(f"storage_location: {storage_location}")
@@ -101,7 +101,7 @@ def run(table_name, execution_mode='append', exec_date=None, target_type=None,
     # 调用实际处理函数 (模拟版本)
     result = mock_load_file(
         table_name=table_name,
-        execution_mode=execution_mode,
+        update_mode=update_mode,
         exec_date=exec_date,
         target_type=target_type,
         storage_location=storage_location,
@@ -121,7 +121,7 @@ if __name__ == "__main__":
     # 提供一些默认值以便直接运行脚本进行测试
     test_params = {
         "table_name": "sample_table",
-        "execution_mode": "full_refresh",
+        "update_mode": "full_refresh",
         "exec_date": datetime.now().strftime('%Y-%m-%d'),
         "target_type": "structure",
         "storage_location": "/path/to/mock/file.csv",

+ 122 - 0
dataops_scripts/script_utils.py

@@ -3,10 +3,28 @@
 # 这是dataops_scripts目录下的文件 - 用于验证路径修改成功
 import logging
 import sys
+import os
+
+# 添加父目录到Python路径,以便能导入dags目录下的config模块
+parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
+if parent_dir not in sys.path:
+    sys.path.insert(0, parent_dir)
+
+import importlib.util
 from datetime import datetime, timedelta
 import pytz
 import re  # 添加re模块以支持正则表达式
 
+# 导入Airflow相关包
+try:
+    from airflow.models import Variable
+except ImportError:
+    # 处理在非Airflow环境中运行的情况
+    class Variable:
+        @staticmethod
+        def get(key, default_var=None):
+            return default_var
+
 # 配置日志记录器
 logging.basicConfig(
     level=logging.INFO,
@@ -18,6 +36,110 @@ logging.basicConfig(
 
 logger = logging.getLogger("script_utils")
 
+def get_config_path():
+    """
+    从Airflow变量中获取DATAOPS_DAGS_PATH
+    
+    返回:
+        str: config.py的完整路径
+    """
+    try:
+        # 从Airflow变量中获取DATAOPS_DAGS_PATH
+        dags_path = Variable.get("DATAOPS_DAGS_PATH", "/opt/airflow/dags")
+        logger.info(f"从Airflow变量获取到DATAOPS_DAGS_PATH: {dags_path}")
+        
+        # 构建config.py的完整路径
+        config_path = os.path.join(dags_path, "config.py")
+        
+        if not os.path.exists(config_path):
+            logger.warning(f"配置文件路径不存在: {config_path}, 将使用默认路径")
+            # 尝试使用相对路径
+            alt_config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../dags/config.py"))
+            if os.path.exists(alt_config_path):
+                logger.info(f"使用替代配置路径: {alt_config_path}")
+                return alt_config_path
+        
+        return config_path
+    except Exception as e:
+        logger.error(f"获取配置路径时出错: {str(e)}")
+        # 使用默认路径
+        return os.path.abspath(os.path.join(os.path.dirname(__file__), "../dags/config.py"))
+
+def load_config_module():
+    """
+    动态加载config.py模块
+    
+    返回:
+        module: 加载的config模块
+    """
+    try:
+        config_path = get_config_path()
+        logger.info(f"正在加载配置文件: {config_path}")
+        
+        # 动态加载config.py模块
+        spec = importlib.util.spec_from_file_location("config", config_path)
+        config_module = importlib.util.module_from_spec(spec)
+        spec.loader.exec_module(config_module)
+        
+        return config_module
+    except Exception as e:
+        logger.error(f"加载配置模块时出错: {str(e)}")
+        raise ImportError(f"无法加载配置模块: {str(e)}")
+
+def get_pg_config():
+    """
+    从config.py获取PostgreSQL数据库配置
+    
+    返回:
+        dict: PostgreSQL配置字典
+    """
+    try:
+        config_module = load_config_module()
+        pg_config = getattr(config_module, "PG_CONFIG", None)
+        
+        if pg_config is None:
+            logger.warning("配置模块中未找到PG_CONFIG")
+            # 返回默认配置
+            return {
+                "host": "localhost",
+                "port": 5432,
+                "user": "postgres",
+                "password": "postgres",
+                "database": "dataops"
+            }
+        
+        logger.info(f"已获取PostgreSQL配置: {pg_config}")
+        return pg_config
+    except Exception as e:
+        logger.error(f"获取PostgreSQL配置时出错: {str(e)}")
+        # 返回默认配置
+        return {
+            "host": "localhost",
+            "port": 5432,
+            "user": "postgres",
+            "password": "postgres",
+            "database": "dataops"
+        }
+
+def get_upload_paths():
+    """
+    从config.py获取文件上传和归档路径
+    
+    返回:
+        tuple: (上传路径, 归档路径)
+    """
+    try:
+        config_module = load_config_module()
+        upload_path = getattr(config_module, "STRUCTURE_UPLOAD_BASE_PATH", "/data/csv")
+        archive_path = getattr(config_module, "STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH", "/data/archive")
+        
+        logger.info(f"已获取上传路径: {upload_path}, 归档路径: {archive_path}")
+        return upload_path, archive_path
+    except Exception as e:
+        logger.error(f"获取上传路径时出错: {str(e)}")
+        # 返回默认路径
+        return "/data/csv", "/data/archive"
+
 def get_date_range(exec_date, frequency):
     """
     根据执行日期和频率,计算开始日期和结束日期