123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- from airflow import DAG
- from airflow.operators.python import PythonOperator
- from datetime import datetime
- from utils import (
- get_enabled_tables,
- get_resource_subscribed_tables,
- get_dependency_resource_tables
- )
- import pendulum
- import logging
- import sys
- # 创建日志记录器
- logger = logging.getLogger(__name__)
- def get_script_name_from_neo4j(table_name):
- from neo4j import GraphDatabase
- from config import NEO4J_CONFIG
-
- # 正确处理Neo4j连接参数
- uri = NEO4J_CONFIG['uri']
- auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
- driver = GraphDatabase.driver(uri, auth=auth)
-
- query = """
- MATCH (dr:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(ds:DataSource)
- RETURN rel.script_name AS script_name
- """
- try:
- with driver.session() as session:
- result = session.run(query, table_name=table_name)
- record = result.single()
- if record:
- logger.info(f"找到表 {table_name} 的完整记录: {record}")
- try:
- script_name = record['script_name']
- logger.info(f"找到表 {table_name} 的 script_name: {script_name}")
- return script_name
- except (KeyError, TypeError) as e:
- logger.warning(f"记录中不包含script_name字段: {e}")
- return None
- else:
- logger.warning(f"未找到表 {table_name} 的记录")
- return None
- except Exception as e:
- logger.error(f"查询表 {table_name} 的脚本名称时出错: {str(e)}")
- return None
- finally:
- driver.close()
- def load_table_data(table_name, execution_mode):
- import os
- import importlib.util
- script_name = get_script_name_from_neo4j(table_name)
- if not script_name:
- logger.warning(f"未找到表 {table_name} 的 script_name,跳过")
- return
- # scripts_base_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "dataops", "scripts")
- # script_path = os.path.join(scripts_base_path, script_name)
- # 使用配置文件中的绝对路径
- from pathlib import Path
- from config import SCRIPTS_BASE_PATH
- script_path = Path(SCRIPTS_BASE_PATH) / script_name
- if not os.path.exists(script_path):
- logger.error(f"脚本文件不存在: {script_path}")
- return
- logger.info(f"执行脚本: {script_path}")
- try:
- spec = importlib.util.spec_from_file_location("dynamic_script", script_path)
- module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
- if hasattr(module, "run"):
- module.run(table_name=table_name, execution_mode=execution_mode)
- else:
- logger.warning(f"脚本 {script_name} 中未定义 run(...) 方法,跳过")
- except Exception as e:
- logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
- with DAG("dag_data_resource", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
- today = pendulum.today()
- is_monday = today.day_of_week == 0
- is_first_day = today.day == 1
- all_resource_tables = []
- # Daily
- daily_enabled = get_enabled_tables("daily")
- all_resource_tables.extend(get_resource_subscribed_tables(daily_enabled))
- all_resource_tables.extend(get_dependency_resource_tables(daily_enabled))
- # Weekly
- if is_monday:
- weekly_enabled = get_enabled_tables("weekly")
- all_resource_tables.extend(get_resource_subscribed_tables(weekly_enabled))
- all_resource_tables.extend(get_dependency_resource_tables(weekly_enabled))
- # Monthly
- if is_first_day:
- monthly_enabled = get_enabled_tables("monthly")
- all_resource_tables.extend(get_resource_subscribed_tables(monthly_enabled))
- all_resource_tables.extend(get_dependency_resource_tables(monthly_enabled))
- # 去重(按表名)
- unique_resources = {}
- for item in all_resource_tables:
- name = item["table_name"]
- if name not in unique_resources:
- unique_resources[name] = item
- resource_tables = list(unique_resources.values())
- for item in resource_tables:
- PythonOperator(
- task_id=f"load_{item['table_name']}",
- python_callable=load_table_data,
- op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
- )
|