dag_data_resource.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. from airflow import DAG
  2. from airflow.operators.python import PythonOperator
  3. from datetime import datetime
  4. from utils import (
  5. get_enabled_tables,
  6. get_resource_subscribed_tables,
  7. get_dependency_resource_tables
  8. )
  9. import pendulum
  10. import logging
  11. import sys
  12. # 创建日志记录器
  13. logger = logging.getLogger(__name__)
  14. def get_script_name_from_neo4j(table_name):
  15. from neo4j import GraphDatabase
  16. from config import NEO4J_CONFIG
  17. # 正确处理Neo4j连接参数
  18. uri = NEO4J_CONFIG['uri']
  19. auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  20. driver = GraphDatabase.driver(uri, auth=auth)
  21. query = """
  22. MATCH (dr:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(ds:DataSource)
  23. RETURN rel.script_name AS script_name
  24. """
  25. try:
  26. with driver.session() as session:
  27. result = session.run(query, table_name=table_name)
  28. record = result.single()
  29. if record:
  30. logger.info(f"找到表 {table_name} 的完整记录: {record}")
  31. try:
  32. script_name = record['script_name']
  33. logger.info(f"找到表 {table_name} 的 script_name: {script_name}")
  34. return script_name
  35. except (KeyError, TypeError) as e:
  36. logger.warning(f"记录中不包含script_name字段: {e}")
  37. return None
  38. else:
  39. logger.warning(f"未找到表 {table_name} 的记录")
  40. return None
  41. except Exception as e:
  42. logger.error(f"查询表 {table_name} 的脚本名称时出错: {str(e)}")
  43. return None
  44. finally:
  45. driver.close()
  46. def load_table_data(table_name, execution_mode):
  47. import os
  48. import importlib.util
  49. script_name = get_script_name_from_neo4j(table_name)
  50. if not script_name:
  51. logger.warning(f"未找到表 {table_name} 的 script_name,跳过")
  52. return
  53. # scripts_base_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "dataops", "scripts")
  54. # script_path = os.path.join(scripts_base_path, script_name)
  55. # 使用配置文件中的绝对路径
  56. from pathlib import Path
  57. from config import SCRIPTS_BASE_PATH
  58. script_path = Path(SCRIPTS_BASE_PATH) / script_name
  59. if not os.path.exists(script_path):
  60. logger.error(f"脚本文件不存在: {script_path}")
  61. return
  62. logger.info(f"执行脚本: {script_path}")
  63. try:
  64. spec = importlib.util.spec_from_file_location("dynamic_script", script_path)
  65. module = importlib.util.module_from_spec(spec)
  66. spec.loader.exec_module(module)
  67. if hasattr(module, "run"):
  68. module.run(table_name=table_name, execution_mode=execution_mode)
  69. else:
  70. logger.warning(f"脚本 {script_name} 中未定义 run(...) 方法,跳过")
  71. except Exception as e:
  72. logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
  73. with DAG("dag_data_resource", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
  74. today = pendulum.today()
  75. is_monday = today.day_of_week == 0
  76. is_first_day = today.day == 1
  77. all_resource_tables = []
  78. # Daily
  79. daily_enabled = get_enabled_tables("daily")
  80. all_resource_tables.extend(get_resource_subscribed_tables(daily_enabled))
  81. all_resource_tables.extend(get_dependency_resource_tables(daily_enabled))
  82. # Weekly
  83. if is_monday:
  84. weekly_enabled = get_enabled_tables("weekly")
  85. all_resource_tables.extend(get_resource_subscribed_tables(weekly_enabled))
  86. all_resource_tables.extend(get_dependency_resource_tables(weekly_enabled))
  87. # Monthly
  88. if is_first_day:
  89. monthly_enabled = get_enabled_tables("monthly")
  90. all_resource_tables.extend(get_resource_subscribed_tables(monthly_enabled))
  91. all_resource_tables.extend(get_dependency_resource_tables(monthly_enabled))
  92. # 去重(按表名)
  93. unique_resources = {}
  94. for item in all_resource_tables:
  95. name = item["table_name"]
  96. if name not in unique_resources:
  97. unique_resources[name] = item
  98. resource_tables = list(unique_resources.values())
  99. for item in resource_tables:
  100. PythonOperator(
  101. task_id=f"load_{item['table_name']}",
  102. python_callable=load_table_data,
  103. op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
  104. )