dag_dataops_pipeline_data_scheduler.py 57 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368
  1. """
  2. 统一数据运维调度器 DAG
  3. 功能:
  4. 1. 将数据处理与统计汇总整合到一个DAG中
  5. 2. 保留原有的每个处理脚本单独运行的特性,方便通过Web UI查看
  6. 3. 支持执行计划文件的动态解析和执行
  7. 4. 执行完成后自动生成汇总报告
  8. """
  9. from airflow import DAG
  10. from airflow.operators.python import PythonOperator, ShortCircuitOperator
  11. from airflow.operators.empty import EmptyOperator
  12. from airflow.utils.task_group import TaskGroup
  13. from datetime import datetime, timedelta, date
  14. import logging
  15. import networkx as nx
  16. import json
  17. import os
  18. from decimal import Decimal
  19. from common import (
  20. get_pg_conn,
  21. get_neo4j_driver,
  22. execute_with_monitoring,
  23. get_today_date
  24. )
  25. from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG
  26. # 创建日志记录器
  27. logger = logging.getLogger(__name__)
  28. #############################################
  29. # 通用工具函数
  30. #############################################
  31. def json_serial(obj):
  32. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  33. if isinstance(obj, (datetime, date)):
  34. return obj.isoformat()
  35. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  36. # 添加自定义JSON编码器解决Decimal序列化问题
  37. class DecimalEncoder(json.JSONEncoder):
  38. def default(self, obj):
  39. if isinstance(obj, Decimal):
  40. return float(obj)
  41. # 处理日期类型
  42. elif isinstance(obj, (datetime, date)):
  43. return obj.isoformat()
  44. # 让父类处理其他类型
  45. return super(DecimalEncoder, self).default(obj)
  46. #############################################
  47. # 第一阶段: 准备阶段(Prepare Phase)的函数
  48. #############################################
  49. def get_enabled_tables():
  50. """获取所有启用的表"""
  51. conn = get_pg_conn()
  52. cursor = conn.cursor()
  53. try:
  54. cursor.execute("""
  55. SELECT owner_id, table_name
  56. FROM schedule_status
  57. WHERE schedule_is_enabled = TRUE
  58. """)
  59. result = cursor.fetchall()
  60. return [row[1] for row in result] # 只返回表名
  61. except Exception as e:
  62. logger.error(f"获取启用表失败: {str(e)}")
  63. return []
  64. finally:
  65. cursor.close()
  66. conn.close()
  67. def check_table_directly_subscribed(table_name):
  68. """检查表是否在schedule_status表中直接订阅"""
  69. conn = get_pg_conn()
  70. cursor = conn.cursor()
  71. try:
  72. cursor.execute("""
  73. SELECT schedule_is_enabled
  74. FROM schedule_status
  75. WHERE table_name = %s
  76. """, (table_name,))
  77. result = cursor.fetchone()
  78. return result and result[0] is True
  79. except Exception as e:
  80. logger.error(f"检查表订阅状态失败: {str(e)}")
  81. return False
  82. finally:
  83. cursor.close()
  84. conn.close()
  85. def get_table_info_from_neo4j(table_name):
  86. """从Neo4j获取表的详细信息"""
  87. driver = get_neo4j_driver()
  88. # 检查表是否直接订阅
  89. is_directly_schedule = check_table_directly_subscribed(table_name)
  90. table_info = {
  91. 'target_table': table_name,
  92. 'is_directly_schedule': is_directly_schedule, # 初始值设为True,从schedule_status表获取
  93. }
  94. try:
  95. with driver.session() as session:
  96. # 查询表标签和状态
  97. query_table = """
  98. MATCH (t {en_name: $table_name})
  99. RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency
  100. """
  101. result = session.run(query_table, table_name=table_name)
  102. record = result.single()
  103. if record:
  104. labels = record.get("labels", [])
  105. table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
  106. table_info['target_table_status'] = record.get("status", True) # 默认为True
  107. table_info['default_update_frequency'] = record.get("frequency")
  108. # 根据标签类型查询关系和脚本信息
  109. if "DataResource" in labels:
  110. query_rel = """
  111. MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
  112. RETURN source.en_name AS source_table, rel.script_name AS script_name,
  113. rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
  114. """
  115. elif "DataModel" in labels:
  116. query_rel = """
  117. MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
  118. RETURN source.en_name AS source_table, rel.script_name AS script_name,
  119. rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
  120. """
  121. else:
  122. logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
  123. return table_info
  124. result = session.run(query_rel, table_name=table_name)
  125. record = result.single()
  126. if record:
  127. table_info['source_table'] = record.get("source_table")
  128. # 检查script_name是否为空
  129. script_name = record.get("script_name")
  130. if not script_name:
  131. logger.warning(f"表 {table_name} 的关系中没有script_name属性,可能导致后续处理出错")
  132. table_info['script_name'] = script_name
  133. # 设置默认值,确保即使属性为空也有默认值
  134. table_info['script_type'] = record.get("script_type", "python") # 默认为python
  135. table_info['script_exec_mode'] = record.get("script_exec_mode", "append") # 默认为append
  136. else:
  137. logger.warning(f"未找到表 {table_name} 的关系信息")
  138. else:
  139. logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
  140. except Exception as e:
  141. logger.error(f"获取表 {table_name} 的信息时出错: {str(e)}")
  142. finally:
  143. driver.close()
  144. return table_info
  145. def process_dependencies(tables_info):
  146. """处理表间依赖关系,添加被动调度的表"""
  147. # 存储所有表信息的字典
  148. all_tables = {t['target_table']: t for t in tables_info}
  149. driver = get_neo4j_driver()
  150. try:
  151. with driver.session() as session:
  152. for table_name, table_info in list(all_tables.items()):
  153. if table_info.get('target_table_label') == 'DataModel':
  154. # 查询其依赖表
  155. query = """
  156. MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
  157. RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels,
  158. dep.status AS dep_status, dep.frequency AS dep_frequency
  159. """
  160. result = session.run(query, table_name=table_name)
  161. for record in result:
  162. dep_name = record.get("dep_name")
  163. dep_labels = record.get("dep_labels", [])
  164. dep_status = record.get("dep_status", True)
  165. dep_frequency = record.get("dep_frequency")
  166. # 处理未被直接调度的依赖表
  167. if dep_name and dep_name not in all_tables:
  168. logger.info(f"发现被动依赖表: {dep_name}, 标签: {dep_labels}")
  169. # 获取依赖表详细信息
  170. dep_info = get_table_info_from_neo4j(dep_name)
  171. dep_info['is_directly_schedule'] = False
  172. # 处理调度频率继承
  173. if not dep_info.get('default_update_frequency'):
  174. dep_info['default_update_frequency'] = table_info.get('default_update_frequency')
  175. all_tables[dep_name] = dep_info
  176. except Exception as e:
  177. logger.error(f"处理依赖关系时出错: {str(e)}")
  178. finally:
  179. driver.close()
  180. return list(all_tables.values())
  181. def filter_invalid_tables(tables_info):
  182. """过滤无效表及其依赖,使用NetworkX构建依赖图"""
  183. # 构建表名到索引的映射
  184. table_dict = {t['target_table']: i for i, t in enumerate(tables_info)}
  185. # 找出无效表
  186. invalid_tables = set()
  187. for table in tables_info:
  188. if table.get('target_table_status') is False:
  189. invalid_tables.add(table['target_table'])
  190. logger.info(f"表 {table['target_table']} 的状态为无效")
  191. # 构建依赖图
  192. G = nx.DiGraph()
  193. # 添加所有节点
  194. for table in tables_info:
  195. G.add_node(table['target_table'])
  196. # 查询并添加依赖边
  197. driver = get_neo4j_driver()
  198. try:
  199. with driver.session() as session:
  200. for table in tables_info:
  201. if table.get('target_table_label') == 'DataModel':
  202. query = """
  203. MATCH (source {en_name: $table_name})-[:DERIVED_FROM]->(target)
  204. RETURN target.en_name AS target_name
  205. """
  206. result = session.run(query, table_name=table['target_table'])
  207. for record in result:
  208. target_name = record.get("target_name")
  209. if target_name and target_name in table_dict:
  210. # 添加从目标到源的边,表示目标依赖于源
  211. G.add_edge(table['target_table'], target_name)
  212. logger.debug(f"添加依赖边: {table['target_table']} -> {target_name}")
  213. except Exception as e:
  214. logger.error(f"构建依赖图时出错: {str(e)}")
  215. finally:
  216. driver.close()
  217. # 找出依赖于无效表的所有表
  218. downstream_invalid = set()
  219. for invalid_table in invalid_tables:
  220. # 获取可从无效表到达的所有节点
  221. try:
  222. descendants = nx.descendants(G, invalid_table)
  223. downstream_invalid.update(descendants)
  224. logger.info(f"表 {invalid_table} 的下游无效表: {descendants}")
  225. except Exception as e:
  226. logger.error(f"处理表 {invalid_table} 的下游依赖时出错: {str(e)}")
  227. # 合并所有无效表
  228. all_invalid = invalid_tables.union(downstream_invalid)
  229. logger.info(f"总共 {len(all_invalid)} 个表被标记为无效: {all_invalid}")
  230. # 过滤出有效表
  231. valid_tables = [t for t in tables_info if t['target_table'] not in all_invalid]
  232. logger.info(f"过滤后保留 {len(valid_tables)} 个有效表")
  233. return valid_tables
  234. def write_to_airflow_dag_schedule(exec_date, tables_info):
  235. """将表信息写入airflow_dag_schedule表"""
  236. conn = get_pg_conn()
  237. cursor = conn.cursor()
  238. try:
  239. # 清理当日数据,避免重复
  240. cursor.execute("""
  241. DELETE FROM airflow_dag_schedule WHERE exec_date = %s
  242. """, (exec_date,))
  243. logger.info(f"已清理执行日期 {exec_date} 的现有数据")
  244. # 批量插入新数据
  245. inserted_count = 0
  246. for table in tables_info:
  247. cursor.execute("""
  248. INSERT INTO airflow_dag_schedule (
  249. exec_date, source_table, target_table, target_table_label,
  250. target_table_status, is_directly_schedule, default_update_frequency,
  251. script_name, script_type, script_exec_mode
  252. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  253. """, (
  254. exec_date,
  255. table.get('source_table'),
  256. table['target_table'],
  257. table.get('target_table_label'),
  258. table.get('target_table_status', True),
  259. table.get('is_directly_schedule', False),
  260. table.get('default_update_frequency'),
  261. table.get('script_name'),
  262. table.get('script_type', 'python'),
  263. table.get('script_exec_mode', 'append')
  264. ))
  265. inserted_count += 1
  266. conn.commit()
  267. logger.info(f"成功插入 {inserted_count} 条记录到 airflow_dag_schedule 表")
  268. return inserted_count
  269. except Exception as e:
  270. logger.error(f"写入 airflow_dag_schedule 表时出错: {str(e)}")
  271. conn.rollback()
  272. # 不要返回0,而是重新抛出异常,确保错误被正确传播
  273. raise
  274. finally:
  275. cursor.close()
  276. conn.close()
  277. def prepare_dag_schedule(**kwargs):
  278. """准备DAG调度任务的主函数"""
  279. exec_date = kwargs.get('ds') or get_today_date()
  280. logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
  281. # 1. 获取启用的表
  282. enabled_tables = get_enabled_tables()
  283. logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
  284. if not enabled_tables:
  285. logger.warning("没有找到启用的表,准备工作结束")
  286. return 0
  287. # 2. 获取表的详细信息
  288. tables_info = []
  289. for table_name in enabled_tables:
  290. table_info = get_table_info_from_neo4j(table_name)
  291. if table_info:
  292. tables_info.append(table_info)
  293. logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
  294. # 3. 处理依赖关系,添加被动调度的表
  295. enriched_tables = process_dependencies(tables_info)
  296. logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
  297. # 4. 过滤无效表及其依赖
  298. valid_tables = filter_invalid_tables(enriched_tables)
  299. logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
  300. # 5. 写入airflow_dag_schedule表
  301. inserted_count = write_to_airflow_dag_schedule(exec_date, valid_tables)
  302. # 6. 检查插入操作是否成功,如果失败则抛出异常
  303. if inserted_count == 0 and valid_tables:
  304. error_msg = f"插入操作失败,无记录被插入到airflow_dag_schedule表,但有{len(valid_tables)}个有效表需要处理"
  305. logger.error(error_msg)
  306. raise Exception(error_msg)
  307. # 7. 生成执行计划数据
  308. resource_tasks = []
  309. model_tasks = []
  310. for table in valid_tables:
  311. if table.get('target_table_label') == 'DataResource':
  312. resource_tasks.append({
  313. "source_table": table.get('source_table'),
  314. "target_table": table['target_table'],
  315. "target_table_label": "DataResource",
  316. "script_name": table.get('script_name'),
  317. "script_exec_mode": table.get('script_exec_mode', 'append')
  318. })
  319. elif table.get('target_table_label') == 'DataModel':
  320. model_tasks.append({
  321. "source_table": table.get('source_table'),
  322. "target_table": table['target_table'],
  323. "target_table_label": "DataModel",
  324. "script_name": table.get('script_name'),
  325. "script_exec_mode": table.get('script_exec_mode', 'append')
  326. })
  327. # 获取依赖关系
  328. model_table_names = [t['target_table'] for t in model_tasks]
  329. dependencies = {}
  330. driver = get_neo4j_driver()
  331. try:
  332. with driver.session() as session:
  333. for table_name in model_table_names:
  334. query = """
  335. MATCH (source:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(target)
  336. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  337. """
  338. result = session.run(query, table_name=table_name)
  339. deps = []
  340. for record in result:
  341. target = record.get("target")
  342. target_labels = record.get("target_labels", [])
  343. if target:
  344. table_type = next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  345. deps.append({
  346. "table_name": target,
  347. "table_type": table_type
  348. })
  349. dependencies[table_name] = deps
  350. finally:
  351. driver.close()
  352. # 创建执行计划
  353. execution_plan = {
  354. "exec_date": exec_date,
  355. "resource_tasks": resource_tasks,
  356. "model_tasks": model_tasks,
  357. "dependencies": dependencies
  358. }
  359. # 将执行计划保存到XCom
  360. kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
  361. logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  362. # 保存执行计划到文件
  363. try:
  364. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  365. with open(plan_path, 'w') as f:
  366. json.dump(execution_plan, f, default=json_serial, indent=2)
  367. logger.info(f"将执行计划保存到文件: {plan_path}")
  368. # 添加ready标记文件
  369. ready_path = f"{plan_path}.ready"
  370. with open(ready_path, 'w') as f:
  371. f.write(datetime.now().isoformat())
  372. logger.info(f"已创建ready标记文件: {ready_path}")
  373. except Exception as file_e:
  374. logger.error(f"保存执行计划到文件时出错: {str(file_e)}")
  375. return inserted_count
  376. def check_execution_plan_file(**kwargs):
  377. """
  378. 检查执行计划文件是否存在且有效
  379. 返回False将阻止所有下游任务执行
  380. """
  381. logger.info("检查执行计划文件是否存在且有效")
  382. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  383. ready_path = f"{plan_path}.ready"
  384. # 检查文件是否存在
  385. if not os.path.exists(plan_path):
  386. logger.error(f"执行计划文件不存在: {plan_path}")
  387. return False
  388. # 检查ready标记是否存在
  389. if not os.path.exists(ready_path):
  390. logger.error(f"执行计划ready标记文件不存在: {ready_path}")
  391. return False
  392. # 检查文件是否可读且内容有效
  393. try:
  394. with open(plan_path, 'r') as f:
  395. data = json.load(f)
  396. # 检查必要字段
  397. if "exec_date" not in data:
  398. logger.error("执行计划缺少exec_date字段")
  399. return False
  400. if not isinstance(data.get("resource_tasks", []), list):
  401. logger.error("执行计划的resource_tasks字段无效")
  402. return False
  403. if not isinstance(data.get("model_tasks", []), list):
  404. logger.error("执行计划的model_tasks字段无效")
  405. return False
  406. # 检查是否有任务数据
  407. resource_tasks = data.get("resource_tasks", [])
  408. model_tasks = data.get("model_tasks", [])
  409. if not resource_tasks and not model_tasks:
  410. logger.warning("执行计划不包含任何任务,但文件格式有效")
  411. # 注意:即使没有任务,我们仍然允许流程继续
  412. logger.info(f"执行计划文件验证成功: 包含 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
  413. return True
  414. except json.JSONDecodeError as je:
  415. logger.error(f"执行计划文件不是有效的JSON: {str(je)}")
  416. return False
  417. except Exception as e:
  418. logger.error(f"检查执行计划文件时出错: {str(e)}")
  419. return False
  420. #############################################
  421. # 第二阶段: 数据处理阶段(Data Processing Phase)的函数
  422. #############################################
  423. def get_latest_date():
  424. """获取数据库中包含记录的最近日期"""
  425. conn = get_pg_conn()
  426. cursor = conn.cursor()
  427. try:
  428. cursor.execute("""
  429. SELECT DISTINCT exec_date
  430. FROM airflow_dag_schedule
  431. ORDER BY exec_date DESC
  432. LIMIT 1
  433. """)
  434. result = cursor.fetchone()
  435. if result:
  436. latest_date = result[0]
  437. logger.info(f"找到最近的包含记录的日期: {latest_date}")
  438. return latest_date
  439. else:
  440. logger.warning("未找到包含记录的日期,将使用当前日期")
  441. return get_today_date()
  442. except Exception as e:
  443. logger.error(f"查找最近日期时出错: {str(e)}")
  444. return get_today_date()
  445. finally:
  446. cursor.close()
  447. conn.close()
  448. def get_all_tasks(exec_date):
  449. """获取所有需要执行的任务(DataResource和DataModel)"""
  450. conn = get_pg_conn()
  451. cursor = conn.cursor()
  452. try:
  453. # 查询数据表中记录总数
  454. cursor.execute("""
  455. SELECT COUNT(*)
  456. FROM airflow_dag_schedule
  457. WHERE exec_date = %s
  458. """, (exec_date,))
  459. total_count = cursor.fetchone()[0]
  460. logger.info(f"执行日期 {exec_date} 在airflow_dag_schedule表中共有 {total_count} 条记录")
  461. # 查询所有资源表任务
  462. cursor.execute("""
  463. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  464. FROM airflow_dag_schedule
  465. WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
  466. """, (exec_date,))
  467. resource_results = cursor.fetchall()
  468. logger.info(f"查询到 {len(resource_results)} 条DataResource记录")
  469. # 查询所有模型表任务
  470. cursor.execute("""
  471. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  472. FROM airflow_dag_schedule
  473. WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
  474. """, (exec_date,))
  475. model_results = cursor.fetchall()
  476. logger.info(f"查询到 {len(model_results)} 条DataModel记录")
  477. # 整理资源表信息
  478. resource_tasks = []
  479. for row in resource_results:
  480. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  481. if script_name: # 确保脚本名称不为空
  482. resource_tasks.append({
  483. "source_table": source_table,
  484. "target_table": target_table,
  485. "target_table_label": target_table_label,
  486. "script_name": script_name,
  487. "script_exec_mode": script_exec_mode or "append"
  488. })
  489. # 整理模型表信息
  490. model_tasks = []
  491. for row in model_results:
  492. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  493. if script_name: # 确保脚本名称不为空
  494. model_tasks.append({
  495. "source_table": source_table,
  496. "target_table": target_table,
  497. "target_table_label": target_table_label,
  498. "script_name": script_name,
  499. "script_exec_mode": script_exec_mode or "append"
  500. })
  501. logger.info(f"获取到 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  502. return resource_tasks, model_tasks
  503. except Exception as e:
  504. logger.error(f"获取任务信息时出错: {str(e)}")
  505. return [], []
  506. finally:
  507. cursor.close()
  508. conn.close()
  509. def get_table_dependencies(table_names):
  510. """获取表之间的依赖关系"""
  511. driver = get_neo4j_driver()
  512. dependency_dict = {name: [] for name in table_names}
  513. try:
  514. with driver.session() as session:
  515. # 获取所有模型表之间的依赖关系
  516. query = """
  517. MATCH (source:DataModel)-[:DERIVED_FROM]->(target)
  518. WHERE source.en_name IN $table_names
  519. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  520. """
  521. result = session.run(query, table_names=table_names)
  522. for record in result:
  523. source = record.get("source")
  524. target = record.get("target")
  525. target_labels = record.get("target_labels", [])
  526. if source and target:
  527. # 将目标表添加到源表的依赖列表中
  528. dependency_dict[source].append({
  529. "table_name": target,
  530. "table_type": next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  531. })
  532. logger.debug(f"依赖关系: {source} 依赖于 {target}")
  533. except Exception as e:
  534. logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
  535. finally:
  536. driver.close()
  537. return dependency_dict
  538. def create_execution_plan(**kwargs):
  539. """准备执行计划的函数,使用从准备阶段传递的数据"""
  540. try:
  541. # 从XCom获取执行计划
  542. execution_plan = kwargs['ti'].xcom_pull(task_ids='prepare_phase.prepare_dag_schedule', key='execution_plan')
  543. # 如果找不到执行计划,则从数据库获取
  544. if not execution_plan:
  545. # 获取执行日期
  546. exec_date = get_latest_date()
  547. logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
  548. # 获取所有任务
  549. resource_tasks, model_tasks = get_all_tasks(exec_date)
  550. if not resource_tasks and not model_tasks:
  551. logger.warning(f"执行日期 {exec_date} 没有找到任务")
  552. return 0
  553. # 为所有模型表获取依赖关系
  554. model_table_names = [task["target_table"] for task in model_tasks]
  555. dependencies = get_table_dependencies(model_table_names)
  556. # 创建执行计划
  557. new_execution_plan = {
  558. "exec_date": exec_date,
  559. "resource_tasks": resource_tasks,
  560. "model_tasks": model_tasks,
  561. "dependencies": dependencies
  562. }
  563. # 保存执行计划
  564. kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(new_execution_plan, default=json_serial))
  565. logger.info(f"创建新的执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  566. # 保存执行计划到文件
  567. try:
  568. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  569. with open(plan_path, 'w') as f:
  570. json.dump(new_execution_plan, f, default=json_serial, indent=2)
  571. logger.info(f"将执行计划保存到文件: {plan_path}")
  572. # 创建ready标记文件
  573. ready_path = f"{plan_path}.ready"
  574. with open(ready_path, 'w') as f:
  575. f.write(datetime.now().isoformat())
  576. logger.info(f"已创建ready标记文件: {ready_path}")
  577. except Exception as file_e:
  578. logger.error(f"保存执行计划到文件时出错: {str(file_e)}")
  579. return json.dumps(new_execution_plan, default=json_serial)
  580. # 如果从XCom获取到了执行计划,也保存到文件
  581. try:
  582. plan_json = json.loads(execution_plan) if isinstance(execution_plan, str) else execution_plan
  583. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  584. with open(plan_path, 'w') as f:
  585. json.dump(plan_json, f, default=json_serial, indent=2)
  586. logger.info(f"将从XCom获取的执行计划保存到文件: {plan_path}")
  587. # 创建ready标记文件
  588. ready_path = f"{plan_path}.ready"
  589. with open(ready_path, 'w') as f:
  590. f.write(datetime.now().isoformat())
  591. logger.info(f"已创建ready标记文件: {ready_path}")
  592. except Exception as file_e:
  593. logger.error(f"保存从XCom获取的执行计划到文件时出错: {str(file_e)}")
  594. logger.info(f"成功获取执行计划")
  595. return execution_plan
  596. except Exception as e:
  597. logger.error(f"创建执行计划时出错: {str(e)}")
  598. # 返回空执行计划
  599. empty_plan = {
  600. "exec_date": get_today_date(),
  601. "resource_tasks": [],
  602. "model_tasks": [],
  603. "dependencies": {}
  604. }
  605. # 尝试保存空执行计划到文件
  606. try:
  607. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  608. with open(plan_path, 'w') as f:
  609. json.dump(empty_plan, f, default=json_serial, indent=2)
  610. logger.info(f"将空执行计划保存到文件: {plan_path}")
  611. # 创建ready标记文件
  612. ready_path = f"{plan_path}.ready"
  613. with open(ready_path, 'w') as f:
  614. f.write(datetime.now().isoformat())
  615. logger.info(f"已创建ready标记文件: {ready_path}")
  616. except Exception as file_e:
  617. logger.error(f"保存空执行计划到文件时出错: {str(file_e)}")
  618. return json.dumps(empty_plan, default=json_serial)
  619. def process_resource(target_table, script_name, script_exec_mode, exec_date):
  620. """处理单个资源表"""
  621. logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
  622. # 检查exec_date是否是JSON字符串
  623. if isinstance(exec_date, str) and exec_date.startswith('{'):
  624. try:
  625. # 尝试解析JSON字符串
  626. exec_date_data = json.loads(exec_date)
  627. exec_date = exec_date_data.get("exec_date")
  628. logger.info(f"从JSON中提取执行日期: {exec_date}")
  629. except Exception as e:
  630. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  631. try:
  632. # 正常调用执行监控函数
  633. result = execute_with_monitoring(
  634. target_table=target_table,
  635. script_name=script_name,
  636. script_exec_mode=script_exec_mode,
  637. exec_date=exec_date
  638. )
  639. logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
  640. return result
  641. except Exception as e:
  642. logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
  643. # 确保即使出错也返回结果,不会阻塞DAG
  644. return False
  645. def process_model(target_table, script_name, script_exec_mode, exec_date):
  646. """处理单个模型表"""
  647. logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
  648. # 检查exec_date是否是JSON字符串
  649. if isinstance(exec_date, str) and exec_date.startswith('{'):
  650. try:
  651. # 尝试解析JSON字符串
  652. exec_date_data = json.loads(exec_date)
  653. exec_date = exec_date_data.get("exec_date")
  654. logger.info(f"从JSON中提取执行日期: {exec_date}")
  655. except Exception as e:
  656. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  657. try:
  658. result = execute_with_monitoring(
  659. target_table=target_table,
  660. script_name=script_name,
  661. script_exec_mode=script_exec_mode,
  662. exec_date=exec_date
  663. )
  664. logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
  665. return result
  666. except Exception as e:
  667. logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
  668. # 确保即使出错也返回结果,不会阻塞DAG
  669. return False
  670. #############################################
  671. # 第三阶段: 汇总阶段(Summary Phase)的函数
  672. #############################################
  673. def get_execution_stats(exec_date):
  674. """获取当日执行统计信息"""
  675. conn = get_pg_conn()
  676. cursor = conn.cursor()
  677. try:
  678. # 查询总任务数
  679. cursor.execute("""
  680. SELECT COUNT(*) FROM airflow_dag_schedule WHERE exec_date = %s
  681. """, (exec_date,))
  682. result = cursor.fetchone()
  683. total_tasks = result[0] if result else 0
  684. # 查询每种类型的任务数
  685. cursor.execute("""
  686. SELECT target_table_label, COUNT(*)
  687. FROM airflow_dag_schedule
  688. WHERE exec_date = %s
  689. GROUP BY target_table_label
  690. """, (exec_date,))
  691. type_counts = {row[0]: row[1] for row in cursor.fetchall()}
  692. # 查询执行结果统计
  693. cursor.execute("""
  694. SELECT COUNT(*)
  695. FROM airflow_dag_schedule
  696. WHERE exec_date = %s AND exec_result IS TRUE
  697. """, (exec_date,))
  698. result = cursor.fetchone()
  699. success_count = result[0] if result else 0
  700. cursor.execute("""
  701. SELECT COUNT(*)
  702. FROM airflow_dag_schedule
  703. WHERE exec_date = %s AND exec_result IS FALSE
  704. """, (exec_date,))
  705. result = cursor.fetchone()
  706. fail_count = result[0] if result else 0
  707. cursor.execute("""
  708. SELECT COUNT(*)
  709. FROM airflow_dag_schedule
  710. WHERE exec_date = %s AND exec_result IS NULL
  711. """, (exec_date,))
  712. result = cursor.fetchone()
  713. pending_count = result[0] if result else 0
  714. # 计算执行时间统计
  715. cursor.execute("""
  716. SELECT AVG(exec_duration), MIN(exec_duration), MAX(exec_duration)
  717. FROM airflow_dag_schedule
  718. WHERE exec_date = %s AND exec_duration IS NOT NULL
  719. """, (exec_date,))
  720. time_stats = cursor.fetchone()
  721. # 确保时间统计不为None
  722. if time_stats and time_stats[0] is not None:
  723. avg_duration = float(time_stats[0])
  724. min_duration = float(time_stats[1]) if time_stats[1] is not None else None
  725. max_duration = float(time_stats[2]) if time_stats[2] is not None else None
  726. else:
  727. avg_duration = None
  728. min_duration = None
  729. max_duration = None
  730. # 查询失败任务详情
  731. cursor.execute("""
  732. SELECT target_table, script_name, target_table_label, exec_duration
  733. FROM airflow_dag_schedule
  734. WHERE exec_date = %s AND exec_result IS FALSE
  735. """, (exec_date,))
  736. failed_tasks = []
  737. for row in cursor.fetchall():
  738. task_dict = {
  739. "target_table": row[0],
  740. "script_name": row[1],
  741. "target_table_label": row[2],
  742. }
  743. if row[3] is not None:
  744. task_dict["exec_duration"] = float(row[3])
  745. else:
  746. task_dict["exec_duration"] = None
  747. failed_tasks.append(task_dict)
  748. # 计算成功率,避免除零错误
  749. success_rate = 0
  750. if total_tasks > 0:
  751. success_rate = (success_count / total_tasks) * 100
  752. # 汇总统计信息
  753. stats = {
  754. "exec_date": exec_date,
  755. "total_tasks": total_tasks,
  756. "type_counts": type_counts,
  757. "success_count": success_count,
  758. "fail_count": fail_count,
  759. "pending_count": pending_count,
  760. "success_rate": success_rate,
  761. "avg_duration": avg_duration,
  762. "min_duration": min_duration,
  763. "max_duration": max_duration,
  764. "failed_tasks": failed_tasks
  765. }
  766. return stats
  767. except Exception as e:
  768. logger.error(f"获取执行统计信息时出错: {str(e)}")
  769. return {}
  770. finally:
  771. cursor.close()
  772. conn.close()
  773. def update_missing_results(exec_date):
  774. """更新缺失的执行结果信息"""
  775. conn = get_pg_conn()
  776. cursor = conn.cursor()
  777. try:
  778. # 查询所有缺失执行结果的任务
  779. cursor.execute("""
  780. SELECT target_table, script_name
  781. FROM airflow_dag_schedule
  782. WHERE exec_date = %s AND exec_result IS NULL
  783. """, (exec_date,))
  784. missing_results = cursor.fetchall()
  785. update_count = 0
  786. for row in missing_results:
  787. target_table, script_name = row
  788. # 如果有开始时间但没有结束时间,假设执行失败
  789. cursor.execute("""
  790. SELECT exec_start_time
  791. FROM airflow_dag_schedule
  792. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  793. """, (exec_date, target_table, script_name))
  794. start_time = cursor.fetchone()
  795. if start_time and start_time[0]:
  796. # 有开始时间但无结果,标记为失败
  797. now = datetime.now()
  798. duration = (now - start_time[0]).total_seconds()
  799. cursor.execute("""
  800. UPDATE airflow_dag_schedule
  801. SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
  802. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  803. """, (now, duration, exec_date, target_table, script_name))
  804. logger.warning(f"任务 {target_table} 的脚本 {script_name} 标记为失败,开始时间: {start_time[0]}")
  805. update_count += 1
  806. else:
  807. # 没有开始时间且无结果,假设未执行
  808. logger.warning(f"任务 {target_table} 的脚本 {script_name} 未执行")
  809. conn.commit()
  810. logger.info(f"更新了 {update_count} 个缺失结果的任务")
  811. return update_count
  812. except Exception as e:
  813. logger.error(f"更新缺失执行结果时出错: {str(e)}")
  814. conn.rollback()
  815. return 0
  816. finally:
  817. cursor.close()
  818. conn.close()
  819. def generate_execution_report(exec_date, stats):
  820. """生成执行报告"""
  821. # 构建报告
  822. report = []
  823. report.append(f"========== 数据运维系统执行报告 ==========")
  824. report.append(f"执行日期: {exec_date}")
  825. report.append(f"总任务数: {stats['total_tasks']}")
  826. # 任务类型分布
  827. report.append("\n--- 任务类型分布 ---")
  828. for label, count in stats.get('type_counts', {}).items():
  829. report.append(f"{label} 任务: {count} 个")
  830. # 执行结果统计
  831. report.append("\n--- 执行结果统计 ---")
  832. report.append(f"成功任务: {stats.get('success_count', 0)} 个")
  833. report.append(f"失败任务: {stats.get('fail_count', 0)} 个")
  834. report.append(f"未执行任务: {stats.get('pending_count', 0)} 个")
  835. report.append(f"成功率: {stats.get('success_rate', 0):.2f}%")
  836. # 执行时间统计
  837. report.append("\n--- 执行时间统计 (秒) ---")
  838. avg_duration = stats.get('avg_duration')
  839. min_duration = stats.get('min_duration')
  840. max_duration = stats.get('max_duration')
  841. report.append(f"平均执行时间: {avg_duration:.2f}" if avg_duration is not None else "平均执行时间: N/A")
  842. report.append(f"最短执行时间: {min_duration:.2f}" if min_duration is not None else "最短执行时间: N/A")
  843. report.append(f"最长执行时间: {max_duration:.2f}" if max_duration is not None else "最长执行时间: N/A")
  844. # 失败任务详情
  845. failed_tasks = stats.get('failed_tasks', [])
  846. if failed_tasks:
  847. report.append("\n--- 失败任务详情 ---")
  848. for i, task in enumerate(failed_tasks, 1):
  849. report.append(f"{i}. 表名: {task['target_table']}")
  850. report.append(f" 脚本: {task['script_name']}")
  851. report.append(f" 类型: {task['target_table_label']}")
  852. exec_duration = task.get('exec_duration')
  853. if exec_duration is not None:
  854. report.append(f" 执行时间: {exec_duration:.2f} 秒")
  855. else:
  856. report.append(" 执行时间: N/A")
  857. report.append("\n========== 报告结束 ==========")
  858. # 将报告转换为字符串
  859. report_str = "\n".join(report)
  860. # 记录到日志
  861. logger.info("\n" + report_str)
  862. return report_str
  863. def summarize_execution(**kwargs):
  864. """汇总执行情况的主函数"""
  865. try:
  866. exec_date = kwargs.get('ds') or get_today_date()
  867. logger.info(f"开始汇总执行日期 {exec_date} 的统一执行情况")
  868. # 1. 更新缺失的执行结果
  869. try:
  870. update_count = update_missing_results(exec_date)
  871. logger.info(f"更新了 {update_count} 个缺失的执行结果")
  872. except Exception as e:
  873. logger.error(f"更新缺失执行结果时出错: {str(e)}")
  874. update_count = 0
  875. # 2. 获取执行统计信息
  876. try:
  877. stats = get_execution_stats(exec_date)
  878. if not stats:
  879. logger.warning("未能获取执行统计信息,将使用默认值")
  880. stats = {
  881. "exec_date": exec_date,
  882. "total_tasks": 0,
  883. "type_counts": {},
  884. "success_count": 0,
  885. "fail_count": 0,
  886. "pending_count": 0,
  887. "success_rate": 0,
  888. "avg_duration": None,
  889. "min_duration": None,
  890. "max_duration": None,
  891. "failed_tasks": []
  892. }
  893. except Exception as e:
  894. logger.error(f"获取执行统计信息时出错: {str(e)}")
  895. stats = {
  896. "exec_date": exec_date,
  897. "total_tasks": 0,
  898. "type_counts": {},
  899. "success_count": 0,
  900. "fail_count": 0,
  901. "pending_count": 0,
  902. "success_rate": 0,
  903. "avg_duration": None,
  904. "min_duration": None,
  905. "max_duration": None,
  906. "failed_tasks": []
  907. }
  908. # 3. 生成执行报告
  909. try:
  910. report = generate_execution_report(exec_date, stats)
  911. except Exception as e:
  912. logger.error(f"生成执行报告时出错: {str(e)}")
  913. report = f"生成执行报告时出错: {str(e)}\n基础统计: 总任务数: {stats.get('total_tasks', 0)}, 成功: {stats.get('success_count', 0)}, 失败: {stats.get('fail_count', 0)}"
  914. # 将报告和统计信息传递给下一个任务
  915. try:
  916. kwargs['ti'].xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
  917. kwargs['ti'].xcom_push(key='execution_report', value=report)
  918. except Exception as e:
  919. logger.error(f"保存报告到XCom时出错: {str(e)}")
  920. return report
  921. except Exception as e:
  922. logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
  923. # 返回一个简单的错误报告,确保任务不会失败
  924. return f"执行汇总时出现错误: {str(e)}"
  925. # 创建DAG
  926. with DAG(
  927. "dag_dataops_pipeline_data_scheduler",
  928. start_date=datetime(2024, 1, 1),
  929. schedule_interval="@daily",
  930. catchup=False,
  931. default_args={
  932. 'owner': 'airflow',
  933. 'depends_on_past': False,
  934. 'email_on_failure': False,
  935. 'email_on_retry': False,
  936. 'retries': 1,
  937. 'retry_delay': timedelta(minutes=5)
  938. },
  939. # 添加DAG级别参数,确保任务运行时有正确的环境
  940. params={
  941. "scripts_path": SCRIPTS_BASE_PATH,
  942. "airflow_base_path": os.path.dirname(os.path.dirname(__file__))
  943. }
  944. ) as dag:
  945. #############################################
  946. # 阶段1: 准备阶段(Prepare Phase)
  947. #############################################
  948. with TaskGroup("prepare_phase") as prepare_group:
  949. # 任务开始标记
  950. start_preparation = EmptyOperator(
  951. task_id="start_preparation"
  952. )
  953. # 准备调度任务
  954. prepare_task = PythonOperator(
  955. task_id="prepare_dag_schedule",
  956. python_callable=prepare_dag_schedule,
  957. provide_context=True
  958. )
  959. # 验证执行计划有效性
  960. check_plan = ShortCircuitOperator(
  961. task_id="check_execution_plan_file",
  962. python_callable=check_execution_plan_file,
  963. provide_context=True
  964. )
  965. # 创建执行计划
  966. create_plan = PythonOperator(
  967. task_id="create_execution_plan",
  968. python_callable=create_execution_plan,
  969. provide_context=True
  970. )
  971. # 准备完成标记
  972. preparation_completed = EmptyOperator(
  973. task_id="preparation_completed"
  974. )
  975. # 设置任务依赖
  976. start_preparation >> prepare_task >> check_plan >> create_plan >> preparation_completed
  977. #############################################
  978. # 阶段2: 数据处理阶段(Data Processing Phase)
  979. #############################################
  980. with TaskGroup("data_processing_phase") as data_group:
  981. # 数据处理开始任务
  982. start_processing = EmptyOperator(
  983. task_id="start_processing"
  984. )
  985. # 数据处理完成标记
  986. processing_completed = EmptyOperator(
  987. task_id="processing_completed",
  988. trigger_rule="none_failed_min_one_success" # 只要有一个任务成功且没有失败的任务就标记为完成
  989. )
  990. # 设置依赖
  991. start_processing >> processing_completed
  992. #############################################
  993. # 阶段3: 汇总阶段(Summary Phase)
  994. #############################################
  995. with TaskGroup("summary_phase") as summary_group:
  996. # 汇总执行情况
  997. summarize_task = PythonOperator(
  998. task_id="summarize_execution",
  999. python_callable=summarize_execution,
  1000. provide_context=True
  1001. )
  1002. # 总结完成标记
  1003. summary_completed = EmptyOperator(
  1004. task_id="summary_completed"
  1005. )
  1006. # 设置任务依赖
  1007. summarize_task >> summary_completed
  1008. # 设置三个阶段之间的依赖关系
  1009. prepare_group >> data_group >> summary_group
  1010. # 尝试从执行计划文件中获取信息 - 这部分在DAG解析时执行
  1011. try:
  1012. # 尝试从文件中读取最新的执行计划,用于构建DAG图
  1013. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  1014. ready_path = f"{plan_path}.ready"
  1015. if os.path.exists(plan_path) and os.path.exists(ready_path):
  1016. try:
  1017. # 读取ready文件中的时间戳
  1018. with open(ready_path, 'r') as f:
  1019. ready_timestamp = f.read().strip()
  1020. logger.info(f"执行计划ready标记时间: {ready_timestamp}")
  1021. # 读取执行计划文件
  1022. with open(plan_path, 'r') as f:
  1023. execution_plan_json = f.read()
  1024. execution_plan = json.loads(execution_plan_json)
  1025. logger.info(f"从文件加载执行计划: {plan_path}")
  1026. # 提取信息
  1027. exec_date = execution_plan.get("exec_date", get_today_date())
  1028. resource_tasks = execution_plan.get("resource_tasks", [])
  1029. model_tasks = execution_plan.get("model_tasks", [])
  1030. dependencies = execution_plan.get("dependencies", {})
  1031. logger.info(f"执行计划: exec_date={exec_date}, resource_tasks数量={len(resource_tasks)}, model_tasks数量={len(model_tasks)}")
  1032. # 动态创建处理任务
  1033. task_dict = {}
  1034. # 1. 创建资源表任务
  1035. for task_info in resource_tasks:
  1036. table_name = task_info["target_table"]
  1037. script_name = task_info["script_name"]
  1038. exec_mode = task_info.get("script_exec_mode", "append")
  1039. # 创建安全的任务ID
  1040. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  1041. # 确保所有任务都是data_processing_phase的一部分
  1042. with data_group:
  1043. resource_task = PythonOperator(
  1044. task_id=f"resource_{safe_table_name}",
  1045. python_callable=process_resource,
  1046. op_kwargs={
  1047. "target_table": table_name,
  1048. "script_name": script_name,
  1049. "script_exec_mode": exec_mode,
  1050. "exec_date": exec_date
  1051. },
  1052. retries=TASK_RETRY_CONFIG["retries"],
  1053. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
  1054. )
  1055. # 将任务添加到字典
  1056. task_dict[table_name] = resource_task
  1057. # 设置与start_processing的依赖
  1058. start_processing >> resource_task
  1059. # 创建有向图,用于检测模型表之间的依赖关系
  1060. G = nx.DiGraph()
  1061. # 将所有模型表添加为节点
  1062. for task_info in model_tasks:
  1063. table_name = task_info["target_table"]
  1064. G.add_node(table_name)
  1065. # 添加模型表之间的依赖边
  1066. for source, deps in dependencies.items():
  1067. for dep in deps:
  1068. if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
  1069. G.add_edge(dep.get("table_name"), source) # 依赖方向:依赖项 -> 目标
  1070. # 检测循环依赖并处理
  1071. try:
  1072. cycles = list(nx.simple_cycles(G))
  1073. if cycles:
  1074. logger.warning(f"检测到循环依赖: {cycles}")
  1075. for cycle in cycles:
  1076. G.remove_edge(cycle[-1], cycle[0])
  1077. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  1078. except Exception as e:
  1079. logger.error(f"检测循环依赖时出错: {str(e)}")
  1080. # 生成拓扑排序,确定执行顺序
  1081. execution_order = []
  1082. try:
  1083. execution_order = list(nx.topological_sort(G))
  1084. except Exception as e:
  1085. logger.error(f"生成拓扑排序失败: {str(e)}")
  1086. execution_order = [task_info["target_table"] for task_info in model_tasks]
  1087. # 2. 按拓扑排序顺序创建模型表任务
  1088. for table_name in execution_order:
  1089. task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
  1090. if not task_info:
  1091. continue
  1092. script_name = task_info["script_name"]
  1093. exec_mode = task_info.get("script_exec_mode", "append")
  1094. # 创建安全的任务ID
  1095. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  1096. # 确保所有任务都是data_processing_phase的一部分
  1097. with data_group:
  1098. model_task = PythonOperator(
  1099. task_id=f"model_{safe_table_name}",
  1100. python_callable=process_model,
  1101. op_kwargs={
  1102. "target_table": table_name,
  1103. "script_name": script_name,
  1104. "script_exec_mode": exec_mode,
  1105. "exec_date": exec_date
  1106. },
  1107. retries=TASK_RETRY_CONFIG["retries"],
  1108. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
  1109. )
  1110. # 将任务添加到字典
  1111. task_dict[table_name] = model_task
  1112. # 设置依赖关系
  1113. deps = dependencies.get(table_name, [])
  1114. has_dependency = False
  1115. # 处理模型表之间的依赖
  1116. for dep in deps:
  1117. dep_table = dep.get("table_name")
  1118. dep_type = dep.get("table_type")
  1119. if dep_table in task_dict:
  1120. task_dict[dep_table] >> model_task
  1121. has_dependency = True
  1122. logger.info(f"设置依赖: {dep_table} >> {table_name}")
  1123. # 如果没有依赖,则依赖于start_processing和资源表任务
  1124. if not has_dependency:
  1125. # 从start_processing任务直接连接
  1126. start_processing >> model_task
  1127. # 同时从所有资源表任务连接
  1128. resource_count = 0
  1129. for resource_table in resource_tasks:
  1130. if resource_count >= 5: # 最多设置5个依赖
  1131. break
  1132. resource_name = resource_table["target_table"]
  1133. if resource_name in task_dict:
  1134. task_dict[resource_name] >> model_task
  1135. resource_count += 1
  1136. # 找出所有终端任务(没有下游依赖的任务)
  1137. terminal_tasks = []
  1138. # 检查所有模型表任务
  1139. for table_name in execution_order:
  1140. # 检查是否有下游任务
  1141. has_downstream = False
  1142. for source, deps in dependencies.items():
  1143. if source == table_name: # 跳过自身
  1144. continue
  1145. for dep in deps:
  1146. if dep.get("table_name") == table_name:
  1147. has_downstream = True
  1148. break
  1149. if has_downstream:
  1150. break
  1151. # 如果没有下游任务,添加到终端任务列表
  1152. if not has_downstream and table_name in task_dict:
  1153. terminal_tasks.append(table_name)
  1154. # 如果没有模型表任务,将所有资源表任务视为终端任务
  1155. if not model_tasks and resource_tasks:
  1156. terminal_tasks = [task["target_table"] for task in resource_tasks]
  1157. logger.info(f"没有模型表任务,将所有资源表任务视为终端任务: {terminal_tasks}")
  1158. # 如果既没有模型表任务也没有资源表任务,已有默认依赖链
  1159. if not terminal_tasks:
  1160. logger.warning("未找到任何任务,使用默认依赖链")
  1161. else:
  1162. # 将所有终端任务连接到完成标记
  1163. for table_name in terminal_tasks:
  1164. if table_name in task_dict:
  1165. task_dict[table_name] >> processing_completed
  1166. logger.info(f"设置终端任务: {table_name} >> processing_completed")
  1167. except Exception as plan_e:
  1168. logger.error(f"解析执行计划文件时出错: {str(plan_e)}")
  1169. import traceback
  1170. logger.error(traceback.format_exc())
  1171. else:
  1172. if not os.path.exists(plan_path):
  1173. logger.warning(f"执行计划文件不存在: {plan_path}")
  1174. if not os.path.exists(ready_path):
  1175. logger.warning(f"执行计划ready标记文件不存在: {ready_path}")
  1176. logger.warning("将使用默认DAG结构")
  1177. except Exception as e:
  1178. logger.error(f"加载执行计划文件时出错: {str(e)}")
  1179. import traceback
  1180. logger.error(traceback.format_exc())
  1181. logger.info(f"DAG dag_dataops_pipeline_data_scheduler 定义完成")