dag_data_resource.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. from airflow import DAG
  2. from airflow.operators.python import PythonOperator
  3. from datetime import datetime
  4. from utils import (
  5. get_enabled_tables,
  6. get_resource_subscribed_tables,
  7. get_dependency_resource_tables,
  8. check_script_exists
  9. )
  10. import pendulum
  11. import logging
  12. import sys
  13. from airflow.operators.empty import EmptyOperator
  14. from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH
  15. from neo4j import GraphDatabase
  16. # 创建日志记录器
  17. logger = logging.getLogger(__name__)
  18. def get_resource_script_name_from_neo4j(table_name):
  19. from neo4j import GraphDatabase
  20. from config import NEO4J_CONFIG
  21. # 正确处理Neo4j连接参数
  22. uri = NEO4J_CONFIG['uri']
  23. auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  24. driver = GraphDatabase.driver(uri, auth=auth)
  25. query = """
  26. MATCH (dr:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(ds:DataSource)
  27. RETURN rel.script_name AS script_name
  28. """
  29. try:
  30. with driver.session() as session:
  31. result = session.run(query, table_name=table_name)
  32. record = result.single()
  33. if record:
  34. logger.info(f"找到表 {table_name} 的完整记录: {record}")
  35. try:
  36. script_name = record['script_name']
  37. logger.info(f"找到表 {table_name} 的 script_name: {script_name}")
  38. return script_name
  39. except (KeyError, TypeError) as e:
  40. logger.warning(f"记录中不包含script_name字段: {e}")
  41. return None
  42. else:
  43. logger.warning(f"未找到表 {table_name} 的记录")
  44. return None
  45. except Exception as e:
  46. logger.error(f"查询表 {table_name} 的脚本名称时出错: {str(e)}")
  47. return None
  48. finally:
  49. driver.close()
  50. def load_table_data(table_name, execution_mode):
  51. import os
  52. import importlib.util
  53. script_name = get_resource_script_name_from_neo4j(table_name)
  54. if not script_name:
  55. logger.warning(f"未找到表 {table_name} 的 script_name,跳过")
  56. return
  57. logger.info(f"从Neo4j获取到表 {table_name} 的脚本名称: {script_name}")
  58. # 检查脚本文件是否存在
  59. exists, script_path = check_script_exists(script_name)
  60. if not exists:
  61. logger.error(f"表 {table_name} 的脚本文件 {script_name} 不存在,跳过处理")
  62. return False
  63. # 执行脚本
  64. logger.info(f"开始执行脚本: {script_path}")
  65. try:
  66. # 动态导入模块
  67. import importlib.util
  68. import sys
  69. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  70. module = importlib.util.module_from_spec(spec)
  71. spec.loader.exec_module(module)
  72. # 检查并调用标准入口函数run
  73. if hasattr(module, "run"):
  74. logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
  75. module.run(table_name=table_name, execution_mode=execution_mode)
  76. logger.info(f"脚本 {script_name} 执行成功")
  77. return True
  78. else:
  79. logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
  80. return False
  81. except Exception as e:
  82. logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
  83. import traceback
  84. logger.error(traceback.format_exc())
  85. return False
  86. with DAG(
  87. "dag_data_resource",
  88. start_date=datetime(2024, 1, 1),
  89. schedule_interval="@daily",
  90. catchup=False,
  91. # 添加DAG级别的并行度控制,每个DAG运行最多同时执行2个任务
  92. # max_active_tasks=8
  93. # 无论有多少个DAG运行实例(比如昨天、今天的运行),这个DAG定义的所有实例总共最多有5个任务同时运行
  94. # concurrency=5
  95. ) as dag:
  96. today = pendulum.today()
  97. is_monday = today.day_of_week == 0
  98. is_first_day_of_month = today.day == 1
  99. is_first_day_of_year = today.month == 1 and today.day == 1
  100. all_resource_tables = []
  101. # 使用循环处理不同频率的表
  102. frequency_configs = [
  103. {"name": "daily", "condition": True},
  104. {"name": "weekly", "condition": is_monday},
  105. {"name": "monthly", "condition": is_first_day_of_month},
  106. {"name": "yearly", "condition": is_first_day_of_year}
  107. ]
  108. for config in frequency_configs:
  109. frequency = config["name"]
  110. if config["condition"]:
  111. enabled_tables = get_enabled_tables(frequency)
  112. all_resource_tables.extend(get_resource_subscribed_tables(enabled_tables))
  113. all_resource_tables.extend(get_dependency_resource_tables(enabled_tables))
  114. logger.info(f"已添加 {frequency} 频率的资源表")
  115. # 去重(按表名)
  116. unique_resources = {}
  117. for item in all_resource_tables:
  118. name = item["table_name"]
  119. if name not in unique_resources:
  120. unique_resources[name] = item
  121. resource_tables = list(unique_resources.values())
  122. # 创建开始任务
  123. start_loading = EmptyOperator(task_id="start_resource_loading")
  124. # 创建结束任务
  125. end_loading = EmptyOperator(task_id="finish_resource_loading")
  126. for item in resource_tables:
  127. task = PythonOperator(
  128. task_id=f"load_{item['table_name']}",
  129. python_callable=load_table_data,
  130. op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
  131. )
  132. # 设置依赖
  133. start_loading >> task >> end_loading