Browse Source

基本完成自动调度和手工调度的修改

wangxq 1 week ago
parent
commit
bfc53156ae

+ 372 - 48
dags/dataops_productline_execute_dag.py

@@ -6,12 +6,44 @@
 2. 基于dataops_productline_prepare_dag生成的执行计划执行脚本
 2. 基于dataops_productline_prepare_dag生成的执行计划执行脚本
 3. 支持对脚本执行顺序的优化
 3. 支持对脚本执行顺序的优化
 4. 提供详细的执行日志和错误处理
 4. 提供详细的执行日志和错误处理
+
+预期的执行计划模式:
+{
+    "version": "2.0",
+    "exec_date": "YYYY-MM-DD",
+    "scripts": [
+        {
+            "task_id": "唯一任务ID",
+            "script_id": "唯一脚本ID",
+            "script_name": "脚本文件名或标识符",
+            "script_type": "python|sql|python_script",
+            "target_type": "structure|null",
+            "update_mode": "append|full_refresh",
+            "target_table": "表名",
+            "source_tables": ["表1", "表2"],
+            "schedule_status": true,
+            "storage_location": "/路径/模式" 或 null,
+            "schedule_frequency": "daily|weekly|monthly|quarterly|yearly",
+            "target_table_label": "DataModel|DataResource|DataSource"
+        },
+        ...
+    ],
+    "model_scripts": ["script_id1", "script_id2", ...],
+    "resource_scripts": ["script_id3", "script_id4", ...],
+    "execution_order": ["script_id1", "script_id3", "script_id2", "script_id4", ...],
+    "script_dependencies": {
+        "script_id1": ["script_id3", "script_id4"],
+        "script_id2": [],
+        ...
+    }
+}
 """
 """
 from airflow import DAG
 from airflow import DAG
 from airflow.operators.python import PythonOperator, ShortCircuitOperator
 from airflow.operators.python import PythonOperator, ShortCircuitOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.utils.task_group import TaskGroup
 from airflow.utils.task_group import TaskGroup
 from datetime import datetime, timedelta, date
 from datetime import datetime, timedelta, date
+from airflow.models import Variable
 import logging
 import logging
 import networkx as nx
 import networkx as nx
 import json
 import json
@@ -72,7 +104,7 @@ class DecimalEncoder(json.JSONEncoder):
 # 脚本执行函数
 # 脚本执行函数
 #############################################
 #############################################
 
 
-def execute_python_script(script_id, script_name, target_table, script_exec_mode, frequency, **kwargs):
+def execute_python_script(script_id, script_name, target_table, update_mode, schedule_frequency, **kwargs):
     """
     """
     执行Python脚本文件并返回执行结果
     执行Python脚本文件并返回执行结果
     
     
