1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654 |
- # dag_dataops_unified_scheduler.py
- # 合并了prepare, data和summary三个DAG的功能
- from airflow import DAG
- from airflow.operators.python import PythonOperator
- from airflow.operators.empty import EmptyOperator
- from airflow.utils.task_group import TaskGroup
- from datetime import datetime, timedelta, date
- import logging
- import networkx as nx
- import json
- import os
- from decimal import Decimal
- from common import (
- get_pg_conn,
- get_neo4j_driver,
- execute_with_monitoring,
- get_today_date
- )
- from config import TASK_RETRY_CONFIG, PG_CONFIG, NEO4J_CONFIG, AIRFLOW_BASE_PATH, SCRIPTS_BASE_PATH
- # 创建日志记录器
- logger = logging.getLogger(__name__)
- # 添加日期序列化器
- def json_serial(obj):
- """将日期对象序列化为ISO格式字符串的JSON序列化器"""
- if isinstance(obj, (datetime, date)):
- return obj.isoformat()
- raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
- # 添加自定义JSON编码器解决Decimal序列化问题
- class DecimalEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, Decimal):
- return float(obj)
- # 处理日期类型
- elif isinstance(obj, (datetime, date)):
- return obj.isoformat()
- # 让父类处理其他类型
- return super(DecimalEncoder, self).default(obj)
- #############################################
- # 第一阶段: 准备阶段(Prepare Phase)的函数
- #############################################
- def get_enabled_tables():
- """获取所有启用的表"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- cursor.execute("""
- SELECT owner_id, table_name
- FROM schedule_status
- WHERE schedule_is_enabled = TRUE
- """)
- result = cursor.fetchall()
- return [row[1] for row in result] # 只返回表名
- except Exception as e:
- logger.error(f"获取启用表失败: {str(e)}")
- return []
- finally:
- cursor.close()
- conn.close()
- def check_table_directly_subscribed(table_name):
- """检查表是否在schedule_status表中直接订阅"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- cursor.execute("""
- SELECT schedule_is_enabled
- FROM schedule_status
- WHERE table_name = %s
- """, (table_name,))
- result = cursor.fetchone()
- return result and result[0] is True
- except Exception as e:
- logger.error(f"检查表订阅状态失败: {str(e)}")
- return False
- finally:
- cursor.close()
- conn.close()
- def get_table_info_from_neo4j(table_name):
- """从Neo4j获取表的详细信息"""
- driver = get_neo4j_driver()
- # 检查表是否直接订阅
- is_directly_schedule = check_table_directly_subscribed(table_name)
- table_info = {
- 'target_table': table_name,
- 'is_directly_schedule': is_directly_schedule, # 初始值设为True,从schedule_status表获取
- }
-
- try:
- with driver.session() as session:
- # 查询表标签和状态
- query_table = """
- MATCH (t {en_name: $table_name})
- RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency
- """
- result = session.run(query_table, table_name=table_name)
- record = result.single()
-
- if record:
- labels = record.get("labels", [])
- table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
- table_info['target_table_status'] = record.get("status", True) # 默认为True
- table_info['default_update_frequency'] = record.get("frequency")
-
- # 根据标签类型查询关系和脚本信息
- if "DataResource" in labels:
- query_rel = """
- MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
- RETURN source.en_name AS source_table, rel.script_name AS script_name,
- rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
- """
- elif "DataModel" in labels:
- query_rel = """
- MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
- RETURN source.en_name AS source_table, rel.script_name AS script_name,
- rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
- """
- else:
- logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
- return table_info
-
- result = session.run(query_rel, table_name=table_name)
- record = result.single()
-
- if record:
- table_info['source_table'] = record.get("source_table")
- # 检查script_name是否为空
- script_name = record.get("script_name")
- if not script_name:
- logger.warning(f"表 {table_name} 的关系中没有script_name属性,可能导致后续处理出错")
- table_info['script_name'] = script_name
-
- # 设置默认值,确保即使属性为空也有默认值
- table_info['script_type'] = record.get("script_type", "python") # 默认为python
- table_info['script_exec_mode'] = record.get("script_exec_mode", "append") # 默认为append
- else:
- logger.warning(f"未找到表 {table_name} 的关系信息")
- else:
- logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
- except Exception as e:
- logger.error(f"获取表 {table_name} 的信息时出错: {str(e)}")
- finally:
- driver.close()
-
- return table_info
- def process_dependencies(tables_info):
- """处理表间依赖关系,添加被动调度的表"""
- # 存储所有表信息的字典
- all_tables = {t['target_table']: t for t in tables_info}
- driver = get_neo4j_driver()
-
- try:
- with driver.session() as session:
- for table_name, table_info in list(all_tables.items()):
- if table_info.get('target_table_label') == 'DataModel':
- # 查询其依赖表
- query = """
- MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
- RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels,
- dep.status AS dep_status, dep.frequency AS dep_frequency
- """
- result = session.run(query, table_name=table_name)
-
- for record in result:
- dep_name = record.get("dep_name")
- dep_labels = record.get("dep_labels", [])
- dep_status = record.get("dep_status", True)
- dep_frequency = record.get("dep_frequency")
-
- # 处理未被直接调度的依赖表
- if dep_name and dep_name not in all_tables:
- logger.info(f"发现被动依赖表: {dep_name}, 标签: {dep_labels}")
-
- # 获取依赖表详细信息
- dep_info = get_table_info_from_neo4j(dep_name)
- dep_info['is_directly_schedule'] = False
-
- # 处理调度频率继承
- if not dep_info.get('default_update_frequency'):
- dep_info['default_update_frequency'] = table_info.get('default_update_frequency')
-
- all_tables[dep_name] = dep_info
- except Exception as e:
- logger.error(f"处理依赖关系时出错: {str(e)}")
- finally:
- driver.close()
-
- return list(all_tables.values())
- def filter_invalid_tables(tables_info):
- """过滤无效表及其依赖,使用NetworkX构建依赖图"""
- # 构建表名到索引的映射
- table_dict = {t['target_table']: i for i, t in enumerate(tables_info)}
-
- # 找出无效表
- invalid_tables = set()
- for table in tables_info:
- if table.get('target_table_status') is False:
- invalid_tables.add(table['target_table'])
- logger.info(f"表 {table['target_table']} 的状态为无效")
-
- # 构建依赖图
- G = nx.DiGraph()
-
- # 添加所有节点
- for table in tables_info:
- G.add_node(table['target_table'])
-
- # 查询并添加依赖边
- driver = get_neo4j_driver()
- try:
- with driver.session() as session:
- for table in tables_info:
- if table.get('target_table_label') == 'DataModel':
- query = """
- MATCH (source {en_name: $table_name})-[:DERIVED_FROM]->(target)
- RETURN target.en_name AS target_name
- """
- result = session.run(query, table_name=table['target_table'])
-
- for record in result:
- target_name = record.get("target_name")
- if target_name and target_name in table_dict:
- # 添加从目标到源的边,表示目标依赖于源
- G.add_edge(table['target_table'], target_name)
- logger.debug(f"添加依赖边: {table['target_table']} -> {target_name}")
- except Exception as e:
- logger.error(f"构建依赖图时出错: {str(e)}")
- finally:
- driver.close()
-
- # 找出依赖于无效表的所有表
- downstream_invalid = set()
- for invalid_table in invalid_tables:
- # 获取可从无效表到达的所有节点
- try:
- descendants = nx.descendants(G, invalid_table)
- downstream_invalid.update(descendants)
- logger.info(f"表 {invalid_table} 的下游无效表: {descendants}")
- except Exception as e:
- logger.error(f"处理表 {invalid_table} 的下游依赖时出错: {str(e)}")
-
- # 合并所有无效表
- all_invalid = invalid_tables.union(downstream_invalid)
- logger.info(f"总共 {len(all_invalid)} 个表被标记为无效: {all_invalid}")
-
- # 过滤出有效表
- valid_tables = [t for t in tables_info if t['target_table'] not in all_invalid]
- logger.info(f"过滤后保留 {len(valid_tables)} 个有效表")
-
- return valid_tables
- def write_to_airflow_dag_schedule(exec_date, tables_info):
- """将表信息写入airflow_dag_schedule表"""
- conn = get_pg_conn()
- cursor = conn.cursor()
-
- try:
- # 清理当日数据,避免重复
- cursor.execute("""
- DELETE FROM airflow_dag_schedule WHERE exec_date = %s
- """, (exec_date,))
- logger.info(f"已清理执行日期 {exec_date} 的现有数据")
-
- # 批量插入新数据
- inserted_count = 0
- for table in tables_info:
- cursor.execute("""
- INSERT INTO airflow_dag_schedule (
- exec_date, source_table, target_table, target_table_label,
- target_table_status, is_directly_schedule, default_update_frequency,
- script_name, script_type, script_exec_mode
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """, (
- exec_date,
- table.get('source_table'),
- table['target_table'],
- table.get('target_table_label'),
- table.get('target_table_status', True),
- table.get('is_directly_schedule', False),
- table.get('default_update_frequency'),
- table.get('script_name'),
- table.get('script_type', 'python'),
- table.get('script_exec_mode', 'append')
- ))
- inserted_count += 1
-
- conn.commit()
- logger.info(f"成功插入 {inserted_count} 条记录到 airflow_dag_schedule 表")
- return inserted_count
- except Exception as e:
- logger.error(f"写入 airflow_dag_schedule 表时出错: {str(e)}")
- conn.rollback()
- # 重新抛出异常,确保错误被正确传播
- raise
- finally:
- cursor.close()
- conn.close()
- def prepare_dag_schedule(**kwargs):
- """准备调度任务的主函数"""
- exec_date = kwargs.get('ds') or get_today_date()
- logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
-
- # 1. 获取启用的表
- enabled_tables = get_enabled_tables()
- logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
-
- if not enabled_tables:
- logger.warning("没有找到启用的表,准备工作结束")
- return 0
-
- # 2. 获取表的详细信息
- tables_info = []
- for table_name in enabled_tables:
- table_info = get_table_info_from_neo4j(table_name)
- if table_info:
- tables_info.append(table_info)
-
- logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
-
- # 3. 处理依赖关系,添加被动调度的表
- enriched_tables = process_dependencies(tables_info)
- logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
-
- # 4. 过滤无效表及其依赖
- valid_tables = filter_invalid_tables(enriched_tables)
- logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
-
- # 5. 写入airflow_dag_schedule表
- inserted_count = write_to_airflow_dag_schedule(exec_date, valid_tables)
-
- # 6. 检查插入操作是否成功,如果失败则抛出异常
- if inserted_count == 0 and valid_tables:
- error_msg = f"插入操作失败,无记录被插入到airflow_dag_schedule表,但有{len(valid_tables)}个有效表需要处理"
- logger.error(error_msg)
- raise Exception(error_msg)
-
- # 7. 生成执行计划数据
- resource_tasks = []
- model_tasks = []
-
- for table in valid_tables:
- if table.get('target_table_label') == 'DataResource':
- resource_tasks.append({
- "source_table": table.get('source_table'),
- "target_table": table['target_table'],
- "target_table_label": "DataResource",
- "script_name": table.get('script_name'),
- "script_exec_mode": table.get('script_exec_mode', 'append')
- })
- elif table.get('target_table_label') == 'DataModel':
- model_tasks.append({
- "source_table": table.get('source_table'),
- "target_table": table['target_table'],
- "target_table_label": "DataModel",
- "script_name": table.get('script_name'),
- "script_exec_mode": table.get('script_exec_mode', 'append')
- })
-
- # 获取依赖关系
- model_table_names = [t['target_table'] for t in model_tasks]
- dependencies = {}
-
- driver = get_neo4j_driver()
- try:
- with driver.session() as session:
- for table_name in model_table_names:
- query = """
- MATCH (source:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(target)
- RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
- """
- result = session.run(query, table_name=table_name)
-
- deps = []
- for record in result:
- target = record.get("target")
- target_labels = record.get("target_labels", [])
-
- if target:
- table_type = next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
- deps.append({
- "table_name": target,
- "table_type": table_type
- })
-
- dependencies[table_name] = deps
- finally:
- driver.close()
-
- # 创建执行计划
- execution_plan = {
- "exec_date": exec_date,
- "resource_tasks": resource_tasks,
- "model_tasks": model_tasks,
- "dependencies": dependencies
- }
-
- # 将执行计划保存到XCom
- kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
- logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
-
- return inserted_count
- def create_execution_plan(**kwargs):
- """准备执行计划的函数,使用从prepare_phase传递的数据,并生成JSON文件"""
- try:
- # 从prepare_dag_schedule获取执行计划
- execution_plan_json = kwargs['ti'].xcom_pull(task_ids='prepare_dag_schedule', key='execution_plan')
-
- if not execution_plan_json:
- # 如果没有获取到,可能是因为推送到XCom失败,尝试从数据库获取
- exec_date = kwargs.get('ds') or get_today_date()
- logger.info(f"未从XCom获取到执行计划,尝试从数据库构建。使用执行日期: {exec_date}")
-
- # 获取所有任务
- resource_tasks, model_tasks = get_all_tasks(exec_date)
-
- if not resource_tasks and not model_tasks:
- logger.warning(f"执行日期 {exec_date} 没有找到任务")
- # 创建空执行计划
- execution_plan = {
- "exec_date": exec_date,
- "resource_tasks": [],
- "model_tasks": [],
- "dependencies": {}
- }
- else:
- # 为所有模型表获取依赖关系
- model_table_names = [task["target_table"] for task in model_tasks]
- dependencies = get_table_dependencies_for_data_phase(model_table_names)
-
- # 创建执行计划
- execution_plan = {
- "exec_date": exec_date,
- "resource_tasks": resource_tasks,
- "model_tasks": model_tasks,
- "dependencies": dependencies
- }
-
- # 转换为JSON
- execution_plan_json = json.dumps(execution_plan, default=json_serial)
- else:
- # 如果是字符串,解析一下确保格式正确
- if isinstance(execution_plan_json, str):
- execution_plan = json.loads(execution_plan_json)
- else:
- execution_plan = execution_plan_json
- execution_plan_json = json.dumps(execution_plan, default=json_serial)
-
- # 将执行计划保存为JSON文件,使用临时文件确保写入完整
- try:
- import os
- import time
- import tempfile
- from datetime import datetime
-
- # 设置文件路径
- plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
- plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
- temp_plan_path = os.path.join(plan_dir, f'temp_last_execution_plan_{int(time.time())}.json')
- ready_flag_path = os.path.join(plan_dir, 'last_execution_plan.json.ready')
-
- logger.info(f"=== 开始创建执行计划文件 - 时间戳: {datetime.now().isoformat()} ===")
- logger.info(f"计划目录: {plan_dir}")
- logger.info(f"最终文件路径: {plan_path}")
- logger.info(f"临时文件路径: {temp_plan_path}")
- logger.info(f"就绪标志文件路径: {ready_flag_path}")
-
- # 获取目录中的现有文件
- existing_files = os.listdir(plan_dir)
- plan_related_files = [f for f in existing_files if 'execution_plan' in f or f.endswith('.ready')]
- logger.info(f"创建前目录中相关文件数: {len(plan_related_files)}")
- for f in plan_related_files:
- file_path = os.path.join(plan_dir, f)
- file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
- file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
- logger.info(f"已存在文件: {f} (大小: {file_size}字节, 修改时间: {file_time})")
-
- # 首先写入临时文件
- with open(temp_plan_path, 'w') as f:
- if isinstance(execution_plan_json, str):
- f.write(execution_plan_json)
- else:
- json.dump(execution_plan_json, f, indent=2, default=json_serial)
- f.flush()
- os.fsync(f.fileno()) # 确保写入磁盘
-
- # 验证临时文件
- temp_size = os.path.getsize(temp_plan_path)
- temp_time = datetime.fromtimestamp(os.path.getmtime(temp_plan_path)).isoformat()
- logger.info(f"已创建临时文件: {temp_plan_path} (大小: {temp_size}字节, 修改时间: {temp_time})")
-
- with open(temp_plan_path, 'r') as f:
- test_content = json.load(f) # 测试是否能正确读取
- logger.info(f"临时文件验证成功,内容可正确解析为JSON")
-
- # 重命名为正式文件
- if os.path.exists(plan_path):
- old_size = os.path.getsize(plan_path)
- old_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
- logger.info(f"删除已有文件: {plan_path} (大小: {old_size}字节, 修改时间: {old_time})")
- os.remove(plan_path) # 先删除已有文件
-
- logger.info(f"重命名临时文件: {temp_plan_path} -> {plan_path}")
- os.rename(temp_plan_path, plan_path)
-
- # 确认正式文件
- if os.path.exists(plan_path):
- final_size = os.path.getsize(plan_path)
- final_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
- logger.info(f"正式文件创建成功: {plan_path} (大小: {final_size}字节, 修改时间: {final_time})")
- else:
- logger.error(f"正式文件未成功创建: {plan_path}")
-
- # 写入就绪标志文件
- with open(ready_flag_path, 'w') as f:
- flag_content = f"Generated at {datetime.now().isoformat()}"
- f.write(flag_content)
- f.flush()
- os.fsync(f.fileno()) # 确保写入磁盘
-
- # 确认就绪标志文件
- if os.path.exists(ready_flag_path):
- flag_size = os.path.getsize(ready_flag_path)
- flag_time = datetime.fromtimestamp(os.path.getmtime(ready_flag_path)).isoformat()
- logger.info(f"就绪标志文件创建成功: {ready_flag_path} (大小: {flag_size}字节, 修改时间: {flag_time}, 内容: {flag_content})")
- else:
- logger.error(f"就绪标志文件未成功创建: {ready_flag_path}")
-
- # 再次检查目录
- final_files = os.listdir(plan_dir)
- final_plan_files = [f for f in final_files if 'execution_plan' in f or f.endswith('.ready')]
- logger.info(f"创建完成后目录中相关文件数: {len(final_plan_files)}")
- for f in final_plan_files:
- file_path = os.path.join(plan_dir, f)
- file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
- file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
- logger.info(f"最终文件: {f} (大小: {file_size}字节, 修改时间: {file_time})")
-
- logger.info(f"=== 执行计划文件创建完成 - 时间戳: {datetime.now().isoformat()} ===")
- except Exception as e:
- logger.error(f"保存执行计划到文件时出错: {str(e)}")
- import traceback
- logger.error(traceback.format_exc())
- raise # 抛出异常,确保任务失败
-
- return execution_plan_json
- except Exception as e:
- logger.error(f"创建执行计划时出错: {str(e)}")
- import traceback
- logger.error(traceback.format_exc())
- raise # 抛出异常,确保任务失败
- def bridge_prepare_to_data_func(**kwargs):
- """桥接prepare和data阶段,确保执行计划文件已就绪"""
- import os
- import time
- from datetime import datetime
-
- logger.info(f"=== 开始验证执行计划文件 - 时间戳: {datetime.now().isoformat()} ===")
-
- plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
- plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
- ready_flag_path = os.path.join(plan_dir, 'last_execution_plan.json.ready')
-
- logger.info(f"计划目录: {plan_dir}")
- logger.info(f"计划文件路径: {plan_path}")
- logger.info(f"就绪标志文件路径: {ready_flag_path}")
-
- # 获取目录中的文件列表
- all_files = os.listdir(plan_dir)
- related_files = [f for f in all_files if 'execution_plan' in f or f.endswith('.ready')]
- logger.info(f"目录中的相关文件总数: {len(related_files)}")
- for idx, file in enumerate(related_files, 1):
- file_path = os.path.join(plan_dir, file)
- file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
- file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
- logger.info(f"相关文件{idx}: {file} (大小: {file_size}字节, 修改时间: {file_time})")
-
- # 等待就绪标志文件出现
- logger.info(f"开始等待就绪标志文件: {ready_flag_path}")
- waiting_start = datetime.now()
- max_attempts = 30 # 最多等待5分钟
- for attempt in range(max_attempts):
- if os.path.exists(ready_flag_path):
- wait_duration = (datetime.now() - waiting_start).total_seconds()
- file_size = os.path.getsize(ready_flag_path)
- file_time = datetime.fromtimestamp(os.path.getmtime(ready_flag_path)).isoformat()
-
- # 读取就绪文件内容
- try:
- with open(ready_flag_path, 'r') as f:
- ready_content = f.read()
- except Exception as e:
- ready_content = f"[读取错误: {str(e)}]"
-
- logger.info(f"发现执行计划就绪标志: {ready_flag_path} (尝试次数: {attempt+1}, 等待时间: {wait_duration:.2f}秒, 大小: {file_size}字节, 修改时间: {file_time}, 内容: {ready_content})")
- break
-
- logger.info(f"等待执行计划就绪 (尝试: {attempt+1}/{max_attempts}, 已等待: {(datetime.now() - waiting_start).total_seconds():.2f}秒)...")
- time.sleep(10) # 等待10秒
-
- if not os.path.exists(ready_flag_path):
- error_msg = f"执行计划就绪标志文件不存在: {ready_flag_path},等待超时 (等待时间: {(datetime.now() - waiting_start).total_seconds():.2f}秒)"
- logger.error(error_msg)
- raise Exception(error_msg)
-
- # 验证执行计划文件
- logger.info(f"开始验证执行计划文件: {plan_path}")
- if not os.path.exists(plan_path):
- error_msg = f"执行计划文件不存在: {plan_path}"
- logger.error(error_msg)
- raise Exception(error_msg)
-
- try:
- file_size = os.path.getsize(plan_path)
- file_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
- logger.info(f"准备读取执行计划文件: {plan_path} (大小: {file_size}字节, 修改时间: {file_time})")
-
- with open(plan_path, 'r') as f:
- execution_plan = json.load(f)
- logger.info(f"成功读取并解析执行计划文件 JSON 内容")
-
- # 验证基本结构
- if not isinstance(execution_plan, dict):
- logger.error(f"执行计划格式错误: 不是有效的字典,而是 {type(execution_plan)}")
- raise ValueError("执行计划不是有效的字典")
- else:
- logger.info(f"执行计划基本结构验证: 是有效的字典对象")
-
- # 验证关键字段
- required_fields = ["exec_date", "resource_tasks", "model_tasks"]
- missing_fields = [field for field in required_fields if field not in execution_plan]
-
- if missing_fields:
- error_msg = f"执行计划缺少必要字段: {missing_fields}"
- logger.error(error_msg)
- raise ValueError(error_msg)
- else:
- logger.info(f"执行计划必要字段验证通过: 包含所有必要字段 {required_fields}")
-
- # 记录执行计划基本信息
- resource_tasks = execution_plan.get("resource_tasks", [])
- model_tasks = execution_plan.get("model_tasks", [])
- exec_date = execution_plan.get("exec_date", "未知")
-
- logger.info(f"执行计划内容摘要: 日期={exec_date}, 资源任务数={len(resource_tasks)}, 模型任务数={len(model_tasks)}")
-
- # 如果任务很少,记录具体内容
- if len(resource_tasks) + len(model_tasks) < 10:
- for idx, task in enumerate(resource_tasks, 1):
- logger.info(f"资源任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
-
- for idx, task in enumerate(model_tasks, 1):
- logger.info(f"模型任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
-
- # 如果没有任何任务,发出警告
- if not resource_tasks and not model_tasks:
- logger.warning(f"执行计划不包含任何任务,可能导致数据处理阶段没有实际工作")
-
- logger.info(f"=== 执行计划文件验证成功 - 时间戳: {datetime.now().isoformat()} ===")
- return True
- except Exception as e:
- error_msg = f"验证执行计划文件时出错: {str(e)}"
- logger.error(error_msg)
- import traceback
- logger.error(traceback.format_exc())
- logger.info(f"=== 执行计划文件验证失败 - 时间戳: {datetime.now().isoformat()} ===")
- raise Exception(error_msg)
- def init_data_processing_phase(**kwargs):
- """数据处理阶段的初始化函数,重新加载执行计划文件"""
- import os
- from datetime import datetime
-
- logger.info(f"=== 开始数据处理阶段初始化 - 时间戳: {datetime.now().isoformat()} ===")
-
- plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
- plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
- ready_flag_path = os.path.join(plan_dir, 'last_execution_plan.json.ready')
-
- logger.info(f"计划目录: {plan_dir}")
- logger.info(f"计划文件路径: {plan_path}")
- logger.info(f"就绪标志文件路径: {ready_flag_path}")
-
- # 检查目录中的文件
- all_files = os.listdir(plan_dir)
- related_files = [f for f in all_files if 'execution_plan' in f or f.endswith('.ready')]
- logger.info(f"目录中的相关文件总数: {len(related_files)}")
- for idx, file in enumerate(related_files, 1):
- file_path = os.path.join(plan_dir, file)
- file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
- file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
- logger.info(f"相关文件{idx}: {file} (大小: {file_size}字节, 修改时间: {file_time})")
-
- # 验证文件是否存在
- if not os.path.exists(plan_path):
- error_msg = f"执行计划文件不存在: {plan_path}"
- logger.error(error_msg)
- logger.info(f"=== 数据处理阶段初始化失败 - 时间戳: {datetime.now().isoformat()} ===")
- raise Exception(error_msg)
-
- file_size = os.path.getsize(plan_path)
- file_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
- logger.info(f"准备读取执行计划文件: {plan_path} (大小: {file_size}字节, 修改时间: {file_time})")
-
- try:
- # 记录读取开始时间
- read_start = datetime.now()
-
- with open(plan_path, 'r') as f:
- file_content = f.read()
- logger.info(f"成功读取文件内容,大小为 {len(file_content)} 字节")
-
- # 解析JSON
- parse_start = datetime.now()
- execution_plan = json.loads(file_content)
- parse_duration = (datetime.now() - parse_start).total_seconds()
- logger.info(f"成功解析JSON内容,耗时 {parse_duration:.4f} 秒")
-
- read_duration = (datetime.now() - read_start).total_seconds()
- logger.info(f"文件读取和解析总耗时: {read_duration:.4f} 秒")
-
- # 验证执行计划基本结构
- if not isinstance(execution_plan, dict):
- error_msg = f"执行计划不是有效的字典,实际类型: {type(execution_plan)}"
- logger.error(error_msg)
- raise ValueError(error_msg)
-
- # 存储到XCom中,以便后续任务使用
- push_start = datetime.now()
-
- # 先序列化为JSON字符串
- execution_plan_json = json.dumps(execution_plan, default=json_serial)
- logger.info(f"序列化执行计划为JSON字符串,大小为 {len(execution_plan_json)} 字节")
-
- # 推送到XCom
- kwargs['ti'].xcom_push(key='data_phase_execution_plan', value=execution_plan_json)
- push_duration = (datetime.now() - push_start).total_seconds()
- logger.info(f"成功推送执行计划到XCom,耗时 {push_duration:.4f} 秒")
-
- # 记录执行计划基本信息
- resource_tasks = execution_plan.get("resource_tasks", [])
- model_tasks = execution_plan.get("model_tasks", [])
- exec_date = execution_plan.get("exec_date", "未知")
-
- logger.info(f"执行计划内容摘要: 日期={exec_date}, 资源任务数={len(resource_tasks)}, 模型任务数={len(model_tasks)}")
-
- # 如果任务较少,记录详细信息
- if len(resource_tasks) + len(model_tasks) < 10:
- for idx, task in enumerate(resource_tasks, 1):
- logger.info(f"资源任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
-
- for idx, task in enumerate(model_tasks, 1):
- logger.info(f"模型任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
-
- result = {
- "exec_date": exec_date,
- "resource_count": len(resource_tasks),
- "model_count": len(model_tasks)
- }
-
- logger.info(f"=== 数据处理阶段初始化完成 - 时间戳: {datetime.now().isoformat()} ===")
- return result
- except json.JSONDecodeError as e:
- error_msg = f"执行计划文件JSON解析失败: {str(e)}"
- logger.error(error_msg)
-
- # 记录文件内容摘要以帮助调试
- try:
- with open(plan_path, 'r') as f:
- content = f.read(1000) # 只读取前1000个字符
- logger.error(f"文件内容前1000个字符: {content}...")
- except Exception as read_error:
- logger.error(f"尝试读取文件内容时出错: {str(read_error)}")
-
- import traceback
- logger.error(traceback.format_exc())
- logger.info(f"=== 数据处理阶段初始化失败 - 时间戳: {datetime.now().isoformat()} ===")
- raise Exception(error_msg)
- except Exception as e:
- error_msg = f"数据处理阶段初始化失败: {str(e)}"
- logger.error(error_msg)
- import traceback
- logger.error(traceback.format_exc())
- logger.info(f"=== 数据处理阶段初始化失败 - 时间戳: {datetime.now().isoformat()} ===")
- raise Exception(error_msg)
- #############################################
- # 第二阶段: 数据处理阶段(Data Processing Phase)
- #############################################
- def get_latest_date():
- """获取数据库中包含记录的最近日期"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- cursor.execute("""
- SELECT DISTINCT exec_date
- FROM airflow_dag_schedule
- ORDER BY exec_date DESC
- LIMIT 1
- """)
- result = cursor.fetchone()
- if result:
- latest_date = result[0]
- logger.info(f"找到最近的包含记录的日期: {latest_date}")
- return latest_date
- else:
- logger.warning("未找到包含记录的日期,将使用当前日期")
- return get_today_date()
- except Exception as e:
- logger.error(f"查找最近日期时出错: {str(e)}")
- return get_today_date()
- finally:
- cursor.close()
- conn.close()
- def get_all_tasks(exec_date):
- """获取所有需要执行的任务(DataResource和DataModel)"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- # 查询所有资源表任务
- cursor.execute("""
- SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
- """, (exec_date,))
- resource_results = cursor.fetchall()
-
- # 查询所有模型表任务
- cursor.execute("""
- SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
- """, (exec_date,))
- model_results = cursor.fetchall()
-
- # 整理资源表信息
- resource_tasks = []
- for row in resource_results:
- source_table, target_table, target_table_label, script_name, script_exec_mode = row
- if script_name: # 确保脚本名称不为空
- resource_tasks.append({
- "source_table": source_table,
- "target_table": target_table,
- "target_table_label": target_table_label,
- "script_name": script_name,
- "script_exec_mode": script_exec_mode or "append"
- })
-
- # 整理模型表信息
- model_tasks = []
- for row in model_results:
- source_table, target_table, target_table_label, script_name, script_exec_mode = row
- if script_name: # 确保脚本名称不为空
- model_tasks.append({
- "source_table": source_table,
- "target_table": target_table,
- "target_table_label": target_table_label,
- "script_name": script_name,
- "script_exec_mode": script_exec_mode or "append"
- })
-
- logger.info(f"获取到 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
- return resource_tasks, model_tasks
- except Exception as e:
- logger.error(f"获取任务信息时出错: {str(e)}")
- return [], []
- finally:
- cursor.close()
- conn.close()
- def get_table_dependencies_for_data_phase(table_names):
- """获取表之间的依赖关系"""
- driver = get_neo4j_driver()
- dependency_dict = {name: [] for name in table_names}
-
- try:
- with driver.session() as session:
- # 获取所有模型表之间的依赖关系
- query = """
- MATCH (source:DataModel)-[:DERIVED_FROM]->(target)
- WHERE source.en_name IN $table_names
- RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
- """
- result = session.run(query, table_names=table_names)
-
- for record in result:
- source = record.get("source")
- target = record.get("target")
- target_labels = record.get("target_labels", [])
-
- if source and target:
- # 将目标表添加到源表的依赖列表中
- dependency_dict[source].append({
- "table_name": target,
- "table_type": next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
- })
- logger.debug(f"依赖关系: {source} 依赖于 {target}")
- except Exception as e:
- logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
- finally:
- driver.close()
-
- return dependency_dict
- def process_resource(target_table, script_name, script_exec_mode, exec_date):
- """处理单个资源表"""
- logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
- # 检查exec_date是否是JSON字符串
- if isinstance(exec_date, str) and exec_date.startswith('{'):
- try:
- # 尝试解析JSON字符串
- exec_date_data = json.loads(exec_date)
- exec_date = exec_date_data.get("exec_date")
- logger.info(f"从JSON中提取执行日期: {exec_date}")
- except Exception as e:
- logger.error(f"解析exec_date JSON时出错: {str(e)}")
-
- try:
- # 直接调用执行监控函数,确保脚本得到执行
- result = execute_with_monitoring(
- target_table=target_table,
- script_name=script_name,
- script_exec_mode=script_exec_mode,
- exec_date=exec_date
- )
- logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
- return f"处理资源表 {target_table} 完成,结果: {result}"
- except Exception as e:
- logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
- import traceback
- logger.error(traceback.format_exc())
- # 返回错误信息,但不抛出异常,确保DAG可以继续执行
- return f"处理资源表 {target_table} 失败: {str(e)}"
- def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_date, **kwargs):
- """执行脚本并监控执行情况"""
- from pathlib import Path
- import importlib.util
- import sys
-
- logger.info(f"=== 开始执行任务 {target_table} 的脚本 {script_name} - 时间戳: {datetime.now().isoformat()} ===")
- # 检查script_name是否为空
- if not script_name:
- logger.error(f"表 {target_table} 的script_name为空,无法执行")
- # 记录执行失败到数据库
- now = datetime.now()
- update_task_completion(exec_date, target_table, script_name or "", False, now, 0)
- return False
-
- # 记录执行开始时间
- start_time = datetime.now()
- update_task_start_time(exec_date, target_table, script_name, start_time)
- logger.info(f"任务开始时间: {start_time.isoformat()}")
-
- try:
- # 执行实际脚本
- script_path = Path(SCRIPTS_BASE_PATH) / script_name
- logger.info(f"脚本完整路径: {script_path}")
-
- if not script_path.exists():
- logger.error(f"脚本文件不存在: {script_path}")
- end_time = datetime.now()
- duration = (end_time - start_time).total_seconds()
- update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
- return False
-
- try:
- # 动态导入模块
- module_name = f"dynamic_module_{abs(hash(script_name))}"
- spec = importlib.util.spec_from_file_location(module_name, script_path)
- module = importlib.util.module_from_spec(spec)
- sys.modules[module_name] = module
- spec.loader.exec_module(module)
-
- # 使用标准入口函数run
- if hasattr(module, "run"):
- logger.info(f"执行脚本 {script_name} 的标准入口函数 run()")
- result = module.run(table_name=target_table, execution_mode=script_exec_mode)
- logger.info(f"脚本 {script_name} 执行结果: {result}")
- success = True if result else False
- else:
- logger.warning(f"脚本 {script_name} 未定义标准入口函数 run(),尝试使用main函数")
- if hasattr(module, "main"):
- logger.info(f"执行脚本 {script_name} 的main函数")
- result = module.main(table_name=target_table, execution_mode=script_exec_mode)
- logger.info(f"脚本 {script_name} 执行结果: {result}")
- success = True if result else False
- else:
- logger.error(f"脚本 {script_name} 没有定义标准入口函数 run() 或 main()")
- success = False
- except Exception as script_e:
- logger.error(f"执行脚本 {script_name} 时出错: {str(script_e)}")
- import traceback
- logger.error(traceback.format_exc())
- success = False
-
- # 记录结束时间和结果
- end_time = datetime.now()
- duration = (end_time - start_time).total_seconds()
- update_task_completion(exec_date, target_table, script_name, success, end_time, duration)
-
- logger.info(f"任务结束时间: {end_time.isoformat()}, 执行时长: {duration:.2f}秒, 结果: {success}")
- logger.info(f"=== 完成执行任务 {target_table} 的脚本 {script_name} - 时间戳: {datetime.now().isoformat()} ===")
-
- return success
- except Exception as e:
- # 处理异常
- logger.error(f"执行任务出错: {str(e)}")
- import traceback
- logger.error(traceback.format_exc())
- end_time = datetime.now()
- duration = (end_time - start_time).total_seconds()
- update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
-
- logger.info(f"=== 执行任务 {target_table} 的脚本 {script_name} 失败 - 时间戳: {datetime.now().isoformat()} ===")
- return False
- def update_task_start_time(exec_date, target_table, script_name, start_time):
- """更新任务开始时间"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- cursor.execute("""
- UPDATE airflow_dag_schedule
- SET exec_start_time = %s
- WHERE exec_date = %s AND target_table = %s AND script_name = %s
- """, (start_time, exec_date, target_table, script_name))
- conn.commit()
- logger.info(f"已更新表 {target_table} 的脚本 {script_name} 的开始时间: {start_time}")
- except Exception as e:
- logger.error(f"更新任务开始时间失败: {str(e)}")
- conn.rollback()
- finally:
- cursor.close()
- conn.close()
- def update_task_completion(exec_date, target_table, script_name, success, end_time, duration):
- """更新任务完成信息"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- cursor.execute("""
- UPDATE airflow_dag_schedule
- SET exec_result = %s, exec_end_time = %s, exec_duration = %s
- WHERE exec_date = %s AND target_table = %s AND script_name = %s
- """, (success, end_time, duration, exec_date, target_table, script_name))
- conn.commit()
- logger.info(f"已更新表 {target_table} 的脚本 {script_name} 的完成状态: 结果={success}, 结束时间={end_time}, 耗时={duration}秒")
- except Exception as e:
- logger.error(f"更新任务完成信息失败: {str(e)}")
- conn.rollback()
- finally:
- cursor.close()
- conn.close()
- def process_model(target_table, script_name, script_exec_mode, exec_date):
- """处理单个模型表"""
- logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
- # 检查exec_date是否是JSON字符串
- if isinstance(exec_date, str) and exec_date.startswith('{'):
- try:
- # 尝试解析JSON字符串
- exec_date_data = json.loads(exec_date)
- exec_date = exec_date_data.get("exec_date")
- logger.info(f"从JSON中提取执行日期: {exec_date}")
- except Exception as e:
- logger.error(f"解析exec_date JSON时出错: {str(e)}")
-
- try:
- # 直接调用执行监控函数,确保脚本得到执行
- result = execute_with_monitoring(
- target_table=target_table,
- script_name=script_name,
- script_exec_mode=script_exec_mode,
- exec_date=exec_date
- )
- logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
- return f"处理模型表 {target_table} 完成,结果: {result}"
- except Exception as e:
- logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
- import traceback
- logger.error(traceback.format_exc())
- # 返回错误信息,但不抛出异常,确保DAG可以继续执行
- return f"处理模型表 {target_table} 失败: {str(e)}"
- #############################################
- # 第三阶段: 汇总阶段(Summary Phase)的函数
- #############################################
- def get_execution_stats(exec_date):
- """获取当日执行统计信息"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- # 查询总任务数
- cursor.execute("""
- SELECT COUNT(*) FROM airflow_dag_schedule WHERE exec_date = %s
- """, (exec_date,))
- result = cursor.fetchone()
- total_tasks = result[0] if result else 0
-
- # 查询每种类型的任务数
- cursor.execute("""
- SELECT target_table_label, COUNT(*)
- FROM airflow_dag_schedule
- WHERE exec_date = %s
- GROUP BY target_table_label
- """, (exec_date,))
- type_counts = {row[0]: row[1] for row in cursor.fetchall()}
-
- # 查询执行结果统计
- cursor.execute("""
- SELECT COUNT(*)
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND exec_result IS TRUE
- """, (exec_date,))
- result = cursor.fetchone()
- success_count = result[0] if result else 0
-
- cursor.execute("""
- SELECT COUNT(*)
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND exec_result IS FALSE
- """, (exec_date,))
- result = cursor.fetchone()
- fail_count = result[0] if result else 0
-
- cursor.execute("""
- SELECT COUNT(*)
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND exec_result IS NULL
- """, (exec_date,))
- result = cursor.fetchone()
- pending_count = result[0] if result else 0
-
- # 计算执行时间统计
- cursor.execute("""
- SELECT AVG(exec_duration), MIN(exec_duration), MAX(exec_duration)
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND exec_duration IS NOT NULL
- """, (exec_date,))
- time_stats = cursor.fetchone()
-
- # 确保时间统计不为None
- if time_stats and time_stats[0] is not None:
- avg_duration = float(time_stats[0])
- min_duration = float(time_stats[1]) if time_stats[1] is not None else None
- max_duration = float(time_stats[2]) if time_stats[2] is not None else None
- else:
- avg_duration = None
- min_duration = None
- max_duration = None
-
- # 查询失败任务详情
- cursor.execute("""
- SELECT target_table, script_name, target_table_label, exec_duration
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND exec_result IS FALSE
- """, (exec_date,))
- failed_tasks = []
- for row in cursor.fetchall():
- task_dict = {
- "target_table": row[0],
- "script_name": row[1],
- "target_table_label": row[2],
- }
- if row[3] is not None:
- task_dict["exec_duration"] = float(row[3])
- else:
- task_dict["exec_duration"] = None
- failed_tasks.append(task_dict)
-
- # 计算成功率,避免除零错误
- success_rate = 0
- if total_tasks > 0:
- success_rate = (success_count / total_tasks) * 100
-
- # 汇总统计信息
- stats = {
- "exec_date": exec_date,
- "total_tasks": total_tasks,
- "type_counts": type_counts,
- "success_count": success_count,
- "fail_count": fail_count,
- "pending_count": pending_count,
- "success_rate": success_rate,
- "avg_duration": avg_duration,
- "min_duration": min_duration,
- "max_duration": max_duration,
- "failed_tasks": failed_tasks
- }
-
- return stats
- except Exception as e:
- logger.error(f"获取执行统计信息时出错: {str(e)}")
- return {}
- finally:
- cursor.close()
- conn.close()
- def update_missing_results(exec_date):
- """更新缺失的执行结果信息"""
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- # 查询所有缺失执行结果的任务
- cursor.execute("""
- SELECT target_table, script_name
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND exec_result IS NULL
- """, (exec_date,))
- missing_results = cursor.fetchall()
-
- update_count = 0
- for row in missing_results:
- target_table, script_name = row
-
- # 如果有开始时间但没有结束时间,假设执行失败
- cursor.execute("""
- SELECT exec_start_time
- FROM airflow_dag_schedule
- WHERE exec_date = %s AND target_table = %s AND script_name = %s
- """, (exec_date, target_table, script_name))
-
- start_time = cursor.fetchone()
-
- if start_time and start_time[0]:
- # 有开始时间但无结果,标记为失败
- now = datetime.now()
- duration = (now - start_time[0]).total_seconds()
-
- cursor.execute("""
- UPDATE airflow_dag_schedule
- SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
- WHERE exec_date = %s AND target_table = %s AND script_name = %s
- """, (now, duration, exec_date, target_table, script_name))
-
- logger.warning(f"任务 {target_table} 的脚本 {script_name} 标记为失败,开始时间: {start_time[0]}")
- update_count += 1
- else:
- # 没有开始时间且无结果,假设未执行
- logger.warning(f"任务 {target_table} 的脚本 {script_name} 未执行")
-
- conn.commit()
- logger.info(f"更新了 {update_count} 个缺失结果的任务")
- return update_count
- except Exception as e:
- logger.error(f"更新缺失执行结果时出错: {str(e)}")
- conn.rollback()
- return 0
- finally:
- cursor.close()
- conn.close()
- def generate_unified_execution_report(exec_date, stats):
- """生成统一执行报告"""
- # 构建报告
- report = []
- report.append(f"========== 统一数据运维系统执行报告 ==========")
- report.append(f"执行日期: {exec_date}")
- report.append(f"总任务数: {stats['total_tasks']}")
-
- # 任务类型分布
- report.append("\n--- 任务类型分布 ---")
- for label, count in stats.get('type_counts', {}).items():
- report.append(f"{label} 任务: {count} 个")
-
- # 执行结果统计
- report.append("\n--- 执行结果统计 ---")
- report.append(f"成功任务: {stats.get('success_count', 0)} 个")
- report.append(f"失败任务: {stats.get('fail_count', 0)} 个")
- report.append(f"未执行任务: {stats.get('pending_count', 0)} 个")
- report.append(f"成功率: {stats.get('success_rate', 0):.2f}%")
-
- # 执行时间统计
- report.append("\n--- 执行时间统计 (秒) ---")
- avg_duration = stats.get('avg_duration')
- min_duration = stats.get('min_duration')
- max_duration = stats.get('max_duration')
-
- report.append(f"平均执行时间: {avg_duration:.2f}" if avg_duration is not None else "平均执行时间: N/A")
- report.append(f"最短执行时间: {min_duration:.2f}" if min_duration is not None else "最短执行时间: N/A")
- report.append(f"最长执行时间: {max_duration:.2f}" if max_duration is not None else "最长执行时间: N/A")
-
- # 失败任务详情
- failed_tasks = stats.get('failed_tasks', [])
- if failed_tasks:
- report.append("\n--- 失败任务详情 ---")
- for i, task in enumerate(failed_tasks, 1):
- report.append(f"{i}. 表名: {task['target_table']}")
- report.append(f" 脚本: {task['script_name']}")
- report.append(f" 类型: {task['target_table_label']}")
- exec_duration = task.get('exec_duration')
- if exec_duration is not None:
- report.append(f" 执行时间: {exec_duration:.2f} 秒")
- else:
- report.append(" 执行时间: N/A")
-
- report.append("\n========== 报告结束 ==========")
-
- # 将报告转换为字符串
- report_str = "\n".join(report)
-
- # 记录到日志
- logger.info("\n" + report_str)
-
- return report_str
- def summarize_execution(**context):
- """
- 汇总执行计划的执行情况,生成报告
- """
- logger.info(f"=== 开始汇总执行情况 - 时间戳: {datetime.now().isoformat()} ===")
- try:
- # 获取执行日期
- execution_date = context.get('execution_date', datetime.now())
- exec_date = execution_date.strftime('%Y-%m-%d')
-
- # 从本地文件加载执行计划
- plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
- plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
-
- if not os.path.exists(plan_path):
- logger.warning(f"执行计划文件不存在: {plan_path}")
- return "执行计划文件不存在,无法生成汇总报告"
-
- with open(plan_path, 'r') as f:
- execution_plan = json.loads(f.read())
-
- # 获取任务列表
- resource_tasks = execution_plan.get("resource_tasks", [])
- model_tasks = execution_plan.get("model_tasks", [])
- all_tasks = resource_tasks + model_tasks
-
- # 连接数据库,获取任务执行状态
- conn = get_pg_conn()
- cursor = conn.cursor()
-
- # 分析任务执行状态
- successful_tasks = []
- failed_tasks = []
- skipped_tasks = []
-
- for task in all_tasks:
- table_name = task["target_table"]
- table_type = "资源表" if task in resource_tasks else "模型表"
-
- # 查询任务执行状态
- cursor.execute("""
- SELECT status FROM airflow_task_execution
- WHERE table_name = %s AND exec_date = %s
- ORDER BY execution_time DESC LIMIT 1
- """, (table_name, exec_date))
-
- result = cursor.fetchone()
- status = result[0] if result else "未执行"
-
- task_info = {
- "table_name": table_name,
- "table_type": table_type,
- "script_name": task["script_name"],
- "status": status
- }
-
- if status == "成功":
- successful_tasks.append(task_info)
- elif status == "失败":
- failed_tasks.append(task_info)
- else:
- skipped_tasks.append(task_info)
-
- # 生成汇总报告
- total_tasks = len(all_tasks)
- success_count = len(successful_tasks)
- fail_count = len(failed_tasks)
- skip_count = len(skipped_tasks)
-
- summary = f"""
- 执行日期: {exec_date}
- 总任务数: {total_tasks}
- 成功任务数: {success_count}
- 失败任务数: {fail_count}
- 跳过任务数: {skip_count}
- === 成功任务 ===
- """
-
- for task in successful_tasks:
- summary += f"{task['table_type']} {task['table_name']} ({task['script_name']})\n"
-
- if failed_tasks:
- summary += "\n=== 失败任务 ===\n"
- for task in failed_tasks:
- summary += f"{task['table_type']} {task['table_name']} ({task['script_name']})\n"
-
- if skipped_tasks:
- summary += "\n=== 跳过任务 ===\n"
- for task in skipped_tasks:
- summary += f"{task['table_type']} {task['table_name']} ({task['script_name']})\n"
-
- # 更新汇总表
- cursor.execute("""
- INSERT INTO airflow_execution_summary
- (exec_date, total_tasks, success_count, fail_count, skip_count, summary_text, created_at)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- ON CONFLICT (exec_date)
- DO UPDATE SET
- total_tasks = EXCLUDED.total_tasks,
- success_count = EXCLUDED.success_count,
- fail_count = EXCLUDED.fail_count,
- skip_count = EXCLUDED.skip_count,
- summary_text = EXCLUDED.summary_text,
- updated_at = CURRENT_TIMESTAMP
- """, (
- exec_date, total_tasks, success_count, fail_count, skip_count,
- summary, datetime.now()
- ))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- logger.info(f"=== 执行情况汇总完成 - 时间戳: {datetime.now().isoformat()} ===")
- return summary
- except Exception as e:
- logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
- # 返回一个简单的错误报告,确保任务不会失败
- return f"执行汇总时出现错误: {str(e)}"
- # 创建DAG
- with DAG(
- "dag_dataops_unified_scheduler",
- start_date=datetime(2024, 1, 1),
- schedule_interval="@daily",
- catchup=False,
- default_args={
- 'owner': 'airflow',
- 'depends_on_past': False,
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'retries': 1,
- 'retry_delay': timedelta(minutes=5)
- }
- ) as dag:
-
- # 初始化全局变量,避免在DAG解析时出现未定义错误
- globals()['_resource_tasks'] = []
- globals()['_task_dict'] = {}
-
- # DAG开始任务
- dag_start = EmptyOperator(task_id="dag_start")
-
- # DAG结束任务
- dag_end = EmptyOperator(
- task_id="dag_end",
- trigger_rule="all_done" # 确保DAG无论上游任务成功与否都能完成
- )
-
- # 准备阶段任务
- prepare_task = PythonOperator(
- task_id="prepare_dag_schedule",
- python_callable=prepare_dag_schedule,
- provide_context=True
- )
-
- # 汇总执行情况任务
- summarize_task = PythonOperator(
- task_id='summarize_execution',
- python_callable=summarize_execution,
- provide_context=True,
- trigger_rule='all_done', # 无论之前的任务成功还是失败都执行
- retries=2, # 增加重试次数
- retry_delay=timedelta(minutes=1) # 重试延迟
- )
-
- # 数据处理阶段
- # 获取所有需要执行的任务(实际任务,不是TaskGroup包装的任务)
- exec_date = get_latest_date()
- resource_tasks, model_tasks = get_all_tasks(exec_date)
-
- # 创建任务字典,用于设置依赖关系
- task_dict = {}
-
- # 创建资源表任务
- for task_info in resource_tasks:
- table_name = task_info["target_table"]
- script_name = task_info["script_name"]
- exec_mode = task_info.get("script_exec_mode", "append")
-
- # 创建安全的任务ID
- safe_table_name = table_name.replace(".", "_").replace("-", "_")
- task_id = f"resource_{safe_table_name}"
-
- # 直接使用 execute_with_monitoring 函数,确保执行脚本
- resource_task = PythonOperator(
- task_id=task_id,
- python_callable=execute_with_monitoring,
- op_kwargs={
- "target_table": table_name,
- "script_name": script_name,
- "script_exec_mode": exec_mode,
- "exec_date": exec_date
- },
- retries=2,
- retry_delay=timedelta(minutes=1),
- trigger_rule="all_done" # 确保无论上游任务成功或失败都会执行
- )
-
- # 将任务添加到字典
- task_dict[table_name] = resource_task
-
- # 设置依赖关系:prepare_task -> resource_task
- prepare_task >> resource_task
-
- # 为所有模型表获取依赖关系
- model_table_names = [task["target_table"] for task in model_tasks]
- dependencies = get_table_dependencies_for_data_phase(model_table_names)
-
- # 创建有向图,用于确定执行顺序
- G = nx.DiGraph()
-
- # 将所有模型表添加为节点
- for task_info in model_tasks:
- G.add_node(task_info["target_table"])
-
- # 添加依赖边
- for source, deps in dependencies.items():
- for dep in deps:
- if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
- G.add_edge(dep.get("table_name"), source)
-
- # 处理循环依赖
- cycles = list(nx.simple_cycles(G))
- if cycles:
- for cycle in cycles:
- G.remove_edge(cycle[-1], cycle[0])
-
- # 获取执行顺序
- try:
- execution_order = list(nx.topological_sort(G))
- except Exception as e:
- execution_order = [task["target_table"] for task in model_tasks]
-
- # 创建模型表任务
- for table_name in execution_order:
- task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
- if not task_info:
- continue
-
- script_name = task_info["script_name"]
- exec_mode = task_info.get("script_exec_mode", "append")
-
- # 创建安全的任务ID
- safe_table_name = table_name.replace(".", "_").replace("-", "_")
- task_id = f"model_{safe_table_name}"
-
- # 直接使用 execute_with_monitoring 函数执行脚本
- model_task = PythonOperator(
- task_id=task_id,
- python_callable=execute_with_monitoring,
- op_kwargs={
- "target_table": table_name,
- "script_name": script_name,
- "script_exec_mode": exec_mode,
- "exec_date": exec_date
- },
- retries=2,
- retry_delay=timedelta(minutes=1),
- trigger_rule="all_done" # 确保无论上游任务成功或失败都会执行
- )
-
- # 将任务添加到字典
- task_dict[table_name] = model_task
-
- # 设置依赖关系
- deps = dependencies.get(table_name, [])
- has_dependency = False
-
- # 处理模型表之间的依赖
- for dep in deps:
- dep_table = dep.get("table_name")
- if dep_table in task_dict:
- task_dict[dep_table] >> model_task
- has_dependency = True
-
- # 如果没有依赖,则依赖于所有资源表任务
- if not has_dependency and resource_tasks:
- for resource_task_info in resource_tasks:
- resource_name = resource_task_info["target_table"]
- if resource_name in task_dict:
- task_dict[resource_name] >> model_task
-
- # 如果没有依赖,也没有资源表,则直接依赖于prepare_task
- if not has_dependency and not resource_tasks:
- prepare_task >> model_task
-
- # 所有处理任务都是summarize_task的上游
- for task in task_dict.values():
- task >> summarize_task
-
- # 设置主要流程
- dag_start >> prepare_task
-
- # 创建执行计划文件任务
- create_plan_task = PythonOperator(
- task_id="create_execution_plan",
- python_callable=create_execution_plan,
- provide_context=True
- )
-
- # 设置依赖关系
- prepare_task >> create_plan_task >> summarize_task >> dag_end
-
- logger.info(f"DAG dag_dataops_unified_scheduler 定义完成,创建了 {len(task_dict)} 个脚本执行任务")
-
- # 尝试从数据库获取最新的执行计划,用于WebUI展示
- try:
- # 使用一个只在DAG加载时执行一次的简单查询来获取表信息
- # 这只用于UI展示,不影响实际执行
- conn = get_pg_conn()
- cursor = conn.cursor()
- try:
- cursor.execute("""
- SELECT COUNT(*) FROM airflow_dag_schedule
- """)
- count = cursor.fetchone()
- if count and count[0] > 0:
- logger.info(f"数据库中有 {count[0]} 条任务记录可用于调度")
- else:
- logger.info("数据库中没有找到任务记录,DAG的第一次运行将创建初始计划")
- except Exception as e:
- logger.warning(f"查询数据库时出错: {str(e)}, 这不会影响DAG的实际执行")
- finally:
- cursor.close()
- conn.close()
- except Exception as e:
- logger.warning(f"初始化DAG时发生错误: {str(e)}, 这不会影响DAG的实际执行")
- # 确保即使出错,也有清晰的执行路径
- # 已经有默认依赖链,不需要额外添加
|