123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528 |
- # dag_dataops_unified_data_scheduler.py
- from airflow import DAG
- from airflow.operators.python import PythonOperator
- from airflow.operators.empty import EmptyOperator
- from datetime import datetime, timedelta, date
- import logging
- import networkx as nx
- import json
- import os
- from common import (
- get_pg_conn,
- get_neo4j_driver,
- execute_with_monitoring,
- get_today_date
- )
- from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, AIRFLOW_BASE_PATH
- # 创建日志记录器
- logger = logging.getLogger(__name__)
- def get_latest_date():
- """获取数据库中包含记录的最近日期"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- cursor.execute("""
- SELECT DISTINCT exec_date
- FROM airflow_dag_schedule
- ORDER BY exec_date DESC
- LIMIT 1
- """)
- result = cursor.fetchone()
- if result:
- latest_date = result[0]
- logger.info(f"找到最近的包含记录的日期: {latest_date}")
- return latest_date
- else:
- logger.warning("未找到包含记录的日期,将使用当前日期")
- return get_today_date()
- except Exception as e:
- logger.error(f"查找最近日期时出错: {str(e)}")
- return get_today_date()
- finally:
- cursor.close()
- conn.close()
- def get_all_tasks(exec_date):
- """获取所有需要执行的任务(DataResource和DataModel)"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- # 查询数据表中记录总数
- cursor.execute("""
- SELECT COUNT(*)
- FROM airflow_dag_schedule
- WHERE exec_date = %s
- """, (exec_date,))
- total_count = cursor.fetchone()[0]
- logger.info(f"执行日期 {exec_date} 在airflow_dag_schedule表中共有 {total_count} 条记录")
-
- # 查询所有资源表任务
- cursor.execute("""
- SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
- """, (exec_date,))
- resource_results = cursor.fetchall()
- logger.info(f"查询到 {len(resource_results)} 条DataResource记录")
-
- # 查询所有模型表任务
- cursor.execute("""
- SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
- """, (exec_date,))
- model_results = cursor.fetchall()
- logger.info(f"查询到 {len(model_results)} 条DataModel记录")
-
- # 整理资源表信息
- resource_tasks = []
- for row in resource_results:
- source_table, target_table, target_table_label, script_name, script_exec_mode = row
- if script_name: # 确保脚本名称不为空
- resource_tasks.append({
- "source_table": source_table,
- "target_table": target_table,
- "target_table_label": target_table_label,
- "script_name": script_name,
- "script_exec_mode": script_exec_mode or "append"
- })
-
- # 整理模型表信息
- model_tasks = []
- for row in model_results:
- source_table, target_table, target_table_label, script_name, script_exec_mode = row
- if script_name: # 确保脚本名称不为空
- model_tasks.append({
- "source_table": source_table,
- "target_table": target_table,
- "target_table_label": target_table_label,
- "script_name": script_name,
- "script_exec_mode": script_exec_mode or "append"
- })
-
- logger.info(f"获取到 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
- return resource_tasks, model_tasks
- except Exception as e:
- logger.error(f"获取任务信息时出错: {str(e)}")
- return [], []
- finally:
- cursor.close()
- conn.close()
- def get_table_dependencies(table_names):
- """获取表之间的依赖关系"""
- driver = get_neo4j_driver()
- dependency_dict = {name: [] for name in table_names}
-
- try:
- with driver.session() as session:
- # 获取所有模型表之间的依赖关系
- query = """
- MATCH (source:DataModel)-[:DERIVED_FROM]->(target)
- WHERE source.en_name IN $table_names
- RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
- """
- result = session.run(query, table_names=table_names)
-
- for record in result:
- source = record.get("source")
- target = record.get("target")
- target_labels = record.get("target_labels", [])
-
- if source and target:
- # 将目标表添加到源表的依赖列表中
- dependency_dict[source].append({
- "table_name": target,
- "table_type": next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
- })
- logger.debug(f"依赖关系: {source} 依赖于 {target}")
- except Exception as e:
- logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
- finally:
- driver.close()
-
- return dependency_dict
- def json_serial(obj):
- """将日期对象序列化为ISO格式字符串的JSON序列化器"""
- if isinstance(obj, (datetime, date)):
- return obj.isoformat()
- raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
- def prepare_unified_execution_plan(**kwargs):
- """准备统一执行计划的主函数"""
- # 获取执行日期
- exec_date = get_latest_date()
- logger.info(f"使用执行日期: {exec_date}")
-
- # 获取所有任务
- resource_tasks, model_tasks = get_all_tasks(exec_date)
-
- if not resource_tasks and not model_tasks:
- logger.warning(f"执行日期 {exec_date} 没有找到任务")
- return 0
-
- # 为所有模型表获取依赖关系
- model_table_names = [task["target_table"] for task in model_tasks]
- dependencies = get_table_dependencies(model_table_names)
-
- # 创建执行计划
- execution_plan = {
- "exec_date": exec_date,
- "resource_tasks": resource_tasks,
- "model_tasks": model_tasks,
- "dependencies": dependencies
- }
-
- # 记录资源任务和模型任务的名称,便于调试
- resource_names = [task["target_table"] for task in resource_tasks]
- model_names = [task["target_table"] for task in model_tasks]
- logger.info(f"资源表任务: {resource_names}")
- logger.info(f"模型表任务: {model_names}")
-
- # 已经不需要推送到XCom,因为我们直接从文件读取
- # 这里仅用于验证执行计划与文件中的是否一致
- plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
- ready_path = f"{plan_path}.ready"
-
- if os.path.exists(plan_path) and os.path.exists(ready_path):
- try:
- with open(plan_path, 'r') as f:
- existing_plan = json.load(f)
-
- # 比较执行计划是否有变化
- existing_resources = sorted([t.get("target_table") for t in existing_plan.get("resource_tasks", [])])
- current_resources = sorted(resource_names)
-
- existing_models = sorted([t.get("target_table") for t in existing_plan.get("model_tasks", [])])
- current_models = sorted(model_names)
-
- if existing_resources == current_resources and existing_models == current_models:
- logger.info("执行计划无变化,继续使用现有任务结构")
- else:
- logger.warning("执行计划与现有文件不一致,但DAG结构已固定,需等待下次解析")
- logger.warning(f"现有资源表: {existing_resources}")
- logger.warning(f"当前资源表: {current_resources}")
- logger.warning(f"现有模型表: {existing_models}")
- logger.warning(f"当前模型表: {current_models}")
- except Exception as e:
- logger.error(f"比较执行计划时出错: {str(e)}")
-
- logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
- return len(resource_tasks) + len(model_tasks)
- def process_resource(target_table, script_name, script_exec_mode, exec_date):
- """处理单个资源表"""
- logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
- # 检查exec_date是否是JSON字符串
- if isinstance(exec_date, str) and exec_date.startswith('{'):
- try:
- # 尝试解析JSON字符串
- exec_date_data = json.loads(exec_date)
- exec_date = exec_date_data.get("exec_date")
- logger.info(f"从JSON中提取执行日期: {exec_date}")
- except Exception as e:
- logger.error(f"解析exec_date JSON时出错: {str(e)}")
-
- try:
- # 正常调用执行监控函数
- result = execute_with_monitoring(
- target_table=target_table,
- script_name=script_name,
- script_exec_mode=script_exec_mode,
- exec_date=exec_date
- )
- logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
- return result
- except Exception as e:
- logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
- # 确保即使出错也返回结果,不会阻塞DAG
- return False
- def process_model(target_table, script_name, script_exec_mode, exec_date):
- """处理单个模型表"""
- logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
- # 检查exec_date是否是JSON字符串
- if isinstance(exec_date, str) and exec_date.startswith('{'):
- try:
- # 尝试解析JSON字符串
- exec_date_data = json.loads(exec_date)
- exec_date = exec_date_data.get("exec_date")
- logger.info(f"从JSON中提取执行日期: {exec_date}")
- except Exception as e:
- logger.error(f"解析exec_date JSON时出错: {str(e)}")
-
- try:
- result = execute_with_monitoring(
- target_table=target_table,
- script_name=script_name,
- script_exec_mode=script_exec_mode,
- exec_date=exec_date
- )
- logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
- return result
- except Exception as e:
- logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
- # 确保即使出错也返回结果,不会阻塞DAG
- return False
- # 修改预先加载数据以创建任务的逻辑
- try:
- logger.info("预先加载执行计划数据用于构建DAG")
- plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
- ready_path = f"{plan_path}.ready"
- execution_plan = {"exec_date": get_today_date(), "resource_tasks": [], "model_tasks": [], "dependencies": {}}
-
- # 首先检查ready文件是否存在,确保JSON文件已完整生成
- if os.path.exists(ready_path) and os.path.exists(plan_path):
- try:
- # 读取ready文件中的时间戳
- with open(ready_path, 'r') as f:
- ready_timestamp = f.read().strip()
- logger.info(f"执行计划ready标记时间: {ready_timestamp}")
-
- # 读取执行计划文件
- with open(plan_path, 'r') as f:
- execution_plan_json = f.read()
- execution_plan = json.loads(execution_plan_json)
- logger.info(f"从文件加载执行计划: {plan_path}")
- except Exception as e:
- logger.warning(f"读取执行计划文件出错: {str(e)}")
- else:
- if not os.path.exists(ready_path):
- logger.warning(f"执行计划ready标记文件不存在: {ready_path}")
- if not os.path.exists(plan_path):
- logger.warning(f"执行计划文件不存在: {plan_path}")
- logger.warning("将创建基础DAG结构")
-
- # 提取信息
- exec_date = execution_plan.get("exec_date", get_today_date())
- resource_tasks = execution_plan.get("resource_tasks", [])
- model_tasks = execution_plan.get("model_tasks", [])
- dependencies = execution_plan.get("dependencies", {})
-
- logger.info(f"预加载执行计划: exec_date={exec_date}, resource_tasks数量={len(resource_tasks)}, model_tasks数量={len(model_tasks)}")
- except Exception as e:
- logger.error(f"预加载执行计划数据时出错: {str(e)}")
- exec_date = get_today_date()
- resource_tasks = []
- model_tasks = []
- dependencies = {}
- # 定义处理DAG失败的回调函数
- def handle_dag_failure(context):
- logger.error(f"DAG执行失败: {context.get('exception')}")
- # 创建DAG
- with DAG(
- "dag_dataops_unified_data_scheduler",
- start_date=datetime(2024, 1, 1),
- # 修改调度间隔为每15分钟检查一次,以便及时响应执行计划变化
- schedule_interval="*/10 * * * *",
- catchup=False,
- default_args={
- 'owner': 'airflow',
- 'depends_on_past': False,
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'retries': 1,
- 'retry_delay': timedelta(minutes=5)
- },
- on_failure_callback=handle_dag_failure,
- # 添加DAG级别参数,确保任务运行时有正确的环境
- params={
- "scripts_path": SCRIPTS_BASE_PATH,
- "airflow_base_path": AIRFLOW_BASE_PATH
- }
- ) as dag:
-
- # 准备执行计划
- prepare_plan = PythonOperator(
- task_id="prepare_execution_plan",
- python_callable=prepare_unified_execution_plan,
- provide_context=True,
- dag=dag
- )
-
- # 处理完成标记
- processing_completed = EmptyOperator(
- task_id="processing_completed",
- trigger_rule="none_failed_min_one_success", # 只要有一个任务成功且没有失败的任务就标记为完成
- dag=dag
- )
-
- # 任务字典,用于设置依赖关系
- task_dict = {}
-
- # 添加一个空任务作为下游任务的起始点,确保即使没有资源表和模型表,DAG也能正常执行
- start_processing = EmptyOperator(
- task_id="start_processing",
- dag=dag
- )
-
- # 设置基本依赖
- prepare_plan >> start_processing
-
- # 1. 预先创建资源表任务
- for task_info in resource_tasks:
- table_name = task_info["target_table"]
- script_name = task_info["script_name"]
- exec_mode = task_info.get("script_exec_mode", "append")
-
- # 创建安全的任务ID
- safe_table_name = table_name.replace(".", "_").replace("-", "_")
- task_id = f"resource_{safe_table_name}"
-
- resource_task = PythonOperator(
- task_id=task_id,
- python_callable=process_resource,
- op_kwargs={
- "target_table": table_name,
- "script_name": script_name,
- "script_exec_mode": exec_mode,
- "exec_date": exec_date # 直接使用解析出的exec_date,不使用XCom
- },
- retries=TASK_RETRY_CONFIG["retries"],
- retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
- dag=dag
- )
-
- # 将任务添加到字典
- task_dict[table_name] = resource_task
-
- # 设置与start_processing的依赖
- start_processing >> resource_task
- logger.info(f"设置基本依赖: start_processing >> {task_id}")
-
- # 创建有向图,用于检测模型表之间的依赖关系
- G = nx.DiGraph()
-
- # 将所有模型表添加为节点
- for task_info in model_tasks:
- table_name = task_info["target_table"]
- G.add_node(table_name)
-
- # 添加模型表之间的依赖边
- for source, deps in dependencies.items():
- for dep in deps:
- if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
- G.add_edge(dep.get("table_name"), source) # 依赖方向:依赖项 -> 目标
-
- # 检测循环依赖并处理
- cycles = list(nx.simple_cycles(G))
- if cycles:
- logger.warning(f"检测到循环依赖: {cycles}")
- for cycle in cycles:
- G.remove_edge(cycle[-1], cycle[0])
- logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
-
- # 生成拓扑排序,确定执行顺序
- execution_order = []
- try:
- execution_order = list(nx.topological_sort(G))
- logger.info(f"预加载计算的执行顺序: {execution_order}")
- except Exception as e:
- logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
- execution_order = [task_info["target_table"] for task_info in model_tasks]
-
- # 2. 预先创建模型表任务
- for table_name in execution_order:
- task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
- if not task_info:
- continue
-
- script_name = task_info["script_name"]
- exec_mode = task_info.get("script_exec_mode", "append")
-
- # 创建安全的任务ID
- safe_table_name = table_name.replace(".", "_").replace("-", "_")
- task_id = f"model_{safe_table_name}"
-
- model_task = PythonOperator(
- task_id=task_id,
- python_callable=process_model,
- op_kwargs={
- "target_table": table_name,
- "script_name": script_name,
- "script_exec_mode": exec_mode,
- "exec_date": exec_date # 直接使用解析出的exec_date,不使用XCom
- },
- retries=TASK_RETRY_CONFIG["retries"],
- retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
- dag=dag
- )
-
- # 将任务添加到字典
- task_dict[table_name] = model_task
-
- # 设置依赖关系
- deps = dependencies.get(table_name, [])
- has_dependency = False
-
- # 处理模型表之间的依赖
- for dep in deps:
- dep_table = dep.get("table_name")
- dep_type = dep.get("table_type")
-
- if dep_table in task_dict:
- task_dict[dep_table] >> model_task
- has_dependency = True
- logger.info(f"预先设置依赖: {dep_table} >> {table_name}")
-
- # 如果没有依赖,则依赖于start_processing和资源表任务
- if not has_dependency:
- # 从start_processing任务直接连接
- start_processing >> model_task
- logger.info(f"设置基本依赖: start_processing >> {task_id}")
-
- # 同时从所有资源表任务连接 - 限制每个模型表最多依赖5个资源表,避免过度复杂的依赖关系
- resource_count = 0
- for resource_table in resource_tasks:
- if resource_count >= 5:
- break
-
- resource_name = resource_table["target_table"]
- if resource_name in task_dict:
- task_dict[resource_name] >> model_task
- logger.info(f"预先设置资源依赖: {resource_name} >> {table_name}")
- resource_count += 1
-
- # 找出所有终端任务(没有下游依赖的任务)
- terminal_tasks = []
-
- # 检查所有模型表任务
- for table_name in execution_order:
- # 检查是否有下游任务
- has_downstream = False
- for source, deps in dependencies.items():
- if source == table_name: # 跳过自身
- continue
- for dep in deps:
- if dep.get("table_name") == table_name:
- has_downstream = True
- break
- if has_downstream:
- break
-
- # 如果没有下游任务,添加到终端任务列表
- if not has_downstream and table_name in task_dict:
- terminal_tasks.append(table_name)
-
- # 如果没有模型表任务,将所有资源表任务视为终端任务
- if not model_tasks and resource_tasks:
- terminal_tasks = [task["target_table"] for task in resource_tasks]
-
- # 如果既没有模型表任务也没有资源表任务,直接连接start_processing到完成标记
- if not terminal_tasks:
- start_processing >> processing_completed
- logger.warning("未找到任何任务,直接连接start_processing到完成标记")
- else:
- # 将所有终端任务连接到完成标记
- for table_name in terminal_tasks:
- if table_name in task_dict:
- task_dict[table_name] >> processing_completed
- logger.info(f"设置终端任务: {table_name} >> processing_completed")
- logger.info(f"DAG dag_dataops_unified_data_scheduler 定义完成,预创建了 {len(task_dict)} 个任务")
|