dag_dataops_pipeline_prepare_scheduler.py 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885
  1. # dag_dataops_pipeline_prepare_scheduler.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator, ShortCircuitOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from datetime import datetime, timedelta
  6. import logging
  7. import networkx as nx
  8. import json
  9. import os
  10. import re
  11. import glob
  12. from pathlib import Path
  13. import hashlib
  14. import pendulum
  15. from common import (
  16. get_pg_conn,
  17. get_neo4j_driver,
  18. get_today_date
  19. )
  20. from config import PG_CONFIG, NEO4J_CONFIG
  21. # 创建日志记录器
  22. logger = logging.getLogger(__name__)
  23. def get_enabled_tables():
  24. """获取所有启用的表"""
  25. conn = get_pg_conn()
  26. cursor = conn.cursor()
  27. try:
  28. cursor.execute("""
  29. SELECT owner_id, table_name
  30. FROM schedule_status
  31. WHERE schedule_is_enabled = TRUE
  32. """)
  33. result = cursor.fetchall()
  34. return [row[1] for row in result] # 只返回表名
  35. except Exception as e:
  36. logger.error(f"获取启用表失败: {str(e)}")
  37. return []
  38. finally:
  39. cursor.close()
  40. conn.close()
  41. def check_table_directly_subscribed(table_name):
  42. """检查表是否在schedule_status表中直接调度"""
  43. conn = get_pg_conn()
  44. cursor = conn.cursor()
  45. try:
  46. cursor.execute("""
  47. SELECT schedule_is_enabled
  48. FROM schedule_status
  49. WHERE table_name = %s
  50. """, (table_name,))
  51. result = cursor.fetchone()
  52. return result and result[0] is True
  53. except Exception as e:
  54. logger.error(f"检查表订阅状态失败: {str(e)}")
  55. return False
  56. finally:
  57. cursor.close()
  58. conn.close()
  59. def should_execute_today(table_name, frequency, exec_date):
  60. """
  61. 判断指定频率的表在给定执行日期是否应该执行
  62. 参数:
  63. table_name (str): 表名,用于日志记录
  64. frequency (str): 调度频率,如'daily'、'weekly'、'monthly'、'yearly',为None时默认为'daily'
  65. exec_date (str): 执行日期,格式为'YYYY-MM-DD'
  66. 返回:
  67. bool: 如果该表应该在执行日期执行,则返回True,否则返回False
  68. """
  69. # 将执行日期字符串转换为pendulum日期对象
  70. try:
  71. exec_date_obj = pendulum.parse(exec_date)
  72. except Exception as e:
  73. logger.error(f"解析执行日期 {exec_date} 出错: {str(e)},使用当前日期")
  74. exec_date_obj = pendulum.today()
  75. # 计算下一个日期,用于判断是否是月初、周初等
  76. next_date = exec_date_obj.add(days=1)
  77. # 如果频率为None或空字符串,默认为daily
  78. if not frequency:
  79. logger.info(f"表 {table_name} 未指定调度频率,默认为daily")
  80. return True
  81. frequency = frequency.lower() if isinstance(frequency, str) else 'daily'
  82. if frequency == 'daily':
  83. # 日任务每天都执行
  84. return True
  85. elif frequency == 'weekly':
  86. # 周任务只在周日执行(因为exec_date+1是周一时才执行)
  87. is_sunday = next_date.day_of_week == 1 # 1表示周一
  88. logger.info(f"表 {table_name} 是weekly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否周日: {is_sunday}")
  89. return is_sunday
  90. elif frequency == 'monthly':
  91. # 月任务只在每月最后一天执行(因为exec_date+1是月初时才执行)
  92. is_month_end = next_date.day == 1
  93. logger.info(f"表 {table_name} 是monthly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否月末: {is_month_end}")
  94. return is_month_end
  95. elif frequency == 'quarterly':
  96. # 季度任务只在每季度最后一天执行(因为exec_date+1是季度初时才执行)
  97. is_quarter_end = next_date.day == 1 and next_date.month in [1, 4, 7, 10]
  98. logger.info(f"表 {table_name} 是quarterly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否季末: {is_quarter_end}")
  99. return is_quarter_end
  100. elif frequency == 'yearly':
  101. # 年任务只在每年最后一天执行(因为exec_date+1是年初时才执行)
  102. is_year_end = next_date.day == 1 and next_date.month == 1
  103. logger.info(f"表 {table_name} 是yearly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否年末: {is_year_end}")
  104. return is_year_end
  105. else:
  106. # 未知频率,默认执行
  107. logger.warning(f"表 {table_name} 使用未知的调度频率: {frequency},默认执行")
  108. return True
  109. def get_table_info_from_neo4j(table_name):
  110. """从Neo4j获取表的详细信息"""
  111. driver = get_neo4j_driver()
  112. # 检查表是否直接订阅
  113. is_directly_schedule = check_table_directly_subscribed(table_name)
  114. table_info = {
  115. 'target_table': table_name,
  116. 'is_directly_schedule': is_directly_schedule, # 初始值设为True,从schedule_status表获取
  117. }
  118. try:
  119. with driver.session() as session:
  120. # 查询表标签和状态
  121. query_table = """
  122. MATCH (t {en_name: $table_name})
  123. RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency,
  124. t.type AS type, t.storage_location AS storage_location
  125. """
  126. result = session.run(query_table, table_name=table_name)
  127. record = result.single()
  128. if record:
  129. labels = record.get("labels", [])
  130. table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
  131. table_info['target_table_status'] = record.get("status", True) # 默认为True
  132. # table_info['default_update_frequency'] = record.get("frequency")
  133. table_info['frequency'] = record.get("frequency")
  134. table_info['target_type'] = record.get("type") # 获取type属性
  135. table_info['storage_location'] = record.get("storage_location") # 获取storage_location属性
  136. # 根据标签类型查询关系和脚本信息
  137. if "DataResource" in labels:
  138. # 检查是否为structure类型
  139. if table_info.get('target_type') == "structure":
  140. # 对于structure类型,设置默认值,不查询关系
  141. table_info['source_tables'] = [] # 使用空数组表示无源表
  142. table_info['script_name'] = "load_file.py"
  143. table_info['script_type'] = "python"
  144. # csv类型的DataResource没有上游,使用默认的append模式
  145. table_info['script_exec_mode'] = "append"
  146. logger.info(f"表 {table_name} 为structure类型,使用默认执行模式: append")
  147. return table_info
  148. else:
  149. query_rel = """
  150. MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
  151. WITH source, rel,
  152. CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
  153. CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
  154. RETURN source.en_name AS source_table, script_name AS script_name,
  155. script_type AS script_type, 'append' AS script_exec_mode
  156. """
  157. elif "DataModel" in labels:
  158. query_rel = """
  159. MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
  160. WITH source, rel,
  161. CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
  162. CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
  163. RETURN source.en_name AS source_table, script_name AS script_name,
  164. script_type AS script_type, 'append' AS script_exec_mode
  165. """
  166. else:
  167. logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
  168. return table_info
  169. # 收集所有关系记录
  170. result = session.run(query_rel, table_name=table_name)
  171. # 检查result对象是否有collect方法,否则使用data方法或list直接转换
  172. try:
  173. if hasattr(result, 'collect'):
  174. records = result.collect() # 使用collect()获取所有记录
  175. else:
  176. # 尝试使用其他方法获取记录
  177. logger.info(f"表 {table_name} 的查询结果不支持collect方法,尝试使用其他方法")
  178. try:
  179. records = list(result) # 直接转换为列表
  180. except Exception as e1:
  181. logger.warning(f"尝试列表转换失败: {str(e1)},尝试使用data方法")
  182. try:
  183. records = result.data() # 使用data()方法
  184. except Exception as e2:
  185. logger.warning(f"所有方法都失败,使用空列表: {str(e2)}")
  186. records = []
  187. except Exception as e:
  188. logger.warning(f"获取查询结果时出错: {str(e)},使用空列表")
  189. records = []
  190. # 记录查询到的原始记录
  191. logger.info(f"表 {table_name} 查询到 {len(records)} 条关系记录")
  192. for idx, rec in enumerate(records):
  193. logger.info(f"关系记录[{idx}]: source_table={rec.get('source_table')}, script_name={rec.get('script_name')}, "
  194. f"script_type={rec.get('script_type')}, script_exec_mode={rec.get('script_exec_mode')}")
  195. if records:
  196. # 按脚本名称分组源表
  197. scripts_info = {}
  198. for record in records:
  199. script_name = record.get("script_name")
  200. source_table = record.get("source_table")
  201. script_type = record.get("script_type", "python")
  202. script_exec_mode = record.get("script_exec_mode", "append")
  203. logger.info(f"处理记录: source_table={source_table}, script_name={script_name}")
  204. # 如果script_name为空,生成默认的脚本名
  205. if not script_name:
  206. script_name = f"{table_name}_process.py"
  207. logger.warning(f"表 {table_name} 的关系中没有script_name属性,使用默认值: {script_name}")
  208. if script_name not in scripts_info:
  209. scripts_info[script_name] = {
  210. "sources": [],
  211. "script_type": script_type,
  212. "script_exec_mode": script_exec_mode
  213. }
  214. # 确保source_table有值且不为None才添加到sources列表中
  215. if source_table and source_table not in scripts_info[script_name]["sources"]:
  216. scripts_info[script_name]["sources"].append(source_table)
  217. logger.debug(f"为表 {table_name} 的脚本 {script_name} 添加源表: {source_table}")
  218. # 处理分组信息
  219. if scripts_info:
  220. # 存储完整的脚本信息
  221. table_info['scripts_info'] = scripts_info
  222. # 如果只有一个脚本,直接使用它
  223. if len(scripts_info) == 1:
  224. script_name = list(scripts_info.keys())[0]
  225. script_info = scripts_info[script_name]
  226. table_info['source_tables'] = script_info["sources"] # 使用数组
  227. table_info['script_name'] = script_name
  228. table_info['script_type'] = script_info["script_type"]
  229. table_info['script_exec_mode'] = script_info["script_exec_mode"]
  230. logger.info(f"表 {table_name} 有单个脚本 {script_name},源表: {script_info['sources']}")
  231. else:
  232. # 如果有多个不同脚本,记录多脚本信息
  233. logger.info(f"表 {table_name} 有多个不同脚本: {list(scripts_info.keys())}")
  234. # 暂时使用第一个脚本的信息作为默认值
  235. first_script = list(scripts_info.keys())[0]
  236. table_info['source_tables'] = scripts_info[first_script]["sources"]
  237. table_info['script_name'] = first_script
  238. table_info['script_type'] = scripts_info[first_script]["script_type"]
  239. table_info['script_exec_mode'] = scripts_info[first_script]["script_exec_mode"]
  240. else:
  241. logger.warning(f"表 {table_name} 未找到有效的脚本信息")
  242. table_info['source_tables'] = [] # 使用空数组
  243. else:
  244. logger.warning(f"未找到表 {table_name} 的关系信息")
  245. table_info['source_tables'] = [] # 使用空数组
  246. else:
  247. logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
  248. except Exception as e:
  249. logger.error(f"获取表 {table_name} 的信息时出错: {str(e)}")
  250. finally:
  251. driver.close()
  252. return table_info
  253. def process_dependencies(tables_info):
  254. """处理表间依赖关系,添加被动调度的表"""
  255. # 存储所有表信息的字典
  256. all_tables = {t['target_table']: t for t in tables_info}
  257. driver = get_neo4j_driver()
  258. try:
  259. with driver.session() as session:
  260. for table_name, table_info in list(all_tables.items()):
  261. if table_info.get('target_table_label') == 'DataModel':
  262. # 查询其依赖表
  263. query = """
  264. MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
  265. RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels,
  266. dep.status AS dep_status, dep.frequency AS dep_frequency
  267. """
  268. result = session.run(query, table_name=table_name)
  269. for record in result:
  270. dep_name = record.get("dep_name")
  271. dep_labels = record.get("dep_labels", [])
  272. dep_status = record.get("dep_status", True)
  273. dep_frequency = record.get("dep_frequency")
  274. # 处理未被直接调度的依赖表
  275. if dep_name and dep_name not in all_tables:
  276. logger.info(f"发现被动依赖表: {dep_name}, 标签: {dep_labels}")
  277. # 获取依赖表详细信息
  278. dep_info = get_table_info_from_neo4j(dep_name)
  279. dep_info['is_directly_schedule'] = False
  280. # 处理调度频率继承
  281. if not dep_info.get('frequency'):
  282. dep_info['frequency'] = table_info.get('frequency')
  283. all_tables[dep_name] = dep_info
  284. except Exception as e:
  285. logger.error(f"处理依赖关系时出错: {str(e)}")
  286. finally:
  287. driver.close()
  288. return list(all_tables.values())
  289. def filter_invalid_tables(tables_info):
  290. """过滤无效表及其依赖,使用NetworkX构建依赖图"""
  291. # 构建表名到索引的映射
  292. table_dict = {t['target_table']: i for i, t in enumerate(tables_info)}
  293. # 找出无效表
  294. invalid_tables = set()
  295. for table in tables_info:
  296. if table.get('target_table_status') is False:
  297. invalid_tables.add(table['target_table'])
  298. logger.info(f"表 {table['target_table']} 的状态为无效")
  299. # 构建依赖图
  300. G = nx.DiGraph()
  301. # 添加所有节点
  302. for table in tables_info:
  303. G.add_node(table['target_table'])
  304. # 查询并添加依赖边
  305. driver = get_neo4j_driver()
  306. try:
  307. with driver.session() as session:
  308. for table in tables_info:
  309. if table.get('target_table_label') == 'DataModel':
  310. query = """
  311. MATCH (source {en_name: $table_name})-[:DERIVED_FROM]->(target)
  312. RETURN target.en_name AS target_name
  313. """
  314. result = session.run(query, table_name=table['target_table'])
  315. for record in result:
  316. target_name = record.get("target_name")
  317. if target_name and target_name in table_dict:
  318. # 添加从目标到源的边,表示目标依赖于源
  319. G.add_edge(table['target_table'], target_name)
  320. logger.debug(f"添加依赖边: {table['target_table']} -> {target_name}")
  321. except Exception as e:
  322. logger.error(f"构建依赖图时出错: {str(e)}")
  323. finally:
  324. driver.close()
  325. # 找出依赖于无效表的所有表
  326. downstream_invalid = set()
  327. for invalid_table in invalid_tables:
  328. # 获取可从无效表到达的所有节点
  329. try:
  330. descendants = nx.descendants(G, invalid_table)
  331. downstream_invalid.update(descendants)
  332. logger.info(f"表 {invalid_table} 的下游无效表: {descendants}")
  333. except Exception as e:
  334. logger.error(f"处理表 {invalid_table} 的下游依赖时出错: {str(e)}")
  335. # 合并所有无效表
  336. all_invalid = invalid_tables.union(downstream_invalid)
  337. logger.info(f"总共 {len(all_invalid)} 个表被标记为无效: {all_invalid}")
  338. # 过滤出有效表
  339. valid_tables = [t for t in tables_info if t['target_table'] not in all_invalid]
  340. logger.info(f"过滤后保留 {len(valid_tables)} 个有效表")
  341. return valid_tables
  342. def touch_data_scheduler_file():
  343. """
  344. 更新数据调度器DAG文件的修改时间,触发重新解析
  345. 返回:
  346. bool: 是否成功更新
  347. """
  348. data_scheduler_path = os.path.join(os.path.dirname(__file__), 'dag_dataops_pipeline_data_scheduler.py')
  349. success = False
  350. try:
  351. if os.path.exists(data_scheduler_path):
  352. # 更新文件修改时间,触发Airflow重新解析
  353. os.utime(data_scheduler_path, None)
  354. logger.info(f"已触发数据调度器DAG重新解析: {data_scheduler_path}")
  355. success = True
  356. else:
  357. logger.warning(f"数据调度器DAG文件不存在: {data_scheduler_path}")
  358. return success
  359. except Exception as e:
  360. logger.error(f"触发DAG重新解析时出错: {str(e)}")
  361. return False
  362. def get_subscription_state_hash():
  363. """获取订阅表状态的哈希值"""
  364. conn = get_pg_conn()
  365. cursor = conn.cursor()
  366. try:
  367. cursor.execute("""
  368. SELECT table_name, schedule_is_enabled
  369. FROM schedule_status
  370. ORDER BY table_name
  371. """)
  372. rows = cursor.fetchall()
  373. # 将所有行拼接成一个字符串,然后计算哈希值
  374. data_str = '|'.join(f"{row[0]}:{row[1]}" for row in rows)
  375. return hashlib.md5(data_str.encode()).hexdigest()
  376. except Exception as e:
  377. logger.error(f"计算订阅表状态哈希值时出错: {str(e)}")
  378. return None
  379. finally:
  380. cursor.close()
  381. conn.close()
  382. def check_execution_plan_in_db(**kwargs):
  383. """
  384. 检查当天的执行计划是否存在于数据库中
  385. 返回False将阻止所有下游任务执行
  386. """
  387. # 获取执行日期
  388. dag_run = kwargs.get('dag_run')
  389. logical_date = dag_run.logical_date
  390. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  391. exec_date = local_logical_date.strftime('%Y-%m-%d')
  392. logger.info(f"logical_date: {logical_date} ")
  393. logger.info(f"local_logical_date {local_logical_date} ")
  394. logger.info(f"检查执行日期 exec_date {exec_date} 的执行计划是否存在于数据库中")
  395. # 检查数据库中是否存在执行计划
  396. conn = get_pg_conn()
  397. cursor = conn.cursor()
  398. try:
  399. cursor.execute("""
  400. SELECT plan
  401. FROM airflow_exec_plans
  402. WHERE exec_date = %s
  403. ORDER BY logical_date DESC
  404. LIMIT 1
  405. """, (exec_date,))
  406. result = cursor.fetchone()
  407. if not result:
  408. logger.error(f"数据库中不存在执行日期 {exec_date} 的执行计划")
  409. return False
  410. # 检查执行计划内容是否有效
  411. try:
  412. # PostgreSQL的jsonb类型会被psycopg2自动转换为Python字典,无需再使用json.loads
  413. plan_data = result[0]
  414. # 检查必要字段
  415. if "exec_date" not in plan_data:
  416. logger.error("执行计划缺少exec_date字段")
  417. return False
  418. if not isinstance(plan_data.get("resource_tasks", []), list):
  419. logger.error("执行计划的resource_tasks字段无效")
  420. return False
  421. if not isinstance(plan_data.get("model_tasks", []), list):
  422. logger.error("执行计划的model_tasks字段无效")
  423. return False
  424. # 检查是否有任务数据
  425. resource_tasks = plan_data.get("resource_tasks", [])
  426. model_tasks = plan_data.get("model_tasks", [])
  427. logger.info(f"执行计划验证成功: 包含 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
  428. return True
  429. except Exception as je:
  430. logger.error(f"处理执行计划数据时出错: {str(je)}")
  431. return False
  432. except Exception as e:
  433. logger.error(f"检查数据库中执行计划时出错: {str(e)}")
  434. return False
  435. finally:
  436. cursor.close()
  437. conn.close()
  438. def save_execution_plan_to_db(execution_plan, dag_id, run_id, logical_date, ds):
  439. """
  440. 将执行计划保存到airflow_exec_plans表
  441. 参数:
  442. execution_plan (dict): 执行计划字典
  443. dag_id (str): DAG的ID
  444. run_id (str): DAG运行的ID
  445. logical_date (datetime): 逻辑日期
  446. ds (str): 日期字符串,格式为YYYY-MM-DD
  447. 返回:
  448. bool: 操作是否成功
  449. """
  450. conn = get_pg_conn()
  451. cursor = conn.cursor()
  452. try:
  453. # 将执行计划转换为JSON字符串
  454. plan_json = json.dumps(execution_plan)
  455. # 获取本地时间
  456. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  457. # 插入记录
  458. cursor.execute("""
  459. INSERT INTO airflow_exec_plans
  460. (dag_id, run_id, logical_date, local_logical_date, exec_date, plan)
  461. VALUES (%s, %s, %s, %s, %s, %s)
  462. """, (dag_id, run_id, logical_date, local_logical_date, ds, plan_json))
  463. conn.commit()
  464. logger.info(f"成功将执行计划保存到airflow_exec_plans表,dag_id={dag_id}, run_id={run_id}, exec_date={ds}")
  465. return True
  466. except Exception as e:
  467. logger.error(f"保存执行计划到数据库时出错: {str(e)}")
  468. conn.rollback()
  469. return False
  470. finally:
  471. cursor.close()
  472. conn.close()
  473. def prepare_pipeline_dag_schedule(**kwargs):
  474. """准备Pipeline DAG调度任务的主函数"""
  475. # 检查是否是手动触发模式
  476. is_manual_trigger = False
  477. params = kwargs.get('params', {})
  478. if params and 'MANUAL_TRIGGER' in params:
  479. is_manual_trigger = params.get('MANUAL_TRIGGER', False)
  480. if is_manual_trigger:
  481. logger.info(f"接收到手动触发参数: MANUAL_TRIGGER={is_manual_trigger}")
  482. # 获取执行日期
  483. dag_run = kwargs.get('dag_run')
  484. logical_date = dag_run.logical_date
  485. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  486. exec_date = local_logical_date.strftime('%Y-%m-%d')
  487. logger.info(f"开始准备执行日期 {exec_date} 的Pipeline调度任务")
  488. # 检查是否需要创建新的执行计划
  489. need_create_plan = False
  490. # 条件1: 数据库中不存在当天的执行计划
  491. has_plan_in_db = check_execution_plan_in_db(**kwargs)
  492. if not has_plan_in_db:
  493. logger.info(f"数据库中不存在执行日期exec_date {exec_date} 的执行计划,需要创建新的执行计划")
  494. need_create_plan = True
  495. # 条件2: schedule_status表中的数据发生了变更
  496. if not need_create_plan:
  497. # 计算当前哈希值
  498. current_hash = get_subscription_state_hash()
  499. # 读取上次记录的哈希值
  500. hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
  501. last_hash = None
  502. if os.path.exists(hash_file):
  503. try:
  504. with open(hash_file, 'r') as f:
  505. last_hash = f.read().strip()
  506. except Exception as e:
  507. logger.warning(f"读取上次订阅状态哈希值失败: {str(e)}")
  508. # 如果哈希值不同,表示数据发生了变更
  509. if current_hash != last_hash:
  510. logger.info(f"检测到schedule_status表数据变更。旧哈希值: {last_hash}, 新哈希值: {current_hash}")
  511. need_create_plan = True
  512. # 手动触发模式覆盖以上判断
  513. if is_manual_trigger:
  514. logger.info("手动触发模式,将创建新的执行计划")
  515. need_create_plan = True
  516. # 如果不需要创建新的执行计划,直接返回
  517. if not need_create_plan:
  518. logger.info("无需创建新的执行计划")
  519. return 0
  520. # 继续处理,创建新的执行计划
  521. # 1. 获取启用的表
  522. enabled_tables = get_enabled_tables()
  523. logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
  524. if not enabled_tables:
  525. logger.warning("没有找到启用的表,准备工作结束")
  526. return 0
  527. # 2. 获取表的详细信息
  528. tables_info = []
  529. for table_name in enabled_tables:
  530. table_info = get_table_info_from_neo4j(table_name)
  531. if table_info:
  532. tables_info.append(table_info)
  533. logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
  534. # 2.1 根据调度频率过滤表(新增的步骤)
  535. filtered_tables_info = []
  536. for table_info in tables_info:
  537. table_name = table_info['target_table']
  538. frequency = table_info.get('frequency')
  539. if should_execute_today(table_name, frequency, exec_date):
  540. filtered_tables_info.append(table_info)
  541. logger.info(f"表 {table_name} (频率: {frequency}) 将在今天{exec_date}执行")
  542. else:
  543. logger.info(f"表 {table_name} (频率: {frequency}) 今天{exec_date}不执行,已过滤")
  544. logger.info(f"按调度频率过滤后,今天{exec_date}需要执行的表有 {len(filtered_tables_info)} 个")
  545. # 3. 处理依赖关系,添加被动调度的表
  546. enriched_tables = process_dependencies(filtered_tables_info)
  547. logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
  548. # 4. 过滤无效表及其依赖
  549. valid_tables = filter_invalid_tables(enriched_tables)
  550. logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
  551. # 构建执行计划并保存到数据库
  552. try:
  553. # 构建执行计划
  554. resource_tasks = []
  555. model_tasks = []
  556. # 遍历所有有效表,创建任务信息
  557. for table in valid_tables:
  558. # 确保每个表对象都有source_tables字段且是一个列表
  559. if 'source_tables' not in table or not isinstance(table.get('source_tables'), list):
  560. logger.warning(f"表 {table['target_table']} 没有source_tables或不是列表,初始化为空列表")
  561. table['source_tables'] = []
  562. # 处理资源表任务
  563. if table.get('target_table_label') == 'DataResource':
  564. task_info = {
  565. "source_tables": table.get('source_tables', []), # 使用数组存储源表
  566. "target_table": table['target_table'],
  567. "target_table_label": "DataResource",
  568. "script_name": table.get('script_name'),
  569. "script_exec_mode": table.get('script_exec_mode', 'append'),
  570. "frequency": table.get('frequency')
  571. }
  572. # 为structure类型添加特殊属性
  573. if table.get('target_type') == "structure":
  574. task_info["target_type"] = "structure"
  575. task_info["storage_location"] = table.get('storage_location')
  576. resource_tasks.append(task_info)
  577. # 处理模型表任务
  578. elif table.get('target_table_label') == 'DataModel':
  579. # 检查是否有多个脚本信息
  580. if 'scripts_info' in table and len(table['scripts_info']) > 1:
  581. # 处理多脚本情况,为每个脚本创建单独的任务
  582. logger.info(f"表 {table['target_table']} 有多个脚本,单独处理每个脚本")
  583. for script_name, script_info in table['scripts_info'].items():
  584. model_tasks.append({
  585. "source_tables": script_info.get("sources", []), # 使用数组存储源表
  586. "target_table": table['target_table'],
  587. "target_table_label": "DataModel",
  588. "script_name": script_name,
  589. "script_exec_mode": script_info.get("script_exec_mode", 'append'),
  590. "script_type": script_info.get("script_type", 'python'),
  591. "frequency": table.get('frequency')
  592. })
  593. else:
  594. # 处理单脚本情况
  595. model_tasks.append({
  596. "source_tables": table.get('source_tables', []), # 使用数组存储源表
  597. "target_table": table['target_table'],
  598. "target_table_label": "DataModel",
  599. "script_name": table.get('script_name'),
  600. "script_exec_mode": table.get('script_exec_mode', 'append'),
  601. "frequency": table.get('frequency')
  602. })
  603. # 获取和处理依赖关系
  604. dependencies = {}
  605. model_table_names = [t['target_table'] for t in model_tasks]
  606. # 初始化依赖关系字典
  607. for table_name in model_table_names:
  608. dependencies[table_name] = []
  609. # 查询Neo4j获取依赖关系
  610. driver = get_neo4j_driver()
  611. try:
  612. with driver.session() as session:
  613. # 为每个模型表查询依赖
  614. for table_name in model_table_names:
  615. query = """
  616. MATCH (source:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(target)
  617. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  618. """
  619. try:
  620. # 执行查询
  621. result = session.run(query, table_name=table_name)
  622. # 尝试获取记录
  623. records = []
  624. try:
  625. if hasattr(result, 'collect'):
  626. records = result.collect()
  627. else:
  628. records = list(result)
  629. except Exception as e:
  630. logger.warning(f"获取表 {table_name} 的依赖关系记录失败: {str(e)}")
  631. records = []
  632. # 源表列表,用于后续更新model_tasks
  633. source_tables_list = []
  634. # 处理依赖关系记录
  635. for record in records:
  636. target = record.get("target")
  637. target_labels = record.get("target_labels", [])
  638. if target:
  639. # 确定依赖表类型
  640. table_type = next((label for label in target_labels
  641. if label in ["DataModel", "DataResource"]), None)
  642. # 添加依赖关系
  643. dependencies[table_name].append({
  644. "table_name": target,
  645. "table_type": table_type
  646. })
  647. # 记录源表
  648. source_tables_list.append(target)
  649. logger.info(f"添加其他依赖: {table_name} -> {target}")
  650. # 更新model_tasks中的source_tables
  651. for mt in model_tasks:
  652. if mt['target_table'] == table_name:
  653. # 确保source_tables是数组
  654. if not isinstance(mt.get('source_tables'), list):
  655. mt['source_tables'] = []
  656. # 添加依赖的源表
  657. for source_table in source_tables_list:
  658. if source_table and source_table not in mt['source_tables']:
  659. mt['source_tables'].append(source_table)
  660. logger.info(f"从依赖关系中添加源表 {source_table} 到 {table_name}")
  661. except Exception as e:
  662. logger.error(f"处理表 {table_name} 的依赖关系时出错: {str(e)}")
  663. except Exception as e:
  664. logger.error(f"查询Neo4j依赖关系时出错: {str(e)}")
  665. finally:
  666. driver.close()
  667. # 创建最终执行计划
  668. execution_plan = {
  669. "exec_date": exec_date,
  670. "resource_tasks": resource_tasks,
  671. "model_tasks": model_tasks,
  672. "dependencies": dependencies
  673. }
  674. # 更新订阅表状态哈希值
  675. current_hash = get_subscription_state_hash()
  676. hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
  677. with open(hash_file, 'w') as f:
  678. f.write(current_hash)
  679. logger.info(f"已更新订阅表状态哈希值: {current_hash}")
  680. # 触发数据调度器DAG重新解析
  681. touch_data_scheduler_file()
  682. # 保存执行计划到数据库表
  683. try:
  684. # 获取DAG运行信息
  685. dag_run = kwargs.get('dag_run')
  686. if dag_run:
  687. dag_id = dag_run.dag_id
  688. run_id = dag_run.run_id
  689. logical_date = dag_run.logical_date
  690. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  691. else:
  692. # 如果无法获取dag_run,使用默认值
  693. dag_id = kwargs.get('dag').dag_id if 'dag' in kwargs else "dag_dataops_pipeline_prepare_scheduler"
  694. run_id = f"manual_{datetime.now().strftime('%Y%m%d%H%M%S')}"
  695. logical_date = datetime.now()
  696. # 保存到数据库
  697. save_result = save_execution_plan_to_db(
  698. execution_plan=execution_plan,
  699. dag_id=dag_id,
  700. run_id=run_id,
  701. logical_date=local_logical_date,
  702. ds=exec_date
  703. )
  704. if save_result:
  705. logger.info("执行计划已成功保存到数据库")
  706. else:
  707. raise Exception("执行计划保存到数据库失败")
  708. except Exception as db_e:
  709. # 捕获数据库保存错误
  710. error_msg = f"保存执行计划到数据库时出错: {str(db_e)}"
  711. logger.error(error_msg)
  712. raise Exception(error_msg)
  713. except Exception as e:
  714. error_msg = f"创建或保存执行计划时出错: {str(e)}"
  715. logger.error(error_msg)
  716. # 强制抛出异常,确保任务失败,阻止下游DAG执行
  717. raise Exception(error_msg)
  718. return len(valid_tables) # 返回有效表数量
  719. # 创建DAG
  720. with DAG(
  721. "dag_dataops_pipeline_prepare_scheduler",
  722. start_date=datetime(2024, 1, 1),
  723. # 每小时执行一次
  724. schedule_interval="0 * * * *",
  725. catchup=False,
  726. default_args={
  727. 'owner': 'airflow',
  728. 'depends_on_past': False,
  729. 'email_on_failure': False,
  730. 'email_on_retry': False,
  731. 'retries': 1,
  732. 'retry_delay': timedelta(minutes=5)
  733. },
  734. params={
  735. 'MANUAL_TRIGGER': False,
  736. },
  737. ) as dag:
  738. # 任务开始标记
  739. start_preparation = EmptyOperator(
  740. task_id="start_preparation",
  741. dag=dag
  742. )
  743. # 准备调度任务
  744. prepare_task = PythonOperator(
  745. task_id="prepare_pipeline_dag_schedule",
  746. python_callable=prepare_pipeline_dag_schedule,
  747. provide_context=True,
  748. dag=dag
  749. )
  750. # 检查执行计划是否存在于数据库中
  751. check_plan_in_db = ShortCircuitOperator(
  752. task_id="check_execution_plan_in_db",
  753. python_callable=check_execution_plan_in_db,
  754. provide_context=True,
  755. dag=dag
  756. )
  757. # 准备完成标记
  758. preparation_completed = EmptyOperator(
  759. task_id="preparation_completed",
  760. dag=dag
  761. )
  762. # 设置任务依赖
  763. start_preparation >> prepare_task >> check_plan_in_db >> preparation_completed