dag_dataops_pipeline_prepare_scheduler.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746
  1. # dag_dataops_pipeline_prepare_scheduler.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator, ShortCircuitOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from datetime import datetime, timedelta
  6. import logging
  7. import networkx as nx
  8. import json
  9. import os
  10. import re
  11. import glob
  12. from pathlib import Path
  13. import hashlib
  14. from common import (
  15. get_pg_conn,
  16. get_neo4j_driver,
  17. get_today_date
  18. )
  19. from config import PG_CONFIG, NEO4J_CONFIG, EXECUTION_PLAN_KEEP_COUNT
  20. # 创建日志记录器
  21. logger = logging.getLogger(__name__)
  22. def get_enabled_tables():
  23. """获取所有启用的表"""
  24. conn = get_pg_conn()
  25. cursor = conn.cursor()
  26. try:
  27. cursor.execute("""
  28. SELECT owner_id, table_name
  29. FROM schedule_status
  30. WHERE schedule_is_enabled = TRUE
  31. """)
  32. result = cursor.fetchall()
  33. return [row[1] for row in result] # 只返回表名
  34. except Exception as e:
  35. logger.error(f"获取启用表失败: {str(e)}")
  36. return []
  37. finally:
  38. cursor.close()
  39. conn.close()
  40. def check_table_directly_subscribed(table_name):
  41. """检查表是否在schedule_status表中直接订阅"""
  42. conn = get_pg_conn()
  43. cursor = conn.cursor()
  44. try:
  45. cursor.execute("""
  46. SELECT schedule_is_enabled
  47. FROM schedule_status
  48. WHERE table_name = %s
  49. """, (table_name,))
  50. result = cursor.fetchone()
  51. return result and result[0] is True
  52. except Exception as e:
  53. logger.error(f"检查表订阅状态失败: {str(e)}")
  54. return False
  55. finally:
  56. cursor.close()
  57. conn.close()
  58. def get_table_info_from_neo4j(table_name):
  59. """从Neo4j获取表的详细信息"""
  60. driver = get_neo4j_driver()
  61. # 检查表是否直接订阅
  62. is_directly_schedule = check_table_directly_subscribed(table_name)
  63. table_info = {
  64. 'target_table': table_name,
  65. 'is_directly_schedule': is_directly_schedule, # 初始值设为True,从schedule_status表获取
  66. }
  67. try:
  68. with driver.session() as session:
  69. # 查询表标签和状态
  70. query_table = """
  71. MATCH (t {en_name: $table_name})
  72. RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency
  73. """
  74. result = session.run(query_table, table_name=table_name)
  75. record = result.single()
  76. if record:
  77. labels = record.get("labels", [])
  78. table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
  79. table_info['target_table_status'] = record.get("status", True) # 默认为True
  80. table_info['default_update_frequency'] = record.get("frequency")
  81. # 根据标签类型查询关系和脚本信息
  82. if "DataResource" in labels:
  83. query_rel = """
  84. MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
  85. RETURN source.en_name AS source_table, rel.script_name AS script_name,
  86. rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
  87. """
  88. elif "DataModel" in labels:
  89. query_rel = """
  90. MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
  91. RETURN source.en_name AS source_table, rel.script_name AS script_name,
  92. rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
  93. """
  94. else:
  95. logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
  96. return table_info
  97. result = session.run(query_rel, table_name=table_name)
  98. record = result.single()
  99. if record:
  100. table_info['source_table'] = record.get("source_table")
  101. # 检查script_name是否为空
  102. script_name = record.get("script_name")
  103. if not script_name:
  104. logger.warning(f"表 {table_name} 的关系中没有script_name属性,可能导致后续处理出错")
  105. table_info['script_name'] = script_name
  106. # 设置默认值,确保即使属性为空也有默认值
  107. table_info['script_type'] = record.get("script_type", "python") # 默认为python
  108. table_info['script_exec_mode'] = record.get("script_exec_mode", "append") # 默认为append
  109. else:
  110. logger.warning(f"未找到表 {table_name} 的关系信息")
  111. else:
  112. logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
  113. except Exception as e:
  114. logger.error(f"获取表 {table_name} 的信息时出错: {str(e)}")
  115. finally:
  116. driver.close()
  117. return table_info
  118. def process_dependencies(tables_info):
  119. """处理表间依赖关系,添加被动调度的表"""
  120. # 存储所有表信息的字典
  121. all_tables = {t['target_table']: t for t in tables_info}
  122. driver = get_neo4j_driver()
  123. try:
  124. with driver.session() as session:
  125. for table_name, table_info in list(all_tables.items()):
  126. if table_info.get('target_table_label') == 'DataModel':
  127. # 查询其依赖表
  128. query = """
  129. MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
  130. RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels,
  131. dep.status AS dep_status, dep.frequency AS dep_frequency
  132. """
  133. result = session.run(query, table_name=table_name)
  134. for record in result:
  135. dep_name = record.get("dep_name")
  136. dep_labels = record.get("dep_labels", [])
  137. dep_status = record.get("dep_status", True)
  138. dep_frequency = record.get("dep_frequency")
  139. # 处理未被直接调度的依赖表
  140. if dep_name and dep_name not in all_tables:
  141. logger.info(f"发现被动依赖表: {dep_name}, 标签: {dep_labels}")
  142. # 获取依赖表详细信息
  143. dep_info = get_table_info_from_neo4j(dep_name)
  144. dep_info['is_directly_schedule'] = False
  145. # 处理调度频率继承
  146. if not dep_info.get('default_update_frequency'):
  147. dep_info['default_update_frequency'] = table_info.get('default_update_frequency')
  148. all_tables[dep_name] = dep_info
  149. except Exception as e:
  150. logger.error(f"处理依赖关系时出错: {str(e)}")
  151. finally:
  152. driver.close()
  153. return list(all_tables.values())
  154. def filter_invalid_tables(tables_info):
  155. """过滤无效表及其依赖,使用NetworkX构建依赖图"""
  156. # 构建表名到索引的映射
  157. table_dict = {t['target_table']: i for i, t in enumerate(tables_info)}
  158. # 找出无效表
  159. invalid_tables = set()
  160. for table in tables_info:
  161. if table.get('target_table_status') is False:
  162. invalid_tables.add(table['target_table'])
  163. logger.info(f"表 {table['target_table']} 的状态为无效")
  164. # 构建依赖图
  165. G = nx.DiGraph()
  166. # 添加所有节点
  167. for table in tables_info:
  168. G.add_node(table['target_table'])
  169. # 查询并添加依赖边
  170. driver = get_neo4j_driver()
  171. try:
  172. with driver.session() as session:
  173. for table in tables_info:
  174. if table.get('target_table_label') == 'DataModel':
  175. query = """
  176. MATCH (source {en_name: $table_name})-[:DERIVED_FROM]->(target)
  177. RETURN target.en_name AS target_name
  178. """
  179. result = session.run(query, table_name=table['target_table'])
  180. for record in result:
  181. target_name = record.get("target_name")
  182. if target_name and target_name in table_dict:
  183. # 添加从目标到源的边,表示目标依赖于源
  184. G.add_edge(table['target_table'], target_name)
  185. logger.debug(f"添加依赖边: {table['target_table']} -> {target_name}")
  186. except Exception as e:
  187. logger.error(f"构建依赖图时出错: {str(e)}")
  188. finally:
  189. driver.close()
  190. # 找出依赖于无效表的所有表
  191. downstream_invalid = set()
  192. for invalid_table in invalid_tables:
  193. # 获取可从无效表到达的所有节点
  194. try:
  195. descendants = nx.descendants(G, invalid_table)
  196. downstream_invalid.update(descendants)
  197. logger.info(f"表 {invalid_table} 的下游无效表: {descendants}")
  198. except Exception as e:
  199. logger.error(f"处理表 {invalid_table} 的下游依赖时出错: {str(e)}")
  200. # 合并所有无效表
  201. all_invalid = invalid_tables.union(downstream_invalid)
  202. logger.info(f"总共 {len(all_invalid)} 个表被标记为无效: {all_invalid}")
  203. # 过滤出有效表
  204. valid_tables = [t for t in tables_info if t['target_table'] not in all_invalid]
  205. logger.info(f"过滤后保留 {len(valid_tables)} 个有效表")
  206. return valid_tables
  207. def touch_data_scheduler_file():
  208. """
  209. 更新数据调度器DAG文件的修改时间,触发重新解析
  210. 返回:
  211. bool: 是否成功更新
  212. """
  213. data_scheduler_path = os.path.join(os.path.dirname(__file__), 'dag_dataops_pipeline_data_scheduler.py')
  214. success = False
  215. try:
  216. if os.path.exists(data_scheduler_path):
  217. # 更新文件修改时间,触发Airflow重新解析
  218. os.utime(data_scheduler_path, None)
  219. logger.info(f"已触发数据调度器DAG重新解析: {data_scheduler_path}")
  220. success = True
  221. else:
  222. logger.warning(f"数据调度器DAG文件不存在: {data_scheduler_path}")
  223. return success
  224. except Exception as e:
  225. logger.error(f"触发DAG重新解析时出错: {str(e)}")
  226. return False
  227. def get_subscription_state_hash():
  228. """获取订阅表状态的哈希值"""
  229. conn = get_pg_conn()
  230. cursor = conn.cursor()
  231. try:
  232. cursor.execute("""
  233. SELECT table_name, schedule_is_enabled
  234. FROM schedule_status
  235. ORDER BY table_name
  236. """)
  237. rows = cursor.fetchall()
  238. # 将所有行拼接成一个字符串,然后计算哈希值
  239. data_str = '|'.join(f"{row[0]}:{row[1]}" for row in rows)
  240. return hashlib.md5(data_str.encode()).hexdigest()
  241. except Exception as e:
  242. logger.error(f"计算订阅表状态哈希值时出错: {str(e)}")
  243. return None
  244. finally:
  245. cursor.close()
  246. conn.close()
  247. def has_any_execution_plans():
  248. """
  249. 检查当前目录下是否存在任何执行计划文件
  250. 返回:
  251. bool: 如果存在任何执行计划文件返回True,否则返回False
  252. """
  253. dag_dir = os.path.dirname(__file__)
  254. for file in os.listdir(dag_dir):
  255. if file.startswith('exec_plan_') and file.endswith('.json'):
  256. logger.info(f"找到现有执行计划文件: {file}")
  257. return True
  258. logger.info("未找到任何执行计划文件")
  259. return False
  260. def get_execution_plan_files():
  261. """
  262. 获取所有执行计划文件,按日期排序
  263. 返回:
  264. list: 排序后的执行计划文件列表,格式为[(日期, json文件路径, ready文件路径)]
  265. """
  266. dag_dir = os.path.dirname(__file__)
  267. plan_files = []
  268. # 查找所有执行计划文件
  269. for file in os.listdir(dag_dir):
  270. match = re.match(r'exec_plan_(\d{4}-\d{2}-\d{2})\.json', file)
  271. if match:
  272. date_str = match.group(1)
  273. json_path = os.path.join(dag_dir, file)
  274. ready_path = os.path.join(dag_dir, f"exec_plan_{date_str}.ready")
  275. if os.path.exists(ready_path):
  276. plan_files.append((date_str, json_path, ready_path))
  277. # 按日期排序(从旧到新)
  278. plan_files.sort(key=lambda x: x[0])
  279. return plan_files
  280. def cleanup_old_execution_plans(keep_count=None):
  281. """
  282. 清理过期的执行计划文件,保留最新的指定数量
  283. 参数:
  284. keep_days (int): 要保留的文件天数,如果为None则使用配置
  285. 返回:
  286. int: 删除的文件数量
  287. """
  288. if keep_count is None:
  289. keep_count = EXECUTION_PLAN_KEEP_COUNT
  290. # 获取所有执行计划文件
  291. plan_files = get_execution_plan_files()
  292. logger.info(f"找到 {len(plan_files)} 个执行计划文件,将保留最新的 {keep_count} 个")
  293. # 如果文件数量未超过保留数,不需要删除
  294. if len(plan_files) <= keep_count:
  295. logger.info(f"执行计划文件数量 ({len(plan_files)}) 未超过保留数量 ({keep_count}),无需清理")
  296. return 0
  297. # 删除最旧的文件
  298. files_to_delete = plan_files[:-keep_count]
  299. deleted_count = 0
  300. for _, json_path, ready_path in files_to_delete:
  301. try:
  302. # 删除JSON文件
  303. if os.path.exists(json_path):
  304. os.remove(json_path)
  305. deleted_count += 1
  306. logger.info(f"已删除过期执行计划文件: {json_path}")
  307. # 删除ready文件
  308. if os.path.exists(ready_path):
  309. os.remove(ready_path)
  310. deleted_count += 1
  311. logger.info(f"已删除过期ready文件: {ready_path}")
  312. except Exception as e:
  313. logger.error(f"删除文件时出错: {str(e)}")
  314. return deleted_count
  315. def save_execution_plan_to_db(execution_plan, dag_id, run_id, logical_date, ds):
  316. """
  317. 将执行计划保存到airflow_exec_plans表
  318. 参数:
  319. execution_plan (dict): 执行计划字典
  320. dag_id (str): DAG的ID
  321. run_id (str): DAG运行的ID
  322. logical_date (datetime): 逻辑日期
  323. ds (str): 日期字符串,格式为YYYY-MM-DD
  324. 返回:
  325. bool: 操作是否成功
  326. """
  327. conn = get_pg_conn()
  328. cursor = conn.cursor()
  329. try:
  330. # 将执行计划转换为JSON字符串
  331. plan_json = json.dumps(execution_plan)
  332. # 插入记录
  333. cursor.execute("""
  334. INSERT INTO airflow_exec_plans
  335. (dag_id, run_id, logical_date, ds, plan)
  336. VALUES (%s, %s, %s, %s, %s)
  337. """, (dag_id, run_id, logical_date, ds, plan_json))
  338. conn.commit()
  339. logger.info(f"成功将执行计划保存到airflow_exec_plans表,dag_id={dag_id}, run_id={run_id}, ds={ds}")
  340. return True
  341. except Exception as e:
  342. logger.error(f"保存执行计划到数据库时出错: {str(e)}")
  343. conn.rollback()
  344. return False
  345. finally:
  346. cursor.close()
  347. conn.close()
  348. def prepare_pipeline_dag_schedule(**kwargs):
  349. """准备Pipeline DAG调度任务的主函数"""
  350. # 检查是否是手动触发模式
  351. is_force_refresh = False
  352. params = kwargs.get('params', {})
  353. if params and 'FORCE_REFRESH' in params:
  354. is_force_refresh = params.get('FORCE_REFRESH', False)
  355. logger.info(f"接收到强制刷新参数: FORCE_REFRESH={is_force_refresh}")
  356. # 获取执行日期
  357. exec_date = kwargs.get('ds') or get_today_date()
  358. logger.info(f"开始准备执行日期 {exec_date} 的Pipeline调度任务")
  359. # 定义执行计划文件路径 - 使用新的基于日期的命名
  360. plan_base_path = os.path.join(os.path.dirname(__file__), f'exec_plan_{exec_date}')
  361. plan_path = f"{plan_base_path}.json"
  362. ready_path = f"{plan_base_path}.ready"
  363. # 检查是否需要创建新的执行计划文件
  364. need_create_plan = False
  365. # 新的条件1: 当前目录下没有任何json文件
  366. has_any_plans = has_any_execution_plans()
  367. if not has_any_plans:
  368. logger.info("当前目录下没有任何执行计划文件,需要创建新的执行计划")
  369. need_create_plan = True
  370. # 新的条件2: schedule_status表中的数据发生了变更
  371. if not need_create_plan:
  372. # 计算当前哈希值
  373. current_hash = get_subscription_state_hash()
  374. # 读取上次记录的哈希值
  375. hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
  376. last_hash = None
  377. if os.path.exists(hash_file):
  378. try:
  379. with open(hash_file, 'r') as f:
  380. last_hash = f.read().strip()
  381. except Exception as e:
  382. logger.warning(f"读取上次订阅状态哈希值失败: {str(e)}")
  383. # 如果哈希值不同,表示数据发生了变更
  384. if current_hash != last_hash:
  385. logger.info(f"检测到schedule_status表数据变更。旧哈希值: {last_hash}, 新哈希值: {current_hash}")
  386. need_create_plan = True
  387. # 强制刷新模式覆盖以上判断
  388. if is_force_refresh:
  389. logger.info("强制刷新模式,将创建新的执行计划")
  390. need_create_plan = True
  391. # 如果不需要创建新的执行计划,直接返回
  392. if not need_create_plan:
  393. logger.info("无需创建新的执行计划文件")
  394. return 0
  395. # 继续处理,创建新的执行计划
  396. # 1. 获取启用的表
  397. enabled_tables = get_enabled_tables()
  398. logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
  399. if not enabled_tables:
  400. logger.warning("没有找到启用的表,准备工作结束")
  401. return 0
  402. # 2. 获取表的详细信息
  403. tables_info = []
  404. for table_name in enabled_tables:
  405. table_info = get_table_info_from_neo4j(table_name)
  406. if table_info:
  407. tables_info.append(table_info)
  408. logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
  409. # 3. 处理依赖关系,添加被动调度的表
  410. enriched_tables = process_dependencies(tables_info)
  411. logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
  412. # 4. 过滤无效表及其依赖
  413. valid_tables = filter_invalid_tables(enriched_tables)
  414. logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
  415. # 保存最新执行计划,供DAG读取使用
  416. try:
  417. # 构建执行计划
  418. resource_tasks = []
  419. model_tasks = []
  420. for table in valid_tables:
  421. if table.get('target_table_label') == 'DataResource':
  422. resource_tasks.append({
  423. "source_table": table.get('source_table'),
  424. "target_table": table['target_table'],
  425. "target_table_label": "DataResource",
  426. "script_name": table.get('script_name'),
  427. "script_exec_mode": table.get('script_exec_mode', 'append')
  428. })
  429. elif table.get('target_table_label') == 'DataModel':
  430. model_tasks.append({
  431. "source_table": table.get('source_table'),
  432. "target_table": table['target_table'],
  433. "target_table_label": "DataModel",
  434. "script_name": table.get('script_name'),
  435. "script_exec_mode": table.get('script_exec_mode', 'append')
  436. })
  437. # 获取依赖关系
  438. model_table_names = [t['target_table'] for t in model_tasks]
  439. dependencies = {}
  440. driver = get_neo4j_driver()
  441. try:
  442. with driver.session() as session:
  443. for table_name in model_table_names:
  444. query = """
  445. MATCH (source:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(target)
  446. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  447. """
  448. result = session.run(query, table_name=table_name)
  449. deps = []
  450. for record in result:
  451. target = record.get("target")
  452. target_labels = record.get("target_labels", [])
  453. if target:
  454. table_type = next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  455. deps.append({
  456. "table_name": target,
  457. "table_type": table_type
  458. })
  459. dependencies[table_name] = deps
  460. finally:
  461. driver.close()
  462. # 创建执行计划
  463. execution_plan = {
  464. "exec_date": exec_date,
  465. "resource_tasks": resource_tasks,
  466. "model_tasks": model_tasks,
  467. "dependencies": dependencies
  468. }
  469. # 创建临时文件
  470. temp_plan_path = f"{plan_path}.temp"
  471. try:
  472. # 写入临时文件
  473. with open(temp_plan_path, 'w') as f:
  474. json.dump(execution_plan, f, indent=2)
  475. logger.info(f"已保存执行计划到临时文件: {temp_plan_path}")
  476. # 原子替换正式文件
  477. os.replace(temp_plan_path, plan_path)
  478. logger.info(f"已替换执行计划文件: {plan_path}")
  479. # 创建ready文件,标记执行计划就绪,包含详细时间信息
  480. now = datetime.now()
  481. timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
  482. with open(ready_path, 'w') as f:
  483. f.write(f"Created at: {timestamp}\nFor date: {exec_date}")
  484. logger.info(f"已创建ready标记文件: {ready_path}")
  485. # 更新订阅表状态哈希值
  486. current_hash = get_subscription_state_hash()
  487. hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
  488. with open(hash_file, 'w') as f:
  489. f.write(current_hash)
  490. logger.info(f"已更新订阅表状态哈希值: {current_hash}")
  491. # 清理过期的执行计划文件
  492. deleted_count = cleanup_old_execution_plans()
  493. logger.info(f"清理了 {deleted_count} 个过期执行计划文件")
  494. # dag_dataops_pipeline_data_scheduler.py文件的修改日期更新
  495. touch_data_scheduler_file()
  496. # 保存执行计划到数据库表
  497. try:
  498. # 获取DAG运行信息
  499. dag_run = kwargs.get('dag_run')
  500. if dag_run:
  501. dag_id = dag_run.dag_id
  502. run_id = dag_run.run_id
  503. logical_date = dag_run.logical_date
  504. else:
  505. # 如果无法获取dag_run,使用默认值
  506. dag_id = kwargs.get('dag').dag_id if 'dag' in kwargs else "dag_dataops_pipeline_prepare_scheduler"
  507. run_id = f"manual_{datetime.now().strftime('%Y%m%d%H%M%S')}"
  508. logical_date = datetime.now()
  509. # 保存到数据库
  510. save_result = save_execution_plan_to_db(
  511. execution_plan=execution_plan,
  512. dag_id=dag_id,
  513. run_id=run_id,
  514. logical_date=logical_date,
  515. ds=exec_date
  516. )
  517. if save_result:
  518. logger.info("执行计划已成功保存到数据库")
  519. else:
  520. logger.warning("执行计划保存到数据库失败,但文件已保存成功")
  521. except Exception as db_e:
  522. # 捕获数据库保存错误,但不影响主流程
  523. logger.error(f"保存执行计划到数据库时出错: {str(db_e)}")
  524. logger.info("继续执行,因为文件已成功保存")
  525. except Exception as e:
  526. logger.error(f"保存执行计划文件或触发DAG重新解析时出错: {str(e)}")
  527. # 出错时清理临时文件
  528. if os.path.exists(temp_plan_path):
  529. try:
  530. os.remove(temp_plan_path)
  531. logger.info(f"已清理临时文件: {temp_plan_path}")
  532. except Exception as rm_e:
  533. logger.error(f"清理临时文件时出错: {str(rm_e)}")
  534. raise # 重新抛出异常,确保任务失败
  535. except Exception as e:
  536. error_msg = f"保存或验证执行计划文件时出错: {str(e)}"
  537. logger.error(error_msg)
  538. # 强制抛出异常,确保任务失败,阻止下游DAG执行
  539. raise Exception(error_msg)
  540. return len(valid_tables) # 返回有效表数量代替插入记录数
  541. def check_execution_plan_file(**kwargs):
  542. """
  543. 检查当天的执行计划文件是否存在且有效
  544. 返回False将阻止所有下游任务执行
  545. """
  546. # 获取执行日期
  547. exec_date = kwargs.get('ds') or get_today_date()
  548. logger.info(f"检查执行日期 {exec_date} 的执行计划文件是否存在且有效")
  549. # 定义执行计划文件路径
  550. plan_path = os.path.join(os.path.dirname(__file__), f'exec_plan_{exec_date}.json')
  551. ready_path = os.path.join(os.path.dirname(__file__), f'exec_plan_{exec_date}.ready')
  552. # 检查文件是否存在
  553. if not os.path.exists(plan_path):
  554. logger.error(f"执行计划文件不存在: {plan_path}")
  555. return False
  556. # 检查ready标记是否存在
  557. if not os.path.exists(ready_path):
  558. logger.error(f"执行计划ready标记文件不存在: {ready_path}")
  559. return False
  560. # 检查文件是否可读且内容有效
  561. try:
  562. with open(plan_path, 'r') as f:
  563. data = json.load(f)
  564. # 检查必要字段
  565. if "exec_date" not in data:
  566. logger.error("执行计划缺少exec_date字段")
  567. return False
  568. if not isinstance(data.get("resource_tasks", []), list):
  569. logger.error("执行计划的resource_tasks字段无效")
  570. return False
  571. if not isinstance(data.get("model_tasks", []), list):
  572. logger.error("执行计划的model_tasks字段无效")
  573. return False
  574. # 检查是否有任务数据
  575. resource_tasks = data.get("resource_tasks", [])
  576. model_tasks = data.get("model_tasks", [])
  577. if not resource_tasks and not model_tasks:
  578. logger.warning("执行计划不包含任何任务,但文件格式有效")
  579. # 注意:即使没有任务,我们仍然允许流程继续
  580. logger.info(f"执行计划文件验证成功: 包含 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
  581. return True
  582. except json.JSONDecodeError as je:
  583. logger.error(f"执行计划文件不是有效的JSON: {str(je)}")
  584. return False
  585. except Exception as e:
  586. logger.error(f"检查执行计划文件时出错: {str(e)}")
  587. return False
  588. # 创建DAG
  589. with DAG(
  590. "dag_dataops_pipeline_prepare_scheduler",
  591. start_date=datetime(2024, 1, 1),
  592. # 每小时执行一次
  593. schedule_interval="0 * * * *",
  594. catchup=False,
  595. default_args={
  596. 'owner': 'airflow',
  597. 'depends_on_past': False,
  598. 'email_on_failure': False,
  599. 'email_on_retry': False,
  600. 'retries': 1,
  601. 'retry_delay': timedelta(minutes=5)
  602. },
  603. params={
  604. 'FORCE_REFRESH': False, # 添加强制刷新参数,默认为False
  605. },
  606. ) as dag:
  607. # 任务开始标记
  608. start_preparation = EmptyOperator(
  609. task_id="start_preparation",
  610. dag=dag
  611. )
  612. # 准备调度任务
  613. prepare_task = PythonOperator(
  614. task_id="prepare_pipeline_dag_schedule",
  615. python_callable=prepare_pipeline_dag_schedule,
  616. provide_context=True,
  617. dag=dag
  618. )
  619. # 检查执行计划文件
  620. check_plan_file = ShortCircuitOperator(
  621. task_id="check_execution_plan_file",
  622. python_callable=check_execution_plan_file,
  623. provide_context=True,
  624. dag=dag
  625. )
  626. # 准备完成标记
  627. preparation_completed = EmptyOperator(
  628. task_id="preparation_completed",
  629. dag=dag
  630. )
  631. # 设置任务依赖
  632. start_preparation >> prepare_task >> check_plan_file >> preparation_completed