dag_dataops_pipeline_data_scheduler.py 58 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359
  1. """
  2. 统一数据运维调度器 DAG
  3. 功能:
  4. 1. 将数据处理与统计汇总整合到一个DAG中
  5. 2. 保留原有的每个处理脚本单独运行的特性,方便通过Web UI查看
  6. 3. 支持执行计划文件的动态解析和执行
  7. 4. 执行完成后自动生成汇总报告
  8. """
  9. from airflow import DAG
  10. from airflow.operators.python import PythonOperator, ShortCircuitOperator
  11. from airflow.operators.empty import EmptyOperator
  12. from airflow.utils.task_group import TaskGroup
  13. from datetime import datetime, timedelta, date
  14. import logging
  15. import networkx as nx
  16. import json
  17. import os
  18. import pendulum
  19. from decimal import Decimal
  20. from common import (
  21. get_pg_conn,
  22. get_neo4j_driver,
  23. get_today_date
  24. )
  25. from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG
  26. import pytz
  27. # 创建日志记录器
  28. logger = logging.getLogger(__name__)
  29. # 开启详细诊断日志记录
  30. ENABLE_DEBUG_LOGGING = True
  31. def log_debug(message):
  32. """记录调试日志,但只在启用调试模式时"""
  33. if ENABLE_DEBUG_LOGGING:
  34. logger.info(f"[DEBUG] {message}")
  35. # 在DAG启动时输出诊断信息
  36. log_debug("======== 诊断信息 ========")
  37. log_debug(f"当前工作目录: {os.getcwd()}")
  38. log_debug(f"SCRIPTS_BASE_PATH: {SCRIPTS_BASE_PATH}")
  39. log_debug(f"导入的common模块路径: {get_pg_conn.__module__}")
  40. # 检查数据库连接
  41. def validate_database_connection():
  42. """验证数据库连接是否正常"""
  43. try:
  44. conn = get_pg_conn()
  45. cursor = conn.cursor()
  46. cursor.execute("SELECT version()")
  47. version = cursor.fetchone()
  48. log_debug(f"数据库连接正常,PostgreSQL版本: {version[0]}")
  49. # 检查airflow_exec_plans表是否存在
  50. cursor.execute("""
  51. SELECT EXISTS (
  52. SELECT FROM information_schema.tables
  53. WHERE table_name = 'airflow_exec_plans'
  54. )
  55. """)
  56. table_exists = cursor.fetchone()[0]
  57. if table_exists:
  58. # 检查表结构
  59. cursor.execute("""
  60. SELECT column_name, data_type
  61. FROM information_schema.columns
  62. WHERE table_name = 'airflow_exec_plans'
  63. """)
  64. columns = cursor.fetchall()
  65. log_debug(f"airflow_exec_plans表存在,列信息:")
  66. for col in columns:
  67. log_debug(f" - {col[0]}: {col[1]}")
  68. # 查询最新记录数量
  69. cursor.execute("SELECT COUNT(*) FROM airflow_exec_plans")
  70. count = cursor.fetchone()[0]
  71. log_debug(f"airflow_exec_plans表中有 {count} 条记录")
  72. # 检查最近的执行记录
  73. cursor.execute("""
  74. SELECT exec_date, COUNT(*) as record_count
  75. FROM airflow_exec_plans
  76. GROUP BY exec_date
  77. ORDER BY exec_date DESC
  78. LIMIT 3
  79. """)
  80. recent_dates = cursor.fetchall()
  81. log_debug(f"最近的执行日期及记录数:")
  82. for date_info in recent_dates:
  83. log_debug(f" - {date_info[0]}: {date_info[1]} 条记录")
  84. else:
  85. log_debug("airflow_exec_plans表不存在!")
  86. cursor.close()
  87. conn.close()
  88. return True
  89. except Exception as e:
  90. log_debug(f"数据库连接验证失败: {str(e)}")
  91. import traceback
  92. log_debug(f"错误堆栈: {traceback.format_exc()}")
  93. return False
  94. # 执行数据库连接验证
  95. try:
  96. validate_database_connection()
  97. except Exception as e:
  98. log_debug(f"验证数据库连接时出错: {str(e)}")
  99. log_debug("======== 诊断信息结束 ========")
  100. #############################################
  101. # 通用工具函数
  102. #############################################
  103. def json_serial(obj):
  104. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  105. if isinstance(obj, (datetime, date)):
  106. return obj.isoformat()
  107. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  108. # 添加自定义JSON编码器解决Decimal序列化问题
  109. class DecimalEncoder(json.JSONEncoder):
  110. def default(self, obj):
  111. if isinstance(obj, Decimal):
  112. return float(obj)
  113. # 处理日期类型
  114. elif isinstance(obj, (datetime, date)):
  115. return obj.isoformat()
  116. # 让父类处理其他类型
  117. return super(DecimalEncoder, self).default(obj)
  118. #############################################
  119. # 新的工具函数
  120. #############################################
  121. def execute_python_script(target_table, script_name, script_exec_mode, exec_date, **kwargs):
  122. """
  123. 执行Python脚本并返回执行结果
  124. 参数:
  125. target_table: 目标表名
  126. script_name: 脚本名称
  127. script_exec_mode: 脚本执行模式
  128. exec_date: 执行日期
  129. source_tables: (可选) 源表列表
  130. 返回:
  131. bool: 脚本执行结果
  132. """
  133. # 添加详细日志
  134. logger.info(f"===== 开始执行脚本 =====")
  135. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  136. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  137. logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
  138. logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
  139. # 记录额外参数
  140. for key, value in kwargs.items():
  141. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  142. # 检查script_name是否为空
  143. if not script_name:
  144. logger.error(f"表 {target_table} 的script_name为空,无法执行")
  145. return False
  146. # 记录执行开始时间
  147. start_time = datetime.now()
  148. try:
  149. # 导入和执行脚本模块
  150. import importlib.util
  151. import sys
  152. # SCRIPTS_BASE_PATH = "/opt/airflow/dataops/scripts"
  153. script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
  154. if not os.path.exists(script_path):
  155. logger.error(f"脚本文件不存在: {script_path}")
  156. return False
  157. # 动态导入模块
  158. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  159. module = importlib.util.module_from_spec(spec)
  160. spec.loader.exec_module(module)
  161. # 检查并调用标准入口函数run
  162. if hasattr(module, "run"):
  163. logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
  164. # 构建完整的参数字典
  165. run_params = {
  166. "table_name": target_table,
  167. "execution_mode": script_exec_mode,
  168. "exec_date": exec_date
  169. }
  170. ## 添加可能的额外参数
  171. for key in ['target_type', 'storage_location', 'frequency', 'source_tables']:
  172. if key in kwargs and kwargs[key] is not None:
  173. run_params[key] = kwargs[key]
  174. # 调用脚本的run函数
  175. logger.info(f"调用run函数并传递参数: {run_params}")
  176. result = module.run(**run_params)
  177. logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  178. # 确保result是布尔值
  179. if result is None:
  180. logger.warning(f"脚本返回值为None,转换为False")
  181. result = False
  182. elif not isinstance(result, bool):
  183. original_result = result
  184. result = bool(result)
  185. logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  186. # 记录结束时间和结果
  187. end_time = datetime.now()
  188. duration = (end_time - start_time).total_seconds()
  189. logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  190. return result
  191. else:
  192. logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
  193. return False
  194. except Exception as e:
  195. # 处理异常
  196. logger.error(f"执行任务出错: {str(e)}")
  197. end_time = datetime.now()
  198. duration = (end_time - start_time).total_seconds()
  199. logger.error(f"脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  200. logger.info(f"===== 脚本执行异常结束 =====")
  201. import traceback
  202. logger.error(traceback.format_exc())
  203. # 确保不会阻塞DAG
  204. return False
  205. #############################################
  206. # 第一阶段: 准备阶段(Prepare Phase)的函数
  207. #############################################
  208. def get_enabled_tables():
  209. """获取所有启用的表"""
  210. conn = get_pg_conn()
  211. cursor = conn.cursor()
  212. try:
  213. cursor.execute("""
  214. SELECT owner_id, table_name
  215. FROM schedule_status
  216. WHERE schedule_is_enabled = TRUE
  217. """)
  218. result = cursor.fetchall()
  219. return [row[1] for row in result] # 只返回表名
  220. except Exception as e:
  221. logger.error(f"获取启用表失败: {str(e)}")
  222. return []
  223. finally:
  224. cursor.close()
  225. conn.close()
  226. def check_table_directly_subscribed(table_name):
  227. """检查表是否在schedule_status表中直接订阅"""
  228. conn = get_pg_conn()
  229. cursor = conn.cursor()
  230. try:
  231. cursor.execute("""
  232. SELECT schedule_is_enabled
  233. FROM schedule_status
  234. WHERE table_name = %s
  235. """, (table_name,))
  236. result = cursor.fetchone()
  237. return result and result[0] is True
  238. except Exception as e:
  239. logger.error(f"检查表订阅状态失败: {str(e)}")
  240. return False
  241. finally:
  242. cursor.close()
  243. conn.close()
  244. def get_table_info_from_neo4j(table_name):
  245. """从Neo4j获取表的详细信息"""
  246. driver = get_neo4j_driver()
  247. # 检查表是否直接订阅
  248. is_directly_schedule = check_table_directly_subscribed(table_name)
  249. table_info = {
  250. 'target_table': table_name,
  251. 'is_directly_schedule': is_directly_schedule, # 初始值设为True,从schedule_status表获取
  252. }
  253. try:
  254. with driver.session() as session:
  255. # 查询表标签和状态
  256. query_table = """
  257. MATCH (t {en_name: $table_name})
  258. RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency,
  259. t.type AS type, t.storage_location AS storage_location
  260. """
  261. result = session.run(query_table, table_name=table_name)
  262. record = result.single()
  263. if record:
  264. labels = record.get("labels", [])
  265. table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
  266. table_info['target_table_status'] = record.get("status", True) # 默认为True
  267. table_info['default_update_frequency'] = record.get("frequency")
  268. table_info['frequency'] = record.get("frequency")
  269. table_info['target_type'] = record.get("type") # 获取type属性
  270. table_info['storage_location'] = record.get("storage_location") # 获取storage_location属性
  271. # 根据标签类型查询关系和脚本信息
  272. if "DataResource" in labels:
  273. # 检查是否为structure类型
  274. if table_info.get('target_type') == "structure":
  275. # 对于structure类型,设置默认值,不查询关系
  276. table_info['source_tables'] = [] # 使用空数组表示无源表
  277. table_info['script_name'] = "load_file.py"
  278. table_info['script_type'] = "python"
  279. # csv类型的DataResource没有上游,使用默认的append模式
  280. table_info['script_exec_mode'] = "append"
  281. logger.info(f"表 {table_name} 为structure类型,使用默认执行模式: append")
  282. return table_info
  283. else:
  284. query_rel = """
  285. MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
  286. WITH source, rel,
  287. CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
  288. CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
  289. RETURN source.en_name AS source_table, script_name AS script_name,
  290. script_type AS script_type, 'append' AS script_exec_mode
  291. """
  292. elif "DataModel" in labels:
  293. query_rel = """
  294. MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
  295. WITH source, rel,
  296. CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
  297. CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
  298. RETURN source.en_name AS source_table, script_name AS script_name,
  299. script_type AS script_type, 'append' AS script_exec_mode
  300. """
  301. else:
  302. logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
  303. return table_info
  304. # 收集所有关系记录
  305. result = session.run(query_rel, table_name=table_name)
  306. # 检查result对象是否有collect方法,否则使用data方法或list直接转换
  307. try:
  308. if hasattr(result, 'collect'):
  309. records = result.collect() # 使用collect()获取所有记录
  310. else:
  311. # 尝试使用其他方法获取记录
  312. logger.info(f"表 {table_name} 的查询结果不支持collect方法,尝试使用其他方法")
  313. try:
  314. records = list(result) # 直接转换为列表
  315. except Exception as e1:
  316. logger.warning(f"尝试列表转换失败: {str(e1)},尝试使用data方法")
  317. try:
  318. records = result.data() # 使用data()方法
  319. except Exception as e2:
  320. logger.warning(f"所有方法都失败,使用空列表: {str(e2)}")
  321. records = []
  322. except Exception as e:
  323. logger.warning(f"获取查询结果时出错: {str(e)},使用空列表")
  324. records = []
  325. # 记录查询到的原始记录
  326. logger.info(f"表 {table_name} 查询到 {len(records)} 条关系记录")
  327. for idx, rec in enumerate(records):
  328. logger.info(f"关系记录[{idx}]: source_table={rec.get('source_table')}, script_name={rec.get('script_name')}, "
  329. f"script_type={rec.get('script_type')}, script_exec_mode={rec.get('script_exec_mode')}")
  330. if records:
  331. # 按脚本名称分组源表
  332. scripts_info = {}
  333. for record in records:
  334. script_name = record.get("script_name")
  335. source_table = record.get("source_table")
  336. script_type = record.get("script_type", "python")
  337. script_exec_mode = record.get("script_exec_mode", "append")
  338. logger.info(f"处理记录: source_table={source_table}, script_name={script_name}")
  339. if not script_name:
  340. script_name = f"{table_name}_process.py"
  341. logger.warning(f"表 {table_name} 的关系中没有script_name属性,使用默认值: {script_name}")
  342. if script_name not in scripts_info:
  343. scripts_info[script_name] = {
  344. "sources": [],
  345. "script_type": script_type,
  346. "script_exec_mode": script_exec_mode
  347. }
  348. # 确保source_table有值且不为None才添加到sources列表中
  349. if source_table and source_table not in scripts_info[script_name]["sources"]:
  350. scripts_info[script_name]["sources"].append(source_table)
  351. logger.debug(f"为表 {table_name} 的脚本 {script_name} 添加源表: {source_table}")
  352. # 处理分组信息
  353. if scripts_info:
  354. # 存储完整的脚本信息
  355. table_info['scripts_info'] = scripts_info
  356. # 如果只有一个脚本,直接使用它
  357. if len(scripts_info) == 1:
  358. script_name = list(scripts_info.keys())[0]
  359. script_info = scripts_info[script_name]
  360. table_info['source_tables'] = script_info["sources"] # 使用数组
  361. table_info['script_name'] = script_name
  362. table_info['script_type'] = script_info["script_type"]
  363. table_info['script_exec_mode'] = script_info["script_exec_mode"]
  364. logger.info(f"表 {table_name} 有单个脚本 {script_name},源表: {script_info['sources']}")
  365. else:
  366. # 如果有多个不同脚本,记录多脚本信息
  367. logger.info(f"表 {table_name} 有多个不同脚本: {list(scripts_info.keys())}")
  368. # 暂时使用第一个脚本的信息作为默认值
  369. first_script = list(scripts_info.keys())[0]
  370. table_info['source_tables'] = scripts_info[first_script]["sources"]
  371. table_info['script_name'] = first_script
  372. table_info['script_type'] = scripts_info[first_script]["script_type"]
  373. table_info['script_exec_mode'] = scripts_info[first_script]["script_exec_mode"]
  374. else:
  375. logger.warning(f"表 {table_name} 未找到有效的脚本信息")
  376. table_info['source_tables'] = [] # 使用空数组
  377. # 向下兼容
  378. table_info['source_table'] = None
  379. else:
  380. logger.warning(f"未找到表 {table_name} 的关系信息")
  381. table_info['source_tables'] = [] # 使用空数组
  382. # 向下兼容
  383. table_info['source_table'] = None
  384. else:
  385. logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
  386. except Exception as e:
  387. logger.error(f"获取表 {table_name} 的信息时出错: {str(e)}")
  388. finally:
  389. driver.close()
  390. return table_info
  391. def process_dependencies(tables_info):
  392. """处理表间依赖关系,添加被动调度的表"""
  393. # 存储所有表信息的字典
  394. all_tables = {t['target_table']: t for t in tables_info}
  395. driver = get_neo4j_driver()
  396. try:
  397. with driver.session() as session:
  398. for table_name, table_info in list(all_tables.items()):
  399. if table_info.get('target_table_label') == 'DataModel':
  400. # 查询其依赖表
  401. query = """
  402. MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
  403. RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels,
  404. dep.status AS dep_status, dep.frequency AS dep_frequency
  405. """
  406. result = session.run(query, table_name=table_name)
  407. for record in result:
  408. dep_name = record.get("dep_name")
  409. dep_labels = record.get("dep_labels", [])
  410. dep_status = record.get("dep_status", True)
  411. dep_frequency = record.get("dep_frequency")
  412. # 处理未被直接调度的依赖表
  413. if dep_name and dep_name not in all_tables:
  414. logger.info(f"发现被动依赖表: {dep_name}, 标签: {dep_labels}")
  415. # 获取依赖表详细信息
  416. dep_info = get_table_info_from_neo4j(dep_name)
  417. dep_info['is_directly_schedule'] = False
  418. # 处理调度频率继承
  419. if not dep_info.get('frequency'):
  420. dep_info['frequency'] = table_info.get('frequency')
  421. # 确保向下兼容
  422. if not dep_info.get('default_update_frequency'):
  423. dep_info['default_update_frequency'] = table_info.get('default_update_frequency')
  424. all_tables[dep_name] = dep_info
  425. except Exception as e:
  426. logger.error(f"处理依赖关系时出错: {str(e)}")
  427. finally:
  428. driver.close()
  429. return list(all_tables.values())
  430. def filter_invalid_tables(tables_info):
  431. """过滤无效表及其依赖,使用NetworkX构建依赖图"""
  432. # 构建表名到索引的映射
  433. table_dict = {t['target_table']: i for i, t in enumerate(tables_info)}
  434. # 找出无效表
  435. invalid_tables = set()
  436. for table in tables_info:
  437. if table.get('target_table_status') is False:
  438. invalid_tables.add(table['target_table'])
  439. logger.info(f"表 {table['target_table']} 的状态为无效")
  440. # 构建依赖图
  441. G = nx.DiGraph()
  442. # 添加所有节点
  443. for table in tables_info:
  444. G.add_node(table['target_table'])
  445. # 查询并添加依赖边
  446. driver = get_neo4j_driver()
  447. try:
  448. with driver.session() as session:
  449. for table in tables_info:
  450. if table.get('target_table_label') == 'DataModel':
  451. query = """
  452. MATCH (source {en_name: $table_name})-[:DERIVED_FROM]->(target)
  453. RETURN target.en_name AS target_name
  454. """
  455. result = session.run(query, table_name=table['target_table'])
  456. for record in result:
  457. target_name = record.get("target_name")
  458. if target_name and target_name in table_dict:
  459. # 添加从目标到源的边,表示目标依赖于源
  460. G.add_edge(table['target_table'], target_name)
  461. logger.debug(f"添加依赖边: {table['target_table']} -> {target_name}")
  462. except Exception as e:
  463. logger.error(f"构建依赖图时出错: {str(e)}")
  464. finally:
  465. driver.close()
  466. # 找出依赖于无效表的所有表
  467. downstream_invalid = set()
  468. for invalid_table in invalid_tables:
  469. # 获取可从无效表到达的所有节点
  470. try:
  471. descendants = nx.descendants(G, invalid_table)
  472. downstream_invalid.update(descendants)
  473. logger.info(f"表 {invalid_table} 的下游无效表: {descendants}")
  474. except Exception as e:
  475. logger.error(f"处理表 {invalid_table} 的下游依赖时出错: {str(e)}")
  476. # 合并所有无效表
  477. all_invalid = invalid_tables.union(downstream_invalid)
  478. logger.info(f"总共 {len(all_invalid)} 个表被标记为无效: {all_invalid}")
  479. # 过滤出有效表
  480. valid_tables = [t for t in tables_info if t['target_table'] not in all_invalid]
  481. logger.info(f"过滤后保留 {len(valid_tables)} 个有效表")
  482. return valid_tables
  483. def prepare_dag_schedule(**kwargs):
  484. """准备DAG调度任务的主函数"""
  485. dag_run = kwargs.get('dag_run')
  486. logical_date = dag_run.logical_date
  487. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  488. exec_date = local_logical_date.strftime('%Y-%m-%d')
  489. # 检查是否是手动触发
  490. is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
  491. if is_manual_trigger:
  492. logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
  493. # 记录重要的时间参数
  494. logger.info(f"【时间参数】prepare_dag_schedule: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  495. logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
  496. # 1. 获取启用的表
  497. enabled_tables = get_enabled_tables()
  498. logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
  499. if not enabled_tables:
  500. logger.warning("没有找到启用的表,准备工作结束")
  501. return 0
  502. # 2. 获取表的详细信息
  503. tables_info = []
  504. for table_name in enabled_tables:
  505. table_info = get_table_info_from_neo4j(table_name)
  506. if table_info:
  507. tables_info.append(table_info)
  508. logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
  509. # 3. 处理依赖关系,添加被动调度的表
  510. enriched_tables = process_dependencies(tables_info)
  511. logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
  512. # 4. 过滤无效表及其依赖
  513. valid_tables = filter_invalid_tables(enriched_tables)
  514. logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
  515. # 已删除对 airflow_dag_schedule 表的写入操作
  516. # 只记录准备了多少个表
  517. logger.info(f"处理了 {len(valid_tables)} 个有效表")
  518. # 7. 生成执行计划数据
  519. resource_tasks = []
  520. model_tasks = []
  521. for table in valid_tables:
  522. if table.get('target_table_label') == 'DataResource':
  523. task_info = {
  524. "source_tables": [table.get('source_table')] if table.get('source_table') else [],
  525. "target_table": table['target_table'],
  526. "target_table_label": "DataResource",
  527. "script_name": table.get('script_name'),
  528. "script_exec_mode": table.get('script_exec_mode', 'append'),
  529. "frequency": table.get('frequency')
  530. }
  531. # 为structure类型添加特殊属性
  532. if table.get('target_type') == "structure":
  533. task_info["target_type"] = "structure"
  534. task_info["storage_location"] = table.get('storage_location')
  535. resource_tasks.append(task_info)
  536. elif table.get('target_table_label') == 'DataModel':
  537. model_tasks.append({
  538. "source_tables": [table.get('source_table')] if table.get('source_table') else [],
  539. "target_table": table['target_table'],
  540. "target_table_label": "DataModel",
  541. "script_name": table.get('script_name'),
  542. "script_exec_mode": table.get('script_exec_mode', 'append'),
  543. "frequency": table.get('frequency')
  544. })
  545. # 获取依赖关系
  546. model_table_names = [t['target_table'] for t in model_tasks]
  547. dependencies = {}
  548. driver = get_neo4j_driver()
  549. try:
  550. with driver.session() as session:
  551. for table_name in model_table_names:
  552. query = """
  553. MATCH (source:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(target)
  554. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  555. """
  556. result = session.run(query, table_name=table_name)
  557. deps = []
  558. for record in result:
  559. target = record.get("target")
  560. target_labels = record.get("target_labels", [])
  561. if target:
  562. table_type = next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  563. deps.append({
  564. "table_name": target,
  565. "table_type": table_type
  566. })
  567. dependencies[table_name] = deps
  568. finally:
  569. driver.close()
  570. # 创建执行计划
  571. execution_plan = {
  572. "exec_date": exec_date,
  573. "logical_date": logical_date,
  574. "local_logical_date": local_logical_date,
  575. "resource_tasks": resource_tasks,
  576. "model_tasks": model_tasks,
  577. "dependencies": dependencies
  578. }
  579. # 将执行计划保存到XCom
  580. kwargs['ti'].xcom_push(key='execution_plan', value=execution_plan)
  581. logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  582. return len(valid_tables)
  583. def check_execution_plan(**kwargs):
  584. """
  585. 检查执行计划是否存在且有效
  586. 返回False将阻止所有下游任务执行
  587. """
  588. dag_run = kwargs.get('dag_run')
  589. logical_date = dag_run.logical_date
  590. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  591. exec_date = local_logical_date.strftime('%Y-%m-%d')
  592. # 检查是否是手动触发
  593. is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
  594. if is_manual_trigger:
  595. logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
  596. # 记录重要的时间参数
  597. logger.info(f"【时间参数】check_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  598. logger.info("检查数据库中的执行计划是否存在且有效")
  599. # 从数据库获取执行计划
  600. execution_plan = get_execution_plan_from_db(exec_date)
  601. # 检查是否成功获取到执行计划
  602. if not execution_plan:
  603. logger.error(f"未找到执行日期 {exec_date} 的执行计划")
  604. return False
  605. # 检查执行计划是否包含必要字段
  606. if "exec_date" not in execution_plan:
  607. logger.error("执行计划缺少exec_date字段")
  608. return False
  609. if not isinstance(execution_plan.get("resource_tasks", []), list):
  610. logger.error("执行计划的resource_tasks字段无效")
  611. return False
  612. if not isinstance(execution_plan.get("model_tasks", []), list):
  613. logger.error("执行计划的model_tasks字段无效")
  614. return False
  615. # 检查是否有任务数据
  616. resource_tasks = execution_plan.get("resource_tasks", [])
  617. model_tasks = execution_plan.get("model_tasks", [])
  618. if not resource_tasks and not model_tasks:
  619. logger.warning("执行计划不包含任何任务")
  620. # 如果没有任务,则阻止下游任务执行
  621. return False
  622. logger.info(f"执行计划验证成功: 包含 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
  623. return True
  624. #############################################
  625. # 第二阶段: 数据处理阶段(Data Processing Phase)的函数
  626. #############################################
  627. def get_all_tasks(exec_date):
  628. """
  629. 获取所有需要执行的任务(DataResource和DataModel)
  630. 直接从执行计划获取任务信息,不再查询数据库
  631. """
  632. # 从数据库获取执行计划
  633. execution_plan = get_execution_plan_from_db(exec_date)
  634. if not execution_plan:
  635. logger.warning(f"未找到执行日期 {exec_date} 的执行计划")
  636. return [], []
  637. # 提取资源任务和模型任务
  638. resource_tasks = execution_plan.get("resource_tasks", [])
  639. model_tasks = execution_plan.get("model_tasks", [])
  640. logger.info(f"获取到 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
  641. return resource_tasks, model_tasks
  642. def get_table_dependencies(table_names):
  643. """获取表之间的依赖关系"""
  644. driver = get_neo4j_driver()
  645. dependency_dict = {name: [] for name in table_names}
  646. try:
  647. with driver.session() as session:
  648. # 获取所有模型表之间的依赖关系
  649. query = """
  650. MATCH (source:DataModel)-[:DERIVED_FROM]->(target)
  651. WHERE source.en_name IN $table_names
  652. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  653. """
  654. result = session.run(query, table_names=table_names)
  655. for record in result:
  656. source = record.get("source")
  657. target = record.get("target")
  658. target_labels = record.get("target_labels", [])
  659. if source and target:
  660. # 将目标表添加到源表的依赖列表中
  661. dependency_dict[source].append({
  662. "table_name": target,
  663. "table_type": next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  664. })
  665. logger.debug(f"依赖关系: {source} 依赖于 {target}")
  666. except Exception as e:
  667. logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
  668. finally:
  669. driver.close()
  670. return dependency_dict
  671. def create_execution_plan(**kwargs):
  672. """准备执行计划的函数,使用从准备阶段传递的数据"""
  673. try:
  674. dag_run = kwargs.get('dag_run')
  675. logical_date = dag_run.logical_date
  676. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  677. exec_date = local_logical_date.strftime('%Y-%m-%d')
  678. # 检查是否是手动触发
  679. is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
  680. if is_manual_trigger:
  681. logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
  682. # 记录重要的时间参数
  683. logger.info(f"【时间参数】create_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  684. # 从XCom获取执行计划
  685. execution_plan = kwargs['ti'].xcom_pull(task_ids='prepare_phase.prepare_dag_schedule', key='execution_plan')
  686. # 如果找不到执行计划,则从数据库获取
  687. if not execution_plan:
  688. # 获取执行日期
  689. logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
  690. # 获取所有任务
  691. resource_tasks, model_tasks = get_all_tasks(exec_date)
  692. if not resource_tasks and not model_tasks:
  693. logger.warning(f"执行日期 {exec_date} 没有找到任务")
  694. return 0
  695. # 为所有模型表获取依赖关系
  696. model_table_names = [task["target_table"] for task in model_tasks]
  697. dependencies = get_table_dependencies(model_table_names)
  698. # 创建执行计划
  699. new_execution_plan = {
  700. "exec_date": exec_date,
  701. "resource_tasks": resource_tasks,
  702. "model_tasks": model_tasks,
  703. "dependencies": dependencies
  704. }
  705. # 保存执行计划到XCom
  706. kwargs['ti'].xcom_push(key='execution_plan', value=new_execution_plan)
  707. logger.info(f"创建新的执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  708. return new_execution_plan
  709. logger.info(f"成功获取执行计划")
  710. return execution_plan
  711. except Exception as e:
  712. logger.error(f"创建执行计划时出错: {str(e)}")
  713. # 返回空执行计划
  714. empty_plan = {
  715. "exec_date": get_today_date(),
  716. "resource_tasks": [],
  717. "model_tasks": [],
  718. "dependencies": {}
  719. }
  720. return empty_plan
  721. def process_resource(target_table, script_name, script_exec_mode, exec_date,**kwargs):
  722. """处理单个资源表"""
  723. task_id = f"resource_{target_table}"
  724. logger.info(f"===== 开始执行 {task_id} =====")
  725. logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
  726. # 确保exec_date是字符串
  727. if not isinstance(exec_date, str):
  728. exec_date = str(exec_date)
  729. logger.info(f"将exec_date转换为字符串: {exec_date}")
  730. # 获取额外参数
  731. target_type = kwargs.get('target_type')
  732. storage_location = kwargs.get('storage_location')
  733. frequency = kwargs.get('frequency')
  734. source_tables = kwargs.get('source_tables', [])
  735. # 记录源表信息(如果有)
  736. if source_tables and len(source_tables) > 0:
  737. logger.info(f"资源表 {target_table} 有 {len(source_tables)} 个源表: {source_tables}")
  738. try:
  739. # 使用新的函数执行脚本,传递相应参数
  740. logger.info(f"调用execute_python_script: target_table={target_table}, script_name={script_name}")
  741. # 构建参数字典
  742. script_params = {
  743. "target_table": target_table,
  744. "script_name": script_name,
  745. "script_exec_mode": script_exec_mode,
  746. "exec_date": exec_date,
  747. "frequency": frequency,
  748. "source_tables": source_tables
  749. }
  750. # 添加特殊参数(如果有)
  751. if target_type == "structure":
  752. logger.info(f"处理structure类型的资源表,文件路径: {storage_location}")
  753. script_params["target_type"] = target_type
  754. script_params["storage_location"] = storage_location
  755. # logger.debug 打印所有的script_params
  756. logger.debug(f"script_params: {script_params}")
  757. # 执行脚本
  758. result = execute_python_script(**script_params)
  759. logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
  760. return result
  761. except Exception as e:
  762. logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
  763. import traceback
  764. logger.error(traceback.format_exc())
  765. logger.info(f"===== 结束执行 {task_id} (失败) =====")
  766. return False
  767. finally:
  768. logger.info(f"===== 结束执行 {task_id} =====")
  769. def process_model(target_table, script_name, script_exec_mode, exec_date, source_tables=None):
  770. """处理单个模型表,支持多个源表"""
  771. task_id = f"model_{target_table}"
  772. logger.info(f"===== 开始执行 {task_id} =====")
  773. logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
  774. # 确保exec_date是字符串
  775. if not isinstance(exec_date, str):
  776. exec_date = str(exec_date)
  777. logger.info(f"将exec_date转换为字符串: {exec_date}")
  778. # 记录源表信息
  779. if source_tables and len(source_tables) > 0:
  780. logger.info(f"模型表 {target_table} 有 {len(source_tables)} 个源表: {source_tables}")
  781. try:
  782. # 使用新的函数执行脚本,不依赖数据库
  783. logger.info(f"调用execute_python_script: target_table={target_table}, script_name={script_name}")
  784. result = execute_python_script(
  785. target_table=target_table,
  786. script_name=script_name,
  787. script_exec_mode=script_exec_mode,
  788. exec_date=exec_date,
  789. source_tables=source_tables # 传递源表列表
  790. )
  791. logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
  792. return result
  793. except Exception as e:
  794. logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
  795. import traceback
  796. logger.error(traceback.format_exc())
  797. logger.info(f"===== 结束执行 {task_id} (失败) =====")
  798. return False
  799. finally:
  800. logger.info(f"===== 结束执行 {task_id} =====")
  801. # 添加新函数,用于从数据库获取执行计划
  802. def get_execution_plan_from_db(ds):
  803. """
  804. 从数据库airflow_exec_plans表中获取执行计划
  805. 参数:
  806. ds (str): 执行日期,格式为'YYYY-MM-DD'
  807. 返回:
  808. dict: 执行计划字典,如果找不到则返回None
  809. """
  810. # 记录输入参数详细信息
  811. if isinstance(ds, datetime):
  812. if ds.tzinfo:
  813. logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 带时区: {ds.tzinfo}")
  814. else:
  815. logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 无时区")
  816. else:
  817. logger.debug(f"【执行日期】get_execution_plan_from_db接收到: {ds}, 类型: {type(ds)}")
  818. logger.info(f"尝试从数据库获取执行日期 {ds} 的执行计划")
  819. conn = get_pg_conn()
  820. cursor = conn.cursor()
  821. execution_plan = None
  822. try:
  823. # 查询条件a: 当前日期=表的exec_date,如果有多条记录,取insert_time最大的一条
  824. cursor.execute("""
  825. SELECT plan, run_id, insert_time
  826. FROM airflow_exec_plans
  827. WHERE dag_id = 'dag_dataops_pipeline_prepare_scheduler' AND exec_date = %s
  828. ORDER BY insert_time DESC
  829. LIMIT 1
  830. """, (ds,))
  831. result = cursor.fetchone()
  832. if result:
  833. # 获取计划、run_id和insert_time
  834. plan_json, run_id, insert_time = result
  835. logger.info(f"找到当前日期 exec_date={ds} 的执行计划记录,run_id: {run_id}, insert_time: {insert_time}")
  836. # 处理plan_json可能已经是dict的情况
  837. if isinstance(plan_json, dict):
  838. execution_plan = plan_json
  839. else:
  840. execution_plan = json.loads(plan_json)
  841. return execution_plan
  842. # 查询条件b: 找不到当前日期的记录,查找exec_date<当前ds的最新记录
  843. logger.info(f"未找到当前日期 exec_date={ds} 的执行计划记录,尝试查找历史记录")
  844. cursor.execute("""
  845. SELECT plan, run_id, insert_time, exec_date
  846. FROM airflow_exec_plans
  847. WHERE dag_id = 'dag_dataops_pipeline_prepare_scheduler' AND exec_date < %s
  848. ORDER BY exec_date DESC, insert_time DESC
  849. LIMIT 1
  850. """, (ds,))
  851. result = cursor.fetchone()
  852. if result:
  853. # 获取计划、run_id、insert_time和exec_date
  854. plan_json, run_id, insert_time, plan_ds = result
  855. logger.info(f"找到历史执行计划记录,exec_date: {plan_ds}, run_id: {run_id}, insert_time: {insert_time}")
  856. # 处理plan_json可能已经是dict的情况
  857. if isinstance(plan_json, dict):
  858. execution_plan = plan_json
  859. else:
  860. execution_plan = json.loads(plan_json)
  861. return execution_plan
  862. # 找不到任何执行计划记录
  863. logger.error(f"在数据库中未找到任何执行计划记录,当前DAG exec_date={ds}")
  864. return None
  865. except Exception as e:
  866. logger.error(f"从数据库获取执行计划时出错: {str(e)}")
  867. import traceback
  868. logger.error(traceback.format_exc())
  869. return None
  870. finally:
  871. cursor.close()
  872. conn.close()
  873. # 创建DAG
  874. with DAG(
  875. "dag_dataops_pipeline_data_scheduler",
  876. start_date=datetime(2024, 1, 1),
  877. schedule_interval="@daily",
  878. catchup=False,
  879. default_args={
  880. 'owner': 'airflow',
  881. 'depends_on_past': False,
  882. 'email_on_failure': False,
  883. 'email_on_retry': False,
  884. 'retries': 1,
  885. 'retry_delay': timedelta(minutes=5)
  886. },
  887. params={
  888. 'MANUAL_TRIGGER': False,
  889. },
  890. # 添加DAG级别参数,确保任务运行时有正确的环境
  891. # params={
  892. # "scripts_path": SCRIPTS_BASE_PATH,
  893. # "airflow_base_path": os.path.dirname(os.path.dirname(__file__))
  894. # }
  895. ) as dag:
  896. # 记录DAG实例化时的重要信息
  897. now = datetime.now()
  898. now_with_tz = now.replace(tzinfo=pytz.timezone('Asia/Shanghai'))
  899. default_exec_date = get_today_date()
  900. logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期: {default_exec_date}")
  901. #############################################
  902. # 阶段1: 准备阶段(Prepare Phase)
  903. #############################################
  904. with TaskGroup("prepare_phase") as prepare_group:
  905. # 任务开始标记
  906. start_preparation = EmptyOperator(
  907. task_id="start_preparation"
  908. )
  909. # 准备调度任务
  910. prepare_task = PythonOperator(
  911. task_id="prepare_dag_schedule",
  912. python_callable=prepare_dag_schedule,
  913. provide_context=True
  914. )
  915. # 验证执行计划有效性
  916. check_plan = ShortCircuitOperator(
  917. task_id="check_execution_plan",
  918. python_callable=check_execution_plan,
  919. provide_context=True
  920. )
  921. # 创建执行计划
  922. create_plan = PythonOperator(
  923. task_id="create_execution_plan",
  924. python_callable=create_execution_plan,
  925. provide_context=True
  926. )
  927. # 准备完成标记
  928. preparation_completed = EmptyOperator(
  929. task_id="preparation_completed"
  930. )
  931. # 设置任务依赖
  932. start_preparation >> prepare_task >> check_plan >> create_plan >> preparation_completed
  933. #############################################
  934. # 阶段2: 数据处理阶段(Data Processing Phase)
  935. #############################################
  936. with TaskGroup("data_processing_phase") as data_group:
  937. # 数据处理开始任务
  938. start_processing = EmptyOperator(
  939. task_id="start_processing"
  940. )
  941. # 数据处理完成标记
  942. processing_completed = EmptyOperator(
  943. task_id="processing_completed",
  944. trigger_rule="none_failed_min_one_success" # 只要有一个任务成功且没有失败的任务就标记为完成
  945. )
  946. # 设置依赖
  947. start_processing >> processing_completed
  948. # 设置三个阶段之间的依赖关系
  949. prepare_group >> data_group
  950. # 尝试从数据库获取执行计划
  951. try:
  952. # 获取当前DAG的执行日期
  953. exec_date = get_today_date() # 使用当天日期作为默认值
  954. logger.info(f"当前DAG执行日期 ds={exec_date},尝试从数据库获取执行计划")
  955. # 记录实际使用的执行日期的时区信息和原始格式
  956. if isinstance(exec_date, datetime):
  957. logger.info(f"【执行日期详情】类型: datetime, 时区: {exec_date.tzinfo}, 值: {exec_date}")
  958. else:
  959. logger.info(f"【执行日期详情】类型: {type(exec_date)}, 值: {exec_date}")
  960. # 从数据库获取执行计划
  961. execution_plan = get_execution_plan_from_db(exec_date)
  962. # 检查是否成功获取到执行计划
  963. if execution_plan is None:
  964. error_msg = f"无法从数据库获取有效的执行计划,当前DAG exec_date={exec_date}"
  965. logger.error(error_msg)
  966. # 使用全局变量而不是异常来强制DAG失败
  967. raise ValueError(error_msg)
  968. # 如果获取到了执行计划,处理它
  969. logger.info(f"成功从数据库获取执行计划")
  970. # 提取信息
  971. exec_date = execution_plan.get("exec_date", exec_date)
  972. resource_tasks = execution_plan.get("resource_tasks", [])
  973. model_tasks = execution_plan.get("model_tasks", [])
  974. dependencies = execution_plan.get("dependencies", {})
  975. logger.info(f"执行计划: exec_date={exec_date}, resource_tasks数量={len(resource_tasks)}, model_tasks数量={len(model_tasks)}")
  976. # 如果执行计划为空(没有任务),也应该失败
  977. if not resource_tasks and not model_tasks:
  978. error_msg = f"执行计划中没有任何任务,当前DAG exec_date={exec_date}"
  979. logger.error(error_msg)
  980. raise ValueError(error_msg)
  981. # 动态创建处理任务
  982. task_dict = {}
  983. # 1. 创建资源表任务
  984. for task_info in resource_tasks:
  985. table_name = task_info["target_table"]
  986. script_name = task_info["script_name"]
  987. exec_mode = task_info.get("script_exec_mode", "append")
  988. source_tables = task_info.get("source_tables", []) # 获取源表数组
  989. # 创建安全的任务ID
  990. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  991. # 构建op_kwargs参数
  992. op_kwargs = {
  993. "target_table": table_name,
  994. "script_name": script_name,
  995. "script_exec_mode": exec_mode,
  996. "exec_date": str(exec_date),
  997. "source_tables": source_tables # 添加源表数组
  998. }
  999. # 添加特殊参数(如果有)
  1000. if "target_type" in task_info and task_info["target_type"] == "structure":
  1001. op_kwargs["target_type"] = task_info["target_type"]
  1002. op_kwargs["storage_location"] = task_info.get("storage_location")
  1003. # 添加frequency参数(如果有)
  1004. if "frequency" in task_info:
  1005. op_kwargs["frequency"] = task_info["frequency"]
  1006. # 确保所有任务都是data_processing_phase的一部分
  1007. with data_group:
  1008. resource_task = PythonOperator(
  1009. task_id=f"resource_{safe_table_name}",
  1010. python_callable=process_resource,
  1011. op_kwargs=op_kwargs,
  1012. retries=TASK_RETRY_CONFIG["retries"],
  1013. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
  1014. )
  1015. # 将任务添加到字典
  1016. task_dict[table_name] = resource_task
  1017. # 设置与start_processing的依赖
  1018. start_processing >> resource_task
  1019. # 如果资源表有自己的源表依赖
  1020. if source_tables and isinstance(source_tables, list):
  1021. for source_table in source_tables:
  1022. if source_table and source_table in task_dict:
  1023. task_dict[source_table] >> resource_task
  1024. logger.info(f"设置资源表依赖: {source_table} >> {table_name}")
  1025. # 创建有向图,用于检测模型表之间的依赖关系
  1026. G = nx.DiGraph()
  1027. # 将所有模型表添加为节点
  1028. for task_info in model_tasks:
  1029. table_name = task_info["target_table"]
  1030. G.add_node(table_name)
  1031. # 添加模型表之间的依赖边
  1032. for source, deps in dependencies.items():
  1033. for dep in deps:
  1034. if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
  1035. G.add_edge(dep.get("table_name"), source) # 依赖方向:依赖项 -> 目标
  1036. # 检测循环依赖并处理
  1037. try:
  1038. cycles = list(nx.simple_cycles(G))
  1039. if cycles:
  1040. logger.warning(f"检测到循环依赖: {cycles}")
  1041. for cycle in cycles:
  1042. G.remove_edge(cycle[-1], cycle[0])
  1043. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  1044. except Exception as e:
  1045. logger.error(f"检测循环依赖时出错: {str(e)}")
  1046. # 生成拓扑排序,确定执行顺序
  1047. execution_order = []
  1048. try:
  1049. execution_order = list(nx.topological_sort(G))
  1050. except Exception as e:
  1051. logger.error(f"生成拓扑排序失败: {str(e)}")
  1052. execution_order = [task_info["target_table"] for task_info in model_tasks]
  1053. # 2. 按拓扑排序顺序创建模型表任务
  1054. for table_name in execution_order:
  1055. task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
  1056. if not task_info:
  1057. continue
  1058. script_name = task_info["script_name"]
  1059. exec_mode = task_info.get("script_exec_mode", "append")
  1060. source_tables = task_info.get("source_tables", []) # 获取源表数组
  1061. # 创建安全的任务ID
  1062. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  1063. # 确保所有任务都是data_processing_phase的一部分
  1064. with data_group:
  1065. model_task = PythonOperator(
  1066. task_id=f"model_{safe_table_name}",
  1067. python_callable=process_model,
  1068. op_kwargs={
  1069. "target_table": table_name,
  1070. "script_name": script_name,
  1071. "script_exec_mode": exec_mode,
  1072. # 确保使用字符串而不是可能是默认(非字符串)格式的执行日期
  1073. "exec_date": str(exec_date),
  1074. "source_tables": source_tables # 传递源表数组
  1075. },
  1076. retries=TASK_RETRY_CONFIG["retries"],
  1077. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
  1078. )
  1079. # 将任务添加到字典
  1080. task_dict[table_name] = model_task
  1081. # 设置依赖关系,基于source_tables和dependencies
  1082. has_dependency = False
  1083. # 先根据source_tables直接设置依赖
  1084. if isinstance(source_tables, list):
  1085. for source_table in source_tables:
  1086. if source_table and source_table in task_dict:
  1087. task_dict[source_table] >> model_task
  1088. has_dependency = True
  1089. logger.info(f"根据source_tables设置依赖: {source_table} >> {table_name}")
  1090. # 然后处理dependencies中的依赖
  1091. deps = dependencies.get(table_name, [])
  1092. for dep in deps:
  1093. dep_table = dep.get("table_name")
  1094. dep_type = dep.get("table_type")
  1095. if dep_table in task_dict:
  1096. # 避免重复设置依赖
  1097. if dep_table not in source_tables:
  1098. task_dict[dep_table] >> model_task
  1099. has_dependency = True
  1100. logger.info(f"根据dependencies设置依赖: {dep_table} >> {table_name}")
  1101. # 如果没有依赖,则依赖于start_processing和资源表任务
  1102. if not has_dependency:
  1103. # 从start_processing任务直接连接
  1104. start_processing >> model_task
  1105. # 同时从所有资源表任务连接
  1106. resource_count = 0
  1107. for resource_table in resource_tasks:
  1108. if resource_count >= 5: # 最多设置5个依赖
  1109. break
  1110. resource_name = resource_table["target_table"]
  1111. if resource_name in task_dict:
  1112. task_dict[resource_name] >> model_task
  1113. resource_count += 1
  1114. # 找出所有终端任务(没有下游依赖的任务)
  1115. terminal_tasks = []
  1116. # 检查所有模型表任务
  1117. for table_name in execution_order:
  1118. # 检查是否有下游任务
  1119. has_downstream = False
  1120. for source, deps in dependencies.items():
  1121. if source == table_name: # 跳过自身
  1122. continue
  1123. for dep in deps:
  1124. if dep.get("table_name") == table_name:
  1125. has_downstream = True
  1126. break
  1127. if has_downstream:
  1128. break
  1129. # 如果没有下游任务,添加到终端任务列表
  1130. if not has_downstream and table_name in task_dict:
  1131. terminal_tasks.append(table_name)
  1132. # 如果没有模型表任务,将所有资源表任务视为终端任务
  1133. if not model_tasks and resource_tasks:
  1134. terminal_tasks = [task["target_table"] for task in resource_tasks]
  1135. logger.info(f"没有模型表任务,将所有资源表任务视为终端任务: {terminal_tasks}")
  1136. # 如果既没有模型表任务也没有资源表任务,已有默认依赖链
  1137. if not terminal_tasks:
  1138. logger.warning("未找到任何任务,使用默认依赖链")
  1139. else:
  1140. # 将所有终端任务连接到完成标记
  1141. for table_name in terminal_tasks:
  1142. if table_name in task_dict:
  1143. task_dict[table_name] >> processing_completed
  1144. logger.info(f"设置终端任务: {table_name} >> processing_completed")
  1145. except Exception as e:
  1146. logger.error(f"加载执行计划时出错: {str(e)}")
  1147. import traceback
  1148. logger.error(traceback.format_exc())
  1149. logger.info(f"DAG dag_dataops_pipeline_data_scheduler 定义完成")