dag_data_resource_scheduler.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. # dag_data_resource_scheduler.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from datetime import datetime
  6. import pendulum
  7. import logging
  8. import os
  9. from pathlib import Path
  10. from neo4j import GraphDatabase
  11. from utils import (
  12. get_enabled_tables,
  13. get_resource_subscribed_tables,
  14. get_dependency_resource_tables,
  15. check_script_exists
  16. )
  17. from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH
  18. # 创建日志记录器
  19. logger = logging.getLogger(__name__)
  20. def get_resource_script_name_from_neo4j(table_name):
  21. """
  22. 从Neo4j数据库中查询DataResource表对应的脚本名称
  23. 这个函数直接在当前文件中实现,而不是从utils导入
  24. 参数:
  25. table_name (str): 数据资源表名
  26. 返回:
  27. str: 脚本名称,如果未找到则返回None
  28. """
  29. # 使用导入的Neo4j配置
  30. uri = NEO4J_CONFIG['uri']
  31. auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  32. driver = GraphDatabase.driver(uri, auth=auth)
  33. query = """
  34. MATCH (dr:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(ds:DataSource)
  35. RETURN rel.script_name AS script_name
  36. """
  37. try:
  38. with driver.session() as session:
  39. result = session.run(query, table_name=table_name)
  40. record = result.single()
  41. if record:
  42. logger.info(f"找到表 {table_name} 的完整记录: {record}")
  43. try:
  44. script_name = record['script_name']
  45. logger.info(f"找到表 {table_name} 的 script_name: {script_name}")
  46. return script_name
  47. except (KeyError, TypeError) as e:
  48. logger.warning(f"记录中不包含script_name字段: {e}")
  49. return None
  50. else:
  51. logger.warning(f"未找到表 {table_name} 的记录")
  52. return None
  53. except Exception as e:
  54. logger.error(f"查询表 {table_name} 的脚本名称时出错: {str(e)}")
  55. return None
  56. finally:
  57. driver.close()
  58. def load_table_data(table_name, execution_mode):
  59. """执行数据资源表加载脚本的函数"""
  60. script_name = get_resource_script_name_from_neo4j(table_name)
  61. if not script_name:
  62. logger.warning(f"未找到表 {table_name} 的 script_name,跳过")
  63. return False
  64. logger.info(f"从Neo4j获取到表 {table_name} 的脚本名称: {script_name}")
  65. # 检查脚本文件是否存在
  66. exists, script_path = check_script_exists(script_name)
  67. if not exists:
  68. logger.error(f"表 {table_name} 的脚本文件 {script_name} 不存在,跳过处理")
  69. return False
  70. # 执行脚本
  71. logger.info(f"开始执行脚本: {script_path}")
  72. try:
  73. # 动态导入模块
  74. import importlib.util
  75. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  76. module = importlib.util.module_from_spec(spec)
  77. spec.loader.exec_module(module)
  78. # 检查并调用标准入口函数run
  79. if hasattr(module, "run"):
  80. logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
  81. module.run(table_name=table_name, execution_mode=execution_mode)
  82. logger.info(f"脚本 {script_name} 执行成功")
  83. return True
  84. else:
  85. logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
  86. return False
  87. except Exception as e:
  88. logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
  89. import traceback
  90. logger.error(traceback.format_exc())
  91. return False
  92. with DAG(
  93. "dag_data_resource_scheduler",
  94. start_date=datetime(2024, 1, 1),
  95. schedule_interval="@daily",
  96. catchup=False,
  97. ) as dag:
  98. # 获取当前日期信息
  99. today = pendulum.today()
  100. # 原始代码(注释)
  101. # is_monday = today.day_of_week == 0
  102. # is_first_day_of_month = today.day == 1
  103. # is_first_day_of_year = today.month == 1 and today.day == 1
  104. # 测试用:所有条件设为True
  105. is_monday = True
  106. is_first_day_of_month = True
  107. is_first_day_of_year = True
  108. all_resource_tables = []
  109. # 使用循环处理不同频率的表
  110. frequency_configs = [
  111. {"name": "daily", "condition": True},
  112. {"name": "weekly", "condition": is_monday},
  113. {"name": "monthly", "condition": is_first_day_of_month},
  114. {"name": "yearly", "condition": is_first_day_of_year}
  115. ]
  116. # 记录日期信息
  117. logger.info(f"今日日期: {today}, 是否周一: {is_monday}, 是否月初: {is_first_day_of_month}, 是否年初: {is_first_day_of_year}")
  118. logger.info(f"脚本基础路径: {SCRIPTS_BASE_PATH}")
  119. # 收集所有需要处理的资源表
  120. for config in frequency_configs:
  121. frequency = config["name"]
  122. if config["condition"]:
  123. logger.info(f"今天需要处理 {frequency} 频率的资源表")
  124. enabled_tables = get_enabled_tables(frequency)
  125. resource_tables = get_resource_subscribed_tables(enabled_tables)
  126. dependency_tables = get_dependency_resource_tables(enabled_tables)
  127. all_resource_tables.extend(resource_tables)
  128. all_resource_tables.extend(dependency_tables)
  129. logger.info(f"已添加 {frequency} 频率的资源表,共 {len(resource_tables) + len(dependency_tables)} 个")
  130. else:
  131. logger.info(f"今天不需要处理 {frequency} 频率的资源表")
  132. # 去重(按表名)
  133. unique_resources = {}
  134. for item in all_resource_tables:
  135. name = item["table_name"]
  136. if name not in unique_resources:
  137. unique_resources[name] = item
  138. resource_tables = list(unique_resources.values())
  139. logger.info(f"去重后,共需处理 {len(resource_tables)} 个资源表")
  140. # 创建开始任务
  141. start_loading = EmptyOperator(task_id="start_resource_loading")
  142. # 创建结束任务
  143. end_loading = EmptyOperator(task_id="resource_loading_completed")
  144. if resource_tables:
  145. for item in resource_tables:
  146. table_name = item['table_name']
  147. execution_mode = item['execution_mode']
  148. logger.info(f"为资源表 {table_name} 创建加载任务")
  149. task = PythonOperator(
  150. task_id=f"load_{table_name}",
  151. python_callable=load_table_data,
  152. op_kwargs={"table_name": table_name, "execution_mode": execution_mode},
  153. )
  154. # 设置依赖
  155. start_loading >> task >> end_loading
  156. else:
  157. logger.info("没有资源表需要处理,直接连接开始和结束任务")
  158. # 如果没有任务,确保开始和结束任务相连
  159. start_loading >> end_loading