dataops_productline_execute_dag.py 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134
  1. """
  2. 统一数据产品线执行器 DAG
  3. 功能:
  4. 1. 面向脚本的作业编排,不再是面向表
  5. 2. 基于dataops_productline_prepare_dag生成的执行计划执行脚本
  6. 3. 支持对脚本执行顺序的优化
  7. 4. 提供详细的执行日志和错误处理
  8. """
  9. from airflow import DAG
  10. from airflow.operators.python import PythonOperator, ShortCircuitOperator
  11. from airflow.operators.empty import EmptyOperator
  12. from airflow.utils.task_group import TaskGroup
  13. from datetime import datetime, timedelta, date
  14. import logging
  15. import networkx as nx
  16. import json
  17. import os
  18. import pendulum
  19. from decimal import Decimal
  20. from common import (
  21. get_pg_conn,
  22. get_neo4j_driver,
  23. get_today_date
  24. )
  25. from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG
  26. import pytz
  27. import pandas as pd
  28. import sys
  29. # 创建日志记录器
  30. logger = logging.getLogger(__name__)
  31. # 开启详细诊断日志记录
  32. ENABLE_DEBUG_LOGGING = True
  33. def log_debug(message):
  34. """记录调试日志,但只在启用调试模式时"""
  35. if ENABLE_DEBUG_LOGGING:
  36. logger.info(f"[DEBUG] {message}")
  37. # 在DAG启动时输出诊断信息
  38. log_debug("======== 诊断信息 ========")
  39. log_debug(f"当前工作目录: {os.getcwd()}")
  40. log_debug(f"SCRIPTS_BASE_PATH: {SCRIPTS_BASE_PATH}")
  41. log_debug(f"导入的common模块路径: {get_pg_conn.__module__}")
  42. #############################################
  43. # 通用工具函数
  44. #############################################
  45. def json_serial(obj):
  46. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  47. if isinstance(obj, (datetime, date)):
  48. return obj.isoformat()
  49. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  50. # 添加自定义JSON编码器解决Decimal序列化问题
  51. class DecimalEncoder(json.JSONEncoder):
  52. def default(self, obj):
  53. if isinstance(obj, Decimal):
  54. return float(obj)
  55. # 处理日期类型
  56. elif isinstance(obj, (datetime, date)):
  57. return obj.isoformat()
  58. # 让父类处理其他类型
  59. return super(DecimalEncoder, self).default(obj)
  60. def get_cn_exec_date(logical_date):
  61. """
  62. 获取逻辑执行日期
  63. 参数:
  64. logical_date: 逻辑执行日期,UTC时间
  65. 返回:
  66. logical_exec_date: 逻辑执行日期,北京时间
  67. """
  68. # 获取逻辑执行日期
  69. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  70. exec_date = local_logical_date.strftime('%Y-%m-%d')
  71. return exec_date
  72. #############################################
  73. # 脚本执行函数
  74. #############################################
  75. def execute_script(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
  76. """
  77. 执行单个脚本并返回执行结果
  78. 参数:
  79. script_id: 脚本ID
  80. script_name: 脚本文件名
  81. target_table: 目标表名
  82. exec_date: 执行日期
  83. script_exec_mode: 执行模式
  84. **kwargs: 其他参数,如source_tables、target_type等
  85. 返回:
  86. bool: 脚本执行结果
  87. """
  88. # 添加详细日志
  89. logger.info(f"===== 开始执行脚本 {script_id} =====")
  90. logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
  91. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  92. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  93. logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
  94. logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
  95. # 记录额外参数
  96. for key, value in kwargs.items():
  97. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  98. # 检查script_name是否为空
  99. if not script_name:
  100. logger.error(f"脚本ID {script_id} 的script_name为空,无法执行")
  101. return False
  102. # 记录执行开始时间
  103. start_time = datetime.now()
  104. try:
  105. # 导入和执行脚本模块
  106. import importlib.util
  107. import sys
  108. script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
  109. # 获取脚本类型
  110. script_type = kwargs.get('script_type', 'python_script')
  111. # 只有当脚本类型为 sql_script 或 python_script 时才检查文件是否存在
  112. if script_type in ['sql_script', 'python_script']:
  113. if not os.path.exists(script_path):
  114. logger.error(f"脚本文件不存在: {script_path}")
  115. return False
  116. # 动态导入模块
  117. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  118. module = importlib.util.module_from_spec(spec)
  119. spec.loader.exec_module(module)
  120. else:
  121. # 对于其他类型,例如默认python类型,我们将使用其他执行方式
  122. logger.info(f"脚本类型为 {script_type},不检查脚本文件是否存在")
  123. # 这里我们将直接退出,因为对于python类型应使用execute_python_script函数
  124. logger.error(f"脚本类型为 {script_type},应使用专用函数执行,而不是execute_script")
  125. return False
  126. # 检查并调用标准入口函数run
  127. if hasattr(module, "run"):
  128. logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
  129. # 构建完整的参数字典
  130. run_params = {
  131. "table_name": target_table,
  132. "execution_mode": script_exec_mode,
  133. "exec_date": exec_date
  134. }
  135. ## 添加可能的额外参数
  136. for key in ['target_type', 'storage_location', 'frequency', 'source_tables']:
  137. if key in kwargs and kwargs[key] is not None:
  138. run_params[key] = kwargs[key]
  139. # 调用脚本的run函数
  140. logger.info(f"调用run函数并传递参数: {run_params}")
  141. result = module.run(**run_params)
  142. logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  143. # 确保result是布尔值
  144. if result is None:
  145. logger.warning(f"脚本返回值为None,转换为False")
  146. result = False
  147. elif not isinstance(result, bool):
  148. original_result = result
  149. result = bool(result)
  150. logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  151. # 记录结束时间和结果
  152. end_time = datetime.now()
  153. duration = (end_time - start_time).total_seconds()
  154. logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  155. return result
  156. else:
  157. logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
  158. return False
  159. except Exception as e:
  160. # 处理异常
  161. logger.error(f"执行脚本 {script_id} 出错: {str(e)}")
  162. end_time = datetime.now()
  163. duration = (end_time - start_time).total_seconds()
  164. logger.error(f"脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  165. logger.info(f"===== 脚本执行异常结束 =====")
  166. import traceback
  167. logger.error(traceback.format_exc())
  168. # 确保不会阻塞DAG
  169. return False
  170. def execute_sql_script(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
  171. """
  172. 执行SQL脚本并返回执行结果
  173. 参数:
  174. script_id: 脚本ID
  175. script_name: 脚本名称
  176. target_table: 目标表名
  177. exec_date: 执行日期
  178. script_exec_mode: 执行模式
  179. **kwargs: 其他参数
  180. 返回:
  181. bool: 脚本执行结果
  182. """
  183. # 添加详细日志
  184. logger.info(f"===== 开始执行SQL脚本 {script_id} =====")
  185. logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
  186. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  187. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  188. logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
  189. logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
  190. # 记录额外参数
  191. for key, value in kwargs.items():
  192. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  193. # 记录执行开始时间
  194. start_time = datetime.now()
  195. try:
  196. # 导入和执行execution_sql模块
  197. import importlib.util
  198. import sys
  199. exec_sql_path = os.path.join(SCRIPTS_BASE_PATH, "execution_sql.py")
  200. # 对于SQL类型的脚本,我们不检查它是否作为文件存在
  201. # 但是我们需要检查execution_sql.py是否存在
  202. if not os.path.exists(exec_sql_path):
  203. logger.error(f"SQL执行脚本文件不存在: {exec_sql_path}")
  204. return False
  205. # 动态导入execution_sql模块
  206. try:
  207. spec = importlib.util.spec_from_file_location("execution_sql", exec_sql_path)
  208. exec_sql_module = importlib.util.module_from_spec(spec)
  209. spec.loader.exec_module(exec_sql_module)
  210. logger.info(f"成功导入 execution_sql 模块")
  211. except Exception as import_err:
  212. logger.error(f"导入 execution_sql 模块时出错: {str(import_err)}")
  213. import traceback
  214. logger.error(traceback.format_exc())
  215. return False
  216. # 检查并调用标准入口函数run
  217. if hasattr(exec_sql_module, "run"):
  218. logger.info(f"调用执行SQL脚本的标准入口函数 run()")
  219. # 获取频率参数
  220. frequency = kwargs.get('frequency', 'daily') # 默认为daily
  221. # 构建完整的参数字典
  222. run_params = {
  223. "script_type": "sql",
  224. "target_table": target_table,
  225. "script_name": script_name,
  226. "exec_date": exec_date,
  227. "frequency": frequency,
  228. "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签,用于ETL幂等性判断
  229. "execution_mode": script_exec_mode # 传递执行模式参数
  230. }
  231. # 添加可能的额外参数
  232. for key in ['target_type', 'storage_location', 'source_tables']:
  233. if key in kwargs and kwargs[key] is not None:
  234. run_params[key] = kwargs[key]
  235. # 调用execution_sql.py的run函数
  236. logger.info(f"调用SQL执行脚本的run函数并传递参数: {run_params}")
  237. result = exec_sql_module.run(**run_params)
  238. logger.info(f"SQL脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  239. # 确保result是布尔值
  240. if result is None:
  241. logger.warning(f"SQL脚本返回值为None,转换为False")
  242. result = False
  243. elif not isinstance(result, bool):
  244. original_result = result
  245. result = bool(result)
  246. logger.warning(f"SQL脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  247. # 记录结束时间和结果
  248. end_time = datetime.now()
  249. duration = (end_time - start_time).total_seconds()
  250. logger.info(f"SQL脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  251. return result
  252. else:
  253. logger.error(f"执行SQL脚本 execution_sql.py 中未定义标准入口函数 run(),无法执行")
  254. return False
  255. except Exception as e:
  256. # 处理异常
  257. logger.error(f"执行SQL脚本 {script_id} 出错: {str(e)}")
  258. end_time = datetime.now()
  259. duration = (end_time - start_time).total_seconds()
  260. logger.error(f"SQL脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  261. logger.info(f"===== SQL脚本执行异常结束 =====")
  262. import traceback
  263. logger.error(traceback.format_exc())
  264. # 确保不会阻塞DAG
  265. return False
  266. # 重命名此函数为execute_python_script
  267. def execute_python_script(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
  268. """
  269. 执行Python脚本文件并返回执行结果
  270. 参数:
  271. script_id: 脚本ID
  272. script_name: 脚本文件名(.py文件)
  273. target_table: 目标表名
  274. exec_date: 执行日期
  275. script_exec_mode: 执行模式
  276. **kwargs: 其他参数,如source_tables、target_type等
  277. 返回:
  278. bool: 脚本执行结果
  279. """
  280. # 添加详细日志
  281. logger.info(f"===== 开始执行Python脚本文件 {script_id} =====")
  282. logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
  283. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  284. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  285. logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
  286. logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
  287. # 记录额外参数
  288. for key, value in kwargs.items():
  289. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  290. # 检查script_name是否为空
  291. if not script_name:
  292. logger.error(f"脚本ID {script_id} 的script_name为空,无法执行")
  293. return False
  294. # 记录执行开始时间
  295. start_time = datetime.now()
  296. try:
  297. # 导入和执行脚本模块
  298. import importlib.util
  299. import sys
  300. script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
  301. if not os.path.exists(script_path):
  302. logger.error(f"脚本文件不存在: {script_path}")
  303. return False
  304. # 动态导入模块
  305. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  306. module = importlib.util.module_from_spec(spec)
  307. spec.loader.exec_module(module)
  308. # 检查并调用标准入口函数run
  309. if hasattr(module, "run"):
  310. logger.info(f"调用脚本文件 {script_name} 的标准入口函数 run()")
  311. # 构建完整的参数字典
  312. run_params = {
  313. "table_name": target_table,
  314. "execution_mode": script_exec_mode,
  315. "exec_date": exec_date
  316. }
  317. ## 添加可能的额外参数
  318. for key in ['target_type', 'storage_location', 'frequency', 'source_tables']:
  319. if key in kwargs and kwargs[key] is not None:
  320. run_params[key] = kwargs[key]
  321. # 调用脚本的run函数
  322. logger.info(f"调用run函数并传递参数: {run_params}")
  323. result = module.run(**run_params)
  324. logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  325. # 确保result是布尔值
  326. if result is None:
  327. logger.warning(f"脚本返回值为None,转换为False")
  328. result = False
  329. elif not isinstance(result, bool):
  330. original_result = result
  331. result = bool(result)
  332. logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  333. # 记录结束时间和结果
  334. end_time = datetime.now()
  335. duration = (end_time - start_time).total_seconds()
  336. logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  337. return result
  338. else:
  339. logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
  340. return False
  341. except Exception as e:
  342. # 处理异常
  343. logger.error(f"执行脚本 {script_id} 出错: {str(e)}")
  344. end_time = datetime.now()
  345. duration = (end_time - start_time).total_seconds()
  346. logger.error(f"脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  347. logger.info(f"===== 脚本执行异常结束 =====")
  348. import traceback
  349. logger.error(traceback.format_exc())
  350. # 确保不会阻塞DAG
  351. return False
  352. # 使用execute_sql函数代替之前的execute_sql_script
  353. def execute_sql(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
  354. """
  355. 执行SQL脚本并返回执行结果
  356. 参数:
  357. script_id: 脚本ID
  358. script_name: 脚本名称(数据库中的名称)
  359. target_table: 目标表名
  360. exec_date: 执行日期
  361. script_exec_mode: 执行模式
  362. **kwargs: 其他参数
  363. 返回:
  364. bool: 脚本执行结果
  365. """
  366. # 添加详细日志
  367. logger.info(f"===== 开始执行SQL脚本 {script_id} =====")
  368. logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
  369. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  370. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  371. logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
  372. logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
  373. # 记录额外参数
  374. for key, value in kwargs.items():
  375. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  376. # 记录执行开始时间
  377. start_time = datetime.now()
  378. try:
  379. # 导入和执行execution_sql模块
  380. import importlib.util
  381. import sys
  382. exec_sql_path = os.path.join(SCRIPTS_BASE_PATH, "execution_sql.py")
  383. # 对于SQL类型的脚本,我们不检查它是否作为文件存在
  384. # 但是我们需要检查execution_sql.py是否存在
  385. if not os.path.exists(exec_sql_path):
  386. logger.error(f"SQL执行脚本文件不存在: {exec_sql_path}")
  387. return False
  388. # 动态导入execution_sql模块
  389. try:
  390. spec = importlib.util.spec_from_file_location("execution_sql", exec_sql_path)
  391. exec_sql_module = importlib.util.module_from_spec(spec)
  392. spec.loader.exec_module(exec_sql_module)
  393. logger.info(f"成功导入 execution_sql 模块")
  394. except Exception as import_err:
  395. logger.error(f"导入 execution_sql 模块时出错: {str(import_err)}")
  396. import traceback
  397. logger.error(traceback.format_exc())
  398. return False
  399. # 检查并调用标准入口函数run
  400. if hasattr(exec_sql_module, "run"):
  401. logger.info(f"调用执行SQL脚本的标准入口函数 run()")
  402. # 获取频率参数
  403. frequency = kwargs.get('frequency', 'daily') # 默认为daily
  404. # 构建完整的参数字典
  405. run_params = {
  406. "script_type": "sql",
  407. "target_table": target_table,
  408. "script_name": script_name,
  409. "exec_date": exec_date,
  410. "frequency": frequency,
  411. "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签,用于ETL幂等性判断
  412. "execution_mode": script_exec_mode # 传递执行模式参数
  413. }
  414. # 添加可能的额外参数
  415. for key in ['target_type', 'storage_location', 'source_tables']:
  416. if key in kwargs and kwargs[key] is not None:
  417. run_params[key] = kwargs[key]
  418. # 调用execution_sql.py的run函数
  419. logger.info(f"调用SQL执行脚本的run函数并传递参数: {run_params}")
  420. result = exec_sql_module.run(**run_params)
  421. logger.info(f"SQL脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  422. # 确保result是布尔值
  423. if result is None:
  424. logger.warning(f"SQL脚本返回值为None,转换为False")
  425. result = False
  426. elif not isinstance(result, bool):
  427. original_result = result
  428. result = bool(result)
  429. logger.warning(f"SQL脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  430. # 记录结束时间和结果
  431. end_time = datetime.now()
  432. duration = (end_time - start_time).total_seconds()
  433. logger.info(f"SQL脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  434. return result
  435. else:
  436. logger.error(f"执行SQL脚本 execution_sql.py 中未定义标准入口函数 run(),无法执行")
  437. return False
  438. except Exception as e:
  439. # 处理异常
  440. logger.error(f"执行SQL脚本 {script_id} 出错: {str(e)}")
  441. end_time = datetime.now()
  442. duration = (end_time - start_time).total_seconds()
  443. logger.error(f"SQL脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  444. logger.info(f"===== SQL脚本执行异常结束 =====")
  445. import traceback
  446. logger.error(traceback.format_exc())
  447. # 确保不会阻塞DAG
  448. return False
  449. # 使用execute_python函数代替之前的execute_python_script
  450. def execute_python(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
  451. """
  452. 执行Python脚本并返回执行结果
  453. 参数:
  454. script_id: 脚本ID
  455. script_name: 脚本名称(数据库中的名称)
  456. target_table: 目标表名
  457. exec_date: 执行日期
  458. script_exec_mode: 执行模式
  459. **kwargs: 其他参数
  460. 返回:
  461. bool: 脚本执行结果
  462. """
  463. # 添加详细日志
  464. logger.info(f"===== 开始执行Python脚本 {script_id} =====")
  465. logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
  466. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  467. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  468. logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
  469. logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
  470. # 记录额外参数
  471. for key, value in kwargs.items():
  472. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  473. # 记录执行开始时间
  474. start_time = datetime.now()
  475. try:
  476. # 导入和执行execution_python模块
  477. import importlib.util
  478. import sys
  479. exec_python_path = os.path.join(SCRIPTS_BASE_PATH, "execution_python.py")
  480. # 对于Python类型的脚本,我们不检查它是否作为文件存在
  481. # 但是我们需要检查execution_python.py是否存在
  482. if not os.path.exists(exec_python_path):
  483. logger.error(f"Python执行脚本文件不存在: {exec_python_path}")
  484. return False
  485. # 动态导入execution_python模块
  486. try:
  487. spec = importlib.util.spec_from_file_location("execution_python", exec_python_path)
  488. exec_python_module = importlib.util.module_from_spec(spec)
  489. spec.loader.exec_module(exec_python_module)
  490. logger.info(f"成功导入 execution_python 模块")
  491. except Exception as import_err:
  492. logger.error(f"导入 execution_python 模块时出错: {str(import_err)}")
  493. import traceback
  494. logger.error(traceback.format_exc())
  495. return False
  496. # 检查并调用标准入口函数run
  497. if hasattr(exec_python_module, "run"):
  498. logger.info(f"调用执行Python脚本的标准入口函数 run()")
  499. # 获取频率参数
  500. frequency = kwargs.get('frequency', 'daily') # 默认为daily
  501. # 构建完整的参数字典
  502. run_params = {
  503. "script_type": "python",
  504. "target_table": target_table,
  505. "script_name": script_name,
  506. "exec_date": exec_date,
  507. "frequency": frequency,
  508. "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签
  509. "execution_mode": script_exec_mode # 传递执行模式参数
  510. }
  511. # 添加可能的额外参数
  512. for key in ['target_type', 'storage_location', 'source_tables']:
  513. if key in kwargs and kwargs[key] is not None:
  514. run_params[key] = kwargs[key]
  515. # 调用execution_python.py的run函数
  516. logger.info(f"调用Python执行脚本的run函数并传递参数: {run_params}")
  517. result = exec_python_module.run(**run_params)
  518. logger.info(f"Python脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  519. # 确保result是布尔值
  520. if result is None:
  521. logger.warning(f"Python脚本返回值为None,转换为False")
  522. result = False
  523. elif not isinstance(result, bool):
  524. original_result = result
  525. result = bool(result)
  526. logger.warning(f"Python脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  527. # 记录结束时间和结果
  528. end_time = datetime.now()
  529. duration = (end_time - start_time).total_seconds()
  530. logger.info(f"Python脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  531. return result
  532. else:
  533. logger.error(f"执行Python脚本 execution_python.py 中未定义标准入口函数 run(),无法执行")
  534. return False
  535. except Exception as e:
  536. # 处理异常
  537. logger.error(f"执行Python脚本 {script_id} 出错: {str(e)}")
  538. end_time = datetime.now()
  539. duration = (end_time - start_time).total_seconds()
  540. logger.error(f"Python脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  541. logger.info(f"===== Python脚本执行异常结束 =====")
  542. import traceback
  543. logger.error(traceback.format_exc())
  544. # 确保不会阻塞DAG
  545. return False
  546. #############################################
  547. # 执行计划获取和处理函数
  548. #############################################
  549. def get_execution_plan_from_db(ds):
  550. """
  551. 从数据库获取产品线执行计划
  552. 参数:
  553. ds (str): 执行日期,格式为'YYYY-MM-DD'
  554. 返回:
  555. dict: 执行计划字典,如果找不到则返回None
  556. """
  557. # 记录输入参数详细信息
  558. if isinstance(ds, datetime):
  559. if ds.tzinfo:
  560. logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 带时区: {ds.tzinfo}")
  561. else:
  562. logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 无时区")
  563. else:
  564. logger.debug(f"【执行日期】get_execution_plan_from_db接收到: {ds}, 类型: {type(ds)}")
  565. logger.info(f"尝试从数据库获取执行日期 {ds} 的产品线执行计划")
  566. conn = get_pg_conn()
  567. cursor = conn.cursor()
  568. execution_plan = None
  569. try:
  570. # 查询条件a: 当前日期=表的exec_date,如果有多条记录,取logical_date最大的一条
  571. cursor.execute("""
  572. SELECT plan
  573. FROM airflow_exec_plans
  574. WHERE dag_id = 'dataops_productline_prepare_dag' AND exec_date = %s
  575. ORDER BY logical_date DESC
  576. LIMIT 1
  577. """, (ds,))
  578. result = cursor.fetchone()
  579. if result:
  580. # 获取计划
  581. plan_json = result[0]
  582. # 处理plan_json可能已经是dict的情况
  583. if isinstance(plan_json, dict):
  584. execution_plan = plan_json
  585. else:
  586. execution_plan = json.loads(plan_json)
  587. logger.info(f"找到当前日期 exec_date={ds} 的执行计划记录")
  588. return execution_plan
  589. # 查询条件b: 找不到当前日期的记录,查找exec_date<当前ds的最新记录
  590. logger.info(f"未找到当前日期 exec_date={ds} 的执行计划记录,尝试查找历史记录")
  591. cursor.execute("""
  592. SELECT plan, exec_date
  593. FROM airflow_exec_plans
  594. WHERE dag_id = 'dataops_productline_prepare_dag' AND exec_date < %s
  595. ORDER BY exec_date DESC, logical_date DESC
  596. LIMIT 1
  597. """, (ds,))
  598. result = cursor.fetchone()
  599. if result:
  600. # 获取计划和exec_date
  601. plan_json, plan_ds = result
  602. # 处理plan_json可能已经是dict的情况
  603. if isinstance(plan_json, dict):
  604. execution_plan = plan_json
  605. else:
  606. execution_plan = json.loads(plan_json)
  607. logger.info(f"找到历史执行计划记录,exec_date: {plan_ds}")
  608. return execution_plan
  609. # 找不到任何执行计划记录
  610. logger.error(f"在数据库中未找到任何执行计划记录,当前DAG exec_date={ds}")
  611. return None
  612. except Exception as e:
  613. logger.error(f"从数据库获取执行计划时出错: {str(e)}")
  614. import traceback
  615. logger.error(traceback.format_exc())
  616. return None
  617. finally:
  618. cursor.close()
  619. conn.close()
  620. def check_execution_plan(**kwargs):
  621. """
  622. 检查执行计划是否存在且有效
  623. 返回False将阻止所有下游任务执行
  624. """
  625. dag_run = kwargs.get('dag_run')
  626. logical_date = dag_run.logical_date
  627. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  628. exec_date = local_logical_date.strftime('%Y-%m-%d')
  629. # 检查是否是手动触发
  630. is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
  631. if is_manual_trigger:
  632. logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
  633. # 记录重要的时间参数
  634. logger.info(f"【时间参数】check_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  635. logger.info("检查数据库中的执行计划是否存在且有效")
  636. # 从数据库获取执行计划
  637. execution_plan = get_execution_plan_from_db(exec_date)
  638. # 检查是否成功获取到执行计划
  639. if not execution_plan:
  640. logger.error(f"未找到执行日期 {exec_date} 的执行计划")
  641. return False
  642. # 检查执行计划是否包含必要字段
  643. if "exec_date" not in execution_plan:
  644. logger.error("执行计划缺少exec_date字段")
  645. return False
  646. if not isinstance(execution_plan.get("scripts", []), list):
  647. logger.error("执行计划的scripts字段无效")
  648. return False
  649. if not isinstance(execution_plan.get("script_dependencies", {}), dict):
  650. logger.error("执行计划的script_dependencies字段无效")
  651. return False
  652. # 检查是否有脚本数据
  653. scripts = execution_plan.get("scripts", [])
  654. if not scripts:
  655. logger.warning("执行计划不包含任何脚本")
  656. # 如果没有脚本,则阻止下游任务执行
  657. return False
  658. logger.info(f"执行计划验证成功: 包含 {len(scripts)} 个脚本")
  659. # 保存执行计划到XCom以便下游任务使用
  660. kwargs['ti'].xcom_push(key='execution_plan', value=execution_plan)
  661. return True
  662. def optimize_execution_order(scripts, script_dependencies):
  663. """
  664. 使用NetworkX优化脚本执行顺序
  665. 参数:
  666. scripts (list): 脚本信息列表
  667. script_dependencies (dict): 脚本依赖关系字典
  668. 返回:
  669. list: 优化后的脚本执行顺序(脚本ID列表)
  670. """
  671. logger.info("开始使用NetworkX优化脚本执行顺序")
  672. # 构建依赖图
  673. G = nx.DiGraph()
  674. # 添加所有脚本作为节点
  675. for script in scripts:
  676. script_id = script['script_id']
  677. G.add_node(script_id)
  678. # 添加依赖边
  679. for script_id, dependencies in script_dependencies.items():
  680. for dep_id in dependencies:
  681. # 添加从script_id到dep_id的边,表示script_id依赖于dep_id
  682. G.add_edge(script_id, dep_id)
  683. logger.debug(f"添加依赖边: {script_id} -> {dep_id}")
  684. # 检查是否有循环依赖
  685. try:
  686. cycles = list(nx.simple_cycles(G))
  687. if cycles:
  688. logger.warning(f"检测到循环依赖: {cycles}")
  689. # 处理循环依赖,可以通过删除一些边来打破循环
  690. for cycle in cycles:
  691. # 选择一条边删除,这里简单地选择第一条边
  692. if len(cycle) > 1:
  693. G.remove_edge(cycle[0], cycle[1])
  694. logger.warning(f"删除边 {cycle[0]} -> {cycle[1]} 以打破循环")
  695. except Exception as e:
  696. logger.error(f"检测循环依赖时出错: {str(e)}")
  697. # 使用拓扑排序获取执行顺序
  698. try:
  699. # 反转图,因为我们的边表示"依赖于"关系,而拓扑排序需要"优先于"关系
  700. reverse_G = G.reverse()
  701. execution_order = list(nx.topological_sort(reverse_G))
  702. # 反转结果,使上游任务先执行
  703. execution_order.reverse()
  704. logger.info(f"NetworkX优化后的脚本执行顺序: {execution_order}")
  705. return execution_order
  706. except Exception as e:
  707. logger.error(f"生成脚本执行顺序时出错: {str(e)}")
  708. # 出错时返回原始脚本ID列表,不进行优化
  709. return [script['script_id'] for script in scripts]
  710. def create_execution_plan(**kwargs):
  711. """
  712. 创建或获取执行计划
  713. """
  714. try:
  715. dag_run = kwargs.get('dag_run')
  716. logical_date = dag_run.logical_date
  717. exec_date = get_cn_exec_date(logical_date)
  718. # 检查是否是手动触发
  719. is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
  720. if is_manual_trigger:
  721. logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
  722. # 记录重要的时间参数
  723. logger.info(f"【时间参数】create_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  724. # 从XCom获取执行计划
  725. execution_plan = kwargs['ti'].xcom_pull(task_ids='check_execution_plan', key='execution_plan')
  726. # 如果找不到执行计划,则从数据库获取
  727. if not execution_plan:
  728. logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
  729. execution_plan = get_execution_plan_from_db(exec_date)
  730. if not execution_plan:
  731. logger.error(f"执行日期 {exec_date} 没有找到执行计划")
  732. return None
  733. # 验证执行计划结构
  734. scripts = execution_plan.get("scripts", [])
  735. script_dependencies = execution_plan.get("script_dependencies", {})
  736. execution_order = execution_plan.get("execution_order", [])
  737. # 如果执行计划中没有execution_order或为空,使用NetworkX优化
  738. if not execution_order:
  739. logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
  740. execution_order = optimize_execution_order(scripts, script_dependencies)
  741. execution_plan["execution_order"] = execution_order
  742. # 保存完整的执行计划到XCom
  743. kwargs['ti'].xcom_push(key='full_execution_plan', value=execution_plan)
  744. logger.info(f"成功处理执行计划,包含 {len(scripts)} 个脚本")
  745. return execution_plan
  746. except Exception as e:
  747. logger.error(f"创建执行计划时出错: {str(e)}")
  748. import traceback
  749. logger.error(traceback.format_exc())
  750. return None
  751. # 创建DAG
  752. with DAG(
  753. "dataops_productline_execute_dag",
  754. start_date=datetime(2024, 1, 1),
  755. schedule_interval="@daily", # 设置为每日调度
  756. catchup=False,
  757. default_args={
  758. 'owner': 'airflow',
  759. 'depends_on_past': False,
  760. 'email_on_failure': False,
  761. 'email_on_retry': False,
  762. 'retries': 1,
  763. 'retry_delay': timedelta(minutes=5)
  764. },
  765. params={
  766. 'MANUAL_TRIGGER': False,
  767. }
  768. ) as dag:
  769. # 记录DAG实例化时的重要信息
  770. now = datetime.now()
  771. now_with_tz = now.replace(tzinfo=pytz.timezone('Asia/Shanghai'))
  772. default_exec_date = get_today_date()
  773. logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期(用于初始化,非实际执行日期): {default_exec_date}")
  774. #############################################
  775. # 准备阶段: 检查并创建执行计划
  776. #############################################
  777. with TaskGroup("prepare_phase") as prepare_group:
  778. # 检查执行计划是否存在
  779. check_plan = ShortCircuitOperator(
  780. task_id="check_execution_plan",
  781. python_callable=check_execution_plan,
  782. provide_context=True
  783. )
  784. # 创建执行计划
  785. create_plan = PythonOperator(
  786. task_id="create_execution_plan",
  787. python_callable=create_execution_plan,
  788. provide_context=True
  789. )
  790. # 设置任务依赖
  791. check_plan >> create_plan
  792. #############################################
  793. # 执行阶段: 按依赖关系执行脚本
  794. #############################################
  795. with TaskGroup("execution_phase") as execution_group:
  796. try:
  797. # 获取当前DAG的执行日期
  798. exec_date = get_today_date() # 使用当天日期作为默认值
  799. logger.info(f"当前DAG执行日期 ds={exec_date},尝试从数据库获取执行计划")
  800. # 从数据库获取执行计划
  801. execution_plan = get_execution_plan_from_db(exec_date)
  802. # 检查是否成功获取到执行计划
  803. if execution_plan is None:
  804. error_msg = f"无法从数据库获取有效的执行计划,当前DAG exec_date={exec_date}"
  805. logger.error(error_msg)
  806. # 使用全局变量而不是异常来强制DAG失败
  807. raise ValueError(error_msg)
  808. # 提取信息
  809. exec_date = execution_plan.get("exec_date", exec_date)
  810. scripts = execution_plan.get("scripts", [])
  811. script_dependencies = execution_plan.get("script_dependencies", {})
  812. execution_order = execution_plan.get("execution_order", [])
  813. # 如果执行计划中没有execution_order或为空,使用NetworkX优化
  814. if not execution_order:
  815. logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
  816. execution_order = optimize_execution_order(scripts, script_dependencies)
  817. logger.info(f"执行计划: exec_date={exec_date}, scripts数量={len(scripts)}")
  818. # 如果执行计划为空(没有脚本),也应该失败
  819. if not scripts:
  820. error_msg = f"执行计划中没有任何脚本,当前DAG exec_date={exec_date}"
  821. logger.error(error_msg)
  822. raise ValueError(error_msg)
  823. # 1. 创建开始和结束任务
  824. start_execution = EmptyOperator(
  825. task_id="start_execution"
  826. )
  827. execution_completed = EmptyOperator(
  828. task_id="execution_completed",
  829. trigger_rule="none_failed_min_one_success" # 只要有一个任务成功且没有失败的任务就标记为完成
  830. )
  831. # 创建脚本任务字典,用于管理任务依赖
  832. task_dict = {}
  833. # 2. 先创建所有脚本任务,不设置依赖关系
  834. for script in scripts:
  835. script_id = script['script_id']
  836. script_name = script.get("script_name")
  837. target_table = script.get("target_table")
  838. script_type = script.get("script_type", "python")
  839. script_exec_mode = script.get("script_exec_mode", "append")
  840. source_tables = script.get("source_tables", [])
  841. target_table_label = script.get("target_table_label", "")
  842. # 使用描述性的任务ID,包含脚本名称和目标表
  843. # 提取文件名
  844. if "/" in script_name:
  845. script_file = script_name.split("/")[-1] # 获取文件名部分
  846. else:
  847. script_file = script_name
  848. # 确保任务ID不包含不允许的特殊字符
  849. safe_script_name = script_file.replace(" ", "_")
  850. safe_target_table = target_table.replace("-", "_").replace(" ", "_")
  851. # 按照指定格式创建任务ID
  852. task_id = f"{safe_script_name}-TO-{safe_target_table}"
  853. # 构建op_kwargs参数
  854. op_kwargs = {
  855. "script_id": script_id,
  856. "script_name": script_name,
  857. "target_table": target_table,
  858. "exec_date": str(exec_date),
  859. "script_exec_mode": script_exec_mode,
  860. "source_tables": source_tables
  861. }
  862. # 添加特殊参数(如果有)
  863. for key in ['target_type', 'storage_location', 'frequency']:
  864. if key in script and script[key] is not None:
  865. op_kwargs[key] = script[key]
  866. # 根据脚本类型和目标表标签选择执行函数
  867. if script_type.lower() == 'sql' and target_table_label == 'DataModel':
  868. # 使用SQL脚本执行函数
  869. logger.info(f"脚本 {script_id} 是SQL类型且目标表标签为DataModel,使用execute_sql函数执行")
  870. python_callable = execute_sql
  871. elif script_type.lower() == 'python' and target_table_label == 'DataModel':
  872. # 使用Python脚本执行函数
  873. logger.info(f"脚本 {script_id} 是Python类型且目标表标签为DataModel,使用execute_python函数执行")
  874. python_callable = execute_python
  875. elif script_type.lower() == 'python_script':
  876. # 使用Python脚本文件执行函数
  877. logger.info(f"脚本 {script_id} 是python_script类型,使用execute_python_script函数执行")
  878. python_callable = execute_python_script
  879. else:
  880. # 默认使用Python脚本文件执行函数
  881. logger.warning(f"未识别的脚本类型 {script_type},使用默认execute_python_script函数执行")
  882. python_callable = execute_python_script
  883. # 确保target_table_label被传递
  884. op_kwargs["target_table_label"] = target_table_label
  885. # 创建任务
  886. script_task = PythonOperator(
  887. task_id=task_id,
  888. python_callable=python_callable,
  889. op_kwargs=op_kwargs,
  890. retries=TASK_RETRY_CONFIG["retries"],
  891. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
  892. )
  893. # 将任务添加到字典
  894. task_dict[script_id] = script_task
  895. # 3. 设置开始任务与所有无依赖的脚本任务的关系
  896. no_dep_scripts = []
  897. for script_id, dependencies in script_dependencies.items():
  898. if not dependencies: # 如果没有依赖
  899. if script_id in task_dict:
  900. no_dep_scripts.append(script_id)
  901. start_execution >> task_dict[script_id]
  902. logger.info(f"设置无依赖脚本: start_execution >> {script_id}")
  903. # 4. 设置脚本间的依赖关系
  904. for script_id, dependencies in script_dependencies.items():
  905. for dep_id in dependencies:
  906. if script_id in task_dict and dep_id in task_dict:
  907. # 正确的依赖关系:依赖任务 >> 当前任务
  908. task_dict[dep_id] >> task_dict[script_id]
  909. logger.info(f"设置脚本依赖: {dep_id} >> {script_id}")
  910. # 5. 找出所有叶子节点(没有下游任务的节点)并连接到execution_completed
  911. # 首先,构建一个下游节点集合
  912. has_downstream = set()
  913. for script_id, dependencies in script_dependencies.items():
  914. for dep_id in dependencies:
  915. has_downstream.add(dep_id)
  916. # 然后,找出没有下游节点的任务
  917. leaf_nodes = []
  918. for script_id in task_dict:
  919. if script_id not in has_downstream:
  920. leaf_nodes.append(script_id)
  921. task_dict[script_id] >> execution_completed
  922. logger.info(f"将叶子节点连接到completion: {script_id} >> execution_completed")
  923. # 如果没有找到叶子节点,则将所有任务都连接到completion
  924. if not leaf_nodes:
  925. logger.warning("未找到叶子节点,将所有任务连接到completion")
  926. for script_id, task in task_dict.items():
  927. task >> execution_completed
  928. # 设置TaskGroup与prepare_phase的依赖关系
  929. prepare_group >> start_execution
  930. logger.info(f"成功创建 {len(task_dict)} 个脚本执行任务")
  931. except Exception as e:
  932. logger.error(f"加载执行计划或创建任务时出错: {str(e)}")
  933. import traceback
  934. logger.error(traceback.format_exc())
  935. # 添加触发finalize DAG的任务
  936. from airflow.operators.trigger_dagrun import TriggerDagRunOperator
  937. trigger_finalize_dag = TriggerDagRunOperator(
  938. task_id="trigger_finalize_dag",
  939. trigger_dag_id="dataops_productline_finalize_dag",
  940. conf={"execution_date": "{{ ds }}", "parent_execution_date": "{{ execution_date }}", "parent_run_id": "{{ run_id }}"},
  941. reset_dag_run=True,
  942. wait_for_completion=False,
  943. poke_interval=60,
  944. )
  945. # 设置依赖关系,确保执行阶段完成后触发finalize DAG
  946. execution_group >> trigger_finalize_dag
  947. logger.info(f"DAG dataops_productline_execute_dag 定义完成")