dag_data_resource.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. from airflow import DAG
  2. from airflow.operators.python import PythonOperator
  3. from datetime import datetime
  4. from utils import (
  5. get_enabled_tables,
  6. get_resource_subscribed_tables,
  7. get_dependency_resource_tables,
  8. check_script_exists
  9. )
  10. import pendulum
  11. import logging
  12. import sys
  13. from airflow.operators.empty import EmptyOperator
  14. from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH, RESOURCE_LOADING_PARALLEL_DEGREE
  15. from neo4j import GraphDatabase
  16. # 创建日志记录器
  17. logger = logging.getLogger(__name__)
  18. def get_resource_script_name_from_neo4j(table_name):
  19. from neo4j import GraphDatabase
  20. from config import NEO4J_CONFIG
  21. # 正确处理Neo4j连接参数
  22. uri = NEO4J_CONFIG['uri']
  23. auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  24. driver = GraphDatabase.driver(uri, auth=auth)
  25. query = """
  26. MATCH (dr:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(ds:DataSource)
  27. RETURN rel.script_name AS script_name
  28. """
  29. try:
  30. with driver.session() as session:
  31. result = session.run(query, table_name=table_name)
  32. record = result.single()
  33. if record:
  34. logger.info(f"找到表 {table_name} 的完整记录: {record}")
  35. try:
  36. script_name = record['script_name']
  37. logger.info(f"找到表 {table_name} 的 script_name: {script_name}")
  38. return script_name
  39. except (KeyError, TypeError) as e:
  40. logger.warning(f"记录中不包含script_name字段: {e}")
  41. return None
  42. else:
  43. logger.warning(f"未找到表 {table_name} 的记录")
  44. return None
  45. except Exception as e:
  46. logger.error(f"查询表 {table_name} 的脚本名称时出错: {str(e)}")
  47. return None
  48. finally:
  49. driver.close()
  50. def load_table_data(table_name, execution_mode):
  51. import os
  52. import importlib.util
  53. script_name = get_resource_script_name_from_neo4j(table_name)
  54. if not script_name:
  55. logger.warning(f"未找到表 {table_name} 的 script_name,跳过")
  56. return
  57. logger.info(f"从Neo4j获取到表 {table_name} 的脚本名称: {script_name}")
  58. # 检查脚本文件是否存在
  59. exists, script_path = check_script_exists(script_name)
  60. if not exists:
  61. logger.error(f"表 {table_name} 的脚本文件 {script_name} 不存在,跳过处理")
  62. return False
  63. # 执行脚本
  64. logger.info(f"开始执行脚本: {script_path}")
  65. try:
  66. # 动态导入模块
  67. import importlib.util
  68. import sys
  69. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  70. module = importlib.util.module_from_spec(spec)
  71. spec.loader.exec_module(module)
  72. # 检查并调用标准入口函数run
  73. if hasattr(module, "run"):
  74. logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
  75. module.run(table_name=table_name, execution_mode=execution_mode)
  76. logger.info(f"脚本 {script_name} 执行成功")
  77. return True
  78. else:
  79. logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
  80. return False
  81. except Exception as e:
  82. logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
  83. import traceback
  84. logger.error(traceback.format_exc())
  85. return False
  86. with DAG("dag_data_resource", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
  87. today = pendulum.today()
  88. is_monday = today.day_of_week == 0
  89. is_first_day = today.day == 1
  90. all_resource_tables = []
  91. # Daily
  92. daily_enabled = get_enabled_tables("daily")
  93. all_resource_tables.extend(get_resource_subscribed_tables(daily_enabled))
  94. all_resource_tables.extend(get_dependency_resource_tables(daily_enabled))
  95. # Weekly
  96. if is_monday:
  97. weekly_enabled = get_enabled_tables("weekly")
  98. all_resource_tables.extend(get_resource_subscribed_tables(weekly_enabled))
  99. all_resource_tables.extend(get_dependency_resource_tables(weekly_enabled))
  100. # Monthly
  101. if is_first_day:
  102. monthly_enabled = get_enabled_tables("monthly")
  103. all_resource_tables.extend(get_resource_subscribed_tables(monthly_enabled))
  104. all_resource_tables.extend(get_dependency_resource_tables(monthly_enabled))
  105. # 去重(按表名)
  106. unique_resources = {}
  107. for item in all_resource_tables:
  108. name = item["table_name"]
  109. if name not in unique_resources:
  110. unique_resources[name] = item
  111. resource_tables = list(unique_resources.values())
  112. # 创建开始任务
  113. start_loading = EmptyOperator(task_id="start_resource_loading")
  114. # 创建结束任务
  115. end_loading = EmptyOperator(task_id="finish_resource_loading")
  116. # 按批次分组进行并行处理
  117. batch_size = RESOURCE_LOADING_PARALLEL_DEGREE
  118. batched_tables = [resource_tables[i:i+batch_size] for i in range(0, len(resource_tables), batch_size)]
  119. logger.info(f"将 {len(resource_tables)} 个资源表分为 {len(batched_tables)} 批处理,每批最多 {batch_size} 个表")
  120. for batch_idx, batch in enumerate(batched_tables):
  121. batch_tasks = []
  122. for item in batch:
  123. task = PythonOperator(
  124. task_id=f"load_{item['table_name']}",
  125. python_callable=load_table_data,
  126. op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
  127. )
  128. batch_tasks.append(task)
  129. # 设置起始依赖
  130. start_loading >> task
  131. # 设置结束依赖
  132. task >> end_loading
  133. logger.info(f"批次 {batch_idx+1}: 创建了 {len(batch_tasks)} 个表加载任务")