123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445 |
- # common.py
- import psycopg2
- from neo4j import GraphDatabase
- import logging
- import importlib.util
- from pathlib import Path
- import networkx as nx
- import os
- from datetime import datetime, timedelta
- from config import PG_CONFIG, NEO4J_CONFIG, SCRIPTS_BASE_PATH
- import functools
- import time
- # 创建统一的日志记录器
- logger = logging.getLogger("airflow.task")
- def get_pg_conn():
- """获取PostgreSQL连接"""
- return psycopg2.connect(**PG_CONFIG)
- def get_neo4j_driver():
- """获取Neo4j连接驱动"""
- uri = NEO4J_CONFIG['uri']
- auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
- return GraphDatabase.driver(uri, auth=auth)
- def update_task_start_time(exec_date, target_table, script_name, start_time):
- """更新任务开始时间"""
- logger.info(f"===== 更新任务开始时间 =====")
- logger.info(f"参数: exec_date={exec_date} ({type(exec_date).__name__}), target_table={target_table}, script_name={script_name}")
-
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- # 首先检查记录是否存在
- cursor.execute("""
- SELECT COUNT(*)
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND target_table = %s AND script_name = %s
- """, (exec_date, target_table, script_name))
- count = cursor.fetchone()[0]
- logger.info(f"查询到符合条件的记录数: {count}")
-
- if count == 0:
- logger.warning(f"未找到匹配的记录: exec_date={exec_date}, target_table={target_table}, script_name={script_name}")
- logger.info("尝试记录在airflow_dag_schedule表中找到的记录:")
- cursor.execute("""
- SELECT exec_date, target_table, script_name
- FROM airflow_dag_schedule
- LIMIT 5
- """)
- sample_records = cursor.fetchall()
- for record in sample_records:
- logger.info(f"样本记录: exec_date={record[0]} ({type(record[0]).__name__}), target_table={record[1]}, script_name={record[2]}")
-
- # 执行更新
- sql = """
- UPDATE airflow_dag_schedule
- SET exec_start_time = %s
- WHERE exec_date = %s AND target_table = %s AND script_name = %s
- """
- logger.info(f"执行SQL: {sql}")
- logger.info(f"参数: start_time={start_time}, exec_date={exec_date}, target_table={target_table}, script_name={script_name}")
-
- cursor.execute(sql, (start_time, exec_date, target_table, script_name))
- affected_rows = cursor.rowcount
- logger.info(f"更新影响的行数: {affected_rows}")
-
- conn.commit()
- logger.info("事务已提交")
- except Exception as e:
- logger.error(f"更新任务开始时间失败: {str(e)}")
- import traceback
- logger.error(f"错误堆栈: {traceback.format_exc()}")
- conn.rollback()
- logger.info("事务已回滚")
- raise
- finally:
- cursor.close()
- conn.close()
- logger.info("数据库连接已关闭")
- logger.info("===== 更新任务开始时间完成 =====")
- def update_task_completion(exec_date, target_table, script_name, success, end_time, duration):
- """更新任务完成信息"""
- logger.info(f"===== 更新任务完成信息 =====")
- logger.info(f"参数: exec_date={exec_date} ({type(exec_date).__name__}), target_table={target_table}, script_name={script_name}")
- logger.info(f"参数: success={success} ({type(success).__name__}), end_time={end_time}, duration={duration}")
-
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- # 首先检查记录是否存在
- cursor.execute("""
- SELECT COUNT(*)
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND target_table = %s AND script_name = %s
- """, (exec_date, target_table, script_name))
- count = cursor.fetchone()[0]
- logger.info(f"查询到符合条件的记录数: {count}")
-
- if count == 0:
- logger.warning(f"未找到匹配的记录: exec_date={exec_date}, target_table={target_table}, script_name={script_name}")
- # 查询表中前几条记录作为参考
- cursor.execute("""
- SELECT exec_date, target_table, script_name
- FROM airflow_dag_schedule
- LIMIT 5
- """)
- sample_records = cursor.fetchall()
- logger.info("airflow_dag_schedule表中的样本记录:")
- for record in sample_records:
- logger.info(f"样本记录: exec_date={record[0]} ({type(record[0]).__name__}), target_table={record[1]}, script_name={record[2]}")
-
- # 确保success是布尔类型
- if not isinstance(success, bool):
- original_success = success
- success = bool(success)
- logger.warning(f"success参数不是布尔类型,原始值: {original_success},转换为: {success}")
-
- # 执行更新
- sql = """
- UPDATE airflow_dag_schedule
- SET exec_result = %s, exec_end_time = %s, exec_duration = %s
- WHERE exec_date = %s AND target_table = %s AND script_name = %s
- """
- logger.info(f"执行SQL: {sql}")
- logger.info(f"参数: success={success}, end_time={end_time}, duration={duration}, exec_date={exec_date}, target_table={target_table}, script_name={script_name}")
-
- cursor.execute(sql, (success, end_time, duration, exec_date, target_table, script_name))
- affected_rows = cursor.rowcount
- logger.info(f"更新影响的行数: {affected_rows}")
-
- if affected_rows == 0:
- logger.warning("更新操作没有影响任何行,可能是因为条件不匹配")
- # 尝试用不同格式的exec_date查询
- if isinstance(exec_date, str):
- try:
- # 尝试解析日期字符串
- from datetime import datetime
- parsed_date = datetime.strptime(exec_date, "%Y-%m-%d").date()
- logger.info(f"尝试使用解析后的日期格式: {parsed_date}")
-
- cursor.execute("""
- SELECT COUNT(*)
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND target_table = %s AND script_name = %s
- """, (parsed_date, target_table, script_name))
- parsed_count = cursor.fetchone()[0]
- logger.info(f"使用解析日期后查询到的记录数: {parsed_count}")
-
- if parsed_count > 0:
- # 尝试用解析的日期更新
- cursor.execute("""
- UPDATE airflow_dag_schedule
- SET exec_result = %s, exec_end_time = %s, exec_duration = %s
- WHERE exec_date = %s AND target_table = %s AND script_name = %s
- """, (success, end_time, duration, parsed_date, target_table, script_name))
- new_affected_rows = cursor.rowcount
- logger.info(f"使用解析日期后更新影响的行数: {new_affected_rows}")
- except Exception as parse_e:
- logger.error(f"尝试解析日期格式时出错: {str(parse_e)}")
-
- conn.commit()
- logger.info("事务已提交")
- except Exception as e:
- logger.error(f"更新任务完成信息失败: {str(e)}")
- import traceback
- logger.error(f"错误堆栈: {traceback.format_exc()}")
- conn.rollback()
- logger.info("事务已回滚")
- raise
- finally:
- cursor.close()
- conn.close()
- logger.info("数据库连接已关闭")
- logger.info("===== 更新任务完成信息完成 =====")
- def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_date, **kwargs):
- """执行脚本并监控执行情况"""
- # 添加详细日志
- logger.info(f"===== 开始监控执行 =====")
- logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
- logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
- logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
- logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
- # 检查script_name是否为空
- if not script_name:
- logger.error(f"表 {target_table} 的script_name为空,无法执行")
- # 记录执行失败
- now = datetime.now()
- update_task_completion(exec_date, target_table, script_name or "", False, now, 0)
- return False
- # 记录执行开始时间
- start_time = datetime.now()
-
- # 尝试更新开始时间并记录结果
- try:
- update_task_start_time(exec_date, target_table, script_name, start_time)
- logger.info(f"成功更新任务开始时间: {start_time}")
- except Exception as e:
- logger.error(f"更新任务开始时间失败: {str(e)}")
-
- try:
- # 执行实际脚本
- logger.info(f"开始执行脚本: {script_name}")
- result = execute_script(script_name, target_table, script_exec_mode)
- logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
-
- # 确保result是布尔值
- if result is None:
- logger.warning(f"脚本返回值为None,转换为False")
- result = False
- elif not isinstance(result, bool):
- original_result = result
- result = bool(result)
- logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
-
- # 记录结束时间和结果
- end_time = datetime.now()
- duration = (end_time - start_time).total_seconds()
-
- # 尝试更新完成状态并记录结果
- try:
- logger.info(f"尝试更新完成状态: result={result}, end_time={end_time}, duration={duration}")
- update_task_completion(exec_date, target_table, script_name, result, end_time, duration)
- logger.info(f"成功更新任务完成状态,结果: {result}")
- except Exception as e:
- logger.error(f"更新任务完成状态失败: {str(e)}")
-
- logger.info(f"===== 监控执行完成 =====")
- return result
- except Exception as e:
- # 处理异常
- logger.error(f"执行任务出错: {str(e)}")
- end_time = datetime.now()
- duration = (end_time - start_time).total_seconds()
-
- # 尝试更新失败状态并记录结果
- try:
- logger.info(f"尝试更新失败状态: end_time={end_time}, duration={duration}")
- update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
- logger.info(f"成功更新任务失败状态")
- except Exception as update_e:
- logger.error(f"更新任务失败状态失败: {str(update_e)}")
-
- logger.info(f"===== 监控执行异常结束 =====")
- raise e
- def ensure_boolean_result(func):
- """装饰器:确保函数返回布尔值"""
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- try:
- result = func(*args, **kwargs)
- logger.debug(f"脚本原始返回值: {result} (类型: {type(result).__name__})")
-
- # 处理None值
- if result is None:
- logger.warning(f"脚本函数 {func.__name__} 返回了None,默认设置为False")
- return False
-
- # 处理非布尔值
- if not isinstance(result, bool):
- try:
- # 尝试转换为布尔值
- bool_result = bool(result)
- logger.warning(f"脚本函数 {func.__name__} 返回非布尔值 {result},已转换为布尔值 {bool_result}")
- return bool_result
- except Exception as e:
- logger.error(f"无法将脚本返回值 {result} 转换为布尔值: {str(e)}")
- return False
-
- return result
- except Exception as e:
- logger.error(f"脚本函数 {func.__name__} 执行出错: {str(e)}")
- return False
- return wrapper
- def execute_script(script_path=None, script_name=None, script_exec_mode=None, table_name=None, execution_mode=None, args=None):
- """
- 执行指定的脚本,并返回执行结果
- 支持两种调用方式:
- 1. execute_script(script_path, script_name, script_exec_mode, args={})
- 2. execute_script(script_name, table_name, execution_mode)
- """
- # 确定调用方式并统一参数
- if script_path and script_name and script_exec_mode is not None:
- # 第一种调用方式
- if args is None:
- args = {}
- elif script_name and table_name and execution_mode is not None:
- # 第二种调用方式
- script_path = os.path.join(SCRIPTS_BASE_PATH, f"{script_name}.py")
- script_exec_mode = execution_mode
- args = {"table_name": table_name}
- else:
- logger.error("参数不正确,无法执行脚本")
- return False
- try:
- # 确保脚本路径存在
- if not os.path.exists(script_path):
- logger.error(f"脚本路径 {script_path} 不存在")
- return False
- # 加载脚本模块
- spec = importlib.util.spec_from_file_location("script_module", script_path)
- module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
-
- # 检查并记录所有可用的函数
- module_functions = [f for f in dir(module) if callable(getattr(module, f)) and not f.startswith('_')]
- logger.debug(f"模块 {script_name} 中的可用函数: {module_functions}")
- # 获取脚本的运行函数
- if not hasattr(module, "run"):
- logger.error(f"脚本 {script_name} 没有run函数")
- return False
- # 装饰run函数,确保返回布尔值
- original_run = module.run
- module.run = ensure_boolean_result(original_run)
-
- logger.info(f"开始执行脚本 {script_name},执行模式: {script_exec_mode}, 参数: {args}")
- start_time = time.time()
-
- # 执行脚本
- if table_name is not None:
- # 第二种调用方式的参数格式
- exec_result = module.run(table_name=table_name, execution_mode=script_exec_mode)
- else:
- # 第一种调用方式的参数格式
- exec_result = module.run(script_exec_mode, args)
-
- end_time = time.time()
- duration = end_time - start_time
-
- logger.info(f"脚本 {script_name} 执行完成,结果: {exec_result}, 耗时: {duration:.2f}秒")
- return exec_result
- except Exception as e:
- logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
- import traceback
- logger.error(traceback.format_exc())
- return False
- def generate_optimized_execution_order(table_names, dependency_dict):
- """
- 生成优化的执行顺序,处理循环依赖
-
- 参数:
- table_names: 表名列表
- dependency_dict: 依赖关系字典 {表名: [依赖表1, 依赖表2, ...]}
-
- 返回:
- list: 优化后的执行顺序列表
- """
- # 创建有向图
- G = nx.DiGraph()
-
- # 添加所有节点
- for table_name in table_names:
- G.add_node(table_name)
-
- # 添加依赖边
- for target, sources in dependency_dict.items():
- for source in sources:
- if source in table_names: # 确保只考虑目标表集合中的表
- # 从依赖指向目标,表示依赖需要先执行
- G.add_edge(source, target)
-
- # 检测循环依赖
- cycles = list(nx.simple_cycles(G))
- if cycles:
- logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
- # 打破循环依赖(简单策略:移除每个循环中的一条边)
- for cycle in cycles:
- # 移除循环中的最后一条边
- G.remove_edge(cycle[-1], cycle[0])
- logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
-
- # 生成拓扑排序
- try:
- execution_order = list(nx.topological_sort(G))
- return execution_order
- except Exception as e:
- logger.error(f"生成执行顺序失败: {str(e)}")
- # 返回原始列表作为备选
- return table_names
- def get_datamodel_dependency_from_neo4j(table_names):
- """
- 从Neo4j获取DataModel表间的依赖关系
-
- 参数:
- table_names: 表名列表
-
- 返回:
- dict: 依赖关系字典 {目标表: [依赖表1, 依赖表2, ...]}
- """
- logger.info(f"开始获取 {len(table_names)} 个表的依赖关系")
-
- # 创建Neo4j连接
- driver = get_neo4j_driver()
- dependency_dict = {name: [] for name in table_names}
-
- try:
- with driver.session() as session:
- # 使用一次性查询获取所有表之间的依赖关系
- query = """
- MATCH (source:DataModel)-[:DERIVED_FROM]->(target:DataModel)
- WHERE source.en_name IN $table_names AND target.en_name IN $table_names
- RETURN source.en_name AS source, target.en_name AS target
- """
- result = session.run(query, table_names=table_names)
-
- # 处理结果
- for record in result:
- source = record.get("source")
- target = record.get("target")
-
- if source and target:
- # 目标依赖于源
- if source in dependency_dict:
- dependency_dict[source].append(target)
- logger.debug(f"依赖关系: {source} 依赖于 {target}")
- except Exception as e:
- logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
- finally:
- driver.close()
-
- # 记录依赖关系
- for table, deps in dependency_dict.items():
- if deps:
- logger.info(f"表 {table} 依赖于: {deps}")
- else:
- logger.info(f"表 {table} 没有依赖")
-
- return dependency_dict
- def get_today_date():
- """获取今天的日期,返回YYYY-MM-DD格式字符串"""
- return datetime.now().strftime("%Y-%m-%d")
|