|
@@ -2,7 +2,6 @@
|
|
from airflow import DAG
|
|
from airflow import DAG
|
|
from airflow.operators.python import PythonOperator
|
|
from airflow.operators.python import PythonOperator
|
|
from airflow.operators.empty import EmptyOperator
|
|
from airflow.operators.empty import EmptyOperator
|
|
-from airflow.sensors.external_task import ExternalTaskSensor
|
|
|
|
from datetime import datetime, timedelta, date
|
|
from datetime import datetime, timedelta, date
|
|
import logging
|
|
import logging
|
|
import networkx as nx
|
|
import networkx as nx
|
|
@@ -14,7 +13,7 @@ from common import (
|
|
execute_with_monitoring,
|
|
execute_with_monitoring,
|
|
get_today_date
|
|
get_today_date
|
|
)
|
|
)
|
|
-from config import TASK_RETRY_CONFIG
|
|
|
|
|
|
+from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, AIRFLOW_BASE_PATH
|
|
|
|
|
|
# 创建日志记录器
|
|
# 创建日志记录器
|
|
logger = logging.getLogger(__name__)
|
|
logger = logging.getLogger(__name__)
|
|
@@ -183,22 +182,35 @@ def prepare_unified_execution_plan(**kwargs):
|
|
logger.info(f"资源表任务: {resource_names}")
|
|
logger.info(f"资源表任务: {resource_names}")
|
|
logger.info(f"模型表任务: {model_names}")
|
|
logger.info(f"模型表任务: {model_names}")
|
|
|
|
|
|
- # 将执行计划保存到XCom,使用自定义序列化器处理日期对象
|
|
|
|
- try:
|
|
|
|
- kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
|
|
|
|
- logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"将执行计划保存到XCom时出错: {str(e)}")
|
|
|
|
-
|
|
|
|
- # 保存执行计划到文件以备后用
|
|
|
|
|
|
+ # 已经不需要推送到XCom,因为我们直接从文件读取
|
|
|
|
+ # 这里仅用于验证执行计划与文件中的是否一致
|
|
|
|
+ plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
|
|
|
|
+ ready_path = f"{plan_path}.ready"
|
|
|
|
+
|
|
|
|
+ if os.path.exists(plan_path) and os.path.exists(ready_path):
|
|
try:
|
|
try:
|
|
- plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
|
|
|
|
- with open(plan_path, 'w') as f:
|
|
|
|
- json.dump(execution_plan, f, default=json_serial)
|
|
|
|
- logger.info(f"将执行计划保存到文件: {plan_path}")
|
|
|
|
- except Exception as file_e:
|
|
|
|
- logger.error(f"保存执行计划到文件时出错: {str(file_e)}")
|
|
|
|
|
|
+ with open(plan_path, 'r') as f:
|
|
|
|
+ existing_plan = json.load(f)
|
|
|
|
+
|
|
|
|
+ # 比较执行计划是否有变化
|
|
|
|
+ existing_resources = sorted([t.get("target_table") for t in existing_plan.get("resource_tasks", [])])
|
|
|
|
+ current_resources = sorted(resource_names)
|
|
|
|
+
|
|
|
|
+ existing_models = sorted([t.get("target_table") for t in existing_plan.get("model_tasks", [])])
|
|
|
|
+ current_models = sorted(model_names)
|
|
|
|
+
|
|
|
|
+ if existing_resources == current_resources and existing_models == current_models:
|
|
|
|
+ logger.info("执行计划无变化,继续使用现有任务结构")
|
|
|
|
+ else:
|
|
|
|
+ logger.warning("执行计划与现有文件不一致,但DAG结构已固定,需等待下次解析")
|
|
|
|
+ logger.warning(f"现有资源表: {existing_resources}")
|
|
|
|
+ logger.warning(f"当前资源表: {current_resources}")
|
|
|
|
+ logger.warning(f"现有模型表: {existing_models}")
|
|
|
|
+ logger.warning(f"当前模型表: {current_models}")
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"比较执行计划时出错: {str(e)}")
|
|
|
|
|
|
|
|
+ logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
|
|
return len(resource_tasks) + len(model_tasks)
|
|
return len(resource_tasks) + len(model_tasks)
|
|
|
|
|
|
def process_resource(target_table, script_name, script_exec_mode, exec_date):
|
|
def process_resource(target_table, script_name, script_exec_mode, exec_date):
|
|
@@ -256,14 +268,22 @@ def process_model(target_table, script_name, script_exec_mode, exec_date):
|
|
# 确保即使出错也返回结果,不会阻塞DAG
|
|
# 确保即使出错也返回结果,不会阻塞DAG
|
|
return False
|
|
return False
|
|
|
|
|
|
-# 预先加载数据以创建任务
|
|
|
|
|
|
+# 修改预先加载数据以创建任务的逻辑
|
|
try:
|
|
try:
|
|
logger.info("预先加载执行计划数据用于构建DAG")
|
|
logger.info("预先加载执行计划数据用于构建DAG")
|
|
plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
|
|
plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
|
|
|
|
+ ready_path = f"{plan_path}.ready"
|
|
execution_plan = {"exec_date": get_today_date(), "resource_tasks": [], "model_tasks": [], "dependencies": {}}
|
|
execution_plan = {"exec_date": get_today_date(), "resource_tasks": [], "model_tasks": [], "dependencies": {}}
|
|
|
|
|
|
- if os.path.exists(plan_path):
|
|
|
|
|
|
+ # 首先检查ready文件是否存在,确保JSON文件已完整生成
|
|
|
|
+ if os.path.exists(ready_path) and os.path.exists(plan_path):
|
|
try:
|
|
try:
|
|
|
|
+ # 读取ready文件中的时间戳
|
|
|
|
+ with open(ready_path, 'r') as f:
|
|
|
|
+ ready_timestamp = f.read().strip()
|
|
|
|
+ logger.info(f"执行计划ready标记时间: {ready_timestamp}")
|
|
|
|
+
|
|
|
|
+ # 读取执行计划文件
|
|
with open(plan_path, 'r') as f:
|
|
with open(plan_path, 'r') as f:
|
|
execution_plan_json = f.read()
|
|
execution_plan_json = f.read()
|
|
execution_plan = json.loads(execution_plan_json)
|
|
execution_plan = json.loads(execution_plan_json)
|
|
@@ -271,8 +291,12 @@ try:
|
|
except Exception as e:
|
|
except Exception as e:
|
|
logger.warning(f"读取执行计划文件出错: {str(e)}")
|
|
logger.warning(f"读取执行计划文件出错: {str(e)}")
|
|
else:
|
|
else:
|
|
- logger.warning(f"执行计划文件不存在: {plan_path},将创建基础DAG结构")
|
|
|
|
-
|
|
|
|
|
|
+ if not os.path.exists(ready_path):
|
|
|
|
+ logger.warning(f"执行计划ready标记文件不存在: {ready_path}")
|
|
|
|
+ if not os.path.exists(plan_path):
|
|
|
|
+ logger.warning(f"执行计划文件不存在: {plan_path}")
|
|
|
|
+ logger.warning("将创建基础DAG结构")
|
|
|
|
+
|
|
# 提取信息
|
|
# 提取信息
|
|
exec_date = execution_plan.get("exec_date", get_today_date())
|
|
exec_date = execution_plan.get("exec_date", get_today_date())
|
|
resource_tasks = execution_plan.get("resource_tasks", [])
|
|
resource_tasks = execution_plan.get("resource_tasks", [])
|
|
@@ -295,7 +319,8 @@ def handle_dag_failure(context):
|
|
with DAG(
|
|
with DAG(
|
|
"dag_dataops_unified_data_scheduler",
|
|
"dag_dataops_unified_data_scheduler",
|
|
start_date=datetime(2024, 1, 1),
|
|
start_date=datetime(2024, 1, 1),
|
|
- schedule_interval="@daily",
|
|
|
|
|
|
+ # 修改调度间隔为每15分钟检查一次,以便及时响应执行计划变化
|
|
|
|
+ schedule_interval="*/10 * * * *",
|
|
catchup=False,
|
|
catchup=False,
|
|
default_args={
|
|
default_args={
|
|
'owner': 'airflow',
|
|
'owner': 'airflow',
|
|
@@ -305,23 +330,14 @@ with DAG(
|
|
'retries': 1,
|
|
'retries': 1,
|
|
'retry_delay': timedelta(minutes=5)
|
|
'retry_delay': timedelta(minutes=5)
|
|
},
|
|
},
|
|
- on_failure_callback=handle_dag_failure # 在这里设置回调函数
|
|
|
|
|
|
+ on_failure_callback=handle_dag_failure,
|
|
|
|
+ # 添加DAG级别参数,确保任务运行时有正确的环境
|
|
|
|
+ params={
|
|
|
|
+ "scripts_path": SCRIPTS_BASE_PATH,
|
|
|
|
+ "airflow_base_path": AIRFLOW_BASE_PATH
|
|
|
|
+ }
|
|
) as dag:
|
|
) as dag:
|
|
|
|
|
|
- # 等待准备DAG完成
|
|
|
|
- wait_for_prepare = ExternalTaskSensor(
|
|
|
|
- task_id="wait_for_prepare",
|
|
|
|
- external_dag_id="dag_dataops_unified_prepare_scheduler",
|
|
|
|
- external_task_id="preparation_completed",
|
|
|
|
- mode="reschedule", # 改为reschedule模式,减少资源占用
|
|
|
|
- timeout=3600,
|
|
|
|
- poke_interval=30,
|
|
|
|
- execution_timeout=timedelta(hours=1),
|
|
|
|
- soft_fail=True,
|
|
|
|
- allowed_states=["success", "skipped"],
|
|
|
|
- dag=dag
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
# 准备执行计划
|
|
# 准备执行计划
|
|
prepare_plan = PythonOperator(
|
|
prepare_plan = PythonOperator(
|
|
task_id="prepare_execution_plan",
|
|
task_id="prepare_execution_plan",
|
|
@@ -337,12 +353,18 @@ with DAG(
|
|
dag=dag
|
|
dag=dag
|
|
)
|
|
)
|
|
|
|
|
|
- # 设置初始任务依赖
|
|
|
|
- wait_for_prepare >> prepare_plan
|
|
|
|
-
|
|
|
|
# 任务字典,用于设置依赖关系
|
|
# 任务字典,用于设置依赖关系
|
|
task_dict = {}
|
|
task_dict = {}
|
|
|
|
|
|
|
|
+ # 添加一个空任务作为下游任务的起始点,确保即使没有资源表和模型表,DAG也能正常执行
|
|
|
|
+ start_processing = EmptyOperator(
|
|
|
|
+ task_id="start_processing",
|
|
|
|
+ dag=dag
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 设置基本依赖
|
|
|
|
+ prepare_plan >> start_processing
|
|
|
|
+
|
|
# 1. 预先创建资源表任务
|
|
# 1. 预先创建资源表任务
|
|
for task_info in resource_tasks:
|
|
for task_info in resource_tasks:
|
|
table_name = task_info["target_table"]
|
|
table_name = task_info["target_table"]
|
|
@@ -370,9 +392,9 @@ with DAG(
|
|
# 将任务添加到字典
|
|
# 将任务添加到字典
|
|
task_dict[table_name] = resource_task
|
|
task_dict[table_name] = resource_task
|
|
|
|
|
|
- # 设置与prepare_plan的依赖 - 直接依赖,不需要其他条件
|
|
|
|
- prepare_plan >> resource_task
|
|
|
|
- logger.info(f"预先设置基本依赖: prepare_plan >> {task_id}")
|
|
|
|
|
|
+ # 设置与start_processing的依赖
|
|
|
|
+ start_processing >> resource_task
|
|
|
|
+ logger.info(f"设置基本依赖: start_processing >> {task_id}")
|
|
|
|
|
|
# 创建有向图,用于检测模型表之间的依赖关系
|
|
# 创建有向图,用于检测模型表之间的依赖关系
|
|
G = nx.DiGraph()
|
|
G = nx.DiGraph()
|
|
@@ -449,11 +471,11 @@ with DAG(
|
|
has_dependency = True
|
|
has_dependency = True
|
|
logger.info(f"预先设置依赖: {dep_table} >> {table_name}")
|
|
logger.info(f"预先设置依赖: {dep_table} >> {table_name}")
|
|
|
|
|
|
- # 如果没有依赖,则依赖于准备任务和所有资源表任务
|
|
|
|
|
|
+ # 如果没有依赖,则依赖于start_processing和资源表任务
|
|
if not has_dependency:
|
|
if not has_dependency:
|
|
- # 从prepare_plan任务直接连接
|
|
|
|
- prepare_plan >> model_task
|
|
|
|
- logger.info(f"预先设置基本依赖: prepare_plan >> {task_id}")
|
|
|
|
|
|
+ # 从start_processing任务直接连接
|
|
|
|
+ start_processing >> model_task
|
|
|
|
+ logger.info(f"设置基本依赖: start_processing >> {task_id}")
|
|
|
|
|
|
# 同时从所有资源表任务连接 - 限制每个模型表最多依赖5个资源表,避免过度复杂的依赖关系
|
|
# 同时从所有资源表任务连接 - 限制每个模型表最多依赖5个资源表,避免过度复杂的依赖关系
|
|
resource_count = 0
|
|
resource_count = 0
|
|
@@ -492,10 +514,10 @@ with DAG(
|
|
if not model_tasks and resource_tasks:
|
|
if not model_tasks and resource_tasks:
|
|
terminal_tasks = [task["target_table"] for task in resource_tasks]
|
|
terminal_tasks = [task["target_table"] for task in resource_tasks]
|
|
|
|
|
|
- # 如果既没有模型表任务也没有资源表任务,直接连接准备任务到完成标记
|
|
|
|
|
|
+ # 如果既没有模型表任务也没有资源表任务,直接连接start_processing到完成标记
|
|
if not terminal_tasks:
|
|
if not terminal_tasks:
|
|
- prepare_plan >> processing_completed
|
|
|
|
- logger.warning("未找到任何任务,直接连接准备任务到完成标记")
|
|
|
|
|
|
+ start_processing >> processing_completed
|
|
|
|
+ logger.warning("未找到任何任务,直接连接start_processing到完成标记")
|
|
else:
|
|
else:
|
|
# 将所有终端任务连接到完成标记
|
|
# 将所有终端任务连接到完成标记
|
|
for table_name in terminal_tasks:
|
|
for table_name in terminal_tasks:
|