dag_dataops_unified_data_scheduler.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. # dag_dataops_unified_data_scheduler.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from airflow.sensors.external_task import ExternalTaskSensor
  6. from datetime import datetime, timedelta, date
  7. import logging
  8. import networkx as nx
  9. import json
  10. from common import (
  11. get_pg_conn,
  12. get_neo4j_driver,
  13. execute_with_monitoring,
  14. get_today_date
  15. )
  16. from config import TASK_RETRY_CONFIG
  17. # 创建日志记录器
  18. logger = logging.getLogger(__name__)
  19. def get_latest_date():
  20. """获取数据库中包含记录的最近日期"""
  21. conn = get_pg_conn()
  22. cursor = conn.cursor()
  23. try:
  24. cursor.execute("""
  25. SELECT DISTINCT exec_date
  26. FROM airflow_dag_schedule
  27. ORDER BY exec_date DESC
  28. LIMIT 1
  29. """)
  30. result = cursor.fetchone()
  31. if result:
  32. latest_date = result[0]
  33. logger.info(f"找到最近的包含记录的日期: {latest_date}")
  34. return latest_date
  35. else:
  36. logger.warning("未找到包含记录的日期,将使用当前日期")
  37. return get_today_date()
  38. except Exception as e:
  39. logger.error(f"查找最近日期时出错: {str(e)}")
  40. return get_today_date()
  41. finally:
  42. cursor.close()
  43. conn.close()
  44. def get_all_tasks(exec_date):
  45. """获取所有需要执行的任务(DataResource和DataModel)"""
  46. conn = get_pg_conn()
  47. cursor = conn.cursor()
  48. try:
  49. # 查询所有资源表任务
  50. cursor.execute("""
  51. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  52. FROM airflow_dag_schedule
  53. WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
  54. """, (exec_date,))
  55. resource_results = cursor.fetchall()
  56. # 查询所有模型表任务
  57. cursor.execute("""
  58. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  59. FROM airflow_dag_schedule
  60. WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
  61. """, (exec_date,))
  62. model_results = cursor.fetchall()
  63. # 整理资源表信息
  64. resource_tasks = []
  65. for row in resource_results:
  66. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  67. if script_name: # 确保脚本名称不为空
  68. resource_tasks.append({
  69. "source_table": source_table,
  70. "target_table": target_table,
  71. "target_table_label": target_table_label,
  72. "script_name": script_name,
  73. "script_exec_mode": script_exec_mode or "append"
  74. })
  75. # 整理模型表信息
  76. model_tasks = []
  77. for row in model_results:
  78. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  79. if script_name: # 确保脚本名称不为空
  80. model_tasks.append({
  81. "source_table": source_table,
  82. "target_table": target_table,
  83. "target_table_label": target_table_label,
  84. "script_name": script_name,
  85. "script_exec_mode": script_exec_mode or "append"
  86. })
  87. logger.info(f"获取到 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  88. return resource_tasks, model_tasks
  89. except Exception as e:
  90. logger.error(f"获取任务信息时出错: {str(e)}")
  91. return [], []
  92. finally:
  93. cursor.close()
  94. conn.close()
  95. def get_table_dependencies(table_names):
  96. """获取表之间的依赖关系"""
  97. driver = get_neo4j_driver()
  98. dependency_dict = {name: [] for name in table_names}
  99. try:
  100. with driver.session() as session:
  101. # 获取所有模型表之间的依赖关系
  102. query = """
  103. MATCH (source:DataModel)-[:DERIVED_FROM]->(target)
  104. WHERE source.en_name IN $table_names
  105. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  106. """
  107. result = session.run(query, table_names=table_names)
  108. for record in result:
  109. source = record.get("source")
  110. target = record.get("target")
  111. target_labels = record.get("target_labels", [])
  112. if source and target:
  113. # 将目标表添加到源表的依赖列表中
  114. dependency_dict[source].append({
  115. "table_name": target,
  116. "table_type": next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  117. })
  118. logger.debug(f"依赖关系: {source} 依赖于 {target}")
  119. except Exception as e:
  120. logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
  121. finally:
  122. driver.close()
  123. return dependency_dict
  124. def json_serial(obj):
  125. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  126. if isinstance(obj, (datetime, date)):
  127. return obj.isoformat()
  128. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  129. def prepare_unified_execution_plan(**kwargs):
  130. """准备统一执行计划的主函数"""
  131. # 获取执行日期
  132. exec_date = get_latest_date()
  133. logger.info(f"使用执行日期: {exec_date}")
  134. # 获取所有任务
  135. resource_tasks, model_tasks = get_all_tasks(exec_date)
  136. if not resource_tasks and not model_tasks:
  137. logger.warning(f"执行日期 {exec_date} 没有找到任务")
  138. return 0
  139. # 为所有模型表获取依赖关系
  140. model_table_names = [task["target_table"] for task in model_tasks]
  141. dependencies = get_table_dependencies(model_table_names)
  142. # 创建执行计划
  143. execution_plan = {
  144. "exec_date": exec_date,
  145. "resource_tasks": resource_tasks,
  146. "model_tasks": model_tasks,
  147. "dependencies": dependencies
  148. }
  149. # 将执行计划保存到XCom,使用自定义序列化器处理日期对象
  150. kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
  151. logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  152. return len(resource_tasks) + len(model_tasks)
  153. def process_resource(target_table, script_name, script_exec_mode, exec_date):
  154. """处理单个资源表"""
  155. logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
  156. # 检查exec_date是否是JSON字符串
  157. if isinstance(exec_date, str) and exec_date.startswith('{'):
  158. try:
  159. # 尝试解析JSON字符串
  160. exec_date_data = json.loads(exec_date)
  161. exec_date = exec_date_data.get("exec_date")
  162. logger.info(f"从JSON中提取执行日期: {exec_date}")
  163. except Exception as e:
  164. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  165. return execute_with_monitoring(
  166. target_table=target_table,
  167. script_name=script_name,
  168. script_exec_mode=script_exec_mode,
  169. exec_date=exec_date
  170. )
  171. def process_model(target_table, script_name, script_exec_mode, exec_date):
  172. """处理单个模型表"""
  173. logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
  174. # 检查exec_date是否是JSON字符串
  175. if isinstance(exec_date, str) and exec_date.startswith('{'):
  176. try:
  177. # 尝试解析JSON字符串
  178. exec_date_data = json.loads(exec_date)
  179. exec_date = exec_date_data.get("exec_date")
  180. logger.info(f"从JSON中提取执行日期: {exec_date}")
  181. except Exception as e:
  182. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  183. return execute_with_monitoring(
  184. target_table=target_table,
  185. script_name=script_name,
  186. script_exec_mode=script_exec_mode,
  187. exec_date=exec_date
  188. )
  189. # 创建DAG
  190. with DAG(
  191. "dag_dataops_unified_data_scheduler",
  192. start_date=datetime(2024, 1, 1),
  193. schedule_interval="@daily",
  194. catchup=False,
  195. default_args={
  196. 'owner': 'airflow',
  197. 'depends_on_past': False,
  198. 'email_on_failure': False,
  199. 'email_on_retry': False,
  200. 'retries': 1,
  201. 'retry_delay': timedelta(minutes=5)
  202. }
  203. ) as dag:
  204. # 等待准备DAG完成
  205. wait_for_prepare = ExternalTaskSensor(
  206. task_id="wait_for_prepare",
  207. external_dag_id="dag_dataops_unified_prepare_scheduler",
  208. external_task_id="preparation_completed",
  209. mode="poke",
  210. timeout=3600,
  211. poke_interval=30,
  212. dag=dag
  213. )
  214. # 准备执行计划
  215. prepare_plan = PythonOperator(
  216. task_id="prepare_execution_plan",
  217. python_callable=prepare_unified_execution_plan,
  218. provide_context=True,
  219. dag=dag
  220. )
  221. # 处理完成标记
  222. processing_completed = EmptyOperator(
  223. task_id="processing_completed",
  224. dag=dag
  225. )
  226. # 设置初始任务依赖
  227. wait_for_prepare >> prepare_plan
  228. # 从执行计划JSON中获取信息
  229. execution_plan_json = '''{"exec_date": "2025-04-12", "resource_tasks": [], "model_tasks": [], "dependencies": {}}'''
  230. try:
  231. # 从文件或数据库中获取执行计划作为默认值
  232. import os
  233. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  234. if os.path.exists(plan_path):
  235. with open(plan_path, 'r') as f:
  236. execution_plan_json = f.read()
  237. except Exception as e:
  238. logger.warning(f"读取执行计划默认值时出错: {str(e)}")
  239. # 解析执行计划获取任务信息
  240. try:
  241. execution_plan = json.loads(execution_plan_json)
  242. exec_date = execution_plan.get("exec_date", get_today_date())
  243. resource_tasks = execution_plan.get("resource_tasks", [])
  244. model_tasks = execution_plan.get("model_tasks", [])
  245. dependencies = execution_plan.get("dependencies", {})
  246. # 任务字典,用于设置依赖关系
  247. task_dict = {}
  248. # 1. 创建资源表任务
  249. for task_info in resource_tasks:
  250. table_name = task_info["target_table"]
  251. script_name = task_info["script_name"]
  252. exec_mode = task_info.get("script_exec_mode", "append")
  253. # 创建安全的任务ID
  254. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  255. task_id = f"resource_{safe_table_name}"
  256. resource_task = PythonOperator(
  257. task_id=task_id,
  258. python_callable=process_resource,
  259. op_kwargs={
  260. "target_table": table_name,
  261. "script_name": script_name,
  262. "script_exec_mode": exec_mode,
  263. "exec_date": """{{ ti.xcom_pull(task_ids='prepare_execution_plan', key='execution_plan') }}"""
  264. },
  265. retries=TASK_RETRY_CONFIG["retries"],
  266. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
  267. dag=dag
  268. )
  269. # 将任务添加到字典
  270. task_dict[table_name] = resource_task
  271. # 设置与prepare_plan的依赖
  272. prepare_plan >> resource_task
  273. # 创建有向图,用于检测模型表之间的依赖关系
  274. G = nx.DiGraph()
  275. # 将所有模型表添加为节点
  276. for task_info in model_tasks:
  277. table_name = task_info["target_table"]
  278. G.add_node(table_name)
  279. # 添加模型表之间的依赖边
  280. for source, deps in dependencies.items():
  281. for dep in deps:
  282. if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
  283. G.add_edge(dep.get("table_name"), source) # 依赖方向:依赖项 -> 目标
  284. # 检测循环依赖并处理
  285. cycles = list(nx.simple_cycles(G))
  286. if cycles:
  287. logger.warning(f"检测到循环依赖: {cycles}")
  288. for cycle in cycles:
  289. G.remove_edge(cycle[-1], cycle[0])
  290. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  291. # 生成拓扑排序,确定执行顺序
  292. try:
  293. execution_order = list(nx.topological_sort(G))
  294. logger.info(f"计算出的执行顺序: {execution_order}")
  295. except Exception as e:
  296. logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
  297. execution_order = [task_info["target_table"] for task_info in model_tasks]
  298. # 2. 按拓扑排序顺序创建模型表任务
  299. for table_name in execution_order:
  300. task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
  301. if not task_info:
  302. continue
  303. script_name = task_info["script_name"]
  304. exec_mode = task_info.get("script_exec_mode", "append")
  305. # 创建安全的任务ID
  306. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  307. task_id = f"model_{safe_table_name}"
  308. model_task = PythonOperator(
  309. task_id=task_id,
  310. python_callable=process_model,
  311. op_kwargs={
  312. "target_table": table_name,
  313. "script_name": script_name,
  314. "script_exec_mode": exec_mode,
  315. "exec_date": """{{ ti.xcom_pull(task_ids='prepare_execution_plan', key='execution_plan') }}"""
  316. },
  317. retries=TASK_RETRY_CONFIG["retries"],
  318. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
  319. dag=dag
  320. )
  321. # 将任务添加到字典
  322. task_dict[table_name] = model_task
  323. # 设置依赖关系
  324. deps = dependencies.get(table_name, [])
  325. has_dependency = False
  326. # 处理模型表之间的依赖
  327. for dep in deps:
  328. dep_table = dep.get("table_name")
  329. dep_type = dep.get("table_type")
  330. if dep_table in task_dict:
  331. task_dict[dep_table] >> model_task
  332. has_dependency = True
  333. logger.info(f"设置依赖: {dep_table} >> {table_name}")
  334. # 如果没有依赖,则依赖于准备任务和所有资源表任务
  335. if not has_dependency:
  336. # 从prepare_plan任务直接连接
  337. prepare_plan >> model_task
  338. # 同时从所有资源表任务连接
  339. for resource_table in resource_tasks:
  340. resource_name = resource_table["target_table"]
  341. if resource_name in task_dict:
  342. task_dict[resource_name] >> model_task
  343. logger.info(f"设置资源依赖: {resource_name} >> {table_name}")
  344. # 找出所有终端任务(没有下游依赖的任务)
  345. terminal_tasks = []
  346. # 检查所有模型表任务
  347. for table_name in execution_order:
  348. # 检查是否有下游任务
  349. has_downstream = False
  350. for source, deps in dependencies.items():
  351. if source == table_name: # 跳过自身
  352. continue
  353. for dep in deps:
  354. if dep.get("table_name") == table_name:
  355. has_downstream = True
  356. break
  357. if has_downstream:
  358. break
  359. # 如果没有下游任务,添加到终端任务列表
  360. if not has_downstream and table_name in task_dict:
  361. terminal_tasks.append(table_name)
  362. # 如果没有模型表任务,将所有资源表任务视为终端任务
  363. if not model_tasks and resource_tasks:
  364. terminal_tasks = [task["target_table"] for task in resource_tasks]
  365. # 如果既没有模型表任务也没有资源表任务,直接连接准备任务到完成标记
  366. if not terminal_tasks:
  367. prepare_plan >> processing_completed
  368. logger.warning("未找到任何任务,直接连接准备任务到完成标记")
  369. else:
  370. # 将所有终端任务连接到完成标记
  371. for table_name in terminal_tasks:
  372. if table_name in task_dict:
  373. task_dict[table_name] >> processing_completed
  374. logger.info(f"设置终端任务: {table_name} >> processing_completed")
  375. except Exception as e:
  376. logger.error(f"构建任务DAG时出错: {str(e)}")
  377. import traceback
  378. logger.error(traceback.format_exc())
  379. # 确保即使出错,DAG也能正常完成
  380. prepare_plan >> processing_completed