123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- # dag_dataops_resource_scheduler.py
- from airflow import DAG
- from airflow.operators.python import PythonOperator
- from airflow.operators.empty import EmptyOperator
- from airflow.sensors.external_task import ExternalTaskSensor
- from datetime import datetime, timedelta
- import logging
- from common import (
- get_pg_conn, execute_with_monitoring, get_today_date
- )
- # 创建日志记录器
- logger = logging.getLogger(__name__)
- # 添加获取最近日期的函数
- def get_latest_date_with_resources():
- """
- 获取数据库中包含DataResource记录的最近日期
-
- 用于查找数据库中最近的日期,以确保能够获取到数据
- """
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- cursor.execute("""
- SELECT DISTINCT exec_date
- FROM airflow_dag_schedule
- WHERE target_table_label = 'DataResource'
- ORDER BY exec_date DESC
- LIMIT 1
- """)
- result = cursor.fetchone()
- if result:
- latest_date = result[0]
- logger.info(f"找到最近的包含DataResource记录的日期: {latest_date}")
- return latest_date
- else:
- logger.warning("未找到包含DataResource记录的日期,将使用当前日期")
- return get_today_date()
- except Exception as e:
- logger.error(f"查找最近日期时出错: {str(e)}")
- return get_today_date()
- finally:
- cursor.close()
- conn.close()
- def get_dataresource_tasks(exec_date):
- """从airflow_dag_schedule表获取DataResource任务"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- # 构建SQL查询
- sql = """
- SELECT source_table, target_table, script_name, script_exec_mode
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND target_table_label = 'DataResource'
- """
- # 记录查询信息
- logger.info(f"查询资源任务,使用日期: {exec_date}")
-
- # 执行查询
- cursor.execute(sql, (exec_date,))
- results = cursor.fetchall()
- logger.info(f"使用日期 {exec_date} 查询到 {len(results)} 个DataResource任务")
-
- # 处理去重
- unique_tasks = {}
- for row in results:
- source_table, target_table, script_name, script_exec_mode = row
- # 使用目标表名作为键进行去重
- if target_table not in unique_tasks:
- unique_tasks[target_table] = {
- "source_table": source_table,
- "target_table": target_table,
- "script_name": script_name,
- "script_exec_mode": script_exec_mode or "append" # 默认值
- }
-
- logger.info(f"获取到 {len(results)} 个DataResource任务,去重后剩余 {len(unique_tasks)} 个")
- return list(unique_tasks.values())
- except Exception as e:
- logger.error(f"获取DataResource任务时出错: {str(e)}")
- return []
- finally:
- cursor.close()
- conn.close()
- def process_resource(target_table, script_name, script_exec_mode, **kwargs):
- """处理单个资源表的函数"""
- exec_date = kwargs.get('ds')
- logger.info(f"开始处理资源表: {target_table}, 脚本: {script_name}")
-
- try:
- # 调用执行函数
- result = execute_with_monitoring(
- target_table=target_table,
- script_name=script_name,
- script_exec_mode=script_exec_mode,
- exec_date=exec_date
- )
- logger.info(f"资源表 {target_table} 处理完成")
- return result
- except Exception as e:
- logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
- raise
- def generate_no_task_message(**kwargs):
- """当没有任务时执行的函数"""
- logger.info("没有资源需要处理")
- return "没有资源需要处理"
- # 创建DAG
- with DAG(
- "dag_dataops_resource_scheduler",
- start_date=datetime(2024, 1, 1),
- schedule_interval="@daily",
- catchup=False,
- default_args={
- 'owner': 'airflow',
- 'depends_on_past': False,
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'retries': 1,
- 'retry_delay': timedelta(minutes=5)
- }
- ) as dag:
-
- # 等待prepare DAG完成
- wait_for_prepare = ExternalTaskSensor(
- task_id="wait_for_prepare",
- external_dag_id="dag_dataops_prepare_scheduler",
- external_task_id="preparation_completed",
- mode="poke",
- timeout=3600,
- poke_interval=30,
- dag=dag
- )
-
- # 处理完成标记
- resource_processing_completed = EmptyOperator(
- task_id="resource_processing_completed",
- dag=dag
- )
-
- # 在DAG运行时获取最近日期和资源任务
- latest_date = get_latest_date_with_resources()
- logger.info(f"使用最近的日期 {latest_date} 查询资源任务")
-
- # 获取资源任务
- resource_tasks = get_dataresource_tasks(latest_date)
-
- if resource_tasks:
- for i, task_info in enumerate(resource_tasks):
- target_table = task_info["target_table"]
- script_name = task_info["script_name"]
- script_exec_mode = task_info["script_exec_mode"]
-
- if not script_name:
- logger.warning(f"资源表 {target_table} 没有关联脚本,跳过")
- continue
-
- # 为每个资源表创建单独的处理任务
- task_id = f"process_resource_{target_table.replace('.', '_')}"
- process_task = PythonOperator(
- task_id=task_id,
- python_callable=process_resource,
- op_kwargs={
- "target_table": target_table,
- "script_name": script_name,
- "script_exec_mode": script_exec_mode
- },
- provide_context=True,
- dag=dag
- )
-
- # 设置依赖 - 直接从wait_for_prepare连接到处理任务
- wait_for_prepare >> process_task >> resource_processing_completed
- else:
- # 如果没有任务,添加一个空任务
- empty_task = PythonOperator(
- task_id="no_resources_to_process",
- python_callable=generate_no_task_message,
- dag=dag
- )
- wait_for_prepare >> empty_task >> resource_processing_completed
|