dag_dataops_unified_data_scheduler.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. # dag_dataops_unified_data_scheduler.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from airflow.sensors.external_task import ExternalTaskSensor
  6. from datetime import datetime, timedelta, date
  7. import logging
  8. import networkx as nx
  9. import json
  10. import os
  11. from common import (
  12. get_pg_conn,
  13. get_neo4j_driver,
  14. execute_with_monitoring,
  15. get_today_date
  16. )
  17. from config import TASK_RETRY_CONFIG
  18. # 创建日志记录器
  19. logger = logging.getLogger(__name__)
  20. def get_latest_date():
  21. """获取数据库中包含记录的最近日期"""
  22. conn = get_pg_conn()
  23. cursor = conn.cursor()
  24. try:
  25. cursor.execute("""
  26. SELECT DISTINCT exec_date
  27. FROM airflow_dag_schedule
  28. ORDER BY exec_date DESC
  29. LIMIT 1
  30. """)
  31. result = cursor.fetchone()
  32. if result:
  33. latest_date = result[0]
  34. logger.info(f"找到最近的包含记录的日期: {latest_date}")
  35. return latest_date
  36. else:
  37. logger.warning("未找到包含记录的日期,将使用当前日期")
  38. return get_today_date()
  39. except Exception as e:
  40. logger.error(f"查找最近日期时出错: {str(e)}")
  41. return get_today_date()
  42. finally:
  43. cursor.close()
  44. conn.close()
  45. def get_all_tasks(exec_date):
  46. """获取所有需要执行的任务(DataResource和DataModel)"""
  47. conn = get_pg_conn()
  48. cursor = conn.cursor()
  49. try:
  50. # 查询数据表中记录总数
  51. cursor.execute("""
  52. SELECT COUNT(*)
  53. FROM airflow_dag_schedule
  54. WHERE exec_date = %s
  55. """, (exec_date,))
  56. total_count = cursor.fetchone()[0]
  57. logger.info(f"执行日期 {exec_date} 在airflow_dag_schedule表中共有 {total_count} 条记录")
  58. # 查询所有资源表任务
  59. cursor.execute("""
  60. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  61. FROM airflow_dag_schedule
  62. WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
  63. """, (exec_date,))
  64. resource_results = cursor.fetchall()
  65. logger.info(f"查询到 {len(resource_results)} 条DataResource记录")
  66. # 查询所有模型表任务
  67. cursor.execute("""
  68. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  69. FROM airflow_dag_schedule
  70. WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
  71. """, (exec_date,))
  72. model_results = cursor.fetchall()
  73. logger.info(f"查询到 {len(model_results)} 条DataModel记录")
  74. # 整理资源表信息
  75. resource_tasks = []
  76. for row in resource_results:
  77. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  78. if script_name: # 确保脚本名称不为空
  79. resource_tasks.append({
  80. "source_table": source_table,
  81. "target_table": target_table,
  82. "target_table_label": target_table_label,
  83. "script_name": script_name,
  84. "script_exec_mode": script_exec_mode or "append"
  85. })
  86. # 整理模型表信息
  87. model_tasks = []
  88. for row in model_results:
  89. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  90. if script_name: # 确保脚本名称不为空
  91. model_tasks.append({
  92. "source_table": source_table,
  93. "target_table": target_table,
  94. "target_table_label": target_table_label,
  95. "script_name": script_name,
  96. "script_exec_mode": script_exec_mode or "append"
  97. })
  98. logger.info(f"获取到 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  99. return resource_tasks, model_tasks
  100. except Exception as e:
  101. logger.error(f"获取任务信息时出错: {str(e)}")
  102. return [], []
  103. finally:
  104. cursor.close()
  105. conn.close()
  106. def get_table_dependencies(table_names):
  107. """获取表之间的依赖关系"""
  108. driver = get_neo4j_driver()
  109. dependency_dict = {name: [] for name in table_names}
  110. try:
  111. with driver.session() as session:
  112. # 获取所有模型表之间的依赖关系
  113. query = """
  114. MATCH (source:DataModel)-[:DERIVED_FROM]->(target)
  115. WHERE source.en_name IN $table_names
  116. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  117. """
  118. result = session.run(query, table_names=table_names)
  119. for record in result:
  120. source = record.get("source")
  121. target = record.get("target")
  122. target_labels = record.get("target_labels", [])
  123. if source and target:
  124. # 将目标表添加到源表的依赖列表中
  125. dependency_dict[source].append({
  126. "table_name": target,
  127. "table_type": next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  128. })
  129. logger.debug(f"依赖关系: {source} 依赖于 {target}")
  130. except Exception as e:
  131. logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
  132. finally:
  133. driver.close()
  134. return dependency_dict
  135. def json_serial(obj):
  136. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  137. if isinstance(obj, (datetime, date)):
  138. return obj.isoformat()
  139. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  140. def prepare_unified_execution_plan(**kwargs):
  141. """准备统一执行计划的主函数"""
  142. # 获取执行日期
  143. exec_date = get_latest_date()
  144. logger.info(f"使用执行日期: {exec_date}")
  145. # 获取所有任务
  146. resource_tasks, model_tasks = get_all_tasks(exec_date)
  147. if not resource_tasks and not model_tasks:
  148. logger.warning(f"执行日期 {exec_date} 没有找到任务")
  149. return 0
  150. # 为所有模型表获取依赖关系
  151. model_table_names = [task["target_table"] for task in model_tasks]
  152. dependencies = get_table_dependencies(model_table_names)
  153. # 创建执行计划
  154. execution_plan = {
  155. "exec_date": exec_date,
  156. "resource_tasks": resource_tasks,
  157. "model_tasks": model_tasks,
  158. "dependencies": dependencies
  159. }
  160. # 记录资源任务和模型任务的名称,便于调试
  161. resource_names = [task["target_table"] for task in resource_tasks]
  162. model_names = [task["target_table"] for task in model_tasks]
  163. logger.info(f"资源表任务: {resource_names}")
  164. logger.info(f"模型表任务: {model_names}")
  165. # 将执行计划保存到XCom,使用自定义序列化器处理日期对象
  166. try:
  167. kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
  168. logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  169. except Exception as e:
  170. logger.error(f"将执行计划保存到XCom时出错: {str(e)}")
  171. # 保存执行计划到文件以备后用
  172. try:
  173. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  174. with open(plan_path, 'w') as f:
  175. json.dump(execution_plan, f, default=json_serial)
  176. logger.info(f"将执行计划保存到文件: {plan_path}")
  177. except Exception as file_e:
  178. logger.error(f"保存执行计划到文件时出错: {str(file_e)}")
  179. return len(resource_tasks) + len(model_tasks)
  180. def process_resource(target_table, script_name, script_exec_mode, exec_date):
  181. """处理单个资源表"""
  182. logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
  183. # 检查exec_date是否是JSON字符串
  184. if isinstance(exec_date, str) and exec_date.startswith('{'):
  185. try:
  186. # 尝试解析JSON字符串
  187. exec_date_data = json.loads(exec_date)
  188. exec_date = exec_date_data.get("exec_date")
  189. logger.info(f"从JSON中提取执行日期: {exec_date}")
  190. except Exception as e:
  191. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  192. try:
  193. # 正常调用执行监控函数
  194. result = execute_with_monitoring(
  195. target_table=target_table,
  196. script_name=script_name,
  197. script_exec_mode=script_exec_mode,
  198. exec_date=exec_date
  199. )
  200. logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
  201. return result
  202. except Exception as e:
  203. logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
  204. # 确保即使出错也返回结果,不会阻塞DAG
  205. return False
  206. def process_model(target_table, script_name, script_exec_mode, exec_date):
  207. """处理单个模型表"""
  208. logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
  209. # 检查exec_date是否是JSON字符串
  210. if isinstance(exec_date, str) and exec_date.startswith('{'):
  211. try:
  212. # 尝试解析JSON字符串
  213. exec_date_data = json.loads(exec_date)
  214. exec_date = exec_date_data.get("exec_date")
  215. logger.info(f"从JSON中提取执行日期: {exec_date}")
  216. except Exception as e:
  217. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  218. try:
  219. result = execute_with_monitoring(
  220. target_table=target_table,
  221. script_name=script_name,
  222. script_exec_mode=script_exec_mode,
  223. exec_date=exec_date
  224. )
  225. logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
  226. return result
  227. except Exception as e:
  228. logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
  229. # 确保即使出错也返回结果,不会阻塞DAG
  230. return False
  231. # 预先加载数据以创建任务
  232. try:
  233. logger.info("预先加载执行计划数据用于构建DAG")
  234. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  235. execution_plan = {"exec_date": get_today_date(), "resource_tasks": [], "model_tasks": [], "dependencies": {}}
  236. if os.path.exists(plan_path):
  237. try:
  238. with open(plan_path, 'r') as f:
  239. execution_plan_json = f.read()
  240. execution_plan = json.loads(execution_plan_json)
  241. logger.info(f"从文件加载执行计划: {plan_path}")
  242. except Exception as e:
  243. logger.warning(f"读取执行计划文件出错: {str(e)}")
  244. else:
  245. logger.warning(f"执行计划文件不存在: {plan_path},将创建基础DAG结构")
  246. # 提取信息
  247. exec_date = execution_plan.get("exec_date", get_today_date())
  248. resource_tasks = execution_plan.get("resource_tasks", [])
  249. model_tasks = execution_plan.get("model_tasks", [])
  250. dependencies = execution_plan.get("dependencies", {})
  251. logger.info(f"预加载执行计划: exec_date={exec_date}, resource_tasks数量={len(resource_tasks)}, model_tasks数量={len(model_tasks)}")
  252. except Exception as e:
  253. logger.error(f"预加载执行计划数据时出错: {str(e)}")
  254. exec_date = get_today_date()
  255. resource_tasks = []
  256. model_tasks = []
  257. dependencies = {}
  258. # 定义处理DAG失败的回调函数
  259. def handle_dag_failure(context):
  260. logger.error(f"DAG执行失败: {context.get('exception')}")
  261. # 创建DAG
  262. with DAG(
  263. "dag_dataops_unified_data_scheduler",
  264. start_date=datetime(2024, 1, 1),
  265. schedule_interval="@daily",
  266. catchup=False,
  267. default_args={
  268. 'owner': 'airflow',
  269. 'depends_on_past': False,
  270. 'email_on_failure': False,
  271. 'email_on_retry': False,
  272. 'retries': 1,
  273. 'retry_delay': timedelta(minutes=5)
  274. },
  275. on_failure_callback=handle_dag_failure # 在这里设置回调函数
  276. ) as dag:
  277. # 等待准备DAG完成
  278. wait_for_prepare = ExternalTaskSensor(
  279. task_id="wait_for_prepare",
  280. external_dag_id="dag_dataops_unified_prepare_scheduler",
  281. external_task_id="preparation_completed",
  282. mode="reschedule", # 改为reschedule模式,减少资源占用
  283. timeout=3600,
  284. poke_interval=30,
  285. execution_timeout=timedelta(hours=1),
  286. soft_fail=True,
  287. allowed_states=["success", "skipped"],
  288. dag=dag
  289. )
  290. # 准备执行计划
  291. prepare_plan = PythonOperator(
  292. task_id="prepare_execution_plan",
  293. python_callable=prepare_unified_execution_plan,
  294. provide_context=True,
  295. dag=dag
  296. )
  297. # 处理完成标记
  298. processing_completed = EmptyOperator(
  299. task_id="processing_completed",
  300. trigger_rule="none_failed_min_one_success", # 只要有一个任务成功且没有失败的任务就标记为完成
  301. dag=dag
  302. )
  303. # 设置初始任务依赖
  304. wait_for_prepare >> prepare_plan
  305. # 任务字典,用于设置依赖关系
  306. task_dict = {}
  307. # 1. 预先创建资源表任务
  308. for task_info in resource_tasks:
  309. table_name = task_info["target_table"]
  310. script_name = task_info["script_name"]
  311. exec_mode = task_info.get("script_exec_mode", "append")
  312. # 创建安全的任务ID
  313. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  314. task_id = f"resource_{safe_table_name}"
  315. resource_task = PythonOperator(
  316. task_id=task_id,
  317. python_callable=process_resource,
  318. op_kwargs={
  319. "target_table": table_name,
  320. "script_name": script_name,
  321. "script_exec_mode": exec_mode,
  322. "exec_date": exec_date # 直接使用解析出的exec_date,不使用XCom
  323. },
  324. retries=TASK_RETRY_CONFIG["retries"],
  325. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
  326. dag=dag
  327. )
  328. # 将任务添加到字典
  329. task_dict[table_name] = resource_task
  330. # 设置与prepare_plan的依赖 - 直接依赖,不需要其他条件
  331. prepare_plan >> resource_task
  332. logger.info(f"预先设置基本依赖: prepare_plan >> {task_id}")
  333. # 创建有向图,用于检测模型表之间的依赖关系
  334. G = nx.DiGraph()
  335. # 将所有模型表添加为节点
  336. for task_info in model_tasks:
  337. table_name = task_info["target_table"]
  338. G.add_node(table_name)
  339. # 添加模型表之间的依赖边
  340. for source, deps in dependencies.items():
  341. for dep in deps:
  342. if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
  343. G.add_edge(dep.get("table_name"), source) # 依赖方向:依赖项 -> 目标
  344. # 检测循环依赖并处理
  345. cycles = list(nx.simple_cycles(G))
  346. if cycles:
  347. logger.warning(f"检测到循环依赖: {cycles}")
  348. for cycle in cycles:
  349. G.remove_edge(cycle[-1], cycle[0])
  350. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  351. # 生成拓扑排序,确定执行顺序
  352. execution_order = []
  353. try:
  354. execution_order = list(nx.topological_sort(G))
  355. logger.info(f"预加载计算的执行顺序: {execution_order}")
  356. except Exception as e:
  357. logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
  358. execution_order = [task_info["target_table"] for task_info in model_tasks]
  359. # 2. 预先创建模型表任务
  360. for table_name in execution_order:
  361. task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
  362. if not task_info:
  363. continue
  364. script_name = task_info["script_name"]
  365. exec_mode = task_info.get("script_exec_mode", "append")
  366. # 创建安全的任务ID
  367. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  368. task_id = f"model_{safe_table_name}"
  369. model_task = PythonOperator(
  370. task_id=task_id,
  371. python_callable=process_model,
  372. op_kwargs={
  373. "target_table": table_name,
  374. "script_name": script_name,
  375. "script_exec_mode": exec_mode,
  376. "exec_date": exec_date # 直接使用解析出的exec_date,不使用XCom
  377. },
  378. retries=TASK_RETRY_CONFIG["retries"],
  379. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
  380. dag=dag
  381. )
  382. # 将任务添加到字典
  383. task_dict[table_name] = model_task
  384. # 设置依赖关系
  385. deps = dependencies.get(table_name, [])
  386. has_dependency = False
  387. # 处理模型表之间的依赖
  388. for dep in deps:
  389. dep_table = dep.get("table_name")
  390. dep_type = dep.get("table_type")
  391. if dep_table in task_dict:
  392. task_dict[dep_table] >> model_task
  393. has_dependency = True
  394. logger.info(f"预先设置依赖: {dep_table} >> {table_name}")
  395. # 如果没有依赖,则依赖于准备任务和所有资源表任务
  396. if not has_dependency:
  397. # 从prepare_plan任务直接连接
  398. prepare_plan >> model_task
  399. logger.info(f"预先设置基本依赖: prepare_plan >> {task_id}")
  400. # 同时从所有资源表任务连接 - 限制每个模型表最多依赖5个资源表,避免过度复杂的依赖关系
  401. resource_count = 0
  402. for resource_table in resource_tasks:
  403. if resource_count >= 5:
  404. break
  405. resource_name = resource_table["target_table"]
  406. if resource_name in task_dict:
  407. task_dict[resource_name] >> model_task
  408. logger.info(f"预先设置资源依赖: {resource_name} >> {table_name}")
  409. resource_count += 1
  410. # 找出所有终端任务(没有下游依赖的任务)
  411. terminal_tasks = []
  412. # 检查所有模型表任务
  413. for table_name in execution_order:
  414. # 检查是否有下游任务
  415. has_downstream = False
  416. for source, deps in dependencies.items():
  417. if source == table_name: # 跳过自身
  418. continue
  419. for dep in deps:
  420. if dep.get("table_name") == table_name:
  421. has_downstream = True
  422. break
  423. if has_downstream:
  424. break
  425. # 如果没有下游任务,添加到终端任务列表
  426. if not has_downstream and table_name in task_dict:
  427. terminal_tasks.append(table_name)
  428. # 如果没有模型表任务,将所有资源表任务视为终端任务
  429. if not model_tasks and resource_tasks:
  430. terminal_tasks = [task["target_table"] for task in resource_tasks]
  431. # 如果既没有模型表任务也没有资源表任务,直接连接准备任务到完成标记
  432. if not terminal_tasks:
  433. prepare_plan >> processing_completed
  434. logger.warning("未找到任何任务,直接连接准备任务到完成标记")
  435. else:
  436. # 将所有终端任务连接到完成标记
  437. for table_name in terminal_tasks:
  438. if table_name in task_dict:
  439. task_dict[table_name] >> processing_completed
  440. logger.info(f"设置终端任务: {table_name} >> processing_completed")
  441. logger.info(f"DAG dag_dataops_unified_data_scheduler 定义完成,预创建了 {len(task_dict)} 个任务")