dataops_productline_manual_trigger_dag.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. # dataops_productline_manual_trigger_dag.py
  2. """
  3. 手动触发数据产品线脚本执行DAG
  4. 功能:
  5. - 根据指定的脚本名称和目标表名,构建并执行其上游依赖链
  6. - 支持三种依赖级别:
  7. - 'self':只执行当前脚本,不处理上游依赖
  8. - 'dependency':依据脚本之间的直接依赖关系构建执行链
  9. - 'full':构建完整依赖链,包括所有间接依赖
  10. 参数:
  11. - script_name:目标脚本名称
  12. - target_table:目标表名
  13. - dependency_level:依赖级别
  14. 使用示例:
  15. {
  16. "conf": {
  17. "script_name": "book_sale_amt_monthly_process.py",
  18. "target_table": "book_sale_amt_monthly",
  19. "dependency_level": "dependency"
  20. }
  21. }
  22. """
  23. from airflow import DAG
  24. from airflow.operators.python import PythonOperator, ShortCircuitOperator
  25. from airflow.operators.empty import EmptyOperator
  26. from datetime import datetime, timedelta
  27. import logging
  28. import importlib.util
  29. import os
  30. from pathlib import Path
  31. from neo4j import GraphDatabase
  32. import psycopg2
  33. import networkx as nx
  34. import json
  35. from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG
  36. import traceback
  37. # 设置logger
  38. logger = logging.getLogger(__name__)
  39. # DAG参数
  40. default_args = {
  41. 'owner': 'airflow',
  42. 'depends_on_past': False,
  43. 'start_date': datetime(2024, 1, 1),
  44. 'email_on_failure': False,
  45. 'email_on_retry': False,
  46. 'retries': 1,
  47. 'retry_delay': timedelta(minutes=1),
  48. }
  49. def get_pg_conn():
  50. """获取PostgreSQL连接"""
  51. return psycopg2.connect(**PG_CONFIG)
  52. def get_execution_mode(table_name):
  53. """
  54. 从PostgreSQL获取表的执行模式
  55. 参数:
  56. table_name (str): 表名
  57. 返回:
  58. str: 执行模式,如果未找到则返回"append"作为默认值
  59. """
  60. try:
  61. conn = get_pg_conn()
  62. cursor = conn.cursor()
  63. cursor.execute("""
  64. SELECT execution_mode
  65. FROM table_schedule
  66. WHERE table_name = %s
  67. """, (table_name,))
  68. result = cursor.fetchone()
  69. cursor.close()
  70. conn.close()
  71. if result:
  72. return result[0]
  73. else:
  74. logger.warning(f"未找到表 {table_name} 的执行模式,使用默认值 'append'")
  75. return "append"
  76. except Exception as e:
  77. logger.error(f"获取表 {table_name} 的执行模式时出错: {str(e)}")
  78. return "append"
  79. def get_dag_params(**context):
  80. """获取DAG运行参数"""
  81. params = context.get('params', {})
  82. script_name = params.get('script_name')
  83. target_table = params.get('target_table')
  84. # 记录原始参数信息
  85. logger.info(f"接收到的原始参数: {params}")
  86. # 获取依赖级别参数
  87. dependency_level = params.get('dependency_level')
  88. logger.info(f"获取的依赖级别值: {dependency_level}")
  89. if not script_name:
  90. raise ValueError("必须提供script_name参数")
  91. if not target_table:
  92. raise ValueError("必须提供target_table参数")
  93. # 验证dependency_level参数
  94. if dependency_level not in ['self', 'dependency', 'full']:
  95. logger.warning(f"无效的依赖级别参数: {dependency_level},使用默认值'dependency'")
  96. dependency_level = 'dependency'
  97. logger.info(f"最终使用的参数 - 脚本名称: {script_name}, 目标表: {target_table}, 依赖级别: {dependency_level}")
  98. return script_name, target_table, dependency_level
  99. def get_table_label(table_name):
  100. """确定表的标签类型(DataModel or DataResource)"""
  101. driver = GraphDatabase.driver(
  102. NEO4J_CONFIG['uri'],
  103. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  104. )
  105. query = """
  106. MATCH (n {en_name: $table_name})
  107. RETURN labels(n) AS labels
  108. """
  109. try:
  110. with driver.session() as session:
  111. result = session.run(query, table_name=table_name)
  112. record = result.single()
  113. if record and record.get("labels"):
  114. labels = record.get("labels")
  115. if "DataModel" in labels:
  116. return "DataModel"
  117. elif "DataResource" in labels:
  118. return "DataResource"
  119. elif "DataSource" in labels:
  120. return "DataSource"
  121. return None
  122. except Exception as e:
  123. logger.error(f"获取表 {table_name} 的标签时出错: {str(e)}")
  124. return None
  125. finally:
  126. driver.close()
  127. def get_script_info_from_neo4j(script_name, target_table):
  128. """
  129. 从Neo4j获取脚本和表的详细信息
  130. 参数:
  131. script_name: 脚本名称
  132. target_table: 目标表名
  133. 返回:
  134. dict: 脚本和表的详细信息
  135. """
  136. driver = GraphDatabase.driver(
  137. NEO4J_CONFIG['uri'],
  138. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  139. )
  140. script_info = {
  141. 'script_name': script_name,
  142. 'target_table': target_table,
  143. 'script_id': f"{script_name.replace('.', '_')}_{target_table}",
  144. 'target_table_label': get_table_label(target_table),
  145. 'source_tables': [],
  146. 'script_type': 'python' # 默认类型
  147. }
  148. # 根据表标签类型查询脚本信息和依赖关系
  149. try:
  150. with driver.session() as session:
  151. if script_info['target_table_label'] == 'DataModel':
  152. # 查询DataModel的上游依赖
  153. query = """
  154. MATCH (target:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
  155. RETURN source.en_name AS source_table, labels(source) AS source_labels,
  156. rel.script_name AS script_name, rel.script_type AS script_type
  157. """
  158. result = session.run(query, table_name=target_table)
  159. for record in result:
  160. source_table = record.get("source_table")
  161. source_labels = record.get("source_labels", [])
  162. db_script_name = record.get("script_name")
  163. script_type = record.get("script_type", "python")
  164. # 验证脚本名称匹配
  165. if db_script_name and db_script_name == script_name:
  166. if source_table and source_table not in script_info['source_tables']:
  167. script_info['source_tables'].append(source_table)
  168. script_info['script_type'] = script_type
  169. elif script_info['target_table_label'] == 'DataResource':
  170. # 查询DataResource的上游依赖
  171. query = """
  172. MATCH (target:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
  173. RETURN source.en_name AS source_table, labels(source) AS source_labels,
  174. rel.script_name AS script_name, rel.script_type AS script_type
  175. """
  176. result = session.run(query, table_name=target_table)
  177. for record in result:
  178. source_table = record.get("source_table")
  179. source_labels = record.get("source_labels", [])
  180. db_script_name = record.get("script_name")
  181. script_type = record.get("script_type", "python")
  182. # 验证脚本名称匹配
  183. if db_script_name and db_script_name == script_name:
  184. if source_table and source_table not in script_info['source_tables']:
  185. script_info['source_tables'].append(source_table)
  186. script_info['script_type'] = script_type
  187. # 如果没有找到依赖关系,记录警告
  188. if not script_info['source_tables']:
  189. logger.warning(f"未找到脚本 {script_name} 和表 {target_table} 的依赖关系")
  190. # 获取特殊属性(如果是structure类型)
  191. if script_info['target_table_label'] == 'DataResource':
  192. query = """
  193. MATCH (n:DataResource {en_name: $table_name})
  194. RETURN n.type AS target_type, n.storage_location AS storage_location
  195. """
  196. result = session.run(query, table_name=target_table)
  197. record = result.single()
  198. if record:
  199. target_type = record.get("target_type")
  200. storage_location = record.get("storage_location")
  201. if target_type:
  202. script_info['target_type'] = target_type
  203. if storage_location:
  204. script_info['storage_location'] = storage_location
  205. except Exception as e:
  206. logger.error(f"从Neo4j获取脚本 {script_name} 和表 {target_table} 的信息时出错: {str(e)}")
  207. logger.error(traceback.format_exc())
  208. finally:
  209. driver.close()
  210. # 获取表的执行模式
  211. script_info['execution_mode'] = get_execution_mode(target_table)
  212. logger.info(f"获取到脚本信息: {script_info}")
  213. return script_info
  214. def get_upstream_script_dependencies(script_info, dependency_level='dependency'):
  215. """
  216. 获取脚本的上游依赖
  217. 参数:
  218. script_info: 脚本信息
  219. dependency_level: 依赖级别
  220. - self: 只考虑当前脚本
  221. - dependency: 考虑直接依赖
  222. - full: 考虑所有间接依赖
  223. 返回:
  224. list: 依赖链脚本列表
  225. """
  226. # 如果是self级别,直接返回当前脚本信息
  227. if dependency_level == 'self':
  228. logger.info(f"依赖级别为'self',只包含当前脚本: {script_info['script_name']}")
  229. return [script_info]
  230. # 创建依赖图
  231. G = nx.DiGraph()
  232. # 记录所有处理过的脚本
  233. processed_scripts = {}
  234. script_id = script_info['script_id']
  235. # 添加起始节点
  236. G.add_node(script_id, **script_info)
  237. processed_scripts[script_id] = script_info
  238. # 使用BFS算法遍历依赖树
  239. queue = [script_info]
  240. visited = set([script_id])
  241. while queue:
  242. current = queue.pop(0)
  243. current_id = current['script_id']
  244. # 如果没有源表依赖,则跳过
  245. if not current.get('source_tables'):
  246. logger.info(f"脚本 {current['script_name']} 没有源表依赖")
  247. continue
  248. # 对每个源表查找对应的脚本
  249. for source_table in current.get('source_tables', []):
  250. # 获取源表对应的脚本信息(假设一个表只有一个主要脚本)
  251. driver = GraphDatabase.driver(
  252. NEO4J_CONFIG['uri'],
  253. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  254. )
  255. try:
  256. with driver.session() as session:
  257. # 根据表的类型,查询不同的关系
  258. table_label = get_table_label(source_table)
  259. if table_label == 'DataModel':
  260. query = """
  261. MATCH (target:DataModel {en_name: $table_name})<-[rel:DERIVED_FROM]-(source)
  262. RETURN source.en_name AS target_table, rel.script_name AS script_name
  263. LIMIT 1
  264. """
  265. elif table_label == 'DataResource':
  266. query = """
  267. MATCH (target:DataResource {en_name: $table_name})<-[rel:ORIGINATES_FROM]-(source)
  268. RETURN source.en_name AS target_table, rel.script_name AS script_name
  269. LIMIT 1
  270. """
  271. else:
  272. logger.warning(f"表 {source_table} 类型未知或不受支持: {table_label}")
  273. continue
  274. result = session.run(query, table_name=source_table)
  275. record = result.single()
  276. if record and record.get("script_name"):
  277. upstream_script_name = record.get("script_name")
  278. upstream_target_table = source_table
  279. # 构建上游脚本ID
  280. upstream_id = f"{upstream_script_name.replace('.', '_')}_{upstream_target_table}"
  281. # 如果是full级别,或者是dependency级别且是直接依赖,则处理
  282. if dependency_level == 'full' or (dependency_level == 'dependency' and upstream_id not in visited):
  283. # 获取完整的上游脚本信息
  284. upstream_info = get_script_info_from_neo4j(upstream_script_name, upstream_target_table)
  285. # 添加到图中
  286. G.add_node(upstream_id, **upstream_info)
  287. G.add_edge(current_id, upstream_id)
  288. # 记录处理过的脚本
  289. processed_scripts[upstream_id] = upstream_info
  290. # 只有full级别才继续递归处理
  291. if dependency_level == 'full' and upstream_id not in visited:
  292. visited.add(upstream_id)
  293. queue.append(upstream_info)
  294. else:
  295. logger.warning(f"未找到表 {source_table} 对应的脚本信息")
  296. except Exception as e:
  297. logger.error(f"获取表 {source_table} 对应的脚本信息时出错: {str(e)}")
  298. logger.error(traceback.format_exc())
  299. finally:
  300. driver.close()
  301. # 检测循环依赖
  302. cycles = list(nx.simple_cycles(G))
  303. if cycles:
  304. logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
  305. for cycle in cycles:
  306. if len(cycle) > 1:
  307. G.remove_edge(cycle[0], cycle[1])
  308. logger.info(f"打破循环依赖: 移除 {cycle[0]} -> {cycle[1]} 的依赖")
  309. # 生成拓扑排序
  310. try:
  311. # 对于依赖图,我们需要反转边的方向,因为我们记录的是"依赖于"关系,而拓扑排序是按照"先于"关系
  312. reverse_G = G.reverse()
  313. execution_order = list(nx.topological_sort(reverse_G))
  314. # 构建最终依赖链
  315. dependency_chain = []
  316. for script_id in execution_order:
  317. script_info = processed_scripts[script_id]
  318. dependency_chain.append(script_info)
  319. logger.info(f"最终依赖链长度: {len(dependency_chain)}")
  320. logger.info(f"最终依赖链: {[info['script_name'] for info in dependency_chain]}")
  321. return dependency_chain
  322. except Exception as e:
  323. logger.error(f"生成拓扑排序时出错: {str(e)}")
  324. logger.error(traceback.format_exc())
  325. # 出错时,至少返回当前脚本
  326. return [script_info]
  327. def check_script_exists(script_name):
  328. """检查脚本文件是否存在"""
  329. script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
  330. if os.path.exists(script_path):
  331. logger.info(f"脚本文件存在: {script_path}")
  332. return True, script_path
  333. else:
  334. logger.error(f"脚本文件不存在: {script_path}")
  335. return False, script_path
  336. def execute_python_script(script_info):
  337. """
  338. 执行Python脚本文件
  339. 参数:
  340. script_info: 脚本信息字典
  341. 返回:
  342. bool: 执行成功返回True,失败返回False
  343. """
  344. script_name = script_info.get('script_name')
  345. target_table = script_info.get('target_table')
  346. execution_mode = script_info.get('execution_mode', 'append')
  347. target_table_label = script_info.get('target_table_label')
  348. source_tables = script_info.get('source_tables', [])
  349. # 记录开始执行
  350. logger.info(f"===== 开始执行脚本: {script_name} =====")
  351. logger.info(f"目标表: {target_table}")
  352. logger.info(f"执行模式: {execution_mode}")
  353. logger.info(f"表标签: {target_table_label}")
  354. logger.info(f"源表: {source_tables}")
  355. # 检查脚本文件是否存在
  356. exists, script_path = check_script_exists(script_name)
  357. if not exists:
  358. return False
  359. try:
  360. # 创建执行开始时间
  361. start_time = datetime.now()
  362. # 动态导入模块
  363. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  364. module = importlib.util.module_from_spec(spec)
  365. spec.loader.exec_module(module)
  366. # 检查并调用标准入口函数run
  367. if hasattr(module, "run"):
  368. logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
  369. # 构建函数参数
  370. run_kwargs = {
  371. "table_name": target_table,
  372. "execution_mode": execution_mode
  373. }
  374. # 如果是structure类型,添加特殊参数
  375. if target_table_label == 'DataResource' and script_info.get('target_type') == 'structure':
  376. run_kwargs["target_type"] = script_info.get('target_type')
  377. run_kwargs["storage_location"] = script_info.get('storage_location')
  378. run_kwargs["frequency"] = script_info.get('frequency', 'daily')
  379. # 执行脚本
  380. result = module.run(**run_kwargs)
  381. # 记录结束时间
  382. end_time = datetime.now()
  383. duration = (end_time - start_time).total_seconds()
  384. # 确保结果是布尔值
  385. if not isinstance(result, bool):
  386. result = bool(result)
  387. logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  388. return result
  389. else:
  390. logger.error(f"脚本 {script_name} 没有定义标准入口函数 run()")
  391. return False
  392. except Exception as e:
  393. logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
  394. logger.error(traceback.format_exc())
  395. return False
  396. def prepare_dependency_chain(**context):
  397. """
  398. 准备依赖链并保存到XCom
  399. """
  400. # 获取脚本和表名参数
  401. script_name, target_table, dependency_level = get_dag_params(**context)
  402. # 记录依赖级别信息
  403. logger.info(f"依赖级别说明:")
  404. logger.info(f"- self: 只执行当前脚本,不处理上游依赖")
  405. logger.info(f"- dependency: 处理直接依赖")
  406. logger.info(f"- full: 处理所有间接依赖")
  407. logger.info(f"当前依赖级别: {dependency_level}")
  408. # 获取脚本信息
  409. script_info = get_script_info_from_neo4j(script_name, target_table)
  410. # 验证脚本信息
  411. if not script_info.get('target_table_label'):
  412. logger.warning(f"未能确定表 {target_table} 的类型")
  413. return False
  414. # 获取脚本依赖链
  415. dependency_chain = get_upstream_script_dependencies(script_info, dependency_level)
  416. if not dependency_chain:
  417. logger.warning(f"没有找到脚本 {script_name} 的依赖链")
  418. return False
  419. # 保存依赖链到XCom
  420. ti = context['ti']
  421. ti.xcom_push(key='dependency_chain', value=dependency_chain)
  422. return True
  423. def execute_script_chain(**context):
  424. """
  425. 执行依赖链中的所有脚本
  426. """
  427. # 获取依赖链
  428. ti = context['ti']
  429. dependency_chain = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_chain')
  430. if not dependency_chain:
  431. logger.error("没有找到依赖链,无法执行脚本")
  432. return False
  433. # 记录依赖链信息
  434. logger.info(f"准备执行依赖链中的 {len(dependency_chain)} 个脚本")
  435. for idx, script_info in enumerate(dependency_chain, 1):
  436. logger.info(f"脚本[{idx}]: {script_info['script_name']} -> {script_info['target_table']}")
  437. # 逐个执行脚本
  438. all_success = True
  439. results = []
  440. for idx, script_info in enumerate(dependency_chain, 1):
  441. script_name = script_info['script_name']
  442. target_table = script_info['target_table']
  443. logger.info(f"===== 执行脚本 {idx}/{len(dependency_chain)}: {script_name} -> {target_table} =====")
  444. # 执行脚本
  445. success = execute_python_script(script_info)
  446. # 记录结果
  447. result = {
  448. "script_name": script_name,
  449. "target_table": target_table,
  450. "success": success
  451. }
  452. results.append(result)
  453. # 如果任何一个脚本执行失败,标记整体失败
  454. if not success:
  455. all_success = False
  456. logger.error(f"脚本 {script_name} 执行失败,中断执行链")
  457. break
  458. # 保存执行结果
  459. ti.xcom_push(key='execution_results', value=results)
  460. return all_success
  461. def generate_execution_report(**context):
  462. """
  463. 生成执行报告
  464. """
  465. # 获取执行结果
  466. ti = context['ti']
  467. results = ti.xcom_pull(task_ids='execute_script_chain', key='execution_results')
  468. dependency_chain = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_chain')
  469. if not results:
  470. report = "未找到执行结果,无法生成报告"
  471. ti.xcom_push(key='execution_report', value=report)
  472. return report
  473. # 计算执行统计信息
  474. total = len(results)
  475. success_count = sum(1 for r in results if r['success'])
  476. fail_count = total - success_count
  477. # 构建报告
  478. report = []
  479. report.append("\n========== 脚本执行报告 ==========")
  480. report.append(f"执行日期: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  481. report.append(f"总脚本数: {total}")
  482. report.append(f"成功数: {success_count}")
  483. report.append(f"失败数: {fail_count}")
  484. report.append(f"成功率: {success_count / total * 100:.2f}%")
  485. report.append("\n--- 执行详情 ---")
  486. for idx, result in enumerate(results, 1):
  487. script_name = result['script_name']
  488. target_table = result['target_table']
  489. success = result['success']
  490. status = "✓ 成功" if success else "✗ 失败"
  491. report.append(f"{idx}. {script_name} -> {target_table}: {status}")
  492. report.append("\n========== 报告结束 ==========")
  493. # 转换为字符串
  494. report_str = "\n".join(report)
  495. # 保存报告
  496. ti.xcom_push(key='execution_report', value=report_str)
  497. # 记录到日志
  498. logger.info(report_str)
  499. return report_str
  500. # 创建DAG
  501. with DAG(
  502. 'dataops_productline_manual_trigger_dag',
  503. default_args=default_args,
  504. description='手动触发指定脚本的执行,支持三种依赖级别:self(仅当前脚本)、dependency(直接依赖)、full(所有间接依赖)',
  505. schedule_interval=None, # 设置为None表示只能手动触发
  506. catchup=False,
  507. is_paused_upon_creation=False,
  508. params={
  509. 'script_name': '',
  510. 'target_table': '',
  511. 'dependency_level': {
  512. 'type': 'string',
  513. 'enum': ['self', 'dependency', 'full'],
  514. 'default': 'dependency',
  515. 'description': '依赖级别: self-仅当前脚本, dependency-直接依赖, full-所有间接依赖'
  516. }
  517. },
  518. ) as dag:
  519. # 任务1: 准备依赖阶段
  520. prepare_task = ShortCircuitOperator(
  521. task_id='prepare_dependency_chain',
  522. python_callable=prepare_dependency_chain,
  523. provide_context=True,
  524. )
  525. # 任务2: 执行脚本链
  526. execute_task = PythonOperator(
  527. task_id='execute_script_chain',
  528. python_callable=execute_script_chain,
  529. provide_context=True,
  530. )
  531. # 任务3: 生成执行报告
  532. report_task = PythonOperator(
  533. task_id='generate_execution_report',
  534. python_callable=generate_execution_report,
  535. provide_context=True,
  536. trigger_rule='all_done' # 无论前面的任务成功或失败,都生成报告
  537. )
  538. # 任务4: 完成标记
  539. completed_task = EmptyOperator(
  540. task_id='execution_completed',
  541. trigger_rule='all_done' # 无论前面的任务成功或失败,都标记为完成
  542. )
  543. # 设置任务依赖关系
  544. prepare_task >> execute_task >> report_task >> completed_task