dag_dataops_unified_data_scheduler.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. # dag_dataops_unified_data_scheduler.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from datetime import datetime, timedelta, date
  6. import logging
  7. import networkx as nx
  8. import json
  9. import os
  10. from common import (
  11. get_pg_conn,
  12. get_neo4j_driver,
  13. execute_with_monitoring,
  14. get_today_date
  15. )
  16. from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, AIRFLOW_BASE_PATH
  17. # 创建日志记录器
  18. logger = logging.getLogger(__name__)
  19. def get_latest_date():
  20. """获取数据库中包含记录的最近日期"""
  21. conn = get_pg_conn()
  22. cursor = conn.cursor()
  23. try:
  24. cursor.execute("""
  25. SELECT DISTINCT exec_date
  26. FROM airflow_dag_schedule
  27. ORDER BY exec_date DESC
  28. LIMIT 1
  29. """)
  30. result = cursor.fetchone()
  31. if result:
  32. latest_date = result[0]
  33. logger.info(f"找到最近的包含记录的日期: {latest_date}")
  34. return latest_date
  35. else:
  36. logger.warning("未找到包含记录的日期,将使用当前日期")
  37. return get_today_date()
  38. except Exception as e:
  39. logger.error(f"查找最近日期时出错: {str(e)}")
  40. return get_today_date()
  41. finally:
  42. cursor.close()
  43. conn.close()
  44. def get_all_tasks(exec_date):
  45. """获取所有需要执行的任务(DataResource和DataModel)"""
  46. conn = get_pg_conn()
  47. cursor = conn.cursor()
  48. try:
  49. # 查询数据表中记录总数
  50. cursor.execute("""
  51. SELECT COUNT(*)
  52. FROM airflow_dag_schedule
  53. WHERE exec_date = %s
  54. """, (exec_date,))
  55. total_count = cursor.fetchone()[0]
  56. logger.info(f"执行日期 {exec_date} 在airflow_dag_schedule表中共有 {total_count} 条记录")
  57. # 查询所有资源表任务
  58. cursor.execute("""
  59. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  60. FROM airflow_dag_schedule
  61. WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
  62. """, (exec_date,))
  63. resource_results = cursor.fetchall()
  64. logger.info(f"查询到 {len(resource_results)} 条DataResource记录")
  65. # 查询所有模型表任务
  66. cursor.execute("""
  67. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  68. FROM airflow_dag_schedule
  69. WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
  70. """, (exec_date,))
  71. model_results = cursor.fetchall()
  72. logger.info(f"查询到 {len(model_results)} 条DataModel记录")
  73. # 整理资源表信息
  74. resource_tasks = []
  75. for row in resource_results:
  76. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  77. if script_name: # 确保脚本名称不为空
  78. resource_tasks.append({
  79. "source_table": source_table,
  80. "target_table": target_table,
  81. "target_table_label": target_table_label,
  82. "script_name": script_name,
  83. "script_exec_mode": script_exec_mode or "append"
  84. })
  85. # 整理模型表信息
  86. model_tasks = []
  87. for row in model_results:
  88. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  89. if script_name: # 确保脚本名称不为空
  90. model_tasks.append({
  91. "source_table": source_table,
  92. "target_table": target_table,
  93. "target_table_label": target_table_label,
  94. "script_name": script_name,
  95. "script_exec_mode": script_exec_mode or "append"
  96. })
  97. logger.info(f"获取到 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  98. return resource_tasks, model_tasks
  99. except Exception as e:
  100. logger.error(f"获取任务信息时出错: {str(e)}")
  101. return [], []
  102. finally:
  103. cursor.close()
  104. conn.close()
  105. def get_table_dependencies(table_names):
  106. """获取表之间的依赖关系"""
  107. driver = get_neo4j_driver()
  108. dependency_dict = {name: [] for name in table_names}
  109. try:
  110. with driver.session() as session:
  111. # 获取所有模型表之间的依赖关系
  112. query = """
  113. MATCH (source:DataModel)-[:DERIVED_FROM]->(target)
  114. WHERE source.en_name IN $table_names
  115. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  116. """
  117. result = session.run(query, table_names=table_names)
  118. for record in result:
  119. source = record.get("source")
  120. target = record.get("target")
  121. target_labels = record.get("target_labels", [])
  122. if source and target:
  123. # 将目标表添加到源表的依赖列表中
  124. dependency_dict[source].append({
  125. "table_name": target,
  126. "table_type": next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  127. })
  128. logger.debug(f"依赖关系: {source} 依赖于 {target}")
  129. except Exception as e:
  130. logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
  131. finally:
  132. driver.close()
  133. return dependency_dict
  134. def json_serial(obj):
  135. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  136. if isinstance(obj, (datetime, date)):
  137. return obj.isoformat()
  138. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  139. def prepare_unified_execution_plan(**kwargs):
  140. """准备统一执行计划的主函数"""
  141. # 获取执行日期
  142. exec_date = get_latest_date()
  143. logger.info(f"使用执行日期: {exec_date}")
  144. # 获取所有任务
  145. resource_tasks, model_tasks = get_all_tasks(exec_date)
  146. if not resource_tasks and not model_tasks:
  147. logger.warning(f"执行日期 {exec_date} 没有找到任务")
  148. return 0
  149. # 为所有模型表获取依赖关系
  150. model_table_names = [task["target_table"] for task in model_tasks]
  151. dependencies = get_table_dependencies(model_table_names)
  152. # 创建执行计划
  153. execution_plan = {
  154. "exec_date": exec_date,
  155. "resource_tasks": resource_tasks,
  156. "model_tasks": model_tasks,
  157. "dependencies": dependencies
  158. }
  159. # 记录资源任务和模型任务的名称,便于调试
  160. resource_names = [task["target_table"] for task in resource_tasks]
  161. model_names = [task["target_table"] for task in model_tasks]
  162. logger.info(f"资源表任务: {resource_names}")
  163. logger.info(f"模型表任务: {model_names}")
  164. # 已经不需要推送到XCom,因为我们直接从文件读取
  165. # 这里仅用于验证执行计划与文件中的是否一致
  166. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  167. ready_path = f"{plan_path}.ready"
  168. if os.path.exists(plan_path) and os.path.exists(ready_path):
  169. try:
  170. with open(plan_path, 'r') as f:
  171. existing_plan = json.load(f)
  172. # 比较执行计划是否有变化
  173. existing_resources = sorted([t.get("target_table") for t in existing_plan.get("resource_tasks", [])])
  174. current_resources = sorted(resource_names)
  175. existing_models = sorted([t.get("target_table") for t in existing_plan.get("model_tasks", [])])
  176. current_models = sorted(model_names)
  177. if existing_resources == current_resources and existing_models == current_models:
  178. logger.info("执行计划无变化,继续使用现有任务结构")
  179. else:
  180. logger.warning("执行计划与现有文件不一致,但DAG结构已固定,需等待下次解析")
  181. logger.warning(f"现有资源表: {existing_resources}")
  182. logger.warning(f"当前资源表: {current_resources}")
  183. logger.warning(f"现有模型表: {existing_models}")
  184. logger.warning(f"当前模型表: {current_models}")
  185. except Exception as e:
  186. logger.error(f"比较执行计划时出错: {str(e)}")
  187. logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  188. return len(resource_tasks) + len(model_tasks)
  189. def process_resource(target_table, script_name, script_exec_mode, exec_date):
  190. """处理单个资源表"""
  191. logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
  192. # 检查exec_date是否是JSON字符串
  193. if isinstance(exec_date, str) and exec_date.startswith('{'):
  194. try:
  195. # 尝试解析JSON字符串
  196. exec_date_data = json.loads(exec_date)
  197. exec_date = exec_date_data.get("exec_date")
  198. logger.info(f"从JSON中提取执行日期: {exec_date}")
  199. except Exception as e:
  200. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  201. try:
  202. # 正常调用执行监控函数
  203. result = execute_with_monitoring(
  204. target_table=target_table,
  205. script_name=script_name,
  206. script_exec_mode=script_exec_mode,
  207. exec_date=exec_date
  208. )
  209. logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
  210. return result
  211. except Exception as e:
  212. logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
  213. # 确保即使出错也返回结果,不会阻塞DAG
  214. return False
  215. def process_model(target_table, script_name, script_exec_mode, exec_date):
  216. """处理单个模型表"""
  217. logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
  218. # 检查exec_date是否是JSON字符串
  219. if isinstance(exec_date, str) and exec_date.startswith('{'):
  220. try:
  221. # 尝试解析JSON字符串
  222. exec_date_data = json.loads(exec_date)
  223. exec_date = exec_date_data.get("exec_date")
  224. logger.info(f"从JSON中提取执行日期: {exec_date}")
  225. except Exception as e:
  226. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  227. try:
  228. result = execute_with_monitoring(
  229. target_table=target_table,
  230. script_name=script_name,
  231. script_exec_mode=script_exec_mode,
  232. exec_date=exec_date
  233. )
  234. logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
  235. return result
  236. except Exception as e:
  237. logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
  238. # 确保即使出错也返回结果,不会阻塞DAG
  239. return False
  240. # 修改预先加载数据以创建任务的逻辑
  241. try:
  242. logger.info("预先加载执行计划数据用于构建DAG")
  243. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  244. ready_path = f"{plan_path}.ready"
  245. execution_plan = {"exec_date": get_today_date(), "resource_tasks": [], "model_tasks": [], "dependencies": {}}
  246. # 首先检查ready文件是否存在,确保JSON文件已完整生成
  247. if os.path.exists(ready_path) and os.path.exists(plan_path):
  248. try:
  249. # 读取ready文件中的时间戳
  250. with open(ready_path, 'r') as f:
  251. ready_timestamp = f.read().strip()
  252. logger.info(f"执行计划ready标记时间: {ready_timestamp}")
  253. # 读取执行计划文件
  254. with open(plan_path, 'r') as f:
  255. execution_plan_json = f.read()
  256. execution_plan = json.loads(execution_plan_json)
  257. logger.info(f"从文件加载执行计划: {plan_path}")
  258. except Exception as e:
  259. logger.warning(f"读取执行计划文件出错: {str(e)}")
  260. else:
  261. if not os.path.exists(ready_path):
  262. logger.warning(f"执行计划ready标记文件不存在: {ready_path}")
  263. if not os.path.exists(plan_path):
  264. logger.warning(f"执行计划文件不存在: {plan_path}")
  265. logger.warning("将创建基础DAG结构")
  266. # 提取信息
  267. exec_date = execution_plan.get("exec_date", get_today_date())
  268. resource_tasks = execution_plan.get("resource_tasks", [])
  269. model_tasks = execution_plan.get("model_tasks", [])
  270. dependencies = execution_plan.get("dependencies", {})
  271. logger.info(f"预加载执行计划: exec_date={exec_date}, resource_tasks数量={len(resource_tasks)}, model_tasks数量={len(model_tasks)}")
  272. except Exception as e:
  273. logger.error(f"预加载执行计划数据时出错: {str(e)}")
  274. exec_date = get_today_date()
  275. resource_tasks = []
  276. model_tasks = []
  277. dependencies = {}
  278. # 定义处理DAG失败的回调函数
  279. def handle_dag_failure(context):
  280. logger.error(f"DAG执行失败: {context.get('exception')}")
  281. # 创建DAG
  282. with DAG(
  283. "dag_dataops_unified_data_scheduler",
  284. start_date=datetime(2024, 1, 1),
  285. # 修改调度间隔为每15分钟检查一次,以便及时响应执行计划变化
  286. schedule_interval="*/10 * * * *",
  287. catchup=False,
  288. default_args={
  289. 'owner': 'airflow',
  290. 'depends_on_past': False,
  291. 'email_on_failure': False,
  292. 'email_on_retry': False,
  293. 'retries': 1,
  294. 'retry_delay': timedelta(minutes=5)
  295. },
  296. on_failure_callback=handle_dag_failure,
  297. # 添加DAG级别参数,确保任务运行时有正确的环境
  298. params={
  299. "scripts_path": SCRIPTS_BASE_PATH,
  300. "airflow_base_path": AIRFLOW_BASE_PATH
  301. }
  302. ) as dag:
  303. # 准备执行计划
  304. prepare_plan = PythonOperator(
  305. task_id="prepare_execution_plan",
  306. python_callable=prepare_unified_execution_plan,
  307. provide_context=True,
  308. dag=dag
  309. )
  310. # 处理完成标记
  311. processing_completed = EmptyOperator(
  312. task_id="processing_completed",
  313. trigger_rule="none_failed_min_one_success", # 只要有一个任务成功且没有失败的任务就标记为完成
  314. dag=dag
  315. )
  316. # 任务字典,用于设置依赖关系
  317. task_dict = {}
  318. # 添加一个空任务作为下游任务的起始点,确保即使没有资源表和模型表,DAG也能正常执行
  319. start_processing = EmptyOperator(
  320. task_id="start_processing",
  321. dag=dag
  322. )
  323. # 设置基本依赖
  324. prepare_plan >> start_processing
  325. # 1. 预先创建资源表任务
  326. for task_info in resource_tasks:
  327. table_name = task_info["target_table"]
  328. script_name = task_info["script_name"]
  329. exec_mode = task_info.get("script_exec_mode", "append")
  330. # 创建安全的任务ID
  331. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  332. task_id = f"resource_{safe_table_name}"
  333. resource_task = PythonOperator(
  334. task_id=task_id,
  335. python_callable=process_resource,
  336. op_kwargs={
  337. "target_table": table_name,
  338. "script_name": script_name,
  339. "script_exec_mode": exec_mode,
  340. "exec_date": exec_date # 直接使用解析出的exec_date,不使用XCom
  341. },
  342. retries=TASK_RETRY_CONFIG["retries"],
  343. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
  344. dag=dag
  345. )
  346. # 将任务添加到字典
  347. task_dict[table_name] = resource_task
  348. # 设置与start_processing的依赖
  349. start_processing >> resource_task
  350. logger.info(f"设置基本依赖: start_processing >> {task_id}")
  351. # 创建有向图,用于检测模型表之间的依赖关系
  352. G = nx.DiGraph()
  353. # 将所有模型表添加为节点
  354. for task_info in model_tasks:
  355. table_name = task_info["target_table"]
  356. G.add_node(table_name)
  357. # 添加模型表之间的依赖边
  358. for source, deps in dependencies.items():
  359. for dep in deps:
  360. if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
  361. G.add_edge(dep.get("table_name"), source) # 依赖方向:依赖项 -> 目标
  362. # 检测循环依赖并处理
  363. cycles = list(nx.simple_cycles(G))
  364. if cycles:
  365. logger.warning(f"检测到循环依赖: {cycles}")
  366. for cycle in cycles:
  367. G.remove_edge(cycle[-1], cycle[0])
  368. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  369. # 生成拓扑排序,确定执行顺序
  370. execution_order = []
  371. try:
  372. execution_order = list(nx.topological_sort(G))
  373. logger.info(f"预加载计算的执行顺序: {execution_order}")
  374. except Exception as e:
  375. logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
  376. execution_order = [task_info["target_table"] for task_info in model_tasks]
  377. # 2. 预先创建模型表任务
  378. for table_name in execution_order:
  379. task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
  380. if not task_info:
  381. continue
  382. script_name = task_info["script_name"]
  383. exec_mode = task_info.get("script_exec_mode", "append")
  384. # 创建安全的任务ID
  385. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  386. task_id = f"model_{safe_table_name}"
  387. model_task = PythonOperator(
  388. task_id=task_id,
  389. python_callable=process_model,
  390. op_kwargs={
  391. "target_table": table_name,
  392. "script_name": script_name,
  393. "script_exec_mode": exec_mode,
  394. "exec_date": exec_date # 直接使用解析出的exec_date,不使用XCom
  395. },
  396. retries=TASK_RETRY_CONFIG["retries"],
  397. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
  398. dag=dag
  399. )
  400. # 将任务添加到字典
  401. task_dict[table_name] = model_task
  402. # 设置依赖关系
  403. deps = dependencies.get(table_name, [])
  404. has_dependency = False
  405. # 处理模型表之间的依赖
  406. for dep in deps:
  407. dep_table = dep.get("table_name")
  408. dep_type = dep.get("table_type")
  409. if dep_table in task_dict:
  410. task_dict[dep_table] >> model_task
  411. has_dependency = True
  412. logger.info(f"预先设置依赖: {dep_table} >> {table_name}")
  413. # 如果没有依赖,则依赖于start_processing和资源表任务
  414. if not has_dependency:
  415. # 从start_processing任务直接连接
  416. start_processing >> model_task
  417. logger.info(f"设置基本依赖: start_processing >> {task_id}")
  418. # 同时从所有资源表任务连接 - 限制每个模型表最多依赖5个资源表,避免过度复杂的依赖关系
  419. resource_count = 0
  420. for resource_table in resource_tasks:
  421. if resource_count >= 5:
  422. break
  423. resource_name = resource_table["target_table"]
  424. if resource_name in task_dict:
  425. task_dict[resource_name] >> model_task
  426. logger.info(f"预先设置资源依赖: {resource_name} >> {table_name}")
  427. resource_count += 1
  428. # 找出所有终端任务(没有下游依赖的任务)
  429. terminal_tasks = []
  430. # 检查所有模型表任务
  431. for table_name in execution_order:
  432. # 检查是否有下游任务
  433. has_downstream = False
  434. for source, deps in dependencies.items():
  435. if source == table_name: # 跳过自身
  436. continue
  437. for dep in deps:
  438. if dep.get("table_name") == table_name:
  439. has_downstream = True
  440. break
  441. if has_downstream:
  442. break
  443. # 如果没有下游任务,添加到终端任务列表
  444. if not has_downstream and table_name in task_dict:
  445. terminal_tasks.append(table_name)
  446. # 如果没有模型表任务,将所有资源表任务视为终端任务
  447. if not model_tasks and resource_tasks:
  448. terminal_tasks = [task["target_table"] for task in resource_tasks]
  449. # 如果既没有模型表任务也没有资源表任务,直接连接start_processing到完成标记
  450. if not terminal_tasks:
  451. start_processing >> processing_completed
  452. logger.warning("未找到任何任务,直接连接start_processing到完成标记")
  453. else:
  454. # 将所有终端任务连接到完成标记
  455. for table_name in terminal_tasks:
  456. if table_name in task_dict:
  457. task_dict[table_name] >> processing_completed
  458. logger.info(f"设置终端任务: {table_name} >> processing_completed")
  459. logger.info(f"DAG dag_dataops_unified_data_scheduler 定义完成,预创建了 {len(task_dict)} 个任务")