123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- # common.py
- import psycopg2
- from neo4j import GraphDatabase
- import logging
- import importlib.util
- from pathlib import Path
- import networkx as nx
- import os
- from datetime import datetime, timedelta
- from config import PG_CONFIG, NEO4J_CONFIG, SCRIPTS_BASE_PATH
- # 创建统一的日志记录器
- logger = logging.getLogger("airflow.task")
- def get_pg_conn():
- """获取PostgreSQL连接"""
- return psycopg2.connect(**PG_CONFIG)
- def get_neo4j_driver():
- """获取Neo4j连接驱动"""
- uri = NEO4J_CONFIG['uri']
- auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
- return GraphDatabase.driver(uri, auth=auth)
- def update_task_start_time(exec_date, target_table, script_name, start_time):
- """更新任务开始时间"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- cursor.execute("""
- UPDATE airflow_dag_schedule
- SET exec_start_time = %s
- WHERE exec_date = %s AND target_table = %s AND script_name = %s
- """, (start_time, exec_date, target_table, script_name))
- conn.commit()
- except Exception as e:
- logger.error(f"更新任务开始时间失败: {str(e)}")
- conn.rollback()
- finally:
- cursor.close()
- conn.close()
- def update_task_completion(exec_date, target_table, script_name, success, end_time, duration):
- """更新任务完成信息"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- cursor.execute("""
- UPDATE airflow_dag_schedule
- SET exec_result = %s, exec_end_time = %s, exec_duration = %s
- WHERE exec_date = %s AND target_table = %s AND script_name = %s
- """, (success, end_time, duration, exec_date, target_table, script_name))
- conn.commit()
- except Exception as e:
- logger.error(f"更新任务完成信息失败: {str(e)}")
- conn.rollback()
- finally:
- cursor.close()
- conn.close()
- def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_date, **kwargs):
- """执行脚本并监控执行情况"""
- # 检查script_name是否为空
- if not script_name:
- logger.error(f"表 {target_table} 的script_name为空,无法执行")
- # 记录执行失败
- now = datetime.now()
- update_task_completion(exec_date, target_table, script_name or "", False, now, 0)
- return False
- # 记录执行开始时间
- start_time = datetime.now()
- update_task_start_time(exec_date, target_table, script_name, start_time)
-
- try:
- # 执行实际脚本
- success = execute_script(script_name, target_table, script_exec_mode)
-
- # 记录结束时间和结果
- end_time = datetime.now()
- duration = (end_time - start_time).total_seconds()
- update_task_completion(exec_date, target_table, script_name, success, end_time, duration)
-
- return success
- except Exception as e:
- # 处理异常
- logger.error(f"执行任务出错: {str(e)}")
- end_time = datetime.now()
- duration = (end_time - start_time).total_seconds()
- update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
- raise e
- def execute_script(script_name, table_name, execution_mode):
- """执行脚本并返回结果"""
- if not script_name:
- logger.error("未提供脚本名称,无法执行")
- return False
-
- try:
- # 直接使用配置的部署路径
- script_path = Path(SCRIPTS_BASE_PATH) / script_name
- logger.info(f"使用配置的Airflow部署路径: {script_path}")
-
- # 动态导入模块
- spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
- module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
-
- # 使用标准入口函数run
- if hasattr(module, "run"):
- logger.info(f"执行脚本 {script_name} 的标准入口函数 run()")
- result = module.run(table_name=table_name, execution_mode=execution_mode)
- logger.info(f"脚本 {script_name} 执行结果: {result}")
- return result
- else:
- logger.warning(f"脚本 {script_name} 未定义标准入口函数 run(),无法执行")
- return False
- except Exception as e:
- logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
- import traceback
- logger.error(traceback.format_exc())
- return False
- def generate_optimized_execution_order(table_names, dependency_dict):
- """
- 生成优化的执行顺序,处理循环依赖
-
- 参数:
- table_names: 表名列表
- dependency_dict: 依赖关系字典 {表名: [依赖表1, 依赖表2, ...]}
-
- 返回:
- list: 优化后的执行顺序列表
- """
- # 创建有向图
- G = nx.DiGraph()
-
- # 添加所有节点
- for table_name in table_names:
- G.add_node(table_name)
-
- # 添加依赖边
- for target, sources in dependency_dict.items():
- for source in sources:
- if source in table_names: # 确保只考虑目标表集合中的表
- # 从依赖指向目标,表示依赖需要先执行
- G.add_edge(source, target)
-
- # 检测循环依赖
- cycles = list(nx.simple_cycles(G))
- if cycles:
- logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
- # 打破循环依赖(简单策略:移除每个循环中的一条边)
- for cycle in cycles:
- # 移除循环中的最后一条边
- G.remove_edge(cycle[-1], cycle[0])
- logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
-
- # 生成拓扑排序
- try:
- execution_order = list(nx.topological_sort(G))
- return execution_order
- except Exception as e:
- logger.error(f"生成执行顺序失败: {str(e)}")
- # 返回原始列表作为备选
- return table_names
- def get_datamodel_dependency_from_neo4j(table_names):
- """
- 从Neo4j获取DataModel表间的依赖关系
-
- 参数:
- table_names: 表名列表
-
- 返回:
- dict: 依赖关系字典 {目标表: [依赖表1, 依赖表2, ...]}
- """
- logger.info(f"开始获取 {len(table_names)} 个表的依赖关系")
-
- # 创建Neo4j连接
- driver = get_neo4j_driver()
- dependency_dict = {name: [] for name in table_names}
-
- try:
- with driver.session() as session:
- # 使用一次性查询获取所有表之间的依赖关系
- query = """
- MATCH (source:DataModel)-[:DERIVED_FROM]->(target:DataModel)
- WHERE source.en_name IN $table_names AND target.en_name IN $table_names
- RETURN source.en_name AS source, target.en_name AS target
- """
- result = session.run(query, table_names=table_names)
-
- # 处理结果
- for record in result:
- source = record.get("source")
- target = record.get("target")
-
- if source and target:
- # 目标依赖于源
- if source in dependency_dict:
- dependency_dict[source].append(target)
- logger.debug(f"依赖关系: {source} 依赖于 {target}")
- except Exception as e:
- logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
- finally:
- driver.close()
-
- # 记录依赖关系
- for table, deps in dependency_dict.items():
- if deps:
- logger.info(f"表 {table} 依赖于: {deps}")
- else:
- logger.info(f"表 {table} 没有依赖")
-
- return dependency_dict
- def get_today_date():
- """获取今天的日期,返回YYYY-MM-DD格式字符串"""
- return datetime.now().strftime("%Y-%m-%d")
|