@@ -80,8 +112,8 @@ def execute_python_script(script_id, script_name, target_table, script_exec_mode
         script_id: 脚本ID
         script_id: 脚本ID
         script_name: 脚本文件名(.py文件)
         script_name: 脚本文件名(.py文件)
         target_table: 目标表名
         target_table: 目标表名
-        script_exec_mode: 执行模式
-        frequency: 执行频率
+        update_mode: 执行模式
+        schedule_frequency: 执行频率
         **kwargs: 其他参数,如source_tables、target_type等
         **kwargs: 其他参数,如source_tables、target_type等
     
     
     返回:
     返回:
@@ -96,8 +128,8 @@ def execute_python_script(script_id, script_name, target_table, script_exec_mode
     logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
     logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
     logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
     logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
     logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
     logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
-    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
-    logger.info(f"frequency: {frequency}, 类型: {type(frequency)}")
+    logger.info(f"update_mode: {update_mode}, 类型: {type(update_mode)}")
+    logger.info(f"schedule_frequency: {schedule_frequency}, 类型: {type(schedule_frequency)}")
     logger.info(f"【时间参数】execute_python_script: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
     logger.info(f"【时间参数】execute_python_script: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
 
 
     # 记录额外参数
     # 记录额外参数
@@ -133,9 +165,9 @@ def execute_python_script(script_id, script_name, target_table, script_exec_mode
             # 构建完整的参数字典
             # 构建完整的参数字典
             run_params = {
             run_params = {
                 "table_name": target_table,
                 "table_name": target_table,
-                "execution_mode": script_exec_mode,
+                "execution_mode": update_mode,
                 "exec_date": exec_date,
                 "exec_date": exec_date,
-                "frequency": frequency
+                "schedule_frequency": schedule_frequency
             }
             }
 
 
             ## 添加可能的额外参数
             ## 添加可能的额外参数
@@ -180,7 +212,7 @@ def execute_python_script(script_id, script_name, target_table, script_exec_mode
         return False
         return False
 
 
 # 使用execute_sql函数代替之前的execute_sql_script
 # 使用execute_sql函数代替之前的execute_sql_script
-def execute_sql(script_id, script_name, target_table, script_exec_mode, frequency, **kwargs):
+def execute_sql(script_id, script_name, target_table, update_mode, schedule_frequency, **kwargs):
     """
     """
     执行SQL脚本并返回执行结果
     执行SQL脚本并返回执行结果
     
     
@@ -188,8 +220,8 @@ def execute_sql(script_id, script_name, target_table, script_exec_mode, frequenc
         script_id: 脚本ID
         script_id: 脚本ID
         script_name: 脚本名称(数据库中的名称)
         script_name: 脚本名称(数据库中的名称)
         target_table: 目标表名
         target_table: 目标表名
-        script_exec_mode: 执行模式
-        frequency: 执行频率
+        update_mode: 执行模式
+        schedule_frequency: 执行频率
         **kwargs: 其他参数
         **kwargs: 其他参数
     
     
     返回:
     返回:
@@ -204,8 +236,8 @@ def execute_sql(script_id, script_name, target_table, script_exec_mode, frequenc
     logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
     logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
     logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
     logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
     logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
     logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
-    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
-    logger.info(f"frequency: {frequency}, 类型: {type(frequency)}")
+    logger.info(f"update_mode: {update_mode}, 类型: {type(update_mode)}")
+    logger.info(f"schedule_frequency: {schedule_frequency}, 类型: {type(schedule_frequency)}")
     logger.info(f"【时间参数】execute_sql: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
     logger.info(f"【时间参数】execute_sql: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
 
 
     # 记录额外参数
     # 记录额外参数
@@ -249,9 +281,9 @@ def execute_sql(script_id, script_name, target_table, script_exec_mode, frequenc
                 "target_table": target_table,
                 "target_table": target_table,
                 "script_name": script_name,
                 "script_name": script_name,
                 "exec_date": exec_date,
                 "exec_date": exec_date,
-                "frequency": frequency,
+                "schedule_frequency": schedule_frequency,
                 "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签,用于ETL幂等性判断
                 "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签,用于ETL幂等性判断
-                "execution_mode": script_exec_mode  # 传递执行模式参数
+                "update_mode": update_mode  # 传递执行模式参数
             }
             }
 
 
             # 添加可能的额外参数
             # 添加可能的额外参数
@@ -297,7 +329,7 @@ def execute_sql(script_id, script_name, target_table, script_exec_mode, frequenc
         return False
         return False
 
 
 # 使用execute_python函数代替之前的execute_python_script
 # 使用execute_python函数代替之前的execute_python_script
-def execute_python(script_id, script_name, target_table, script_exec_mode, frequency, **kwargs):
+def execute_python(script_id, script_name, target_table, update_mode, schedule_frequency, **kwargs):
     """
     """
     执行Python脚本并返回执行结果
     执行Python脚本并返回执行结果
     
     
@@ -305,8 +337,8 @@ def execute_python(script_id, script_name, target_table, script_exec_mode, frequ
         script_id: 脚本ID
         script_id: 脚本ID
         script_name: 脚本名称(数据库中的名称)
         script_name: 脚本名称(数据库中的名称)
         target_table: 目标表名
         target_table: 目标表名
-        script_exec_mode: 执行模式
-        frequency: 执行频率
+        update_mode: 执行模式
+        schedule_frequency: 执行频率
         **kwargs: 其他参数
         **kwargs: 其他参数
     
     
     返回:
     返回:
@@ -321,8 +353,8 @@ def execute_python(script_id, script_name, target_table, script_exec_mode, frequ
     logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
     logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
     logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
     logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
     logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
     logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
-    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
-    logger.info(f"frequency: {frequency}, 类型: {type(frequency)}")
+    logger.info(f"update_mode: {update_mode}, 类型: {type(update_mode)}")
+    logger.info(f"schedule_frequency: {schedule_frequency}, 类型: {type(schedule_frequency)}")
     logger.info(f"【时间参数】execute_python: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
     logger.info(f"【时间参数】execute_python: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
 
 
     # 记录额外参数
     # 记录额外参数
@@ -366,9 +398,9 @@ def execute_python(script_id, script_name, target_table, script_exec_mode, frequ
                 "target_table": target_table,
                 "target_table": target_table,
                 "script_name": script_name,
                 "script_name": script_name,
                 "exec_date": exec_date,
                 "exec_date": exec_date,
-                "frequency": frequency,
+                "schedule_frequency": schedule_frequency,
                 "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签
                 "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签
-                "execution_mode": script_exec_mode  # 传递执行模式参数
+                "update_mode": update_mode  # 传递执行模式参数
             }
             }
 
 
             # 添加可能的额外参数
             # 添加可能的额外参数
@@ -538,53 +570,344 @@ def check_execution_plan(**kwargs):
         logger.error("执行计划的scripts字段无效")
         logger.error("执行计划的scripts字段无效")
         return False
         return False
         
         
-    if not isinstance(execution_plan.get("script_dependencies", {}), dict):
-        logger.error("执行计划的script_dependencies字段无效")
+    if not isinstance(execution_plan.get("resource_scripts", []), list):
+        logger.error("执行计划的resource_scripts字段无效")
+        return False
+
+    if not isinstance(execution_plan.get("model_scripts", []), list):
+        logger.error("执行计划的model_scripts字段无效")
         return False
         return False
     
     
     # 检查是否有脚本数据
     # 检查是否有脚本数据
     scripts = execution_plan.get("scripts", [])
     scripts = execution_plan.get("scripts", [])
+    resource_scripts = execution_plan.get("resource_scripts", [])
+    model_scripts = execution_plan.get("model_scripts", [])
     
     
-    if not scripts:
-        logger.warning("执行计划不包含任何脚本")
-        # 如果没有脚本,则阻止下游任务执行
-        return False
-    
-    logger.info(f"执行计划验证成功: 包含 {len(scripts)} 个脚本")
+    logger.info(f"执行计划验证成功: 包含 {len(scripts)} 个脚本,{len(resource_scripts)} 个资源脚本和 {len(model_scripts)} 个模型脚本")
     
     
     # 保存执行计划到XCom以便下游任务使用
     # 保存执行计划到XCom以便下游任务使用
     kwargs['ti'].xcom_push(key='execution_plan', value=execution_plan)
     kwargs['ti'].xcom_push(key='execution_plan', value=execution_plan)
     
     
     return True
     return True
 
 
-def optimize_execution_order(scripts, script_dependencies):
+def save_execution_plan_to_db(execution_plan, dag_id, run_id, logical_date, ds):
     """
     """
-    使用NetworkX优化脚本执行顺序
+    将执行计划保存到airflow_exec_plans表
+    
+    参数:
+        execution_plan (dict): 执行计划字典
+        dag_id (str): DAG的ID
+        run_id (str): DAG运行的ID
+        logical_date (datetime): 逻辑日期
+        ds (str): 日期字符串,格式为YYYY-MM-DD
+    
+    返回:
+        bool: 操作是否成功
+    """
+    try:
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        
+        try:
+            # 将执行计划转换为JSON字符串
+            plan_json = json.dumps(execution_plan)
+            
+            # 获取本地时间
+            local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+            
+            # 插入记录
+            cursor.execute("""
+                INSERT INTO airflow_exec_plans
+                (dag_id, run_id, logical_date, local_logical_date, exec_date, plan)
+                VALUES (%s, %s, %s, %s, %s, %s)
+            """, (dag_id, run_id, logical_date, local_logical_date, ds, plan_json))
+            
+            conn.commit()
+            logger.info(f"成功将执行计划保存到airflow_exec_plans表,dag_id={dag_id}, run_id={run_id}, exec_date={ds}")
+            return True
+        except Exception as e:
+            logger.error(f"保存执行计划到数据库时出错: {str(e)}")
+            conn.rollback()
+            raise Exception(f"PostgreSQL保存执行计划失败: {str(e)}")
+        finally:
+            cursor.close()
+            conn.close()
+    except Exception as e:
+        logger.error(f"连接PostgreSQL数据库失败: {str(e)}")
+        raise Exception(f"无法连接PostgreSQL数据库: {str(e)}")
+
+def generate_task_id(script_name, source_tables, target_table):
+    """
+    根据脚本名和表名生成唯一任务ID
+    
+    参数:
+        script_name (str): 脚本文件名
+        source_tables (list): 源表列表
+        target_table (str): 目标表名
+        
+    返回:
+        str: 唯一的任务ID
+    """
+    # 移除脚本名的文件扩展名
+    script_base = os.path.splitext(script_name)[0]
+    
+    # 对于特殊脚本如load_file.py,直接使用目标表名
+    if script_name.lower() in ['load_file.py']:
+        return f"{script_base}_{target_table}"
+    
+    # 处理源表部分
+    if source_tables:
+        # 将所有源表按字母顺序排序并连接
+        source_part = "_".join(sorted(source_tables))
+        # 生成任务ID: 脚本名_源表_to_目标表
+        return f"{script_base}_{source_part}_to_{target_table}"
+    else:
+        # 没有源表时,只使用脚本名和目标表
+        return f"{script_base}_{target_table}"
+
+def prepare_scripts_from_tables(tables_info):
+    """
+    将表信息转换为脚本信息
+    
+    参数:
+        tables_info (list): 表信息列表
+        
+    返回:
+        list: 脚本信息列表
+    """
+    scripts = []
+    
+    for table in tables_info:
+        target_table = table['target_table']
+        target_table_label = table.get('target_table_label')
+        schedule_frequency = table.get('schedule_frequency')
+        
+        # 处理表的脚本信息
+        if 'scripts_info' in table and table['scripts_info']:
+            # 表有多个脚本
+            for script_name, script_info in table['scripts_info'].items():
+                source_tables = script_info.get('sources', [])
+                script_type = script_info.get('script_type', 'python')
+                update_mode = script_info.get('script_exec_mode', 'append')
+                
+                # 生成任务ID
+                task_id = generate_task_id(script_name, source_tables, target_table)
+                
+                # 创建脚本信息
+                script = {
+                    "script_id": task_id,
+                    "script_name": script_name,
+                    "source_tables": source_tables,
+                    "target_table": target_table,
+                    "target_table_label": target_table_label,
+                    "script_type": script_type,
+                    "update_mode": update_mode,
+                    "schedule_frequency": schedule_frequency,
+                    "task_id": task_id
+                }
+                
+                # 为structure类型添加特殊属性
+                if table.get('target_type') == "structure":
+                    script["target_type"] = "structure"
+                    script["storage_location"] = table.get('storage_location')
+                
+                scripts.append(script)
+                logger.info(f"为表 {target_table} 创建脚本 {script_name},任务ID: {task_id}")
+        else:
+            # 表只有单个脚本或没有明确指定脚本信息
+            script_name = table.get('script_name')
+            
+            # 如果没有script_name,使用默认值
+            if not script_name:
+                script_name = f"{target_table}_script.py"
+                logger.warning(f"表 {target_table} 没有指定脚本名,使用默认值: {script_name}")
+            
+            source_tables = table.get('source_tables', [])
+            script_type = table.get('script_type', 'python')
+            update_mode = table.get('update_mode', 'append')
+            
+            # 生成任务ID
+            task_id = generate_task_id(script_name, source_tables, target_table)
+            
+            # 创建脚本信息
+            script = {
+                "script_id": task_id,
+                "script_name": script_name,
+                "source_tables": source_tables,
+                "target_table": target_table,
+                "target_table_label": target_table_label,
+                "script_type": script_type,
+                "update_mode": update_mode,
+                "schedule_frequency": schedule_frequency,
+                "task_id": task_id
+            }
+            
+            # 为structure类型添加特殊属性
+            if table.get('target_type') == "structure":
+                script["target_type"] = "structure"
+                script["storage_location"] = table.get('storage_location')
+            
+            scripts.append(script)
+            logger.info(f"为表 {target_table} 创建脚本 {script_name},任务ID: {task_id}")
+    
+    return scripts
+
+def build_script_dependency_graph(scripts):
+    """
+    处理脚本间的依赖关系
     
     
     参数:
     参数:
         scripts (list): 脚本信息列表
         scripts (list): 脚本信息列表
-        script_dependencies (dict): 脚本依赖关系字典
         
         
     返回:
     返回:
-        list: 优化后的脚本执行顺序(脚本ID列表)
+        tuple: (依赖关系字典, 图对象)
     """
     """
-    logger.info("开始使用NetworkX优化脚本执行顺序")
+    # 打印所有脚本的源表信息,用于调试
+    logger.info("构建脚本依赖图,当前脚本信息:")
+    for script in scripts:
+        script_id = script['script_id']
+        script_name = script['script_name']
+        target_table = script['target_table']
+        source_tables = script['source_tables']
+        logger.info(f"脚本: {script_id} ({script_name}), 目标表: {target_table}, 源表: {source_tables}")
+    
+    # 创建目标表到脚本ID的映射
+    table_to_scripts = {}
+    for script in scripts:
+        target_table = script['target_table']
+        if target_table not in table_to_scripts:
+            table_to_scripts[target_table] = []
+        table_to_scripts[target_table].append(script['script_id'])
+    
+    # 记录表到脚本的映射关系
+    logger.info("表到脚本的映射关系:")
+    for table, script_ids in table_to_scripts.items():
+        logger.info(f"表 {table} 由脚本 {script_ids} 生成")
+    
+    # 创建脚本依赖关系
+    script_dependencies = {}
+    for script in scripts:
+        script_id = script['script_id']
+        source_tables = script['source_tables']
+        target_table = script['target_table']
+        
+        # 初始化依赖列表
+        script_dependencies[script_id] = []
+        
+        # 查找源表对应的脚本
+        if source_tables:
+            logger.info(f"处理脚本 {script_id} 的依赖关系,源表: {source_tables}")
+            for source_table in source_tables:
+                if source_table in table_to_scripts:
+                    # 添加所有生成源表的脚本作为依赖
+                    for source_script_id in table_to_scripts[source_table]:
+                        if source_script_id != script_id:  # 避免自我依赖
+                            script_dependencies[script_id].append(source_script_id)
+                            logger.info(f"添加依赖: {script_id} 依赖于 {source_script_id} (表 {target_table} 依赖于表 {source_table})")
+                else:
+                    logger.warning(f"源表 {source_table} 没有对应的脚本,无法为脚本 {script_id} 创建依赖")
+        else:
+            logger.info(f"脚本 {script_id} 没有源表依赖")
+    
+    # 尝试从Neo4j额外查询依赖关系(如果脚本没有显式的source_tables)
+    try:
+        driver = get_neo4j_driver()
+    except Exception as e:
+        logger.error(f"连接Neo4j数据库失败: {str(e)}")
+        raise Exception(f"无法连接Neo4j数据库: {str(e)}")
+    
+    try:
+        with driver.session() as session:
+            # 验证连接
+            try:
+                test_result = session.run("RETURN 1 as test")
+                test_record = test_result.single()
+                if not test_record or test_record.get("test") != 1:
+                    logger.error("Neo4j连接测试失败")
+                    raise Exception("Neo4j连接测试失败")
+            except Exception as e:
+                logger.error(f"Neo4j连接测试失败: {str(e)}")
+                raise Exception(f"Neo4j连接测试失败: {str(e)}")
+                
+            for script in scripts:
+                script_id = script['script_id']
+                target_table = script['target_table']
+                
+                # 只处理没有源表的脚本
+                if not script['source_tables'] and not script_dependencies[script_id]:
+                    logger.info(f"脚本 {script_id} 没有源表,尝试从Neo4j直接查询表 {target_table} 的依赖")
+                    
+                    # 查询表的直接依赖
+                    query = """
+                        MATCH (target {en_name: $table_name})-[rel]->(dep)
+                        RETURN dep.en_name AS dep_name
+                    """
+                    
+                    try:
+                        result = session.run(query, table_name=target_table)
+                        records = list(result)
+                        
+                        for record in records:
+                            dep_name = record.get("dep_name")
+                            if dep_name and dep_name in table_to_scripts:
+                                for dep_script_id in table_to_scripts[dep_name]:
+                                    if dep_script_id != script_id:  # 避免自我依赖
+                                        script_dependencies[script_id].append(dep_script_id)
+                                        logger.info(f"从Neo4j添加额外依赖: {script_id} 依赖于 {dep_script_id} (表 {target_table} 依赖于表 {dep_name})")
+                    except Exception as e:
+                        logger.warning(f"从Neo4j查询表 {target_table} 依赖时出错: {str(e)}")
+                        raise Exception(f"Neo4j查询表依赖失败: {str(e)}")
+    except Exception as e:
+        if "Neo4j" in str(e):
+            # 已经处理过的错误,直接抛出
+            raise
+        else:
+            logger.error(f"访问Neo4j获取额外依赖时出错: {str(e)}")
+            raise Exception(f"Neo4j依赖查询失败: {str(e)}")
+    finally:
+        driver.close()
     
     
     # 构建依赖图
     # 构建依赖图
     G = nx.DiGraph()
     G = nx.DiGraph()
     
     
     # 添加所有脚本作为节点
     # 添加所有脚本作为节点
     for script in scripts:
     for script in scripts:
-        script_id = script['script_id']
-        G.add_node(script_id)
+        G.add_node(script['script_id'])
     
     
     # 添加依赖边
     # 添加依赖边
     for script_id, dependencies in script_dependencies.items():
     for script_id, dependencies in script_dependencies.items():
-        for dep_id in dependencies:
-            # 添加从script_id到dep_id的边,表示script_id依赖于dep_id
-            G.add_edge(script_id, dep_id)
-            logger.debug(f"添加依赖边: {script_id} -> {dep_id}")
+        if dependencies:
+            for dep_id in dependencies:
+                # 添加从script_id到dep_id的边,表示script_id依赖于dep_id
+                G.add_edge(script_id, dep_id)
+                logger.debug(f"添加依赖边: {script_id} -> {dep_id}")
+        else:
+            logger.info(f"脚本 {script_id} 没有依赖的上游脚本")
+    
+    # 确保所有脚本ID都在依赖关系字典中
+    for script in scripts:
+        script_id = script['script_id']
+        if script_id not in script_dependencies:
+            script_dependencies[script_id] = []
     
     
+    # 记录每个脚本的依赖数量
+    for script_id, deps in script_dependencies.items():
+        logger.info(f"脚本 {script_id} 有 {len(deps)} 个依赖: {deps}")
+    
+    return script_dependencies, G
+
+def optimize_script_execution_order(scripts, script_dependencies, G):
+    """
+    使用NetworkX优化脚本执行顺序
+    
+    参数:
+        scripts (list): 脚本信息列表
+        script_dependencies (dict): 脚本依赖关系字典
+        G (nx.DiGraph): 依赖图对象
+        
+    返回:
+        list: 优化后的脚本执行顺序(脚本ID列表)
+    """
     # 检查是否有循环依赖
     # 检查是否有循环依赖
     try:
     try:
         cycles = list(nx.simple_cycles(G))
         cycles = list(nx.simple_cycles(G))
@@ -608,12 +931,13 @@ def optimize_execution_order(scripts, script_dependencies):
         # 反转结果,使上游任务先执行
         # 反转结果,使上游任务先执行
         execution_order.reverse()
         execution_order.reverse()
         
         
-        logger.info(f"NetworkX优化后的脚本执行顺序: {execution_order}")
+        logger.info(f"生成优化的脚本执行顺序: {execution_order}")
         return execution_order
         return execution_order
     except Exception as e:
     except Exception as e:
         logger.error(f"生成脚本执行顺序时出错: {str(e)}")
         logger.error(f"生成脚本执行顺序时出错: {str(e)}")
         # 出错时返回原始脚本ID列表,不进行优化
         # 出错时返回原始脚本ID列表,不进行优化
-        return [script['script_id'] for script in scripts]
+        return [script['script_id'] for script in scripts] 
+    
 
 
 def create_execution_plan(**kwargs):
 def create_execution_plan(**kwargs):
     """
     """
@@ -652,7 +976,7 @@ def create_execution_plan(**kwargs):
         # 如果执行计划中没有execution_order或为空,使用NetworkX优化
         # 如果执行计划中没有execution_order或为空,使用NetworkX优化
         if not execution_order:
         if not execution_order:
             logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
             logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
-            execution_order = optimize_execution_order(scripts, script_dependencies)
+            execution_order = optimize_script_execution_order(scripts, script_dependencies)
             execution_plan["execution_order"] = execution_order
             execution_plan["execution_order"] = execution_order
         
         
         # 保存完整的执行计划到XCom
         # 保存完整的执行计划到XCom
@@ -740,7 +1064,7 @@ with DAG(
             # 如果执行计划中没有execution_order或为空,使用NetworkX优化
             # 如果执行计划中没有execution_order或为空,使用NetworkX优化
             if not execution_order:
             if not execution_order:
                 logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
                 logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
-                execution_order = optimize_execution_order(scripts, script_dependencies)
+                execution_order = optimize_script_execution_order(scripts, script_dependencies, nx.DiGraph())
             
             
             logger.info(f"执行计划: exec_date={exec_date}, scripts数量={len(scripts)}")
             logger.info(f"执行计划: exec_date={exec_date}, scripts数量={len(scripts)}")
             
             
@@ -769,7 +1093,7 @@ with DAG(
                 script_name = script.get("script_name")
                 script_name = script.get("script_name")
                 target_table = script.get("target_table")
                 target_table = script.get("target_table")
                 script_type = script.get("script_type", "python")
                 script_type = script.get("script_type", "python")
-                script_exec_mode = script.get("script_exec_mode", "append")
+                update_mode = script.get("update_mode", "append")
                 source_tables = script.get("source_tables", [])
                 source_tables = script.get("source_tables", [])
                 target_table_label = script.get("target_table_label", "")
                 target_table_label = script.get("target_table_label", "")
                 
                 
@@ -792,9 +1116,9 @@ with DAG(
                     "script_id": script_id,
                     "script_id": script_id,
                     "script_name": script_name,
                     "script_name": script_name,
                     "target_table": target_table,
                     "target_table": target_table,
-                    "script_exec_mode": script_exec_mode,
+                    "update_mode": update_mode,
                     "source_tables": source_tables,
                     "source_tables": source_tables,
-                    "frequency": script.get("frequency", "daily"),  # 显式添加frequency参数
+                    "schedule_frequency": script.get("schedule_frequency", "daily"),
                     "target_table_label": target_table_label,
                     "target_table_label": target_table_label,
                     # logical_date会在任务执行时由Airflow自动添加
                     # logical_date会在任务执行时由Airflow自动添加
                 }
                 }
@@ -897,4 +1221,4 @@ with DAG(
     # 设置依赖关系,确保执行阶段完成后触发finalize DAG
     # 设置依赖关系,确保执行阶段完成后触发finalize DAG
     execution_group >> trigger_finalize_dag
     execution_group >> trigger_finalize_dag
 
 
-logger.info(f"DAG dataops_productline_execute_dag 定义完成") 
+logger.info(f"DAG dataops_productline_execute_dag 定义完成")

+ 1 - 30
dags/dataops_productline_manual_trigger_dag.py

@@ -66,7 +66,7 @@ from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG
 import traceback
 import traceback
 import pendulum
 import pendulum
 import pytz
 import pytz
-from utils import get_pg_conn, get_cn_exec_date, check_script_exists, get_complete_script_info
+from utils import get_pg_conn, get_cn_exec_date, check_script_exists, get_complete_script_info, get_table_label
 from airflow.exceptions import AirflowException
 from airflow.exceptions import AirflowException
 
 
 # 设置logger
 # 设置logger
@@ -118,35 +118,6 @@ def get_dag_params(**context):
     logger.info(f"最终使用的参数 - 脚本名称: {script_name}, 目标表: {target_table}, 依赖级别: {dependency_level}, 执行日期: {exec_date}")
     logger.info(f"最终使用的参数 - 脚本名称: {script_name}, 目标表: {target_table}, 依赖级别: {dependency_level}, 执行日期: {exec_date}")
     return script_name, target_table, dependency_level, exec_date, logical_date
     return script_name, target_table, dependency_level, exec_date, logical_date
 
 
-def get_table_label(table_name):
-    """确定表的标签类型(DataModel or DataResource)"""
-    driver = GraphDatabase.driver(
-        NEO4J_CONFIG['uri'], 
-        auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
-    )
-    query = """
-        MATCH (n {en_name: $table_name})
-        RETURN labels(n) AS labels
-    """
-    try:
-        with driver.session() as session:
-            result = session.run(query, table_name=table_name)
-            record = result.single()
-            if record and record.get("labels"):
-                labels = record.get("labels")
-                if "DataModel" in labels:
-                    return "DataModel"
-                elif "DataResource" in labels:
-                    return "DataResource"
-                elif "DataSource" in labels:
-                    return "DataSource"
-            return None
-    except Exception as e:
-        logger.error(f"获取表 {table_name} 的标签时出错: {str(e)}")
-        return None
-    finally:
-        driver.close()
-
 def find_target_table_for_script(script_name):
 def find_target_table_for_script(script_name):
     """
     """
     根据脚本名称查找对应的目标表
     根据脚本名称查找对应的目标表

+ 287 - 159
dags/dataops_productline_prepare_dag.py

@@ -15,7 +15,8 @@ import hashlib
 import pendulum
 import pendulum
 from utils import (
 from utils import (
     get_pg_conn, 
     get_pg_conn, 
-    get_neo4j_driver
+    get_neo4j_driver,
+    get_cn_exec_date
 )
 )
 from config import PG_CONFIG, NEO4J_CONFIG, DATAOPS_DAGS_PATH
 from config import PG_CONFIG, NEO4J_CONFIG, DATAOPS_DAGS_PATH
 
 
@@ -23,58 +24,135 @@ from config import PG_CONFIG, NEO4J_CONFIG, DATAOPS_DAGS_PATH
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
 
 
 def get_enabled_tables():
 def get_enabled_tables():
-    """获取所有启用的表"""
+    """获取所有启用调度的表"""
     try:
     try:
-        conn = get_pg_conn()
-        cursor = conn.cursor()
-        try:
-            cursor.execute("""
-                SELECT owner_id, table_name 
-                FROM schedule_status 
-                WHERE schedule_is_enabled = TRUE
-            """)
-            result = cursor.fetchall()
-            return [row[1] for row in result]  # 只返回表名
-        except Exception as e:
-            logger.error(f"获取启用表失败: {str(e)}")
-            raise Exception(f"PostgreSQL数据库查询失败: {str(e)}")
-        finally:
-            cursor.close()
-            conn.close()
+        # 使用Neo4j查询所有schedule_status为true的关系
+        driver = get_neo4j_driver()
+        
+        with driver.session() as session:
+            # 查询DataModel表中有schedule_status为true的关系
+            query_datamodel = """
+                MATCH (target:DataModel)-[rel:DERIVED_FROM]->()
+                WHERE rel.schedule_status = true
+                RETURN target.en_name AS table_name
+            """
+            
+            # 查询DataResource表中有schedule_status为true的关系
+            query_dataresource = """
+                MATCH (target:DataResource)-[rel:ORIGINATES_FROM]->()
+                WHERE rel.schedule_status = true
+                RETURN target.en_name AS table_name
+            """
+            
+            # 查询structure类型的DataResource表中有schedule_status为true的节点
+            query_structure = """
+                MATCH (target:DataResource)
+                WHERE target.type = 'structure' AND target.schedule_status = true
+                RETURN target.en_name AS table_name
+            """
+            
+            try:
+                # 获取结果
+                result_datamodel = session.run(query_datamodel)
+                result_dataresource = session.run(query_dataresource)
+                result_structure = session.run(query_structure)
+                
+                # 合并结果
+                tables = []
+                for result in [result_datamodel, result_dataresource, result_structure]:
+                    for record in result:
+                        table_name = record.get("table_name")
+                        if table_name and table_name not in tables:
+                            tables.append(table_name)
+                
+                logger.info(f"从Neo4j找到 {len(tables)} 个启用的表: {tables}")
+                return tables
+                
+            except Exception as e:
+                logger.error(f"Neo4j查询启用的表失败: {str(e)}")
+                raise Exception(f"Neo4j查询启用的表失败: {str(e)}")
     except Exception as e:
     except Exception as e:
-        logger.error(f"连接PostgreSQL数据库失败: {str(e)}")
-        raise Exception(f"无法连接PostgreSQL数据库: {str(e)}")
+        logger.error(f"连接Neo4j数据库失败: {str(e)}")
+        raise Exception(f"无法连接Neo4j数据库: {str(e)}")
+    
 
 
 def check_table_directly_subscribed(table_name):
 def check_table_directly_subscribed(table_name):
-    """检查表是否在schedule_status表中直接调度"""
+    """检查表是否在节点关系中有schedule_status为True的脚本,若有则直接调度"""
     try:
     try:
-        conn = get_pg_conn()
-        cursor = conn.cursor()
-        try:
-            cursor.execute("""
-                SELECT schedule_is_enabled
-                FROM schedule_status 
-                WHERE table_name = %s
-            """, (table_name,))
-            result = cursor.fetchone()
-            return result and result[0] is True
-        except Exception as e:
-            logger.error(f"检查表订阅状态失败: {str(e)}")
-            raise Exception(f"PostgreSQL查询表订阅状态失败: {str(e)}")
-        finally:
-            cursor.close()
-            conn.close()
+        driver = get_neo4j_driver()
     except Exception as e:
     except Exception as e:
-        logger.error(f"连接PostgreSQL数据库失败: {str(e)}")
-        raise Exception(f"无法连接PostgreSQL数据库: {str(e)}")
+        logger.error(f"连接Neo4j数据库失败: {str(e)}")
+        raise Exception(f"无法连接Neo4j数据库: {str(e)}")
+    
+    try:
+        with driver.session() as session:
+            # 查询是否有直接调度的脚本
+            query_datamodel = """
+                MATCH (target:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
+                WHERE rel.schedule_status = true
+                RETURN count(rel) > 0 AS directly_subscribed
+            """
+            
+            query_dataresource = """
+                MATCH (target:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
+                WHERE rel.schedule_status = true
+                RETURN count(rel) > 0 AS directly_subscribed
+            """
+            
+            # 获取类型
+            labels_query = """
+                MATCH (n {en_name: $table_name})
+                RETURN labels(n) AS labels
+            """
+            
+            result = session.run(labels_query, table_name=table_name)
+            record = result.single()
+            
+            if not record:
+                logger.warning(f"在Neo4j中未找到表 {table_name} 的标签信息")
+                return False
+                
+            labels = record.get("labels", [])
+            
+            # 根据不同标签类型执行不同查询
+            if "DataModel" in labels:
+                result = session.run(query_datamodel, table_name=table_name)
+            elif "DataResource" in labels:
+                # 检查是否是structure类型
+                structure_query = """
+                    MATCH (n:DataResource {en_name: $table_name})
+                    RETURN n.type AS type, n.schedule_status AS schedule_status
+                """
+                result = session.run(structure_query, table_name=table_name)
+                record = result.single()
+                
+                if record and record.get("type") == "structure":
+                    # structure类型,从节点获取schedule_status
+                    return record.get("schedule_status", False)
+                
+                # 非structure类型,继续查询关系
+                result = session.run(query_dataresource, table_name=table_name)
+            else:
+                logger.warning(f"表 {table_name} 不是DataModel或DataResource类型")
+                return False
+            
+            record = result.single()
+            return record and record.get("directly_subscribed", False)
+            
+    except Exception as e:
+        logger.error(f"检查表订阅状态失败: {str(e)}")
+        raise Exception(f"Neo4j查询表订阅状态失败: {str(e)}")
+    finally:
+        driver.close()
+
 
 
-def should_execute_today(table_name, frequency, exec_date):
+def should_execute_today(table_name, schedule_frequency, exec_date):
     """
     """
     判断指定频率的表在给定执行日期是否应该执行
     判断指定频率的表在给定执行日期是否应该执行
     
     
     参数:
     参数:
         table_name (str): 表名,用于日志记录
         table_name (str): 表名,用于日志记录
-        frequency (str): 调度频率,如'daily'、'weekly'、'monthly'、'yearly',为None时默认为'daily'
+        schedule_frequency (str): 调度频率,如'daily'、'weekly'、'monthly',为None时默认为'daily'
         exec_date (str): 执行日期,格式为'YYYY-MM-DD'
         exec_date (str): 执行日期,格式为'YYYY-MM-DD'
     
     
     返回:
     返回:
@@ -91,40 +169,41 @@ def should_execute_today(table_name, frequency, exec_date):
     next_date = exec_date_obj.add(days=1)
     next_date = exec_date_obj.add(days=1)
     
     
     # 如果频率为None或空字符串,默认为daily
     # 如果频率为None或空字符串,默认为daily
-    if not frequency:
+    if not schedule_frequency:
         logger.info(f"表 {table_name} 未指定调度频率,默认为daily")
         logger.info(f"表 {table_name} 未指定调度频率,默认为daily")
         return True
         return True
     
     
-    frequency = frequency.lower() if isinstance(frequency, str) else 'daily'
+    schedule_frequency = schedule_frequency.lower() if isinstance(schedule_frequency, str) else 'daily'
     
     
-    if frequency == 'daily':
+    if schedule_frequency == 'daily':
         # 日任务每天都执行
         # 日任务每天都执行
         return True
         return True
-    elif frequency == 'weekly':
+    elif schedule_frequency == 'weekly':
         # 周任务只在周日执行(因为exec_date+1是周一时才执行)
         # 周任务只在周日执行(因为exec_date+1是周一时才执行)
         is_sunday = next_date.day_of_week == 1  # 1表示周一
         is_sunday = next_date.day_of_week == 1  # 1表示周一
         logger.info(f"表 {table_name} 是weekly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否周日: {is_sunday}")
         logger.info(f"表 {table_name} 是weekly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否周日: {is_sunday}")
         return is_sunday
         return is_sunday
-    elif frequency == 'monthly':
+    elif schedule_frequency == 'monthly':
         # 月任务只在每月最后一天执行(因为exec_date+1是月初时才执行)
         # 月任务只在每月最后一天执行(因为exec_date+1是月初时才执行)
         is_month_end = next_date.day == 1
         is_month_end = next_date.day == 1
         logger.info(f"表 {table_name} 是monthly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否月末: {is_month_end}")
         logger.info(f"表 {table_name} 是monthly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否月末: {is_month_end}")
         return is_month_end
         return is_month_end
-    elif frequency == 'quarterly':
+    elif schedule_frequency == 'quarterly':
         # 季度任务只在每季度最后一天执行(因为exec_date+1是季度初时才执行)
         # 季度任务只在每季度最后一天执行(因为exec_date+1是季度初时才执行)
         is_quarter_end = next_date.day == 1 and next_date.month in [1, 4, 7, 10]
         is_quarter_end = next_date.day == 1 and next_date.month in [1, 4, 7, 10]
         logger.info(f"表 {table_name} 是quarterly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否季末: {is_quarter_end}")
         logger.info(f"表 {table_name} 是quarterly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否季末: {is_quarter_end}")
         return is_quarter_end
         return is_quarter_end
-    elif frequency == 'yearly':
+    elif schedule_frequency == 'yearly':
         # 年任务只在每年最后一天执行(因为exec_date+1是年初时才执行)
         # 年任务只在每年最后一天执行(因为exec_date+1是年初时才执行)
         is_year_end = next_date.day == 1 and next_date.month == 1
         is_year_end = next_date.day == 1 and next_date.month == 1
         logger.info(f"表 {table_name} 是yearly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否年末: {is_year_end}")
         logger.info(f"表 {table_name} 是yearly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否年末: {is_year_end}")
         return is_year_end
         return is_year_end
     else:
     else:
         # 未知频率,默认执行
         # 未知频率,默认执行
-        logger.warning(f"表 {table_name} 使用未知的调度频率: {frequency},默认执行")
+        logger.warning(f"表 {table_name} 使用未知的调度频率: {schedule_frequency},默认执行")
         return True
         return True
 
 
+
 def get_table_info_from_neo4j(table_name):
 def get_table_info_from_neo4j(table_name):
     """从Neo4j获取表的详细信息,保留完整的scripts_info并确保正确获取源表依赖"""
     """从Neo4j获取表的详细信息,保留完整的scripts_info并确保正确获取源表依赖"""
     try:
     try:
@@ -157,8 +236,9 @@ def get_table_info_from_neo4j(table_name):
             # 查询表标签和状态
             # 查询表标签和状态
             query_table = """
             query_table = """
                 MATCH (t {en_name: $table_name})
                 MATCH (t {en_name: $table_name})
-                RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency,
-                       t.type AS type, t.storage_location AS storage_location
+                RETURN labels(t) AS labels, t.status AS status,
+                       t.type AS type, t.storage_location AS storage_location,
+                       t.update_mode as update_mode
             """
             """
             try:
             try:
                 result = session.run(query_table, table_name=table_name)
                 result = session.run(query_table, table_name=table_name)
@@ -171,7 +251,6 @@ def get_table_info_from_neo4j(table_name):
                 labels = record.get("labels", [])
                 labels = record.get("labels", [])
                 table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
                 table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
                 table_info['target_table_status'] = record.get("status", True)  # 默认为True
                 table_info['target_table_status'] = record.get("status", True)  # 默认为True
-                table_info['frequency'] = record.get("frequency")
                 table_info['target_type'] = record.get("type")  # 获取type属性
                 table_info['target_type'] = record.get("type")  # 获取type属性
                 table_info['storage_location'] = record.get("storage_location")  # 获取storage_location属性
                 table_info['storage_location'] = record.get("storage_location")  # 获取storage_location属性
                 
                 
@@ -182,18 +261,23 @@ def get_table_info_from_neo4j(table_name):
                         # 对于structure类型,设置默认值,不查询关系
                         # 对于structure类型,设置默认值,不查询关系
                         table_info['source_tables'] = []  # 使用空数组表示无源表
                         table_info['source_tables'] = []  # 使用空数组表示无源表
                         table_info['script_name'] = "load_file.py"
                         table_info['script_name'] = "load_file.py"
-                        table_info['script_type'] = "python"
+                        table_info['script_type'] = "python_script"
                         
                         
-                        # csv类型的DataResource没有上游,使用默认的append模式
-                        table_info['script_exec_mode'] = "append"
-                        logger.info(f"表 {table_name} 为structure类型,使用默认执行模式: append")
+                        # 从节点属性中获取update_mode,如果不存在则使用默认值
+                        table_info['script_update_mode'] = record.get("update_mode", "append")
+                        table_info['schedule_frequency'] = record.get("schedule_frequency", "daily")
+                        table_info['schedule_status'] = record.get("schedule_status", True)
+                        
+                        logger.info(f"表 {table_name} 为structure类型,使用执行模式: {table_info['script_update_mode']}")
 
 
                         # 添加脚本信息
                         # 添加脚本信息
                         table_info['scripts_info'] = {
                         table_info['scripts_info'] = {
                             "load_file.py": {
                             "load_file.py": {
                                 "sources": [],
                                 "sources": [],
-                                "script_type": "python",
-                                "script_exec_mode": "append"
+                                "script_type": "python_script",
+                                "script_update_mode": table_info['script_update_mode'],
+                                "schedule_frequency": table_info['schedule_frequency'],
+                                "schedule_status": table_info['schedule_status']
                             }
                             }
                         }
                         }
 
 
@@ -202,32 +286,29 @@ def get_table_info_from_neo4j(table_name):
                         # 查询源表关系和脚本信息
                         # 查询源表关系和脚本信息
                         query_rel = """
                         query_rel = """
                             MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
                             MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
-                            WITH source, rel, 
-                                 CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
-                                 CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
-                            RETURN source.en_name AS source_table, script_name AS script_name,
-                                   script_type AS script_type, 'append' AS script_exec_mode
+                            RETURN source.en_name AS source_table, rel.script_name AS script_name,
+                                  rel.script_type AS script_type, rel.update_mode AS script_update_mode,
+                                  rel.schedule_frequency AS schedule_frequency, 
+                                  rel.schedule_status AS schedule_status
                         """
                         """
                 elif "DataModel" in labels:
                 elif "DataModel" in labels:
                     # 查询源表关系和脚本信息
                     # 查询源表关系和脚本信息
                     query_rel = """
                     query_rel = """
                         MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
                         MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
-                        WITH source, rel, 
-                             CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
-                             CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
-                        RETURN source.en_name AS source_table, script_name AS script_name,
-                               script_type AS script_type, 'append' AS script_exec_mode
+                        RETURN source.en_name AS source_table, rel.script_name AS script_name,
+                              rel.script_type AS script_type, rel.update_mode AS script_update_mode,
+                              rel.schedule_frequency AS schedule_frequency, 
+                              rel.schedule_status AS schedule_status
                     """
                     """
                 else:
                 else:
                     logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
                     logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
                     # 即使不是这两种类型,也尝试查询其源表依赖关系
                     # 即使不是这两种类型,也尝试查询其源表依赖关系
                     query_rel = """
                     query_rel = """
                         MATCH (target {en_name: $table_name})-[rel]->(source)
                         MATCH (target {en_name: $table_name})-[rel]->(source)
-                        WITH source, rel, 
-                             CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
-                             CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
-                        RETURN source.en_name AS source_table, script_name AS script_name,
-                               script_type AS script_type, 'append' AS script_exec_mode
+                        RETURN source.en_name AS source_table, rel.script_name AS script_name,
+                              rel.script_type AS script_type, rel.update_mode AS script_update_mode,
+                              rel.schedule_frequency AS schedule_frequency, 
+                              rel.schedule_status AS schedule_status
                     """
                     """
                 
                 
                 # 收集所有关系记录
                 # 收集所有关系记录
@@ -256,7 +337,8 @@ def get_table_info_from_neo4j(table_name):
                 logger.info(f"表 {table_name} 查询到 {len(records)} 条关系记录")
                 logger.info(f"表 {table_name} 查询到 {len(records)} 条关系记录")
                 for idx, rec in enumerate(records):
                 for idx, rec in enumerate(records):
                     logger.info(f"关系记录[{idx}]: source_table={rec.get('source_table')}, script_name={rec.get('script_name')}, " 
                     logger.info(f"关系记录[{idx}]: source_table={rec.get('source_table')}, script_name={rec.get('script_name')}, " 
-                                f"script_type={rec.get('script_type')}, script_exec_mode={rec.get('script_exec_mode')}")
+                                f"script_type={rec.get('script_type')}, script_update_mode={rec.get('script_update_mode')}, "
+                                f"schedule_frequency={rec.get('schedule_frequency')}, schedule_status={rec.get('schedule_status')}")
                 
                 
                 if records:
                 if records:
                     # 按脚本名称分组源表
                     # 按脚本名称分组源表
@@ -264,8 +346,10 @@ def get_table_info_from_neo4j(table_name):
                     for record in records:
                     for record in records:
                         script_name = record.get("script_name")
                         script_name = record.get("script_name")
                         source_table = record.get("source_table")
                         source_table = record.get("source_table")
-                        script_type = record.get("script_type", "python")
-                        script_exec_mode = record.get("script_exec_mode", "append")
+                        script_type = record.get("script_type")
+                        script_update_mode = record.get("script_update_mode")
+                        schedule_frequency = record.get("schedule_frequency")
+                        schedule_status = record.get("schedule_status")
                         
                         
                         logger.info(f"处理记录: source_table={source_table}, script_name={script_name}")
                         logger.info(f"处理记录: source_table={source_table}, script_name={script_name}")
                         
                         
@@ -278,7 +362,9 @@ def get_table_info_from_neo4j(table_name):
                             scripts_info[script_name] = {
                             scripts_info[script_name] = {
                                 "sources": [],
                                 "sources": [],
                                 "script_type": script_type,
                                 "script_type": script_type,
-                                "script_exec_mode": script_exec_mode
+                                "script_update_mode": script_update_mode,
+                                "schedule_frequency": schedule_frequency,
+                                "schedule_status": schedule_status
                             }
                             }
                         
                         
                         # 确保source_table有值且不为None才添加到sources列表中
                         # 确保source_table有值且不为None才添加到sources列表中
@@ -299,7 +385,9 @@ def get_table_info_from_neo4j(table_name):
                             table_info['source_tables'] = script_info["sources"]  # 使用数组
                             table_info['source_tables'] = script_info["sources"]  # 使用数组
                             table_info['script_name'] = script_name
                             table_info['script_name'] = script_name
                             table_info['script_type'] = script_info["script_type"]
                             table_info['script_type'] = script_info["script_type"]
-                            table_info['script_exec_mode'] = script_info["script_exec_mode"]
+                            table_info['script_update_mode'] = script_info["script_update_mode"]
+                            table_info['schedule_frequency'] = script_info["schedule_frequency"]
+                            table_info['schedule_status'] = script_info["schedule_status"]
                             logger.info(f"表 {table_name} 有单个脚本 {script_name},源表: {script_info['sources']}")
                             logger.info(f"表 {table_name} 有单个脚本 {script_name},源表: {script_info['sources']}")
                         else:
                         else:
                             # 如果有多个不同脚本,记录多脚本信息
                             # 如果有多个不同脚本,记录多脚本信息
@@ -309,7 +397,9 @@ def get_table_info_from_neo4j(table_name):
                             table_info['source_tables'] = scripts_info[first_script]["sources"]
                             table_info['source_tables'] = scripts_info[first_script]["sources"]
                             table_info['script_name'] = first_script
                             table_info['script_name'] = first_script
                             table_info['script_type'] = scripts_info[first_script]["script_type"]
                             table_info['script_type'] = scripts_info[first_script]["script_type"]
-                            table_info['script_exec_mode'] = scripts_info[first_script]["script_exec_mode"]
+                            table_info['script_update_mode'] = scripts_info[first_script]["script_update_mode"]
+                            table_info['schedule_frequency'] = scripts_info[first_script]["schedule_frequency"]
+                            table_info['schedule_status'] = scripts_info[first_script]["schedule_status"]
                     else:
                     else:
                         logger.warning(f"表 {table_name} 未找到有效的脚本信息")
                         logger.warning(f"表 {table_name} 未找到有效的脚本信息")
                         table_info['source_tables'] = []  # 使用空数组
                         table_info['source_tables'] = []  # 使用空数组
@@ -338,15 +428,19 @@ def get_table_info_from_neo4j(table_name):
                         script_name = f"{table_name}_script.py"
                         script_name = f"{table_name}_script.py"
                         table_info['source_tables'] = source_tables
                         table_info['source_tables'] = source_tables
                         table_info['script_name'] = script_name
                         table_info['script_name'] = script_name
-                        table_info['script_type'] = "python"
-                        table_info['script_exec_mode'] = "append"
+                        table_info['script_type'] = "python_script"
+                        table_info['script_update_mode'] = "append"
+                        table_info['schedule_frequency'] = "daily"
+                        table_info['schedule_status'] = True
                         
                         
                         # 创建scripts_info
                         # 创建scripts_info
                         table_info['scripts_info'] = {
                         table_info['scripts_info'] = {
                             script_name: {
                             script_name: {
                                 "sources": source_tables,
                                 "sources": source_tables,
-                                "script_type": "python",
-                                "script_exec_mode": "append"
+                                "script_type": "python_script",
+                                "script_update_mode": "append",
+                                "schedule_frequency": "daily",
+                                "schedule_status": True
                             }
                             }
                         }
                         }
                         
                         
@@ -355,30 +449,38 @@ def get_table_info_from_neo4j(table_name):
                         logger.warning(f"直接查询表 {table_name} 的依赖关系时出错: {str(e)}")
                         logger.warning(f"直接查询表 {table_name} 的依赖关系时出错: {str(e)}")
                         table_info['source_tables'] = []  # 使用空数组
                         table_info['source_tables'] = []  # 使用空数组
                         table_info['script_name'] = f"{table_name}_script.py"
                         table_info['script_name'] = f"{table_name}_script.py"
-                        table_info['script_type'] = "python"
-                        table_info['script_exec_mode'] = "append"
+                        table_info['script_type'] = "python_script"
+                        table_info['script_update_mode'] = "append"
+                        table_info['schedule_frequency'] = "daily"
+                        table_info['schedule_status'] = True
                         
                         
                         # 创建空的scripts_info
                         # 创建空的scripts_info
                         table_info['scripts_info'] = {
                         table_info['scripts_info'] = {
                             table_info['script_name']: {
                             table_info['script_name']: {
                                 "sources": [],
                                 "sources": [],
-                                "script_type": "python",
-                                "script_exec_mode": "append"
+                                "script_type": "python_script",
+                                "script_update_mode": "append",
+                                "schedule_frequency": "daily",
+                                "schedule_status": True
                             }
                             }
                         }
                         }
             else:
             else:
                 logger.warning(f"在Neo4j中找不到表 {table_name} 的信息,设置默认值")
                 logger.warning(f"在Neo4j中找不到表 {table_name} 的信息,设置默认值")
                 table_info['source_tables'] = []
                 table_info['source_tables'] = []
                 table_info['script_name'] = f"{table_name}_script.py"
                 table_info['script_name'] = f"{table_name}_script.py"
-                table_info['script_type'] = "python"
-                table_info['script_exec_mode'] = "append"
+                table_info['script_type'] = "python_script"
+                table_info['script_update_mode'] = "append"
+                table_info['schedule_frequency'] = "daily"
+                table_info['schedule_status'] = True
                 
                 
                 # 创建空的scripts_info
                 # 创建空的scripts_info
                 table_info['scripts_info'] = {
                 table_info['scripts_info'] = {
                     table_info['script_name']: {
                     table_info['script_name']: {
                         "sources": [],
                         "sources": [],
-                        "script_type": "python",
-                        "script_exec_mode": "append"
+                        "script_type": "python_script",
+                        "script_update_mode": "append",
+                        "schedule_frequency": "daily",
+                        "schedule_status": True
                     }
                     }
                 }
                 }
     except Exception as e:
     except Exception as e:
@@ -393,6 +495,7 @@ def get_table_info_from_neo4j(table_name):
     
     
     return table_info
     return table_info
 
 
+
 def process_dependencies(tables_info):
 def process_dependencies(tables_info):
     """处理表间依赖关系,添加被动调度的表"""
     """处理表间依赖关系,添加被动调度的表"""
     # 存储所有表信息的字典
     # 存储所有表信息的字典
@@ -420,9 +523,11 @@ def process_dependencies(tables_info):
                 if table_info.get('target_table_label') == 'DataModel':
                 if table_info.get('target_table_label') == 'DataModel':
                     # 查询其依赖表
                     # 查询其依赖表
                     query = """
                     query = """
-                        MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
+                        MATCH (dm {en_name: $table_name})-[rel:DERIVED_FROM]->(dep)
                         RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels, 
                         RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels, 
-                               dep.status AS dep_status, dep.frequency AS dep_frequency
+                               dep.status AS dep_status, rel.schedule_frequency AS schedule_frequency,
+                               rel.update_mode AS update_mode, rel.schedule_status AS schedule_status,
+                               rel.script_name AS script_name, rel.script_type AS script_type
                     """
                     """
                     try:
                     try:
                         result = session.run(query, table_name=table_name)
                         result = session.run(query, table_name=table_name)
@@ -434,7 +539,11 @@ def process_dependencies(tables_info):
                         dep_name = record.get("dep_name")
                         dep_name = record.get("dep_name")
                         dep_labels = record.get("dep_labels", [])
                         dep_labels = record.get("dep_labels", [])
                         dep_status = record.get("dep_status", True)
                         dep_status = record.get("dep_status", True)
-                        dep_frequency = record.get("dep_frequency")
+                        schedule_frequency = record.get("schedule_frequency")
+                        update_mode = record.get("update_mode")
+                        schedule_status = record.get("schedule_status", False)
+                        script_name = record.get("script_name")
+                        script_type = record.get("script_type")
                         
                         
                         # 处理未被直接调度的依赖表
                         # 处理未被直接调度的依赖表
                         if dep_name and dep_name not in all_tables:
                         if dep_name and dep_name not in all_tables:
@@ -444,9 +553,17 @@ def process_dependencies(tables_info):
                             dep_info = get_table_info_from_neo4j(dep_name)
                             dep_info = get_table_info_from_neo4j(dep_name)
                             dep_info['is_directly_schedule'] = False
                             dep_info['is_directly_schedule'] = False
                             
                             
-                            # 处理调度频率继承
-                            if not dep_info.get('frequency'):
-                                dep_info['frequency'] = table_info.get('frequency')
+                            # 手动更新一些可能从关系中获取到的属性
+                            if schedule_frequency:
+                                dep_info['schedule_frequency'] = schedule_frequency
+                            if update_mode:
+                                dep_info['script_update_mode'] = update_mode
+                            if schedule_status is not None:
+                                dep_info['schedule_status'] = schedule_status
+                            if script_name:
+                                dep_info['script_name'] = script_name
+                            if script_type:
+                                dep_info['script_type'] = script_type
                             
                             
                             all_tables[dep_name] = dep_info
                             all_tables[dep_name] = dep_info
     except Exception as e:
     except Exception as e:
@@ -461,6 +578,7 @@ def process_dependencies(tables_info):
     
     
     return list(all_tables.values())
     return list(all_tables.values())
 
 
+
 def filter_invalid_tables(tables_info):
 def filter_invalid_tables(tables_info):
     """过滤无效表及其依赖,使用NetworkX构建依赖图"""
     """过滤无效表及其依赖,使用NetworkX构建依赖图"""
     # 构建表名到索引的映射
     # 构建表名到索引的映射
@@ -550,6 +668,7 @@ def filter_invalid_tables(tables_info):
     
     
     return valid_tables
     return valid_tables
 
 
+
 def touch_product_scheduler_file():
 def touch_product_scheduler_file():
     """
     """
     更新产品线调度器DAG文件的修改时间,触发重新解析
     更新产品线调度器DAG文件的修改时间,触发重新解析
@@ -574,30 +693,30 @@ def touch_product_scheduler_file():
         logger.error(f"触发DAG重新解析时出错: {str(e)}")
         logger.error(f"触发DAG重新解析时出错: {str(e)}")
         return False
         return False
 
 
-def get_subscription_state_hash():
-    """获取订阅表状态的哈希值"""
-    try:
-        conn = get_pg_conn()
-        cursor = conn.cursor()
-        try:
-            cursor.execute("""
-                SELECT table_name, schedule_is_enabled
-                FROM schedule_status
-                ORDER BY table_name
-            """)
-            rows = cursor.fetchall()
-            # 将所有行拼接成一个字符串,然后计算哈希值
-            data_str = '|'.join(f"{row[0]}:{row[1]}" for row in rows)
-            return hashlib.md5(data_str.encode()).hexdigest()
-        except Exception as e:
-            logger.error(f"计算订阅表状态哈希值时出错: {str(e)}")
-            raise Exception(f"PostgreSQL查询订阅表状态失败: {str(e)}")
-        finally:
-            cursor.close()
-            conn.close()
-    except Exception as e:
-        logger.error(f"连接PostgreSQL数据库失败: {str(e)}")
-        raise Exception(f"无法连接PostgreSQL数据库: {str(e)}")
+# def get_subscription_state_hash():
+#     """获取订阅表状态的哈希值"""
+#     try:
+#         conn = get_pg_conn()
+#         cursor = conn.cursor()
+#         try:
+#             cursor.execute("""
+#                 SELECT table_name, schedule_is_enabled
+#                 FROM schedule_status
+#                 ORDER BY table_name
+#             """)
+#             rows = cursor.fetchall()
+#             # 将所有行拼接成一个字符串,然后计算哈希值
+#             data_str = '|'.join(f"{row[0]}:{row[1]}" for row in rows)
+#             return hashlib.md5(data_str.encode()).hexdigest()
+#         except Exception as e:
+#             logger.error(f"计算订阅表状态哈希值时出错: {str(e)}")
+#             raise Exception(f"PostgreSQL查询订阅表状态失败: {str(e)}")
+#         finally:
+#             cursor.close()
+#             conn.close()
+#     except Exception as e:
+#         logger.error(f"连接PostgreSQL数据库失败: {str(e)}")
+#         raise Exception(f"无法连接PostgreSQL数据库: {str(e)}")
 
 
 def check_execution_plan_in_db(**kwargs):
 def check_execution_plan_in_db(**kwargs):
     """
     """
@@ -607,8 +726,7 @@ def check_execution_plan_in_db(**kwargs):
     # 获取执行日期
     # 获取执行日期
     dag_run = kwargs.get('dag_run')
     dag_run = kwargs.get('dag_run')
     logical_date = dag_run.logical_date
     logical_date = dag_run.logical_date
-    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
-    exec_date = local_logical_date.strftime('%Y-%m-%d')
+    exec_date, local_logical_date = get_cn_exec_date(logical_date)
     logger.info(f"logical_date: {logical_date} ")
     logger.info(f"logical_date: {logical_date} ")
     logger.info(f"local_logical_date {local_logical_date} ")
     logger.info(f"local_logical_date {local_logical_date} ")
     logger.info(f"检查执行日期 exec_date {exec_date} 的执行计划是否存在于数据库中")
     logger.info(f"检查执行日期 exec_date {exec_date} 的执行计划是否存在于数据库中")
@@ -749,6 +867,7 @@ def generate_task_id(script_name, source_tables, target_table):
         # 没有源表时,只使用脚本名和目标表
         # 没有源表时,只使用脚本名和目标表
         return f"{script_base}_{target_table}"
         return f"{script_base}_{target_table}"
 
 
+
 def prepare_scripts_from_tables(tables_info):
 def prepare_scripts_from_tables(tables_info):
     """
     """
     将表信息转换为脚本信息
     将表信息转换为脚本信息
@@ -764,15 +883,17 @@ def prepare_scripts_from_tables(tables_info):
     for table in tables_info:
     for table in tables_info:
         target_table = table['target_table']
         target_table = table['target_table']
         target_table_label = table.get('target_table_label')
         target_table_label = table.get('target_table_label')
-        frequency = table.get('frequency')
+        schedule_frequency = table.get('schedule_frequency')
         
         
         # 处理表的脚本信息
         # 处理表的脚本信息
         if 'scripts_info' in table and table['scripts_info']:
         if 'scripts_info' in table and table['scripts_info']:
             # 表有多个脚本
             # 表有多个脚本
             for script_name, script_info in table['scripts_info'].items():
             for script_name, script_info in table['scripts_info'].items():
                 source_tables = script_info.get('sources', [])
                 source_tables = script_info.get('sources', [])
-                script_type = script_info.get('script_type', 'python')
-                script_exec_mode = script_info.get('script_exec_mode', 'append')
+                script_type = script_info.get('script_type', 'python_script')
+                script_update_mode = script_info.get('script_update_mode', 'append')
+                script_schedule_frequency = script_info.get('schedule_frequency', schedule_frequency)
+                script_schedule_status = script_info.get('schedule_status', True)
                 
                 
                 # 生成任务ID
                 # 生成任务ID
                 task_id = generate_task_id(script_name, source_tables, target_table)
                 task_id = generate_task_id(script_name, source_tables, target_table)
@@ -785,8 +906,9 @@ def prepare_scripts_from_tables(tables_info):
                     "target_table": target_table,
                     "target_table": target_table,
                     "target_table_label": target_table_label,
                     "target_table_label": target_table_label,
                     "script_type": script_type,
                     "script_type": script_type,
-                    "script_exec_mode": script_exec_mode,
-                    "frequency": frequency,
+                    "update_mode": script_update_mode,  # 使用update_mode代替script_update_mode
+                    "schedule_frequency": script_schedule_frequency,
+                    "schedule_status": script_schedule_status,
                     "task_id": task_id
                     "task_id": task_id
                 }
                 }
                 
                 
@@ -807,8 +929,10 @@ def prepare_scripts_from_tables(tables_info):
                 logger.warning(f"表 {target_table} 没有指定脚本名,使用默认值: {script_name}")
                 logger.warning(f"表 {target_table} 没有指定脚本名,使用默认值: {script_name}")
             
             
             source_tables = table.get('source_tables', [])
             source_tables = table.get('source_tables', [])
-            script_type = table.get('script_type', 'python')
-            script_exec_mode = table.get('script_exec_mode', 'append')
+            script_type = table.get('script_type', 'python_script')
+            script_update_mode = table.get('script_update_mode', 'append')
+            table_schedule_frequency = table.get('schedule_frequency', 'daily')
+            table_schedule_status = table.get('schedule_status', True)
             
             
             # 生成任务ID
             # 生成任务ID
             task_id = generate_task_id(script_name, source_tables, target_table)
             task_id = generate_task_id(script_name, source_tables, target_table)
@@ -821,8 +945,9 @@ def prepare_scripts_from_tables(tables_info):
                 "target_table": target_table,
                 "target_table": target_table,
                 "target_table_label": target_table_label,
                 "target_table_label": target_table_label,
                 "script_type": script_type,
                 "script_type": script_type,
-                "script_exec_mode": script_exec_mode,
-                "frequency": frequency,
+                "update_mode": script_update_mode,  # 使用update_mode代替script_update_mode
+                "schedule_frequency": table_schedule_frequency,
+                "schedule_status": table_schedule_status,
                 "task_id": task_id
                 "task_id": task_id
             }
             }
             
             
@@ -836,6 +961,7 @@ def prepare_scripts_from_tables(tables_info):
     
     
     return scripts
     return scripts
 
 
+
 def build_script_dependency_graph(scripts):
 def build_script_dependency_graph(scripts):
     """
     """
     处理脚本间的依赖关系
     处理脚本间的依赖关系
@@ -980,6 +1106,7 @@ def build_script_dependency_graph(scripts):
     
     
     return script_dependencies, G
     return script_dependencies, G
 
 
+
 def optimize_script_execution_order(scripts, script_dependencies, G):
 def optimize_script_execution_order(scripts, script_dependencies, G):
     """
     """
     使用NetworkX优化脚本执行顺序
     使用NetworkX优化脚本执行顺序
@@ -1035,6 +1162,7 @@ def set_dataops_dags_path_variable():
         logger.error(f"设置Airflow变量DATAOPS_DAGS_PATH失败: {str(e)}")
         logger.error(f"设置Airflow变量DATAOPS_DAGS_PATH失败: {str(e)}")
         return False
         return False
 
 
+
 def prepare_productline_dag_schedule(**kwargs):
 def prepare_productline_dag_schedule(**kwargs):
     """准备产品线DAG调度任务的主函数"""
     """准备产品线DAG调度任务的主函数"""
     # 添加更严格的异常处理
     # 添加更严格的异常处理
@@ -1069,23 +1197,23 @@ def prepare_productline_dag_schedule(**kwargs):
             raise Exception(f"检查执行计划失败,可能是数据库连接问题: {str(e)}")
             raise Exception(f"检查执行计划失败,可能是数据库连接问题: {str(e)}")
         
         
         # 条件2: schedule_status表中的数据发生了变更
         # 条件2: schedule_status表中的数据发生了变更
-        if not need_create_plan:
-            # 计算当前哈希值
-            current_hash = get_subscription_state_hash()
-            # 读取上次记录的哈希值
-            hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
-            last_hash = None
-            if os.path.exists(hash_file):
-                try:
-                    with open(hash_file, 'r') as f:
-                        last_hash = f.read().strip()
-                except Exception as e:
-                    logger.warning(f"读取上次订阅状态哈希值失败: {str(e)}")
+        # if not need_create_plan:
+        #     # 计算当前哈希值
+        #     current_hash = get_subscription_state_hash()
+        #     # 读取上次记录的哈希值
+        #     hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
+        #     last_hash = None
+        #     if os.path.exists(hash_file):
+        #         try:
+        #             with open(hash_file, 'r') as f:
+        #                 last_hash = f.read().strip()
+        #         except Exception as e:
+        #             logger.warning(f"读取上次订阅状态哈希值失败: {str(e)}")
             
             
-            # 如果哈希值不同,表示数据发生了变更
-            if current_hash != last_hash:
-                logger.info(f"检测到schedule_status表数据变更。旧哈希值: {last_hash}, 新哈希值: {current_hash}")
-                need_create_plan = True
+        #     # 如果哈希值不同,表示数据发生了变更
+        #     if current_hash != last_hash:
+        #         logger.info(f"检测到schedule_status表数据变更。旧哈希值: {last_hash}, 新哈希值: {current_hash}")
+        #         need_create_plan = True
         
         
         # 手动触发模式覆盖以上判断
         # 手动触发模式覆盖以上判断
         if is_manual_trigger:
         if is_manual_trigger:
@@ -1100,7 +1228,7 @@ def prepare_productline_dag_schedule(**kwargs):
         # 继续处理,创建新的执行计划
         # 继续处理,创建新的执行计划
         # 1. 获取启用的表
         # 1. 获取启用的表
         enabled_tables = get_enabled_tables()
         enabled_tables = get_enabled_tables()
-        logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
+        logger.info(f"获取到 {len(enabled_tables)} 个启用的表")
         
         
         if not enabled_tables:
         if not enabled_tables:
             logger.warning("没有找到启用的表,准备工作结束")
             logger.warning("没有找到启用的表,准备工作结束")
@@ -1119,13 +1247,13 @@ def prepare_productline_dag_schedule(**kwargs):
         filtered_tables_info = []
         filtered_tables_info = []
         for table_info in tables_info:
         for table_info in tables_info:
             table_name = table_info['target_table']
             table_name = table_info['target_table']
-            frequency = table_info.get('frequency')
+            schedule_frequency = table_info.get('schedule_frequency')
             
             
-            if should_execute_today(table_name, frequency, exec_date):
+            if should_execute_today(table_name, schedule_frequency, exec_date):
                 filtered_tables_info.append(table_info)
                 filtered_tables_info.append(table_info)
-                logger.info(f"表 {table_name} (频率: {frequency}) 将在今天{exec_date}执行")
+                logger.info(f"表 {table_name} (频率: {schedule_frequency}) 将在今天{exec_date}执行")
             else:
             else:
-                logger.info(f"表 {table_name} (频率: {frequency}) 今天{exec_date}不执行,已过滤")
+                logger.info(f"表 {table_name} (频率: {schedule_frequency}) 今天{exec_date}不执行,已过滤")
         
         
         logger.info(f"按调度频率过滤后,今天{exec_date}需要执行的表有 {len(filtered_tables_info)} 个")
         logger.info(f"按调度频率过滤后,今天{exec_date}需要执行的表有 {len(filtered_tables_info)} 个")
 
 
@@ -1262,11 +1390,11 @@ def prepare_productline_dag_schedule(**kwargs):
             }
             }
             
             
             # 10. 更新订阅表状态哈希值
             # 10. 更新订阅表状态哈希值
-            current_hash = get_subscription_state_hash()
-            hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
-            with open(hash_file, 'w') as f:
-                f.write(current_hash)
-            logger.info(f"已更新订阅表状态哈希值: {current_hash}")
+            # current_hash = get_subscription_state_hash()
+            # hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
+            # with open(hash_file, 'w') as f:
+            #     f.write(current_hash)
+            # logger.info(f"已更新订阅表状态哈希值: {current_hash}")
             
             
             # 11. 触发产品线执行DAG重新解析
             # 11. 触发产品线执行DAG重新解析
             touch_product_scheduler_file()
             touch_product_scheduler_file()
@@ -1325,7 +1453,7 @@ def prepare_productline_dag_schedule(**kwargs):
 # 创建DAG
 # 创建DAG
 with DAG(
 with DAG(
     "dataops_productline_prepare_dag",
     "dataops_productline_prepare_dag",
-    start_date=datetime(2024, 1, 1),
+    start_date=datetime(2025, 1, 1),
     # 每小时执行一次
     # 每小时执行一次
     schedule_interval="0 * * * *",
     schedule_interval="0 * * * *",
     catchup=False,
     catchup=False,