dag_dataops_unified_scheduler.py 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149
  1. # dag_dataops_unified_scheduler.py
  2. # 合并了prepare, data和summary三个DAG的功能
  3. from airflow import DAG
  4. from airflow.operators.python import PythonOperator
  5. from airflow.operators.empty import EmptyOperator
  6. from airflow.utils.task_group import TaskGroup
  7. from datetime import datetime, timedelta, date
  8. import logging
  9. import networkx as nx
  10. import json
  11. from decimal import Decimal
  12. from common import (
  13. get_pg_conn,
  14. get_neo4j_driver,
  15. execute_with_monitoring,
  16. get_today_date
  17. )
  18. from config import TASK_RETRY_CONFIG, PG_CONFIG, NEO4J_CONFIG
  19. # 创建日志记录器
  20. logger = logging.getLogger(__name__)
  21. # 添加日期序列化器
  22. def json_serial(obj):
  23. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  24. if isinstance(obj, (datetime, date)):
  25. return obj.isoformat()
  26. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  27. # 添加自定义JSON编码器解决Decimal序列化问题
  28. class DecimalEncoder(json.JSONEncoder):
  29. def default(self, obj):
  30. if isinstance(obj, Decimal):
  31. return float(obj)
  32. # 处理日期类型
  33. elif isinstance(obj, (datetime, date)):
  34. return obj.isoformat()
  35. # 让父类处理其他类型
  36. return super(DecimalEncoder, self).default(obj)
  37. #############################################
  38. # 第一阶段: 准备阶段(Prepare Phase)的函数
  39. #############################################
  40. def get_enabled_tables():
  41. """获取所有启用的表"""
  42. conn = get_pg_conn()
  43. cursor = conn.cursor()
  44. try:
  45. cursor.execute("""
  46. SELECT owner_id, table_name
  47. FROM schedule_status
  48. WHERE schedule_is_enabled = TRUE
  49. """)
  50. result = cursor.fetchall()
  51. return [row[1] for row in result] # 只返回表名
  52. except Exception as e:
  53. logger.error(f"获取启用表失败: {str(e)}")
  54. return []
  55. finally:
  56. cursor.close()
  57. conn.close()
  58. def check_table_directly_subscribed(table_name):
  59. """检查表是否在schedule_status表中直接订阅"""
  60. conn = get_pg_conn()
  61. cursor = conn.cursor()
  62. try:
  63. cursor.execute("""
  64. SELECT schedule_is_enabled
  65. FROM schedule_status
  66. WHERE table_name = %s
  67. """, (table_name,))
  68. result = cursor.fetchone()
  69. return result and result[0] is True
  70. except Exception as e:
  71. logger.error(f"检查表订阅状态失败: {str(e)}")
  72. return False
  73. finally:
  74. cursor.close()
  75. conn.close()
  76. def get_table_info_from_neo4j(table_name):
  77. """从Neo4j获取表的详细信息"""
  78. driver = get_neo4j_driver()
  79. # 检查表是否直接订阅
  80. is_directly_schedule = check_table_directly_subscribed(table_name)
  81. table_info = {
  82. 'target_table': table_name,
  83. 'is_directly_schedule': is_directly_schedule, # 初始值设为True,从schedule_status表获取
  84. }
  85. try:
  86. with driver.session() as session:
  87. # 查询表标签和状态
  88. query_table = """
  89. MATCH (t {en_name: $table_name})
  90. RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency
  91. """
  92. result = session.run(query_table, table_name=table_name)
  93. record = result.single()
  94. if record:
  95. labels = record.get("labels", [])
  96. table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
  97. table_info['target_table_status'] = record.get("status", True) # 默认为True
  98. table_info['default_update_frequency'] = record.get("frequency")
  99. # 根据标签类型查询关系和脚本信息
  100. if "DataResource" in labels:
  101. query_rel = """
  102. MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
  103. RETURN source.en_name AS source_table, rel.script_name AS script_name,
  104. rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
  105. """
  106. elif "DataModel" in labels:
  107. query_rel = """
  108. MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
  109. RETURN source.en_name AS source_table, rel.script_name AS script_name,
  110. rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
  111. """
  112. else:
  113. logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
  114. return table_info
  115. result = session.run(query_rel, table_name=table_name)
  116. record = result.single()
  117. if record:
  118. table_info['source_table'] = record.get("source_table")
  119. # 检查script_name是否为空
  120. script_name = record.get("script_name")
  121. if not script_name:
  122. logger.warning(f"表 {table_name} 的关系中没有script_name属性,可能导致后续处理出错")
  123. table_info['script_name'] = script_name
  124. # 设置默认值,确保即使属性为空也有默认值
  125. table_info['script_type'] = record.get("script_type", "python") # 默认为python
  126. table_info['script_exec_mode'] = record.get("script_exec_mode", "append") # 默认为append
  127. else:
  128. logger.warning(f"未找到表 {table_name} 的关系信息")
  129. else:
  130. logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
  131. except Exception as e:
  132. logger.error(f"获取表 {table_name} 的信息时出错: {str(e)}")
  133. finally:
  134. driver.close()
  135. return table_info
  136. def process_dependencies(tables_info):
  137. """处理表间依赖关系,添加被动调度的表"""
  138. # 存储所有表信息的字典
  139. all_tables = {t['target_table']: t for t in tables_info}
  140. driver = get_neo4j_driver()
  141. try:
  142. with driver.session() as session:
  143. for table_name, table_info in list(all_tables.items()):
  144. if table_info.get('target_table_label') == 'DataModel':
  145. # 查询其依赖表
  146. query = """
  147. MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
  148. RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels,
  149. dep.status AS dep_status, dep.frequency AS dep_frequency
  150. """
  151. result = session.run(query, table_name=table_name)
  152. for record in result:
  153. dep_name = record.get("dep_name")
  154. dep_labels = record.get("dep_labels", [])
  155. dep_status = record.get("dep_status", True)
  156. dep_frequency = record.get("dep_frequency")
  157. # 处理未被直接调度的依赖表
  158. if dep_name and dep_name not in all_tables:
  159. logger.info(f"发现被动依赖表: {dep_name}, 标签: {dep_labels}")
  160. # 获取依赖表详细信息
  161. dep_info = get_table_info_from_neo4j(dep_name)
  162. dep_info['is_directly_schedule'] = False
  163. # 处理调度频率继承
  164. if not dep_info.get('default_update_frequency'):
  165. dep_info['default_update_frequency'] = table_info.get('default_update_frequency')
  166. all_tables[dep_name] = dep_info
  167. except Exception as e:
  168. logger.error(f"处理依赖关系时出错: {str(e)}")
  169. finally:
  170. driver.close()
  171. return list(all_tables.values())
  172. def filter_invalid_tables(tables_info):
  173. """过滤无效表及其依赖,使用NetworkX构建依赖图"""
  174. # 构建表名到索引的映射
  175. table_dict = {t['target_table']: i for i, t in enumerate(tables_info)}
  176. # 找出无效表
  177. invalid_tables = set()
  178. for table in tables_info:
  179. if table.get('target_table_status') is False:
  180. invalid_tables.add(table['target_table'])
  181. logger.info(f"表 {table['target_table']} 的状态为无效")
  182. # 构建依赖图
  183. G = nx.DiGraph()
  184. # 添加所有节点
  185. for table in tables_info:
  186. G.add_node(table['target_table'])
  187. # 查询并添加依赖边
  188. driver = get_neo4j_driver()
  189. try:
  190. with driver.session() as session:
  191. for table in tables_info:
  192. if table.get('target_table_label') == 'DataModel':
  193. query = """
  194. MATCH (source {en_name: $table_name})-[:DERIVED_FROM]->(target)
  195. RETURN target.en_name AS target_name
  196. """
  197. result = session.run(query, table_name=table['target_table'])
  198. for record in result:
  199. target_name = record.get("target_name")
  200. if target_name and target_name in table_dict:
  201. # 添加从目标到源的边,表示目标依赖于源
  202. G.add_edge(table['target_table'], target_name)
  203. logger.debug(f"添加依赖边: {table['target_table']} -> {target_name}")
  204. except Exception as e:
  205. logger.error(f"构建依赖图时出错: {str(e)}")
  206. finally:
  207. driver.close()
  208. # 找出依赖于无效表的所有表
  209. downstream_invalid = set()
  210. for invalid_table in invalid_tables:
  211. # 获取可从无效表到达的所有节点
  212. try:
  213. descendants = nx.descendants(G, invalid_table)
  214. downstream_invalid.update(descendants)
  215. logger.info(f"表 {invalid_table} 的下游无效表: {descendants}")
  216. except Exception as e:
  217. logger.error(f"处理表 {invalid_table} 的下游依赖时出错: {str(e)}")
  218. # 合并所有无效表
  219. all_invalid = invalid_tables.union(downstream_invalid)
  220. logger.info(f"总共 {len(all_invalid)} 个表被标记为无效: {all_invalid}")
  221. # 过滤出有效表
  222. valid_tables = [t for t in tables_info if t['target_table'] not in all_invalid]
  223. logger.info(f"过滤后保留 {len(valid_tables)} 个有效表")
  224. return valid_tables
  225. def write_to_airflow_dag_schedule(exec_date, tables_info):
  226. """将表信息写入airflow_dag_schedule表"""
  227. conn = get_pg_conn()
  228. cursor = conn.cursor()
  229. try:
  230. # 清理当日数据,避免重复
  231. cursor.execute("""
  232. DELETE FROM airflow_dag_schedule WHERE exec_date = %s
  233. """, (exec_date,))
  234. logger.info(f"已清理执行日期 {exec_date} 的现有数据")
  235. # 批量插入新数据
  236. inserted_count = 0
  237. for table in tables_info:
  238. cursor.execute("""
  239. INSERT INTO airflow_dag_schedule (
  240. exec_date, source_table, target_table, target_table_label,
  241. target_table_status, is_directly_schedule, default_update_frequency,
  242. script_name, script_type, script_exec_mode
  243. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  244. """, (
  245. exec_date,
  246. table.get('source_table'),
  247. table['target_table'],
  248. table.get('target_table_label'),
  249. table.get('target_table_status', True),
  250. table.get('is_directly_schedule', False),
  251. table.get('default_update_frequency'),
  252. table.get('script_name'),
  253. table.get('script_type', 'python'),
  254. table.get('script_exec_mode', 'append')
  255. ))
  256. inserted_count += 1
  257. conn.commit()
  258. logger.info(f"成功插入 {inserted_count} 条记录到 airflow_dag_schedule 表")
  259. return inserted_count
  260. except Exception as e:
  261. logger.error(f"写入 airflow_dag_schedule 表时出错: {str(e)}")
  262. conn.rollback()
  263. # 重新抛出异常,确保错误被正确传播
  264. raise
  265. finally:
  266. cursor.close()
  267. conn.close()
  268. def prepare_dag_schedule(**kwargs):
  269. """准备DAG调度任务的主函数"""
  270. exec_date = kwargs.get('ds') or get_today_date()
  271. logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
  272. # 1. 获取启用的表
  273. enabled_tables = get_enabled_tables()
  274. logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
  275. if not enabled_tables:
  276. logger.warning("没有找到启用的表,准备工作结束")
  277. return 0
  278. # 2. 获取表的详细信息
  279. tables_info = []
  280. for table_name in enabled_tables:
  281. table_info = get_table_info_from_neo4j(table_name)
  282. if table_info:
  283. tables_info.append(table_info)
  284. logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
  285. # 3. 处理依赖关系,添加被动调度的表
  286. enriched_tables = process_dependencies(tables_info)
  287. logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
  288. # 4. 过滤无效表及其依赖
  289. valid_tables = filter_invalid_tables(enriched_tables)
  290. logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
  291. # 5. 写入airflow_dag_schedule表
  292. inserted_count = write_to_airflow_dag_schedule(exec_date, valid_tables)
  293. # 6. 检查插入操作是否成功,如果失败则抛出异常
  294. if inserted_count == 0 and valid_tables:
  295. error_msg = f"插入操作失败,无记录被插入到airflow_dag_schedule表,但有{len(valid_tables)}个有效表需要处理"
  296. logger.error(error_msg)
  297. raise Exception(error_msg)
  298. # 7. 生成执行计划数据
  299. resource_tasks = []
  300. model_tasks = []
  301. for table in valid_tables:
  302. if table.get('target_table_label') == 'DataResource':
  303. resource_tasks.append({
  304. "source_table": table.get('source_table'),
  305. "target_table": table['target_table'],
  306. "target_table_label": "DataResource",
  307. "script_name": table.get('script_name'),
  308. "script_exec_mode": table.get('script_exec_mode', 'append')
  309. })
  310. elif table.get('target_table_label') == 'DataModel':
  311. model_tasks.append({
  312. "source_table": table.get('source_table'),
  313. "target_table": table['target_table'],
  314. "target_table_label": "DataModel",
  315. "script_name": table.get('script_name'),
  316. "script_exec_mode": table.get('script_exec_mode', 'append')
  317. })
  318. # 获取依赖关系
  319. model_table_names = [t['target_table'] for t in model_tasks]
  320. dependencies = {}
  321. driver = get_neo4j_driver()
  322. try:
  323. with driver.session() as session:
  324. for table_name in model_table_names:
  325. query = """
  326. MATCH (source:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(target)
  327. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  328. """
  329. result = session.run(query, table_name=table_name)
  330. deps = []
  331. for record in result:
  332. target = record.get("target")
  333. target_labels = record.get("target_labels", [])
  334. if target:
  335. table_type = next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  336. deps.append({
  337. "table_name": target,
  338. "table_type": table_type
  339. })
  340. dependencies[table_name] = deps
  341. finally:
  342. driver.close()
  343. # 创建执行计划
  344. execution_plan = {
  345. "exec_date": exec_date,
  346. "resource_tasks": resource_tasks,
  347. "model_tasks": model_tasks,
  348. "dependencies": dependencies
  349. }
  350. # 将执行计划保存到XCom
  351. kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
  352. logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  353. return inserted_count
  354. #############################################
  355. # 第二阶段: 数据处理阶段(Data Processing Phase)的函数
  356. #############################################
  357. def get_latest_date():
  358. """获取数据库中包含记录的最近日期"""
  359. conn = get_pg_conn()
  360. cursor = conn.cursor()
  361. try:
  362. cursor.execute("""
  363. SELECT DISTINCT exec_date
  364. FROM airflow_dag_schedule
  365. ORDER BY exec_date DESC
  366. LIMIT 1
  367. """)
  368. result = cursor.fetchone()
  369. if result:
  370. latest_date = result[0]
  371. logger.info(f"找到最近的包含记录的日期: {latest_date}")
  372. return latest_date
  373. else:
  374. logger.warning("未找到包含记录的日期,将使用当前日期")
  375. return get_today_date()
  376. except Exception as e:
  377. logger.error(f"查找最近日期时出错: {str(e)}")
  378. return get_today_date()
  379. finally:
  380. cursor.close()
  381. conn.close()
  382. def get_all_tasks(exec_date):
  383. """获取所有需要执行的任务(DataResource和DataModel)"""
  384. conn = get_pg_conn()
  385. cursor = conn.cursor()
  386. try:
  387. # 查询所有资源表任务
  388. cursor.execute("""
  389. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  390. FROM airflow_dag_schedule
  391. WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
  392. """, (exec_date,))
  393. resource_results = cursor.fetchall()
  394. # 查询所有模型表任务
  395. cursor.execute("""
  396. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  397. FROM airflow_dag_schedule
  398. WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
  399. """, (exec_date,))
  400. model_results = cursor.fetchall()
  401. # 整理资源表信息
  402. resource_tasks = []
  403. for row in resource_results:
  404. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  405. if script_name: # 确保脚本名称不为空
  406. resource_tasks.append({
  407. "source_table": source_table,
  408. "target_table": target_table,
  409. "target_table_label": target_table_label,
  410. "script_name": script_name,
  411. "script_exec_mode": script_exec_mode or "append"
  412. })
  413. # 整理模型表信息
  414. model_tasks = []
  415. for row in model_results:
  416. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  417. if script_name: # 确保脚本名称不为空
  418. model_tasks.append({
  419. "source_table": source_table,
  420. "target_table": target_table,
  421. "target_table_label": target_table_label,
  422. "script_name": script_name,
  423. "script_exec_mode": script_exec_mode or "append"
  424. })
  425. logger.info(f"获取到 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  426. return resource_tasks, model_tasks
  427. except Exception as e:
  428. logger.error(f"获取任务信息时出错: {str(e)}")
  429. return [], []
  430. finally:
  431. cursor.close()
  432. conn.close()
  433. def get_table_dependencies_for_data_phase(table_names):
  434. """获取表之间的依赖关系"""
  435. driver = get_neo4j_driver()
  436. dependency_dict = {name: [] for name in table_names}
  437. try:
  438. with driver.session() as session:
  439. # 获取所有模型表之间的依赖关系
  440. query = """
  441. MATCH (source:DataModel)-[:DERIVED_FROM]->(target)
  442. WHERE source.en_name IN $table_names
  443. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  444. """
  445. result = session.run(query, table_names=table_names)
  446. for record in result:
  447. source = record.get("source")
  448. target = record.get("target")
  449. target_labels = record.get("target_labels", [])
  450. if source and target:
  451. # 将目标表添加到源表的依赖列表中
  452. dependency_dict[source].append({
  453. "table_name": target,
  454. "table_type": next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  455. })
  456. logger.debug(f"依赖关系: {source} 依赖于 {target}")
  457. except Exception as e:
  458. logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
  459. finally:
  460. driver.close()
  461. return dependency_dict
  462. def create_execution_plan(**kwargs):
  463. """准备执行计划的函数,使用从准备阶段传递的数据"""
  464. try:
  465. # 从XCom获取执行计划
  466. execution_plan = kwargs['ti'].xcom_pull(task_ids='prepare_phase.prepare_dag_schedule', key='execution_plan')
  467. # 如果找不到执行计划,则从数据库获取
  468. if not execution_plan:
  469. # 获取执行日期
  470. exec_date = get_latest_date()
  471. logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
  472. # 获取所有任务
  473. resource_tasks, model_tasks = get_all_tasks(exec_date)
  474. if not resource_tasks and not model_tasks:
  475. logger.warning(f"执行日期 {exec_date} 没有找到任务")
  476. return 0
  477. # 为所有模型表获取依赖关系
  478. model_table_names = [task["target_table"] for task in model_tasks]
  479. dependencies = get_table_dependencies_for_data_phase(model_table_names)
  480. # 创建执行计划
  481. new_execution_plan = {
  482. "exec_date": exec_date,
  483. "resource_tasks": resource_tasks,
  484. "model_tasks": model_tasks,
  485. "dependencies": dependencies
  486. }
  487. # 保存执行计划
  488. kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(new_execution_plan, default=json_serial))
  489. logger.info(f"创建新的执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  490. return json.dumps(new_execution_plan, default=json_serial)
  491. logger.info(f"成功获取执行计划")
  492. return execution_plan
  493. except Exception as e:
  494. logger.error(f"创建执行计划时出错: {str(e)}")
  495. # 返回空执行计划
  496. empty_plan = {
  497. "exec_date": get_today_date(),
  498. "resource_tasks": [],
  499. "model_tasks": [],
  500. "dependencies": {}
  501. }
  502. return json.dumps(empty_plan, default=json_serial)
  503. def process_resource(target_table, script_name, script_exec_mode, exec_date):
  504. """处理单个资源表"""
  505. logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
  506. # 检查exec_date是否是JSON字符串
  507. if isinstance(exec_date, str) and exec_date.startswith('{'):
  508. try:
  509. # 尝试解析JSON字符串
  510. exec_date_data = json.loads(exec_date)
  511. exec_date = exec_date_data.get("exec_date")
  512. logger.info(f"从JSON中提取执行日期: {exec_date}")
  513. except Exception as e:
  514. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  515. return execute_with_monitoring(
  516. target_table=target_table,
  517. script_name=script_name,
  518. script_exec_mode=script_exec_mode,
  519. exec_date=exec_date
  520. )
  521. def process_model(target_table, script_name, script_exec_mode, exec_date):
  522. """处理单个模型表"""
  523. logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
  524. # 检查exec_date是否是JSON字符串
  525. if isinstance(exec_date, str) and exec_date.startswith('{'):
  526. try:
  527. # 尝试解析JSON字符串
  528. exec_date_data = json.loads(exec_date)
  529. exec_date = exec_date_data.get("exec_date")
  530. logger.info(f"从JSON中提取执行日期: {exec_date}")
  531. except Exception as e:
  532. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  533. return execute_with_monitoring(
  534. target_table=target_table,
  535. script_name=script_name,
  536. script_exec_mode=script_exec_mode,
  537. exec_date=exec_date
  538. )
  539. #############################################
  540. # 第三阶段: 汇总阶段(Summary Phase)的函数
  541. #############################################
  542. def get_execution_stats(exec_date):
  543. """获取当日执行统计信息"""
  544. conn = get_pg_conn()
  545. cursor = conn.cursor()
  546. try:
  547. # 查询总任务数
  548. cursor.execute("""
  549. SELECT COUNT(*) FROM airflow_dag_schedule WHERE exec_date = %s
  550. """, (exec_date,))
  551. result = cursor.fetchone()
  552. total_tasks = result[0] if result else 0
  553. # 查询每种类型的任务数
  554. cursor.execute("""
  555. SELECT target_table_label, COUNT(*)
  556. FROM airflow_dag_schedule
  557. WHERE exec_date = %s
  558. GROUP BY target_table_label
  559. """, (exec_date,))
  560. type_counts = {row[0]: row[1] for row in cursor.fetchall()}
  561. # 查询执行结果统计
  562. cursor.execute("""
  563. SELECT COUNT(*)
  564. FROM airflow_dag_schedule
  565. WHERE exec_date = %s AND exec_result IS TRUE
  566. """, (exec_date,))
  567. result = cursor.fetchone()
  568. success_count = result[0] if result else 0
  569. cursor.execute("""
  570. SELECT COUNT(*)
  571. FROM airflow_dag_schedule
  572. WHERE exec_date = %s AND exec_result IS FALSE
  573. """, (exec_date,))
  574. result = cursor.fetchone()
  575. fail_count = result[0] if result else 0
  576. cursor.execute("""
  577. SELECT COUNT(*)
  578. FROM airflow_dag_schedule
  579. WHERE exec_date = %s AND exec_result IS NULL
  580. """, (exec_date,))
  581. result = cursor.fetchone()
  582. pending_count = result[0] if result else 0
  583. # 计算执行时间统计
  584. cursor.execute("""
  585. SELECT AVG(exec_duration), MIN(exec_duration), MAX(exec_duration)
  586. FROM airflow_dag_schedule
  587. WHERE exec_date = %s AND exec_duration IS NOT NULL
  588. """, (exec_date,))
  589. time_stats = cursor.fetchone()
  590. # 确保时间统计不为None
  591. if time_stats and time_stats[0] is not None:
  592. avg_duration = float(time_stats[0])
  593. min_duration = float(time_stats[1]) if time_stats[1] is not None else None
  594. max_duration = float(time_stats[2]) if time_stats[2] is not None else None
  595. else:
  596. avg_duration = None
  597. min_duration = None
  598. max_duration = None
  599. # 查询失败任务详情
  600. cursor.execute("""
  601. SELECT target_table, script_name, target_table_label, exec_duration
  602. FROM airflow_dag_schedule
  603. WHERE exec_date = %s AND exec_result IS FALSE
  604. """, (exec_date,))
  605. failed_tasks = []
  606. for row in cursor.fetchall():
  607. task_dict = {
  608. "target_table": row[0],
  609. "script_name": row[1],
  610. "target_table_label": row[2],
  611. }
  612. if row[3] is not None:
  613. task_dict["exec_duration"] = float(row[3])
  614. else:
  615. task_dict["exec_duration"] = None
  616. failed_tasks.append(task_dict)
  617. # 计算成功率,避免除零错误
  618. success_rate = 0
  619. if total_tasks > 0:
  620. success_rate = (success_count / total_tasks) * 100
  621. # 汇总统计信息
  622. stats = {
  623. "exec_date": exec_date,
  624. "total_tasks": total_tasks,
  625. "type_counts": type_counts,
  626. "success_count": success_count,
  627. "fail_count": fail_count,
  628. "pending_count": pending_count,
  629. "success_rate": success_rate,
  630. "avg_duration": avg_duration,
  631. "min_duration": min_duration,
  632. "max_duration": max_duration,
  633. "failed_tasks": failed_tasks
  634. }
  635. return stats
  636. except Exception as e:
  637. logger.error(f"获取执行统计信息时出错: {str(e)}")
  638. return {}
  639. finally:
  640. cursor.close()
  641. conn.close()
  642. def update_missing_results(exec_date):
  643. """更新缺失的执行结果信息"""
  644. conn = get_pg_conn()
  645. cursor = conn.cursor()
  646. try:
  647. # 查询所有缺失执行结果的任务
  648. cursor.execute("""
  649. SELECT target_table, script_name
  650. FROM airflow_dag_schedule
  651. WHERE exec_date = %s AND exec_result IS NULL
  652. """, (exec_date,))
  653. missing_results = cursor.fetchall()
  654. update_count = 0
  655. for row in missing_results:
  656. target_table, script_name = row
  657. # 如果有开始时间但没有结束时间,假设执行失败
  658. cursor.execute("""
  659. SELECT exec_start_time
  660. FROM airflow_dag_schedule
  661. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  662. """, (exec_date, target_table, script_name))
  663. start_time = cursor.fetchone()
  664. if start_time and start_time[0]:
  665. # 有开始时间但无结果,标记为失败
  666. now = datetime.now()
  667. duration = (now - start_time[0]).total_seconds()
  668. cursor.execute("""
  669. UPDATE airflow_dag_schedule
  670. SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
  671. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  672. """, (now, duration, exec_date, target_table, script_name))
  673. logger.warning(f"任务 {target_table} 的脚本 {script_name} 标记为失败,开始时间: {start_time[0]}")
  674. update_count += 1
  675. else:
  676. # 没有开始时间且无结果,假设未执行
  677. logger.warning(f"任务 {target_table} 的脚本 {script_name} 未执行")
  678. conn.commit()
  679. logger.info(f"更新了 {update_count} 个缺失结果的任务")
  680. return update_count
  681. except Exception as e:
  682. logger.error(f"更新缺失执行结果时出错: {str(e)}")
  683. conn.rollback()
  684. return 0
  685. finally:
  686. cursor.close()
  687. conn.close()
  688. def generate_unified_execution_report(exec_date, stats):
  689. """生成统一执行报告"""
  690. # 构建报告
  691. report = []
  692. report.append(f"========== 统一数据运维系统执行报告 ==========")
  693. report.append(f"执行日期: {exec_date}")
  694. report.append(f"总任务数: {stats['total_tasks']}")
  695. # 任务类型分布
  696. report.append("\n--- 任务类型分布 ---")
  697. for label, count in stats.get('type_counts', {}).items():
  698. report.append(f"{label} 任务: {count} 个")
  699. # 执行结果统计
  700. report.append("\n--- 执行结果统计 ---")
  701. report.append(f"成功任务: {stats.get('success_count', 0)} 个")
  702. report.append(f"失败任务: {stats.get('fail_count', 0)} 个")
  703. report.append(f"未执行任务: {stats.get('pending_count', 0)} 个")
  704. report.append(f"成功率: {stats.get('success_rate', 0):.2f}%")
  705. # 执行时间统计
  706. report.append("\n--- 执行时间统计 (秒) ---")
  707. avg_duration = stats.get('avg_duration')
  708. min_duration = stats.get('min_duration')
  709. max_duration = stats.get('max_duration')
  710. report.append(f"平均执行时间: {avg_duration:.2f}" if avg_duration is not None else "平均执行时间: N/A")
  711. report.append(f"最短执行时间: {min_duration:.2f}" if min_duration is not None else "最短执行时间: N/A")
  712. report.append(f"最长执行时间: {max_duration:.2f}" if max_duration is not None else "最长执行时间: N/A")
  713. # 失败任务详情
  714. failed_tasks = stats.get('failed_tasks', [])
  715. if failed_tasks:
  716. report.append("\n--- 失败任务详情 ---")
  717. for i, task in enumerate(failed_tasks, 1):
  718. report.append(f"{i}. 表名: {task['target_table']}")
  719. report.append(f" 脚本: {task['script_name']}")
  720. report.append(f" 类型: {task['target_table_label']}")
  721. exec_duration = task.get('exec_duration')
  722. if exec_duration is not None:
  723. report.append(f" 执行时间: {exec_duration:.2f} 秒")
  724. else:
  725. report.append(" 执行时间: N/A")
  726. report.append("\n========== 报告结束 ==========")
  727. # 将报告转换为字符串
  728. report_str = "\n".join(report)
  729. # 记录到日志
  730. logger.info("\n" + report_str)
  731. return report_str
  732. def summarize_execution(**kwargs):
  733. """汇总执行情况的主函数"""
  734. try:
  735. exec_date = kwargs.get('ds') or get_today_date()
  736. logger.info(f"开始汇总执行日期 {exec_date} 的统一执行情况")
  737. # 1. 更新缺失的执行结果
  738. try:
  739. update_count = update_missing_results(exec_date)
  740. logger.info(f"更新了 {update_count} 个缺失的执行结果")
  741. except Exception as e:
  742. logger.error(f"更新缺失执行结果时出错: {str(e)}")
  743. update_count = 0
  744. # 2. 获取执行统计信息
  745. try:
  746. stats = get_execution_stats(exec_date)
  747. if not stats:
  748. logger.warning("未能获取执行统计信息,将使用默认值")
  749. stats = {
  750. "exec_date": exec_date,
  751. "total_tasks": 0,
  752. "type_counts": {},
  753. "success_count": 0,
  754. "fail_count": 0,
  755. "pending_count": 0,
  756. "success_rate": 0,
  757. "avg_duration": None,
  758. "min_duration": None,
  759. "max_duration": None,
  760. "failed_tasks": []
  761. }
  762. except Exception as e:
  763. logger.error(f"获取执行统计信息时出错: {str(e)}")
  764. stats = {
  765. "exec_date": exec_date,
  766. "total_tasks": 0,
  767. "type_counts": {},
  768. "success_count": 0,
  769. "fail_count": 0,
  770. "pending_count": 0,
  771. "success_rate": 0,
  772. "avg_duration": None,
  773. "min_duration": None,
  774. "max_duration": None,
  775. "failed_tasks": []
  776. }
  777. # 3. 生成执行报告
  778. try:
  779. report = generate_unified_execution_report(exec_date, stats)
  780. except Exception as e:
  781. logger.error(f"生成执行报告时出错: {str(e)}")
  782. report = f"生成执行报告时出错: {str(e)}\n基础统计: 总任务数: {stats.get('total_tasks', 0)}, 成功: {stats.get('success_count', 0)}, 失败: {stats.get('fail_count', 0)}"
  783. # 将报告和统计信息传递给下一个任务
  784. try:
  785. kwargs['ti'].xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
  786. kwargs['ti'].xcom_push(key='execution_report', value=report)
  787. except Exception as e:
  788. logger.error(f"保存报告到XCom时出错: {str(e)}")
  789. return report
  790. except Exception as e:
  791. logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
  792. # 返回一个简单的错误报告,确保任务不会失败
  793. return f"执行汇总时出现错误: {str(e)}"
  794. # 创建DAG
  795. with DAG(
  796. "dag_dataops_unified_scheduler",
  797. start_date=datetime(2024, 1, 1),
  798. schedule_interval="@daily",
  799. catchup=False,
  800. default_args={
  801. 'owner': 'airflow',
  802. 'depends_on_past': False,
  803. 'email_on_failure': False,
  804. 'email_on_retry': False,
  805. 'retries': 1,
  806. 'retry_delay': timedelta(minutes=5)
  807. }
  808. ) as dag:
  809. #############################################
  810. # 阶段1: 准备阶段(Prepare Phase)
  811. #############################################
  812. with TaskGroup("prepare_phase") as prepare_group:
  813. # 任务开始标记
  814. start_preparation = EmptyOperator(
  815. task_id="start_preparation"
  816. )
  817. # 准备调度任务
  818. prepare_task = PythonOperator(
  819. task_id="prepare_dag_schedule",
  820. python_callable=prepare_dag_schedule,
  821. provide_context=True
  822. )
  823. # 创建执行计划 - 从data_processing_phase移至这里
  824. create_plan = PythonOperator(
  825. task_id="create_execution_plan",
  826. python_callable=create_execution_plan,
  827. provide_context=True
  828. )
  829. # 准备完成标记
  830. preparation_completed = EmptyOperator(
  831. task_id="preparation_completed"
  832. )
  833. # 设置任务依赖 - 调整为包含create_plan
  834. start_preparation >> prepare_task >> create_plan >> preparation_completed
  835. #############################################
  836. # 阶段2: 数据处理阶段(Data Processing Phase)
  837. #############################################
  838. with TaskGroup("data_processing_phase") as data_group:
  839. # 过程完成标记
  840. processing_completed = EmptyOperator(
  841. task_id="processing_completed"
  842. )
  843. #############################################
  844. # 阶段3: 汇总阶段(Summary Phase)
  845. #############################################
  846. with TaskGroup("summary_phase") as summary_group:
  847. # 汇总执行情况
  848. summarize_task = PythonOperator(
  849. task_id="summarize_execution",
  850. python_callable=summarize_execution,
  851. provide_context=True
  852. )
  853. # 总结完成标记
  854. summary_completed = EmptyOperator(
  855. task_id="summary_completed"
  856. )
  857. # 设置任务依赖
  858. summarize_task >> summary_completed
  859. # 设置三个阶段之间的依赖关系 - 使用简单的TaskGroup依赖
  860. prepare_group >> data_group >> summary_group
  861. # 实际数据处理任务的动态创建逻辑
  862. # 这部分代码在DAG对象定义时执行,而不是在DAG运行时执行
  863. # 创建一个DynamicTaskMapper用于动态创建任务
  864. class DynamicTaskMapper(PythonOperator):
  865. """用于动态映射任务的特殊PythonOperator。
  866. 该类作为一个普通任务执行,但会在运行时动态创建下游任务。"""
  867. def execute(self, context):
  868. """在DAG运行时动态创建和执行任务"""
  869. try:
  870. logger.info("开始动态任务映射...")
  871. # 从XCom获取执行计划
  872. ti = context['ti']
  873. execution_plan_json = ti.xcom_pull(
  874. task_ids='prepare_phase.create_execution_plan'
  875. )
  876. if not execution_plan_json:
  877. logger.info("尝试从prepare_phase.prepare_dag_schedule获取执行计划")
  878. execution_plan_tmp = ti.xcom_pull(
  879. task_ids='prepare_phase.prepare_dag_schedule',
  880. key='execution_plan'
  881. )
  882. if execution_plan_tmp:
  883. execution_plan_json = execution_plan_tmp
  884. else:
  885. logger.error("无法从XCom获取执行计划")
  886. raise ValueError("执行计划未找到")
  887. # 解析执行计划
  888. if isinstance(execution_plan_json, str):
  889. execution_plan = json.loads(execution_plan_json)
  890. else:
  891. execution_plan = execution_plan_json
  892. # 获取资源任务和模型任务
  893. resource_tasks = execution_plan.get("resource_tasks", [])
  894. model_tasks = execution_plan.get("model_tasks", [])
  895. dependencies = execution_plan.get("dependencies", {})
  896. logger.info(f"获取到执行计划: {len(resource_tasks)}个资源任务, {len(model_tasks)}个模型任务")
  897. # 处理资源任务
  898. logger.info("处理资源任务...")
  899. for task_info in resource_tasks:
  900. target_table = task_info["target_table"]
  901. script_name = task_info["script_name"]
  902. script_exec_mode = task_info.get("script_exec_mode", "append")
  903. logger.info(f"执行资源表任务: {target_table}, 脚本: {script_name}")
  904. try:
  905. process_resource(
  906. target_table=target_table,
  907. script_name=script_name,
  908. script_exec_mode=script_exec_mode,
  909. exec_date=context.get('ds')
  910. )
  911. except Exception as e:
  912. logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
  913. # 构建模型表依赖图,确定执行顺序
  914. G = nx.DiGraph()
  915. # 添加所有模型表节点
  916. for task_info in model_tasks:
  917. G.add_node(task_info["target_table"])
  918. # 添加依赖边
  919. for source, deps in dependencies.items():
  920. for dep in deps:
  921. if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
  922. G.add_edge(dep.get("table_name"), source) # 依赖方向:依赖项 -> 目标
  923. # 检测并处理循环依赖
  924. cycles = list(nx.simple_cycles(G))
  925. if cycles:
  926. logger.warning(f"检测到循环依赖: {cycles}")
  927. for cycle in cycles:
  928. G.remove_edge(cycle[-1], cycle[0])
  929. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  930. # 生成拓扑排序,确定执行顺序
  931. try:
  932. execution_order = list(nx.topological_sort(G))
  933. logger.info(f"计算出的执行顺序: {execution_order}")
  934. except Exception as e:
  935. logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
  936. execution_order = [task_info["target_table"] for task_info in model_tasks]
  937. # 按顺序执行模型表任务
  938. for table_name in execution_order:
  939. task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
  940. if not task_info:
  941. continue
  942. target_table = task_info["target_table"]
  943. script_name = task_info["script_name"]
  944. script_exec_mode = task_info.get("script_exec_mode", "append")
  945. logger.info(f"执行模型表任务: {target_table}, 脚本: {script_name}")
  946. try:
  947. process_model(
  948. target_table=target_table,
  949. script_name=script_name,
  950. script_exec_mode=script_exec_mode,
  951. exec_date=context.get('ds')
  952. )
  953. except Exception as e:
  954. logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
  955. return f"成功执行 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务"
  956. except Exception as e:
  957. logger.error(f"动态任务映射失败: {str(e)}")
  958. import traceback
  959. logger.error(traceback.format_exc())
  960. raise
  961. # 在data_processing_phase中使用修改后的DynamicTaskMapper
  962. with data_group:
  963. dynamic_task_mapper = DynamicTaskMapper(
  964. task_id="dynamic_task_mapper",
  965. python_callable=lambda **kwargs: "Dynamic task mapping placeholder",
  966. provide_context=True
  967. )
  968. # 设置依赖关系
  969. preparation_completed >> dynamic_task_mapper >> processing_completed
  970. # 注意:以下原始代码不再需要,因为我们现在使用DynamicTaskMapper来动态创建和执行任务
  971. # 保留的原始try-except块的结尾,确保代码结构完整
  972. # 尝试从数据库获取最新的执行计划,用于WebUI展示
  973. try:
  974. # 使用一个只在DAG加载时执行一次的简单查询来获取表信息
  975. # 这只用于UI展示,不影响实际执行
  976. conn = get_pg_conn()
  977. cursor = conn.cursor()
  978. try:
  979. cursor.execute("""
  980. SELECT COUNT(*) FROM airflow_dag_schedule
  981. """)
  982. count = cursor.fetchone()
  983. if count and count[0] > 0:
  984. logger.info(f"数据库中有 {count[0]} 条任务记录可用于调度")
  985. else:
  986. logger.info("数据库中没有找到任务记录,DAG的第一次运行将创建初始计划")
  987. except Exception as e:
  988. logger.warning(f"查询数据库时出错: {str(e)}, 这不会影响DAG的实际执行")
  989. finally:
  990. cursor.close()
  991. conn.close()
  992. except Exception as e:
  993. logger.warning(f"初始化DAG时发生错误: {str(e)}, 这不会影响DAG的实际执行")
  994. # 确保即使出错,也有清晰的执行路径
  995. # 已经有默认依赖链,不需要额外添加