dag_manual_dependency_trigger.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. # dag_manual_dependency_trigger.py
  2. """
  3. 手动触发数据表依赖链执行DAG
  4. 功能:
  5. - 根据指定的表名,构建并执行其上游依赖链
  6. - 支持三种依赖级别:
  7. - 'self':只执行当前表,不处理上游依赖
  8. - 'resource':查找依赖到Resource层,但只执行DataModel层
  9. - 'source':查找并执行完整依赖链到Source层
  10. 参数:
  11. - TABLE_NAME:目标表名
  12. - DEPENDENCY_LEVEL/UPPER_LEVEL_STOP:依赖级别
  13. 使用示例:
  14. ```
  15. {
  16. "conf": {
  17. "TABLE_NAME": "book_sale_amt_2yearly",
  18. "DEPENDENCY_LEVEL": "resource"
  19. }
  20. }
  21. ```
  22. """
  23. from airflow import DAG
  24. from airflow.operators.python import PythonOperator
  25. from datetime import datetime, timedelta
  26. import logging
  27. import importlib.util
  28. import os
  29. from pathlib import Path
  30. from neo4j import GraphDatabase
  31. import psycopg2
  32. import networkx as nx
  33. from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG
  34. # 设置logger
  35. logger = logging.getLogger(__name__)
  36. # DAG参数
  37. default_args = {
  38. 'owner': 'airflow',
  39. 'depends_on_past': False,
  40. 'start_date': datetime(2024, 1, 1),
  41. 'email_on_failure': False,
  42. 'email_on_retry': False,
  43. 'retries': 1,
  44. 'retry_delay': timedelta(minutes=5),
  45. }
  46. def get_pg_conn():
  47. """获取PostgreSQL连接"""
  48. return psycopg2.connect(**PG_CONFIG)
  49. def get_execution_mode(table_name):
  50. """
  51. 从PostgreSQL获取表的执行模式
  52. 参数:
  53. table_name (str): 表名
  54. 注意:
  55. "AND is_enabled = TRUE" 这个条件在这里不适用,因为这是强制执行的。
  56. 即使订阅表中没有这个表名,也会强制执行。
  57. 返回:
  58. str: 执行模式,如果未找到则返回"append"作为默认值
  59. """
  60. try:
  61. conn = get_pg_conn()
  62. cursor = conn.cursor()
  63. cursor.execute("""
  64. SELECT execution_mode
  65. FROM table_schedule
  66. WHERE table_name = %s
  67. """, (table_name,))
  68. result = cursor.fetchone()
  69. cursor.close()
  70. conn.close()
  71. if result:
  72. return result[0]
  73. else:
  74. logger.warning(f"未找到表 {table_name} 的执行模式,使用默认值 'append'")
  75. return "append"
  76. except Exception as e:
  77. logger.error(f"获取表 {table_name} 的执行模式时出错: {str(e)}")
  78. return "append"
  79. def get_dag_params(**context):
  80. """获取DAG运行参数"""
  81. params = context.get('params', {})
  82. table_name = params.get('TABLE_NAME')
  83. # 记录原始参数信息
  84. logger.info(f"接收到的原始参数: {params}")
  85. # 同时检查DEPENDENCY_LEVEL和UPPER_LEVEL_STOP参数,兼容两种参数名
  86. dependency_level = params.get('DEPENDENCY_LEVEL')
  87. logger.info(f"从DEPENDENCY_LEVEL获取的值: {dependency_level}")
  88. if dependency_level is None:
  89. dependency_level = params.get('UPPER_LEVEL_STOP', 'resource') # 兼容旧参数名
  90. logger.info(f"从UPPER_LEVEL_STOP获取的值: {dependency_level}")
  91. if not table_name:
  92. raise ValueError("必须提供TABLE_NAME参数")
  93. # 验证dependency_level参数
  94. if dependency_level not in ['self', 'resource', 'source']:
  95. logger.warning(f"无效的依赖级别参数: {dependency_level},使用默认值'resource'")
  96. dependency_level = 'resource'
  97. logger.info(f"最终使用的参数 - 表名: {table_name}, 依赖级别: {dependency_level}")
  98. return table_name, dependency_level
  99. def is_data_model_table(table_name):
  100. """判断表是否为DataModel类型"""
  101. driver = GraphDatabase.driver(
  102. NEO4J_CONFIG['uri'],
  103. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  104. )
  105. query = """
  106. MATCH (n:DataModel {en_name: $table_name}) RETURN count(n) > 0 AS exists
  107. """
  108. try:
  109. with driver.session() as session:
  110. result = session.run(query, table_name=table_name)
  111. record = result.single()
  112. return record and record["exists"]
  113. finally:
  114. driver.close()
  115. def is_data_resource_table(table_name):
  116. """判断表是否为DataResource类型"""
  117. driver = GraphDatabase.driver(
  118. NEO4J_CONFIG['uri'],
  119. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  120. )
  121. query = """
  122. MATCH (n:DataResource {en_name: $table_name}) RETURN count(n) > 0 AS exists
  123. """
  124. try:
  125. with driver.session() as session:
  126. result = session.run(query, table_name=table_name)
  127. record = result.single()
  128. return record and record["exists"]
  129. finally:
  130. driver.close()
  131. def get_upstream_models(table_name):
  132. """获取表的上游DataModel依赖"""
  133. driver = GraphDatabase.driver(
  134. NEO4J_CONFIG['uri'],
  135. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  136. )
  137. query = """
  138. MATCH (target:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(up:DataModel)
  139. RETURN up.en_name AS upstream
  140. """
  141. try:
  142. with driver.session() as session:
  143. result = session.run(query, table_name=table_name)
  144. upstream_list = [record["upstream"] for record in result]
  145. logger.info(f"表 {table_name} 的上游DataModel依赖: {upstream_list}")
  146. return upstream_list
  147. finally:
  148. driver.close()
  149. def get_upstream_resources(table_name):
  150. """获取表的上游DataResource依赖"""
  151. driver = GraphDatabase.driver(
  152. NEO4J_CONFIG['uri'],
  153. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  154. )
  155. query = """
  156. MATCH (target:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(up:DataResource)
  157. RETURN up.en_name AS upstream
  158. """
  159. try:
  160. with driver.session() as session:
  161. result = session.run(query, table_name=table_name)
  162. upstream_list = [record["upstream"] for record in result]
  163. logger.info(f"表 {table_name} 的上游DataResource依赖: {upstream_list}")
  164. return upstream_list
  165. finally:
  166. driver.close()
  167. def get_data_sources(resource_table_name):
  168. """获取DataResource表的上游DataSource"""
  169. driver = GraphDatabase.driver(
  170. NEO4J_CONFIG['uri'],
  171. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  172. )
  173. query = """
  174. MATCH (dr:DataResource {en_name: $table_name})-[:ORIGINATES_FROM]->(ds:DataSource)
  175. RETURN ds.en_name AS source_name
  176. """
  177. try:
  178. with driver.session() as session:
  179. result = session.run(query, table_name=resource_table_name)
  180. return [record["source_name"] for record in result]
  181. finally:
  182. driver.close()
  183. def get_script_name_for_model(table_name):
  184. """获取DataModel表对应的脚本名称"""
  185. driver = GraphDatabase.driver(
  186. NEO4J_CONFIG['uri'],
  187. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  188. )
  189. query = """
  190. MATCH (target:DataModel {en_name: $table_name})-[r:DERIVED_FROM]->(n)
  191. WHERE n:DataModel OR n:DataResource
  192. RETURN r.script_name AS script_name
  193. """
  194. try:
  195. with driver.session() as session:
  196. result = session.run(query, table_name=table_name)
  197. record = result.single()
  198. if record:
  199. return record["script_name"]
  200. else:
  201. logger.warning(f"未找到DataModel表 {table_name} 的脚本名称")
  202. return None
  203. except Exception as e:
  204. logger.error(f"查询表 {table_name} 的脚本名称时出错: {str(e)}")
  205. return None
  206. finally:
  207. driver.close()
  208. def get_script_name_for_resource(table_name):
  209. """获取DataResource表对应的脚本名称"""
  210. driver = GraphDatabase.driver(
  211. NEO4J_CONFIG['uri'],
  212. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  213. )
  214. query = """
  215. MATCH (dr:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(ds:DataSource)
  216. RETURN rel.script_name AS script_name
  217. """
  218. try:
  219. with driver.session() as session:
  220. result = session.run(query, table_name=table_name)
  221. record = result.single()
  222. if record:
  223. return record["script_name"]
  224. else:
  225. logger.warning(f"未找到DataResource表 {table_name} 的脚本名称")
  226. return None
  227. except Exception as e:
  228. logger.error(f"查询表 {table_name} 的脚本名称时出错: {str(e)}")
  229. return None
  230. finally:
  231. driver.close()
  232. def build_dependency_chain_nx(start_table, dependency_level='resource'):
  233. """
  234. 使用networkx构建依赖链
  235. 参数:
  236. start_table (str): 起始表名
  237. dependency_level (str): 依赖级别
  238. - 'self': 只执行自己
  239. - 'resource': 到Resource层 (默认)
  240. - 'source': 到Source层
  241. 返回:
  242. list: 依赖链列表,按执行顺序排序(从上游到下游)
  243. """
  244. # 记录依赖级别
  245. logger.info(f"构建依赖链 - 起始表: {start_table}, 依赖级别: {dependency_level}")
  246. # 创建有向图
  247. G = nx.DiGraph()
  248. # 设置起始节点属性
  249. if is_data_model_table(start_table):
  250. G.add_node(start_table, type='DataModel')
  251. table_type = 'DataModel'
  252. elif is_data_resource_table(start_table):
  253. G.add_node(start_table, type='DataResource')
  254. table_type = 'DataResource'
  255. else:
  256. logger.warning(f"表 {start_table} 不是DataModel或DataResource类型")
  257. return []
  258. # 如果只执行自己,直接返回
  259. if dependency_level == 'self':
  260. logger.info(f"依赖级别为'self',只包含起始表: {start_table}")
  261. script_name = get_script_name_for_model(start_table) if table_type == 'DataModel' else get_script_name_for_resource(start_table)
  262. execution_mode = get_execution_mode(start_table)
  263. return [{
  264. 'table_name': start_table,
  265. 'script_name': script_name,
  266. 'table_type': table_type,
  267. 'execution_mode': execution_mode
  268. }]
  269. # 判断resource级别还是source级别
  270. need_source = (dependency_level == 'source')
  271. logger.info(f"是否需要查找到Source层: {need_source}")
  272. # BFS构建依赖图
  273. visited = set([start_table])
  274. queue = [start_table]
  275. while queue:
  276. current = queue.pop(0)
  277. current_type = G.nodes[current].get('type')
  278. logger.info(f"处理节点: {current}, 类型: {current_type}")
  279. # 处理当前节点的上游依赖
  280. if current_type == 'DataModel':
  281. # 获取DataModel的上游依赖
  282. upstream_models = get_upstream_models(current)
  283. for upstream in upstream_models:
  284. if upstream not in visited:
  285. G.add_node(upstream, type='DataModel')
  286. visited.add(upstream)
  287. queue.append(upstream)
  288. G.add_edge(current, upstream, type='model_to_model')
  289. # 获取上游DataResource - 对于resource和source级别都需要查找DataResource
  290. upstream_resources = get_upstream_resources(current)
  291. for upstream in upstream_resources:
  292. if upstream not in visited:
  293. G.add_node(upstream, type='DataResource')
  294. visited.add(upstream)
  295. # 只有在source级别时才继续向上查找DataSource
  296. if need_source:
  297. queue.append(upstream)
  298. G.add_edge(current, upstream, type='model_to_resource')
  299. # 如果当前节点是DataResource,只有在source级别才查找上游DataSource
  300. elif current_type == 'DataResource' and need_source:
  301. data_sources = get_data_sources(current)
  302. for source in data_sources:
  303. if source not in visited:
  304. G.add_node(source, type='DataSource')
  305. visited.add(source)
  306. G.add_edge(current, source, type='resource_to_source')
  307. # 记录依赖图节点和边信息
  308. logger.info(f"依赖图节点数: {len(G.nodes)}, 边数: {len(G.edges)}")
  309. # 在resource级别,确保不处理DataSource节点的脚本
  310. if dependency_level == 'resource':
  311. # 查找所有DataSource节点
  312. source_nodes = [node for node, attrs in G.nodes(data=True) if attrs.get('type') == 'DataSource']
  313. logger.info(f"依赖级别为'resource',将移除 {len(source_nodes)} 个DataSource节点")
  314. # 移除所有DataSource节点
  315. for node in source_nodes:
  316. G.remove_node(node)
  317. # 重新记录依赖图信息
  318. logger.info(f"清理后依赖图节点数: {len(G.nodes)}, 边数: {len(G.edges)}")
  319. logger.info(f"依赖图节点: {list(G.nodes)}")
  320. # 检测循环依赖
  321. cycles = list(nx.simple_cycles(G))
  322. if cycles:
  323. logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
  324. # 打破循环依赖(简单策略:移除每个循环中的一条边)
  325. for cycle in cycles:
  326. G.remove_edge(cycle[-1], cycle[0])
  327. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  328. # 生成拓扑排序(从上游到下游的顺序)
  329. try:
  330. # 注意:拓扑排序给出的是从上游到下游的顺序
  331. # 我们需要的是执行顺序,所以要反转图然后进行拓扑排序
  332. reverse_G = G.reverse()
  333. execution_order = list(nx.topological_sort(reverse_G))
  334. logger.info(f"计算出的执行顺序: {execution_order}")
  335. # 构建最终依赖链
  336. dependency_chain = []
  337. for table_name in execution_order:
  338. node_type = G.nodes[table_name].get('type')
  339. # 跳过DataSource节点,它们没有脚本需要执行
  340. if node_type == 'DataSource':
  341. logger.info(f"跳过DataSource节点: {table_name}")
  342. continue
  343. # 获取脚本和执行模式
  344. if node_type == 'DataModel':
  345. script_name = get_script_name_for_model(table_name)
  346. else: # DataResource
  347. script_name = get_script_name_for_resource(table_name)
  348. execution_mode = get_execution_mode(table_name)
  349. dependency_chain.append({
  350. 'table_name': table_name,
  351. 'script_name': script_name,
  352. 'table_type': node_type,
  353. 'execution_mode': execution_mode
  354. })
  355. logger.info(f"添加到依赖链: {table_name}, 类型: {node_type}")
  356. logger.info(f"最终依赖链长度: {len(dependency_chain)}")
  357. return dependency_chain
  358. except Exception as e:
  359. logger.error(f"生成拓扑排序时出错: {str(e)}")
  360. return []
  361. def execute_scripts(scripts_list):
  362. """
  363. 执行指定的脚本列表
  364. 参数:
  365. scripts_list (list): 要执行的脚本信息列表,每项包含table_name, script_name, execution_mode
  366. 返回:
  367. bool: 全部执行成功返回True,任一失败返回False
  368. """
  369. if not scripts_list:
  370. logger.info("没有脚本需要执行")
  371. return True
  372. success = True
  373. for item in scripts_list:
  374. script_name = item['script_name']
  375. table_name = item['table_name']
  376. execution_mode = item['execution_mode']
  377. if not script_name:
  378. logger.warning(f"表 {table_name} 没有对应的脚本,跳过执行")
  379. continue
  380. logger.info(f"执行脚本: {script_name}, 表: {table_name}, 模式: {execution_mode}")
  381. try:
  382. script_path = Path(SCRIPTS_BASE_PATH) / script_name
  383. if not os.path.exists(script_path):
  384. logger.error(f"脚本文件不存在: {script_path}")
  385. success = False
  386. break
  387. # 动态导入模块
  388. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  389. module = importlib.util.module_from_spec(spec)
  390. spec.loader.exec_module(module)
  391. # 使用标准入口函数run
  392. if hasattr(module, "run"):
  393. logger.info(f"执行脚本 {script_name} 的标准入口函数 run()")
  394. result = module.run(table_name=table_name, execution_mode=execution_mode)
  395. if result:
  396. logger.info(f"脚本 {script_name} 执行成功")
  397. else:
  398. logger.error(f"脚本 {script_name} 执行失败")
  399. success = False
  400. break
  401. else:
  402. logger.warning(f"脚本 {script_name} 未定义标准入口函数 run(),无法执行")
  403. success = False
  404. break
  405. except Exception as e:
  406. logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
  407. success = False
  408. break
  409. return success
  410. def prepare_dependency_chain(**context):
  411. """
  412. 准备依赖链并保存到XCom
  413. 不同依赖级别的行为:
  414. - self: 只执行当前表,不查找上游依赖
  415. - resource: 仅查找数据模型依赖到Resource层,但不执行Resource层的脚本
  416. - source: 完整查找所有依赖到Source层,并执行所有相关脚本
  417. """
  418. # 获取参数
  419. table_name, dependency_level = get_dag_params(**context)
  420. # 记录依赖级别信息
  421. logger.info(f"依赖级别说明:")
  422. logger.info(f"- self: 只执行当前表,不查找上游依赖")
  423. logger.info(f"- resource: 仅查找数据模型依赖到Resource层,但不执行Resource层的脚本")
  424. logger.info(f"- source: 完整查找所有依赖到Source层,并执行所有相关脚本")
  425. logger.info(f"当前依赖级别: {dependency_level}")
  426. # 获取依赖链
  427. dependency_chain = build_dependency_chain_nx(table_name, dependency_level)
  428. if not dependency_chain:
  429. logger.warning(f"没有找到表 {table_name} 的依赖链")
  430. return False
  431. # 记录完整依赖链
  432. logger.info(f"依赖链完整列表: {[item['table_name'] for item in dependency_chain]}")
  433. # 保存依赖链到XCom以便后续任务使用
  434. ti = context['ti']
  435. ti.xcom_push(key='dependency_chain', value=dependency_chain)
  436. # 保存依赖级别,便于后续任务使用
  437. ti.xcom_push(key='dependency_level', value=dependency_level)
  438. # 检查是否有各类型的脚本需要执行
  439. resource_tables = [item for item in dependency_chain if item['table_type'] == 'DataResource']
  440. model_tables = [item for item in dependency_chain if item['table_type'] == 'DataModel']
  441. has_resource = len(resource_tables) > 0
  442. has_model = len(model_tables) > 0
  443. # 处理特殊情况:如果是self级别,且起始表是DataResource
  444. if dependency_level == 'self' and not has_model and has_resource:
  445. # 确保只有一个DataResource表,而且是起始表
  446. is_start_resource = any(item['table_name'] == table_name for item in resource_tables)
  447. logger.info(f"依赖级别为'self',起始表是DataResource: {is_start_resource}")
  448. # 额外保存标志,标记这是特殊情况
  449. ti.xcom_push(key='is_start_resource_only', value=is_start_resource)
  450. logger.info(f"是否有DataResource脚本: {has_resource}({len(resource_tables)}个), 是否有DataModel脚本: {has_model}({len(model_tables)}个)")
  451. return True
  452. def process_resources(**context):
  453. """
  454. 处理所有DataResource层的脚本
  455. 依赖级别处理策略:
  456. - self: 只有当起始表是DataResource类型时才执行
  457. - resource: 不执行任何DataResource脚本
  458. - source: 执行所有依赖链中的DataResource脚本
  459. """
  460. # 获取任务间共享变量
  461. ti = context['ti']
  462. dependency_chain = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_chain')
  463. # 直接从XCom获取依赖级别,避免重复解析
  464. dependency_level = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_level')
  465. # 记录当前任务的依赖级别
  466. logger.info(f"process_resources任务 - 当前依赖级别: {dependency_level}")
  467. # 检查特殊标志
  468. is_start_resource_only = ti.xcom_pull(task_ids='prepare_dependency_chain', key='is_start_resource_only', default=False)
  469. # 依赖级别处理逻辑
  470. if dependency_level == 'self' and not is_start_resource_only:
  471. logger.info("依赖级别为'self'且起始表不是DataResource,跳过process_resources任务")
  472. return True
  473. elif dependency_level == 'resource':
  474. logger.info("依赖级别为'resource',根据设计不执行DataResource表脚本")
  475. return True
  476. # 获取表名(仅在self级别需要)
  477. table_name = None
  478. if dependency_level == 'self':
  479. params = context.get('params', {})
  480. table_name = params.get('TABLE_NAME') or params.get('table_name')
  481. logger.info(f"依赖级别为'self',目标表: {table_name}")
  482. # 根据依赖级别过滤要执行的脚本
  483. if dependency_level == 'self' and is_start_resource_only:
  484. # 特殊情况:只处理与起始表名匹配的Resource表
  485. resource_scripts = [item for item in dependency_chain if item['table_type'] == 'DataResource' and item['table_name'] == table_name]
  486. logger.info(f"依赖级别为'self'且起始表是DataResource,只处理表: {table_name}")
  487. elif dependency_level == 'source':
  488. # source级别:处理所有Resource表
  489. resource_scripts = [item for item in dependency_chain if item['table_type'] == 'DataResource']
  490. logger.info(f"依赖级别为'source',处理所有DataResource表")
  491. else:
  492. # 其他情况,返回空列表
  493. resource_scripts = []
  494. if not resource_scripts:
  495. logger.info("没有找到DataResource类型的表需要处理")
  496. return True
  497. # 详细记录要执行的脚本信息
  498. logger.info(f"要执行的DataResource脚本数量: {len(resource_scripts)}")
  499. for idx, item in enumerate(resource_scripts, 1):
  500. logger.info(f"Resource脚本[{idx}]: 表={item['table_name']}, 脚本={item['script_name']}, 模式={item['execution_mode']}")
  501. # 执行所有DataResource脚本
  502. return execute_scripts(resource_scripts)
  503. def process_models(**context):
  504. """
  505. 处理所有DataModel层的脚本
  506. 依赖级别处理策略:
  507. - self: 只执行起始表(如果是DataModel类型)
  508. - resource/source: 执行所有依赖链中的DataModel脚本
  509. """
  510. # 获取任务间共享变量
  511. ti = context['ti']
  512. dependency_chain = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_chain')
  513. # 直接从XCom获取依赖级别,避免重复解析
  514. dependency_level = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_level')
  515. # 记录当前任务的依赖级别
  516. logger.info(f"process_models任务 - 当前依赖级别: {dependency_level}")
  517. # 获取表名(在所有级别都需要)
  518. params = context.get('params', {})
  519. table_name = params.get('TABLE_NAME') or params.get('table_name')
  520. logger.info(f"目标表: {table_name}")
  521. # 如果依赖级别是'self',只处理起始表
  522. if dependency_level == 'self':
  523. logger.info(f"依赖级别为'self',只处理起始表: {table_name}")
  524. model_scripts = [item for item in dependency_chain if item['table_name'] == table_name and item['table_type'] == 'DataModel']
  525. else:
  526. # 否则处理所有DataModel表
  527. logger.info(f"依赖级别为'{dependency_level}',处理所有DataModel表")
  528. model_scripts = [item for item in dependency_chain if item['table_type'] == 'DataModel']
  529. if not model_scripts:
  530. logger.info("没有找到DataModel类型的表需要处理")
  531. return True
  532. # 详细记录要执行的脚本信息
  533. logger.info(f"要执行的DataModel脚本数量: {len(model_scripts)}")
  534. for idx, item in enumerate(model_scripts, 1):
  535. logger.info(f"Model脚本[{idx}]: 表={item['table_name']}, 脚本={item['script_name']}, 模式={item['execution_mode']}")
  536. # 执行所有DataModel脚本
  537. return execute_scripts(model_scripts)
  538. # 创建DAG
  539. with DAG(
  540. 'dag_manual_dependency_trigger',
  541. default_args=default_args,
  542. description='手动触发指定表的依赖链执行,支持三种依赖级别:self(仅本表)、resource(到Resource层但不执行Resource)、source(完整依赖到Source层)',
  543. schedule_interval=None, # 设置为None表示只能手动触发
  544. catchup=False,
  545. is_paused_upon_creation=False, # 添加这一行,使DAG创建时不处于暂停状态
  546. params={
  547. 'TABLE_NAME': '',
  548. 'DEPENDENCY_LEVEL': {
  549. 'type': 'string',
  550. 'enum': ['self', 'resource', 'source'],
  551. 'default': 'resource',
  552. 'description': '依赖级别: self-仅本表, resource-到Resource层(不执行Resource脚本), source-到Source层'
  553. },
  554. # 添加旧参数名,保持兼容性
  555. 'UPPER_LEVEL_STOP': {
  556. 'type': 'string',
  557. 'enum': ['self', 'resource', 'source'],
  558. 'default': 'resource',
  559. 'description': '依赖级别(旧参数名): self-仅本表, resource-到Resource层(不执行Resource脚本), source-到Source层'
  560. }
  561. },
  562. ) as dag:
  563. # 第一个任务:准备依赖链
  564. prepare_task = PythonOperator(
  565. task_id='prepare_dependency_chain',
  566. python_callable=prepare_dependency_chain,
  567. provide_context=True,
  568. )
  569. # 第二个任务:执行DataResource脚本
  570. resource_task = PythonOperator(
  571. task_id='process_resources',
  572. python_callable=process_resources,
  573. provide_context=True,
  574. )
  575. # 第三个任务:执行DataModel脚本
  576. model_task = PythonOperator(
  577. task_id='process_models',
  578. python_callable=process_models,
  579. provide_context=True,
  580. )
  581. # 设置任务依赖关系
  582. prepare_task >> resource_task >> model_task