dag_dataops_unified_scheduler.py 69 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654
  1. # dag_dataops_unified_scheduler.py
  2. # 合并了prepare, data和summary三个DAG的功能
  3. from airflow import DAG
  4. from airflow.operators.python import PythonOperator
  5. from airflow.operators.empty import EmptyOperator
  6. from airflow.utils.task_group import TaskGroup
  7. from datetime import datetime, timedelta, date
  8. import logging
  9. import networkx as nx
  10. import json
  11. import os
  12. from decimal import Decimal
  13. from common import (
  14. get_pg_conn,
  15. get_neo4j_driver,
  16. execute_with_monitoring,
  17. get_today_date
  18. )
  19. from config import TASK_RETRY_CONFIG, PG_CONFIG, NEO4J_CONFIG, AIRFLOW_BASE_PATH, SCRIPTS_BASE_PATH
  20. # 创建日志记录器
  21. logger = logging.getLogger(__name__)
  22. # 添加日期序列化器
  23. def json_serial(obj):
  24. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  25. if isinstance(obj, (datetime, date)):
  26. return obj.isoformat()
  27. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  28. # 添加自定义JSON编码器解决Decimal序列化问题
  29. class DecimalEncoder(json.JSONEncoder):
  30. def default(self, obj):
  31. if isinstance(obj, Decimal):
  32. return float(obj)
  33. # 处理日期类型
  34. elif isinstance(obj, (datetime, date)):
  35. return obj.isoformat()
  36. # 让父类处理其他类型
  37. return super(DecimalEncoder, self).default(obj)
  38. #############################################
  39. # 第一阶段: 准备阶段(Prepare Phase)的函数
  40. #############################################
  41. def get_enabled_tables():
  42. """获取所有启用的表"""
  43. conn = get_pg_conn()
  44. cursor = conn.cursor()
  45. try:
  46. cursor.execute("""
  47. SELECT owner_id, table_name
  48. FROM schedule_status
  49. WHERE schedule_is_enabled = TRUE
  50. """)
  51. result = cursor.fetchall()
  52. return [row[1] for row in result] # 只返回表名
  53. except Exception as e:
  54. logger.error(f"获取启用表失败: {str(e)}")
  55. return []
  56. finally:
  57. cursor.close()
  58. conn.close()
  59. def check_table_directly_subscribed(table_name):
  60. """检查表是否在schedule_status表中直接订阅"""
  61. conn = get_pg_conn()
  62. cursor = conn.cursor()
  63. try:
  64. cursor.execute("""
  65. SELECT schedule_is_enabled
  66. FROM schedule_status
  67. WHERE table_name = %s
  68. """, (table_name,))
  69. result = cursor.fetchone()
  70. return result and result[0] is True
  71. except Exception as e:
  72. logger.error(f"检查表订阅状态失败: {str(e)}")
  73. return False
  74. finally:
  75. cursor.close()
  76. conn.close()
  77. def get_table_info_from_neo4j(table_name):
  78. """从Neo4j获取表的详细信息"""
  79. driver = get_neo4j_driver()
  80. # 检查表是否直接订阅
  81. is_directly_schedule = check_table_directly_subscribed(table_name)
  82. table_info = {
  83. 'target_table': table_name,
  84. 'is_directly_schedule': is_directly_schedule, # 初始值设为True,从schedule_status表获取
  85. }
  86. try:
  87. with driver.session() as session:
  88. # 查询表标签和状态
  89. query_table = """
  90. MATCH (t {en_name: $table_name})
  91. RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency
  92. """
  93. result = session.run(query_table, table_name=table_name)
  94. record = result.single()
  95. if record:
  96. labels = record.get("labels", [])
  97. table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
  98. table_info['target_table_status'] = record.get("status", True) # 默认为True
  99. table_info['default_update_frequency'] = record.get("frequency")
  100. # 根据标签类型查询关系和脚本信息
  101. if "DataResource" in labels:
  102. query_rel = """
  103. MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
  104. RETURN source.en_name AS source_table, rel.script_name AS script_name,
  105. rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
  106. """
  107. elif "DataModel" in labels:
  108. query_rel = """
  109. MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
  110. RETURN source.en_name AS source_table, rel.script_name AS script_name,
  111. rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
  112. """
  113. else:
  114. logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
  115. return table_info
  116. result = session.run(query_rel, table_name=table_name)
  117. record = result.single()
  118. if record:
  119. table_info['source_table'] = record.get("source_table")
  120. # 检查script_name是否为空
  121. script_name = record.get("script_name")
  122. if not script_name:
  123. logger.warning(f"表 {table_name} 的关系中没有script_name属性,可能导致后续处理出错")
  124. table_info['script_name'] = script_name
  125. # 设置默认值,确保即使属性为空也有默认值
  126. table_info['script_type'] = record.get("script_type", "python") # 默认为python
  127. table_info['script_exec_mode'] = record.get("script_exec_mode", "append") # 默认为append
  128. else:
  129. logger.warning(f"未找到表 {table_name} 的关系信息")
  130. else:
  131. logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
  132. except Exception as e:
  133. logger.error(f"获取表 {table_name} 的信息时出错: {str(e)}")
  134. finally:
  135. driver.close()
  136. return table_info
  137. def process_dependencies(tables_info):
  138. """处理表间依赖关系,添加被动调度的表"""
  139. # 存储所有表信息的字典
  140. all_tables = {t['target_table']: t for t in tables_info}
  141. driver = get_neo4j_driver()
  142. try:
  143. with driver.session() as session:
  144. for table_name, table_info in list(all_tables.items()):
  145. if table_info.get('target_table_label') == 'DataModel':
  146. # 查询其依赖表
  147. query = """
  148. MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
  149. RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels,
  150. dep.status AS dep_status, dep.frequency AS dep_frequency
  151. """
  152. result = session.run(query, table_name=table_name)
  153. for record in result:
  154. dep_name = record.get("dep_name")
  155. dep_labels = record.get("dep_labels", [])
  156. dep_status = record.get("dep_status", True)
  157. dep_frequency = record.get("dep_frequency")
  158. # 处理未被直接调度的依赖表
  159. if dep_name and dep_name not in all_tables:
  160. logger.info(f"发现被动依赖表: {dep_name}, 标签: {dep_labels}")
  161. # 获取依赖表详细信息
  162. dep_info = get_table_info_from_neo4j(dep_name)
  163. dep_info['is_directly_schedule'] = False
  164. # 处理调度频率继承
  165. if not dep_info.get('default_update_frequency'):
  166. dep_info['default_update_frequency'] = table_info.get('default_update_frequency')
  167. all_tables[dep_name] = dep_info
  168. except Exception as e:
  169. logger.error(f"处理依赖关系时出错: {str(e)}")
  170. finally:
  171. driver.close()
  172. return list(all_tables.values())
  173. def filter_invalid_tables(tables_info):
  174. """过滤无效表及其依赖,使用NetworkX构建依赖图"""
  175. # 构建表名到索引的映射
  176. table_dict = {t['target_table']: i for i, t in enumerate(tables_info)}
  177. # 找出无效表
  178. invalid_tables = set()
  179. for table in tables_info:
  180. if table.get('target_table_status') is False:
  181. invalid_tables.add(table['target_table'])
  182. logger.info(f"表 {table['target_table']} 的状态为无效")
  183. # 构建依赖图
  184. G = nx.DiGraph()
  185. # 添加所有节点
  186. for table in tables_info:
  187. G.add_node(table['target_table'])
  188. # 查询并添加依赖边
  189. driver = get_neo4j_driver()
  190. try:
  191. with driver.session() as session:
  192. for table in tables_info:
  193. if table.get('target_table_label') == 'DataModel':
  194. query = """
  195. MATCH (source {en_name: $table_name})-[:DERIVED_FROM]->(target)
  196. RETURN target.en_name AS target_name
  197. """
  198. result = session.run(query, table_name=table['target_table'])
  199. for record in result:
  200. target_name = record.get("target_name")
  201. if target_name and target_name in table_dict:
  202. # 添加从目标到源的边,表示目标依赖于源
  203. G.add_edge(table['target_table'], target_name)
  204. logger.debug(f"添加依赖边: {table['target_table']} -> {target_name}")
  205. except Exception as e:
  206. logger.error(f"构建依赖图时出错: {str(e)}")
  207. finally:
  208. driver.close()
  209. # 找出依赖于无效表的所有表
  210. downstream_invalid = set()
  211. for invalid_table in invalid_tables:
  212. # 获取可从无效表到达的所有节点
  213. try:
  214. descendants = nx.descendants(G, invalid_table)
  215. downstream_invalid.update(descendants)
  216. logger.info(f"表 {invalid_table} 的下游无效表: {descendants}")
  217. except Exception as e:
  218. logger.error(f"处理表 {invalid_table} 的下游依赖时出错: {str(e)}")
  219. # 合并所有无效表
  220. all_invalid = invalid_tables.union(downstream_invalid)
  221. logger.info(f"总共 {len(all_invalid)} 个表被标记为无效: {all_invalid}")
  222. # 过滤出有效表
  223. valid_tables = [t for t in tables_info if t['target_table'] not in all_invalid]
  224. logger.info(f"过滤后保留 {len(valid_tables)} 个有效表")
  225. return valid_tables
  226. def write_to_airflow_dag_schedule(exec_date, tables_info):
  227. """将表信息写入airflow_dag_schedule表"""
  228. conn = get_pg_conn()
  229. cursor = conn.cursor()
  230. try:
  231. # 清理当日数据,避免重复
  232. cursor.execute("""
  233. DELETE FROM airflow_dag_schedule WHERE exec_date = %s
  234. """, (exec_date,))
  235. logger.info(f"已清理执行日期 {exec_date} 的现有数据")
  236. # 批量插入新数据
  237. inserted_count = 0
  238. for table in tables_info:
  239. cursor.execute("""
  240. INSERT INTO airflow_dag_schedule (
  241. exec_date, source_table, target_table, target_table_label,
  242. target_table_status, is_directly_schedule, default_update_frequency,
  243. script_name, script_type, script_exec_mode
  244. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  245. """, (
  246. exec_date,
  247. table.get('source_table'),
  248. table['target_table'],
  249. table.get('target_table_label'),
  250. table.get('target_table_status', True),
  251. table.get('is_directly_schedule', False),
  252. table.get('default_update_frequency'),
  253. table.get('script_name'),
  254. table.get('script_type', 'python'),
  255. table.get('script_exec_mode', 'append')
  256. ))
  257. inserted_count += 1
  258. conn.commit()
  259. logger.info(f"成功插入 {inserted_count} 条记录到 airflow_dag_schedule 表")
  260. return inserted_count
  261. except Exception as e:
  262. logger.error(f"写入 airflow_dag_schedule 表时出错: {str(e)}")
  263. conn.rollback()
  264. # 重新抛出异常,确保错误被正确传播
  265. raise
  266. finally:
  267. cursor.close()
  268. conn.close()
  269. def prepare_dag_schedule(**kwargs):
  270. """准备调度任务的主函数"""
  271. exec_date = kwargs.get('ds') or get_today_date()
  272. logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
  273. # 1. 获取启用的表
  274. enabled_tables = get_enabled_tables()
  275. logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
  276. if not enabled_tables:
  277. logger.warning("没有找到启用的表,准备工作结束")
  278. return 0
  279. # 2. 获取表的详细信息
  280. tables_info = []
  281. for table_name in enabled_tables:
  282. table_info = get_table_info_from_neo4j(table_name)
  283. if table_info:
  284. tables_info.append(table_info)
  285. logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
  286. # 3. 处理依赖关系,添加被动调度的表
  287. enriched_tables = process_dependencies(tables_info)
  288. logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
  289. # 4. 过滤无效表及其依赖
  290. valid_tables = filter_invalid_tables(enriched_tables)
  291. logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
  292. # 5. 写入airflow_dag_schedule表
  293. inserted_count = write_to_airflow_dag_schedule(exec_date, valid_tables)
  294. # 6. 检查插入操作是否成功,如果失败则抛出异常
  295. if inserted_count == 0 and valid_tables:
  296. error_msg = f"插入操作失败,无记录被插入到airflow_dag_schedule表,但有{len(valid_tables)}个有效表需要处理"
  297. logger.error(error_msg)
  298. raise Exception(error_msg)
  299. # 7. 生成执行计划数据
  300. resource_tasks = []
  301. model_tasks = []
  302. for table in valid_tables:
  303. if table.get('target_table_label') == 'DataResource':
  304. resource_tasks.append({
  305. "source_table": table.get('source_table'),
  306. "target_table": table['target_table'],
  307. "target_table_label": "DataResource",
  308. "script_name": table.get('script_name'),
  309. "script_exec_mode": table.get('script_exec_mode', 'append')
  310. })
  311. elif table.get('target_table_label') == 'DataModel':
  312. model_tasks.append({
  313. "source_table": table.get('source_table'),
  314. "target_table": table['target_table'],
  315. "target_table_label": "DataModel",
  316. "script_name": table.get('script_name'),
  317. "script_exec_mode": table.get('script_exec_mode', 'append')
  318. })
  319. # 获取依赖关系
  320. model_table_names = [t['target_table'] for t in model_tasks]
  321. dependencies = {}
  322. driver = get_neo4j_driver()
  323. try:
  324. with driver.session() as session:
  325. for table_name in model_table_names:
  326. query = """
  327. MATCH (source:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(target)
  328. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  329. """
  330. result = session.run(query, table_name=table_name)
  331. deps = []
  332. for record in result:
  333. target = record.get("target")
  334. target_labels = record.get("target_labels", [])
  335. if target:
  336. table_type = next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  337. deps.append({
  338. "table_name": target,
  339. "table_type": table_type
  340. })
  341. dependencies[table_name] = deps
  342. finally:
  343. driver.close()
  344. # 创建执行计划
  345. execution_plan = {
  346. "exec_date": exec_date,
  347. "resource_tasks": resource_tasks,
  348. "model_tasks": model_tasks,
  349. "dependencies": dependencies
  350. }
  351. # 将执行计划保存到XCom
  352. kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
  353. logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  354. return inserted_count
  355. def create_execution_plan(**kwargs):
  356. """准备执行计划的函数,使用从prepare_phase传递的数据,并生成JSON文件"""
  357. try:
  358. # 从prepare_dag_schedule获取执行计划
  359. execution_plan_json = kwargs['ti'].xcom_pull(task_ids='prepare_dag_schedule', key='execution_plan')
  360. if not execution_plan_json:
  361. # 如果没有获取到,可能是因为推送到XCom失败,尝试从数据库获取
  362. exec_date = kwargs.get('ds') or get_today_date()
  363. logger.info(f"未从XCom获取到执行计划,尝试从数据库构建。使用执行日期: {exec_date}")
  364. # 获取所有任务
  365. resource_tasks, model_tasks = get_all_tasks(exec_date)
  366. if not resource_tasks and not model_tasks:
  367. logger.warning(f"执行日期 {exec_date} 没有找到任务")
  368. # 创建空执行计划
  369. execution_plan = {
  370. "exec_date": exec_date,
  371. "resource_tasks": [],
  372. "model_tasks": [],
  373. "dependencies": {}
  374. }
  375. else:
  376. # 为所有模型表获取依赖关系
  377. model_table_names = [task["target_table"] for task in model_tasks]
  378. dependencies = get_table_dependencies_for_data_phase(model_table_names)
  379. # 创建执行计划
  380. execution_plan = {
  381. "exec_date": exec_date,
  382. "resource_tasks": resource_tasks,
  383. "model_tasks": model_tasks,
  384. "dependencies": dependencies
  385. }
  386. # 转换为JSON
  387. execution_plan_json = json.dumps(execution_plan, default=json_serial)
  388. else:
  389. # 如果是字符串,解析一下确保格式正确
  390. if isinstance(execution_plan_json, str):
  391. execution_plan = json.loads(execution_plan_json)
  392. else:
  393. execution_plan = execution_plan_json
  394. execution_plan_json = json.dumps(execution_plan, default=json_serial)
  395. # 将执行计划保存为JSON文件,使用临时文件确保写入完整
  396. try:
  397. import os
  398. import time
  399. import tempfile
  400. from datetime import datetime
  401. # 设置文件路径
  402. plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
  403. plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
  404. temp_plan_path = os.path.join(plan_dir, f'temp_last_execution_plan_{int(time.time())}.json')
  405. ready_flag_path = os.path.join(plan_dir, 'last_execution_plan.json.ready')
  406. logger.info(f"=== 开始创建执行计划文件 - 时间戳: {datetime.now().isoformat()} ===")
  407. logger.info(f"计划目录: {plan_dir}")
  408. logger.info(f"最终文件路径: {plan_path}")
  409. logger.info(f"临时文件路径: {temp_plan_path}")
  410. logger.info(f"就绪标志文件路径: {ready_flag_path}")
  411. # 获取目录中的现有文件
  412. existing_files = os.listdir(plan_dir)
  413. plan_related_files = [f for f in existing_files if 'execution_plan' in f or f.endswith('.ready')]
  414. logger.info(f"创建前目录中相关文件数: {len(plan_related_files)}")
  415. for f in plan_related_files:
  416. file_path = os.path.join(plan_dir, f)
  417. file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
  418. file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
  419. logger.info(f"已存在文件: {f} (大小: {file_size}字节, 修改时间: {file_time})")
  420. # 首先写入临时文件
  421. with open(temp_plan_path, 'w') as f:
  422. if isinstance(execution_plan_json, str):
  423. f.write(execution_plan_json)
  424. else:
  425. json.dump(execution_plan_json, f, indent=2, default=json_serial)
  426. f.flush()
  427. os.fsync(f.fileno()) # 确保写入磁盘
  428. # 验证临时文件
  429. temp_size = os.path.getsize(temp_plan_path)
  430. temp_time = datetime.fromtimestamp(os.path.getmtime(temp_plan_path)).isoformat()
  431. logger.info(f"已创建临时文件: {temp_plan_path} (大小: {temp_size}字节, 修改时间: {temp_time})")
  432. with open(temp_plan_path, 'r') as f:
  433. test_content = json.load(f) # 测试是否能正确读取
  434. logger.info(f"临时文件验证成功,内容可正确解析为JSON")
  435. # 重命名为正式文件
  436. if os.path.exists(plan_path):
  437. old_size = os.path.getsize(plan_path)
  438. old_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
  439. logger.info(f"删除已有文件: {plan_path} (大小: {old_size}字节, 修改时间: {old_time})")
  440. os.remove(plan_path) # 先删除已有文件
  441. logger.info(f"重命名临时文件: {temp_plan_path} -> {plan_path}")
  442. os.rename(temp_plan_path, plan_path)
  443. # 确认正式文件
  444. if os.path.exists(plan_path):
  445. final_size = os.path.getsize(plan_path)
  446. final_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
  447. logger.info(f"正式文件创建成功: {plan_path} (大小: {final_size}字节, 修改时间: {final_time})")
  448. else:
  449. logger.error(f"正式文件未成功创建: {plan_path}")
  450. # 写入就绪标志文件
  451. with open(ready_flag_path, 'w') as f:
  452. flag_content = f"Generated at {datetime.now().isoformat()}"
  453. f.write(flag_content)
  454. f.flush()
  455. os.fsync(f.fileno()) # 确保写入磁盘
  456. # 确认就绪标志文件
  457. if os.path.exists(ready_flag_path):
  458. flag_size = os.path.getsize(ready_flag_path)
  459. flag_time = datetime.fromtimestamp(os.path.getmtime(ready_flag_path)).isoformat()
  460. logger.info(f"就绪标志文件创建成功: {ready_flag_path} (大小: {flag_size}字节, 修改时间: {flag_time}, 内容: {flag_content})")
  461. else:
  462. logger.error(f"就绪标志文件未成功创建: {ready_flag_path}")
  463. # 再次检查目录
  464. final_files = os.listdir(plan_dir)
  465. final_plan_files = [f for f in final_files if 'execution_plan' in f or f.endswith('.ready')]
  466. logger.info(f"创建完成后目录中相关文件数: {len(final_plan_files)}")
  467. for f in final_plan_files:
  468. file_path = os.path.join(plan_dir, f)
  469. file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
  470. file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
  471. logger.info(f"最终文件: {f} (大小: {file_size}字节, 修改时间: {file_time})")
  472. logger.info(f"=== 执行计划文件创建完成 - 时间戳: {datetime.now().isoformat()} ===")
  473. except Exception as e:
  474. logger.error(f"保存执行计划到文件时出错: {str(e)}")
  475. import traceback
  476. logger.error(traceback.format_exc())
  477. raise # 抛出异常,确保任务失败
  478. return execution_plan_json
  479. except Exception as e:
  480. logger.error(f"创建执行计划时出错: {str(e)}")
  481. import traceback
  482. logger.error(traceback.format_exc())
  483. raise # 抛出异常,确保任务失败
  484. def bridge_prepare_to_data_func(**kwargs):
  485. """桥接prepare和data阶段,确保执行计划文件已就绪"""
  486. import os
  487. import time
  488. from datetime import datetime
  489. logger.info(f"=== 开始验证执行计划文件 - 时间戳: {datetime.now().isoformat()} ===")
  490. plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
  491. plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
  492. ready_flag_path = os.path.join(plan_dir, 'last_execution_plan.json.ready')
  493. logger.info(f"计划目录: {plan_dir}")
  494. logger.info(f"计划文件路径: {plan_path}")
  495. logger.info(f"就绪标志文件路径: {ready_flag_path}")
  496. # 获取目录中的文件列表
  497. all_files = os.listdir(plan_dir)
  498. related_files = [f for f in all_files if 'execution_plan' in f or f.endswith('.ready')]
  499. logger.info(f"目录中的相关文件总数: {len(related_files)}")
  500. for idx, file in enumerate(related_files, 1):
  501. file_path = os.path.join(plan_dir, file)
  502. file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
  503. file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
  504. logger.info(f"相关文件{idx}: {file} (大小: {file_size}字节, 修改时间: {file_time})")
  505. # 等待就绪标志文件出现
  506. logger.info(f"开始等待就绪标志文件: {ready_flag_path}")
  507. waiting_start = datetime.now()
  508. max_attempts = 30 # 最多等待5分钟
  509. for attempt in range(max_attempts):
  510. if os.path.exists(ready_flag_path):
  511. wait_duration = (datetime.now() - waiting_start).total_seconds()
  512. file_size = os.path.getsize(ready_flag_path)
  513. file_time = datetime.fromtimestamp(os.path.getmtime(ready_flag_path)).isoformat()
  514. # 读取就绪文件内容
  515. try:
  516. with open(ready_flag_path, 'r') as f:
  517. ready_content = f.read()
  518. except Exception as e:
  519. ready_content = f"[读取错误: {str(e)}]"
  520. logger.info(f"发现执行计划就绪标志: {ready_flag_path} (尝试次数: {attempt+1}, 等待时间: {wait_duration:.2f}秒, 大小: {file_size}字节, 修改时间: {file_time}, 内容: {ready_content})")
  521. break
  522. logger.info(f"等待执行计划就绪 (尝试: {attempt+1}/{max_attempts}, 已等待: {(datetime.now() - waiting_start).total_seconds():.2f}秒)...")
  523. time.sleep(10) # 等待10秒
  524. if not os.path.exists(ready_flag_path):
  525. error_msg = f"执行计划就绪标志文件不存在: {ready_flag_path},等待超时 (等待时间: {(datetime.now() - waiting_start).total_seconds():.2f}秒)"
  526. logger.error(error_msg)
  527. raise Exception(error_msg)
  528. # 验证执行计划文件
  529. logger.info(f"开始验证执行计划文件: {plan_path}")
  530. if not os.path.exists(plan_path):
  531. error_msg = f"执行计划文件不存在: {plan_path}"
  532. logger.error(error_msg)
  533. raise Exception(error_msg)
  534. try:
  535. file_size = os.path.getsize(plan_path)
  536. file_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
  537. logger.info(f"准备读取执行计划文件: {plan_path} (大小: {file_size}字节, 修改时间: {file_time})")
  538. with open(plan_path, 'r') as f:
  539. execution_plan = json.load(f)
  540. logger.info(f"成功读取并解析执行计划文件 JSON 内容")
  541. # 验证基本结构
  542. if not isinstance(execution_plan, dict):
  543. logger.error(f"执行计划格式错误: 不是有效的字典,而是 {type(execution_plan)}")
  544. raise ValueError("执行计划不是有效的字典")
  545. else:
  546. logger.info(f"执行计划基本结构验证: 是有效的字典对象")
  547. # 验证关键字段
  548. required_fields = ["exec_date", "resource_tasks", "model_tasks"]
  549. missing_fields = [field for field in required_fields if field not in execution_plan]
  550. if missing_fields:
  551. error_msg = f"执行计划缺少必要字段: {missing_fields}"
  552. logger.error(error_msg)
  553. raise ValueError(error_msg)
  554. else:
  555. logger.info(f"执行计划必要字段验证通过: 包含所有必要字段 {required_fields}")
  556. # 记录执行计划基本信息
  557. resource_tasks = execution_plan.get("resource_tasks", [])
  558. model_tasks = execution_plan.get("model_tasks", [])
  559. exec_date = execution_plan.get("exec_date", "未知")
  560. logger.info(f"执行计划内容摘要: 日期={exec_date}, 资源任务数={len(resource_tasks)}, 模型任务数={len(model_tasks)}")
  561. # 如果任务很少,记录具体内容
  562. if len(resource_tasks) + len(model_tasks) < 10:
  563. for idx, task in enumerate(resource_tasks, 1):
  564. logger.info(f"资源任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
  565. for idx, task in enumerate(model_tasks, 1):
  566. logger.info(f"模型任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
  567. # 如果没有任何任务,发出警告
  568. if not resource_tasks and not model_tasks:
  569. logger.warning(f"执行计划不包含任何任务,可能导致数据处理阶段没有实际工作")
  570. logger.info(f"=== 执行计划文件验证成功 - 时间戳: {datetime.now().isoformat()} ===")
  571. return True
  572. except Exception as e:
  573. error_msg = f"验证执行计划文件时出错: {str(e)}"
  574. logger.error(error_msg)
  575. import traceback
  576. logger.error(traceback.format_exc())
  577. logger.info(f"=== 执行计划文件验证失败 - 时间戳: {datetime.now().isoformat()} ===")
  578. raise Exception(error_msg)
  579. def init_data_processing_phase(**kwargs):
  580. """数据处理阶段的初始化函数,重新加载执行计划文件"""
  581. import os
  582. from datetime import datetime
  583. logger.info(f"=== 开始数据处理阶段初始化 - 时间戳: {datetime.now().isoformat()} ===")
  584. plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
  585. plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
  586. ready_flag_path = os.path.join(plan_dir, 'last_execution_plan.json.ready')
  587. logger.info(f"计划目录: {plan_dir}")
  588. logger.info(f"计划文件路径: {plan_path}")
  589. logger.info(f"就绪标志文件路径: {ready_flag_path}")
  590. # 检查目录中的文件
  591. all_files = os.listdir(plan_dir)
  592. related_files = [f for f in all_files if 'execution_plan' in f or f.endswith('.ready')]
  593. logger.info(f"目录中的相关文件总数: {len(related_files)}")
  594. for idx, file in enumerate(related_files, 1):
  595. file_path = os.path.join(plan_dir, file)
  596. file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
  597. file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
  598. logger.info(f"相关文件{idx}: {file} (大小: {file_size}字节, 修改时间: {file_time})")
  599. # 验证文件是否存在
  600. if not os.path.exists(plan_path):
  601. error_msg = f"执行计划文件不存在: {plan_path}"
  602. logger.error(error_msg)
  603. logger.info(f"=== 数据处理阶段初始化失败 - 时间戳: {datetime.now().isoformat()} ===")
  604. raise Exception(error_msg)
  605. file_size = os.path.getsize(plan_path)
  606. file_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
  607. logger.info(f"准备读取执行计划文件: {plan_path} (大小: {file_size}字节, 修改时间: {file_time})")
  608. try:
  609. # 记录读取开始时间
  610. read_start = datetime.now()
  611. with open(plan_path, 'r') as f:
  612. file_content = f.read()
  613. logger.info(f"成功读取文件内容,大小为 {len(file_content)} 字节")
  614. # 解析JSON
  615. parse_start = datetime.now()
  616. execution_plan = json.loads(file_content)
  617. parse_duration = (datetime.now() - parse_start).total_seconds()
  618. logger.info(f"成功解析JSON内容,耗时 {parse_duration:.4f} 秒")
  619. read_duration = (datetime.now() - read_start).total_seconds()
  620. logger.info(f"文件读取和解析总耗时: {read_duration:.4f} 秒")
  621. # 验证执行计划基本结构
  622. if not isinstance(execution_plan, dict):
  623. error_msg = f"执行计划不是有效的字典,实际类型: {type(execution_plan)}"
  624. logger.error(error_msg)
  625. raise ValueError(error_msg)
  626. # 存储到XCom中,以便后续任务使用
  627. push_start = datetime.now()
  628. # 先序列化为JSON字符串
  629. execution_plan_json = json.dumps(execution_plan, default=json_serial)
  630. logger.info(f"序列化执行计划为JSON字符串,大小为 {len(execution_plan_json)} 字节")
  631. # 推送到XCom
  632. kwargs['ti'].xcom_push(key='data_phase_execution_plan', value=execution_plan_json)
  633. push_duration = (datetime.now() - push_start).total_seconds()
  634. logger.info(f"成功推送执行计划到XCom,耗时 {push_duration:.4f} 秒")
  635. # 记录执行计划基本信息
  636. resource_tasks = execution_plan.get("resource_tasks", [])
  637. model_tasks = execution_plan.get("model_tasks", [])
  638. exec_date = execution_plan.get("exec_date", "未知")
  639. logger.info(f"执行计划内容摘要: 日期={exec_date}, 资源任务数={len(resource_tasks)}, 模型任务数={len(model_tasks)}")
  640. # 如果任务较少,记录详细信息
  641. if len(resource_tasks) + len(model_tasks) < 10:
  642. for idx, task in enumerate(resource_tasks, 1):
  643. logger.info(f"资源任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
  644. for idx, task in enumerate(model_tasks, 1):
  645. logger.info(f"模型任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
  646. result = {
  647. "exec_date": exec_date,
  648. "resource_count": len(resource_tasks),
  649. "model_count": len(model_tasks)
  650. }
  651. logger.info(f"=== 数据处理阶段初始化完成 - 时间戳: {datetime.now().isoformat()} ===")
  652. return result
  653. except json.JSONDecodeError as e:
  654. error_msg = f"执行计划文件JSON解析失败: {str(e)}"
  655. logger.error(error_msg)
  656. # 记录文件内容摘要以帮助调试
  657. try:
  658. with open(plan_path, 'r') as f:
  659. content = f.read(1000) # 只读取前1000个字符
  660. logger.error(f"文件内容前1000个字符: {content}...")
  661. except Exception as read_error:
  662. logger.error(f"尝试读取文件内容时出错: {str(read_error)}")
  663. import traceback
  664. logger.error(traceback.format_exc())
  665. logger.info(f"=== 数据处理阶段初始化失败 - 时间戳: {datetime.now().isoformat()} ===")
  666. raise Exception(error_msg)
  667. except Exception as e:
  668. error_msg = f"数据处理阶段初始化失败: {str(e)}"
  669. logger.error(error_msg)
  670. import traceback
  671. logger.error(traceback.format_exc())
  672. logger.info(f"=== 数据处理阶段初始化失败 - 时间戳: {datetime.now().isoformat()} ===")
  673. raise Exception(error_msg)
  674. #############################################
  675. # 第二阶段: 数据处理阶段(Data Processing Phase)
  676. #############################################
  677. def get_latest_date():
  678. """获取数据库中包含记录的最近日期"""
  679. conn = get_pg_conn()
  680. cursor = conn.cursor()
  681. try:
  682. cursor.execute("""
  683. SELECT DISTINCT exec_date
  684. FROM airflow_dag_schedule
  685. ORDER BY exec_date DESC
  686. LIMIT 1
  687. """)
  688. result = cursor.fetchone()
  689. if result:
  690. latest_date = result[0]
  691. logger.info(f"找到最近的包含记录的日期: {latest_date}")
  692. return latest_date
  693. else:
  694. logger.warning("未找到包含记录的日期,将使用当前日期")
  695. return get_today_date()
  696. except Exception as e:
  697. logger.error(f"查找最近日期时出错: {str(e)}")
  698. return get_today_date()
  699. finally:
  700. cursor.close()
  701. conn.close()
  702. def get_all_tasks(exec_date):
  703. """获取所有需要执行的任务(DataResource和DataModel)"""
  704. conn = get_pg_conn()
  705. cursor = conn.cursor()
  706. try:
  707. # 查询所有资源表任务
  708. cursor.execute("""
  709. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  710. FROM airflow_dag_schedule
  711. WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
  712. """, (exec_date,))
  713. resource_results = cursor.fetchall()
  714. # 查询所有模型表任务
  715. cursor.execute("""
  716. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  717. FROM airflow_dag_schedule
  718. WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
  719. """, (exec_date,))
  720. model_results = cursor.fetchall()
  721. # 整理资源表信息
  722. resource_tasks = []
  723. for row in resource_results:
  724. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  725. if script_name: # 确保脚本名称不为空
  726. resource_tasks.append({
  727. "source_table": source_table,
  728. "target_table": target_table,
  729. "target_table_label": target_table_label,
  730. "script_name": script_name,
  731. "script_exec_mode": script_exec_mode or "append"
  732. })
  733. # 整理模型表信息
  734. model_tasks = []
  735. for row in model_results:
  736. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  737. if script_name: # 确保脚本名称不为空
  738. model_tasks.append({
  739. "source_table": source_table,
  740. "target_table": target_table,
  741. "target_table_label": target_table_label,
  742. "script_name": script_name,
  743. "script_exec_mode": script_exec_mode or "append"
  744. })
  745. logger.info(f"获取到 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  746. return resource_tasks, model_tasks
  747. except Exception as e:
  748. logger.error(f"获取任务信息时出错: {str(e)}")
  749. return [], []
  750. finally:
  751. cursor.close()
  752. conn.close()
  753. def get_table_dependencies_for_data_phase(table_names):
  754. """获取表之间的依赖关系"""
  755. driver = get_neo4j_driver()
  756. dependency_dict = {name: [] for name in table_names}
  757. try:
  758. with driver.session() as session:
  759. # 获取所有模型表之间的依赖关系
  760. query = """
  761. MATCH (source:DataModel)-[:DERIVED_FROM]->(target)
  762. WHERE source.en_name IN $table_names
  763. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  764. """
  765. result = session.run(query, table_names=table_names)
  766. for record in result:
  767. source = record.get("source")
  768. target = record.get("target")
  769. target_labels = record.get("target_labels", [])
  770. if source and target:
  771. # 将目标表添加到源表的依赖列表中
  772. dependency_dict[source].append({
  773. "table_name": target,
  774. "table_type": next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  775. })
  776. logger.debug(f"依赖关系: {source} 依赖于 {target}")
  777. except Exception as e:
  778. logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
  779. finally:
  780. driver.close()
  781. return dependency_dict
  782. def process_resource(target_table, script_name, script_exec_mode, exec_date):
  783. """处理单个资源表"""
  784. logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
  785. # 检查exec_date是否是JSON字符串
  786. if isinstance(exec_date, str) and exec_date.startswith('{'):
  787. try:
  788. # 尝试解析JSON字符串
  789. exec_date_data = json.loads(exec_date)
  790. exec_date = exec_date_data.get("exec_date")
  791. logger.info(f"从JSON中提取执行日期: {exec_date}")
  792. except Exception as e:
  793. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  794. try:
  795. # 直接调用执行监控函数,确保脚本得到执行
  796. result = execute_with_monitoring(
  797. target_table=target_table,
  798. script_name=script_name,
  799. script_exec_mode=script_exec_mode,
  800. exec_date=exec_date
  801. )
  802. logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
  803. return f"处理资源表 {target_table} 完成,结果: {result}"
  804. except Exception as e:
  805. logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
  806. import traceback
  807. logger.error(traceback.format_exc())
  808. # 返回错误信息,但不抛出异常,确保DAG可以继续执行
  809. return f"处理资源表 {target_table} 失败: {str(e)}"
  810. def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_date, **kwargs):
  811. """执行脚本并监控执行情况"""
  812. from pathlib import Path
  813. import importlib.util
  814. import sys
  815. logger.info(f"=== 开始执行任务 {target_table} 的脚本 {script_name} - 时间戳: {datetime.now().isoformat()} ===")
  816. # 检查script_name是否为空
  817. if not script_name:
  818. logger.error(f"表 {target_table} 的script_name为空,无法执行")
  819. # 记录执行失败到数据库
  820. now = datetime.now()
  821. update_task_completion(exec_date, target_table, script_name or "", False, now, 0)
  822. return False
  823. # 记录执行开始时间
  824. start_time = datetime.now()
  825. update_task_start_time(exec_date, target_table, script_name, start_time)
  826. logger.info(f"任务开始时间: {start_time.isoformat()}")
  827. try:
  828. # 执行实际脚本
  829. script_path = Path(SCRIPTS_BASE_PATH) / script_name
  830. logger.info(f"脚本完整路径: {script_path}")
  831. if not script_path.exists():
  832. logger.error(f"脚本文件不存在: {script_path}")
  833. end_time = datetime.now()
  834. duration = (end_time - start_time).total_seconds()
  835. update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
  836. return False
  837. try:
  838. # 动态导入模块
  839. module_name = f"dynamic_module_{abs(hash(script_name))}"
  840. spec = importlib.util.spec_from_file_location(module_name, script_path)
  841. module = importlib.util.module_from_spec(spec)
  842. sys.modules[module_name] = module
  843. spec.loader.exec_module(module)
  844. # 使用标准入口函数run
  845. if hasattr(module, "run"):
  846. logger.info(f"执行脚本 {script_name} 的标准入口函数 run()")
  847. result = module.run(table_name=target_table, execution_mode=script_exec_mode)
  848. logger.info(f"脚本 {script_name} 执行结果: {result}")
  849. success = True if result else False
  850. else:
  851. logger.warning(f"脚本 {script_name} 未定义标准入口函数 run(),尝试使用main函数")
  852. if hasattr(module, "main"):
  853. logger.info(f"执行脚本 {script_name} 的main函数")
  854. result = module.main(table_name=target_table, execution_mode=script_exec_mode)
  855. logger.info(f"脚本 {script_name} 执行结果: {result}")
  856. success = True if result else False
  857. else:
  858. logger.error(f"脚本 {script_name} 没有定义标准入口函数 run() 或 main()")
  859. success = False
  860. except Exception as script_e:
  861. logger.error(f"执行脚本 {script_name} 时出错: {str(script_e)}")
  862. import traceback
  863. logger.error(traceback.format_exc())
  864. success = False
  865. # 记录结束时间和结果
  866. end_time = datetime.now()
  867. duration = (end_time - start_time).total_seconds()
  868. update_task_completion(exec_date, target_table, script_name, success, end_time, duration)
  869. logger.info(f"任务结束时间: {end_time.isoformat()}, 执行时长: {duration:.2f}秒, 结果: {success}")
  870. logger.info(f"=== 完成执行任务 {target_table} 的脚本 {script_name} - 时间戳: {datetime.now().isoformat()} ===")
  871. return success
  872. except Exception as e:
  873. # 处理异常
  874. logger.error(f"执行任务出错: {str(e)}")
  875. import traceback
  876. logger.error(traceback.format_exc())
  877. end_time = datetime.now()
  878. duration = (end_time - start_time).total_seconds()
  879. update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
  880. logger.info(f"=== 执行任务 {target_table} 的脚本 {script_name} 失败 - 时间戳: {datetime.now().isoformat()} ===")
  881. return False
  882. def update_task_start_time(exec_date, target_table, script_name, start_time):
  883. """更新任务开始时间"""
  884. conn = get_pg_conn()
  885. cursor = conn.cursor()
  886. try:
  887. cursor.execute("""
  888. UPDATE airflow_dag_schedule
  889. SET exec_start_time = %s
  890. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  891. """, (start_time, exec_date, target_table, script_name))
  892. conn.commit()
  893. logger.info(f"已更新表 {target_table} 的脚本 {script_name} 的开始时间: {start_time}")
  894. except Exception as e:
  895. logger.error(f"更新任务开始时间失败: {str(e)}")
  896. conn.rollback()
  897. finally:
  898. cursor.close()
  899. conn.close()
  900. def update_task_completion(exec_date, target_table, script_name, success, end_time, duration):
  901. """更新任务完成信息"""
  902. conn = get_pg_conn()
  903. cursor = conn.cursor()
  904. try:
  905. cursor.execute("""
  906. UPDATE airflow_dag_schedule
  907. SET exec_result = %s, exec_end_time = %s, exec_duration = %s
  908. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  909. """, (success, end_time, duration, exec_date, target_table, script_name))
  910. conn.commit()
  911. logger.info(f"已更新表 {target_table} 的脚本 {script_name} 的完成状态: 结果={success}, 结束时间={end_time}, 耗时={duration}秒")
  912. except Exception as e:
  913. logger.error(f"更新任务完成信息失败: {str(e)}")
  914. conn.rollback()
  915. finally:
  916. cursor.close()
  917. conn.close()
  918. def process_model(target_table, script_name, script_exec_mode, exec_date):
  919. """处理单个模型表"""
  920. logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
  921. # 检查exec_date是否是JSON字符串
  922. if isinstance(exec_date, str) and exec_date.startswith('{'):
  923. try:
  924. # 尝试解析JSON字符串
  925. exec_date_data = json.loads(exec_date)
  926. exec_date = exec_date_data.get("exec_date")
  927. logger.info(f"从JSON中提取执行日期: {exec_date}")
  928. except Exception as e:
  929. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  930. try:
  931. # 直接调用执行监控函数,确保脚本得到执行
  932. result = execute_with_monitoring(
  933. target_table=target_table,
  934. script_name=script_name,
  935. script_exec_mode=script_exec_mode,
  936. exec_date=exec_date
  937. )
  938. logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
  939. return f"处理模型表 {target_table} 完成,结果: {result}"
  940. except Exception as e:
  941. logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
  942. import traceback
  943. logger.error(traceback.format_exc())
  944. # 返回错误信息,但不抛出异常,确保DAG可以继续执行
  945. return f"处理模型表 {target_table} 失败: {str(e)}"
  946. #############################################
  947. # 第三阶段: 汇总阶段(Summary Phase)的函数
  948. #############################################
  949. def get_execution_stats(exec_date):
  950. """获取当日执行统计信息"""
  951. conn = get_pg_conn()
  952. cursor = conn.cursor()
  953. try:
  954. # 查询总任务数
  955. cursor.execute("""
  956. SELECT COUNT(*) FROM airflow_dag_schedule WHERE exec_date = %s
  957. """, (exec_date,))
  958. result = cursor.fetchone()
  959. total_tasks = result[0] if result else 0
  960. # 查询每种类型的任务数
  961. cursor.execute("""
  962. SELECT target_table_label, COUNT(*)
  963. FROM airflow_dag_schedule
  964. WHERE exec_date = %s
  965. GROUP BY target_table_label
  966. """, (exec_date,))
  967. type_counts = {row[0]: row[1] for row in cursor.fetchall()}
  968. # 查询执行结果统计
  969. cursor.execute("""
  970. SELECT COUNT(*)
  971. FROM airflow_dag_schedule
  972. WHERE exec_date = %s AND exec_result IS TRUE
  973. """, (exec_date,))
  974. result = cursor.fetchone()
  975. success_count = result[0] if result else 0
  976. cursor.execute("""
  977. SELECT COUNT(*)
  978. FROM airflow_dag_schedule
  979. WHERE exec_date = %s AND exec_result IS FALSE
  980. """, (exec_date,))
  981. result = cursor.fetchone()
  982. fail_count = result[0] if result else 0
  983. cursor.execute("""
  984. SELECT COUNT(*)
  985. FROM airflow_dag_schedule
  986. WHERE exec_date = %s AND exec_result IS NULL
  987. """, (exec_date,))
  988. result = cursor.fetchone()
  989. pending_count = result[0] if result else 0
  990. # 计算执行时间统计
  991. cursor.execute("""
  992. SELECT AVG(exec_duration), MIN(exec_duration), MAX(exec_duration)
  993. FROM airflow_dag_schedule
  994. WHERE exec_date = %s AND exec_duration IS NOT NULL
  995. """, (exec_date,))
  996. time_stats = cursor.fetchone()
  997. # 确保时间统计不为None
  998. if time_stats and time_stats[0] is not None:
  999. avg_duration = float(time_stats[0])
  1000. min_duration = float(time_stats[1]) if time_stats[1] is not None else None
  1001. max_duration = float(time_stats[2]) if time_stats[2] is not None else None
  1002. else:
  1003. avg_duration = None
  1004. min_duration = None
  1005. max_duration = None
  1006. # 查询失败任务详情
  1007. cursor.execute("""
  1008. SELECT target_table, script_name, target_table_label, exec_duration
  1009. FROM airflow_dag_schedule
  1010. WHERE exec_date = %s AND exec_result IS FALSE
  1011. """, (exec_date,))
  1012. failed_tasks = []
  1013. for row in cursor.fetchall():
  1014. task_dict = {
  1015. "target_table": row[0],
  1016. "script_name": row[1],
  1017. "target_table_label": row[2],
  1018. }
  1019. if row[3] is not None:
  1020. task_dict["exec_duration"] = float(row[3])
  1021. else:
  1022. task_dict["exec_duration"] = None
  1023. failed_tasks.append(task_dict)
  1024. # 计算成功率,避免除零错误
  1025. success_rate = 0
  1026. if total_tasks > 0:
  1027. success_rate = (success_count / total_tasks) * 100
  1028. # 汇总统计信息
  1029. stats = {
  1030. "exec_date": exec_date,
  1031. "total_tasks": total_tasks,
  1032. "type_counts": type_counts,
  1033. "success_count": success_count,
  1034. "fail_count": fail_count,
  1035. "pending_count": pending_count,
  1036. "success_rate": success_rate,
  1037. "avg_duration": avg_duration,
  1038. "min_duration": min_duration,
  1039. "max_duration": max_duration,
  1040. "failed_tasks": failed_tasks
  1041. }
  1042. return stats
  1043. except Exception as e:
  1044. logger.error(f"获取执行统计信息时出错: {str(e)}")
  1045. return {}
  1046. finally:
  1047. cursor.close()
  1048. conn.close()
  1049. def update_missing_results(exec_date):
  1050. """更新缺失的执行结果信息"""
  1051. conn = get_pg_conn()
  1052. cursor = conn.cursor()
  1053. try:
  1054. # 查询所有缺失执行结果的任务
  1055. cursor.execute("""
  1056. SELECT target_table, script_name
  1057. FROM airflow_dag_schedule
  1058. WHERE exec_date = %s AND exec_result IS NULL
  1059. """, (exec_date,))
  1060. missing_results = cursor.fetchall()
  1061. update_count = 0
  1062. for row in missing_results:
  1063. target_table, script_name = row
  1064. # 如果有开始时间但没有结束时间,假设执行失败
  1065. cursor.execute("""
  1066. SELECT exec_start_time
  1067. FROM airflow_dag_schedule
  1068. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  1069. """, (exec_date, target_table, script_name))
  1070. start_time = cursor.fetchone()
  1071. if start_time and start_time[0]:
  1072. # 有开始时间但无结果,标记为失败
  1073. now = datetime.now()
  1074. duration = (now - start_time[0]).total_seconds()
  1075. cursor.execute("""
  1076. UPDATE airflow_dag_schedule
  1077. SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
  1078. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  1079. """, (now, duration, exec_date, target_table, script_name))
  1080. logger.warning(f"任务 {target_table} 的脚本 {script_name} 标记为失败,开始时间: {start_time[0]}")
  1081. update_count += 1
  1082. else:
  1083. # 没有开始时间且无结果,假设未执行
  1084. logger.warning(f"任务 {target_table} 的脚本 {script_name} 未执行")
  1085. conn.commit()
  1086. logger.info(f"更新了 {update_count} 个缺失结果的任务")
  1087. return update_count
  1088. except Exception as e:
  1089. logger.error(f"更新缺失执行结果时出错: {str(e)}")
  1090. conn.rollback()
  1091. return 0
  1092. finally:
  1093. cursor.close()
  1094. conn.close()
  1095. def generate_unified_execution_report(exec_date, stats):
  1096. """生成统一执行报告"""
  1097. # 构建报告
  1098. report = []
  1099. report.append(f"========== 统一数据运维系统执行报告 ==========")
  1100. report.append(f"执行日期: {exec_date}")
  1101. report.append(f"总任务数: {stats['total_tasks']}")
  1102. # 任务类型分布
  1103. report.append("\n--- 任务类型分布 ---")
  1104. for label, count in stats.get('type_counts', {}).items():
  1105. report.append(f"{label} 任务: {count} 个")
  1106. # 执行结果统计
  1107. report.append("\n--- 执行结果统计 ---")
  1108. report.append(f"成功任务: {stats.get('success_count', 0)} 个")
  1109. report.append(f"失败任务: {stats.get('fail_count', 0)} 个")
  1110. report.append(f"未执行任务: {stats.get('pending_count', 0)} 个")
  1111. report.append(f"成功率: {stats.get('success_rate', 0):.2f}%")
  1112. # 执行时间统计
  1113. report.append("\n--- 执行时间统计 (秒) ---")
  1114. avg_duration = stats.get('avg_duration')
  1115. min_duration = stats.get('min_duration')
  1116. max_duration = stats.get('max_duration')
  1117. report.append(f"平均执行时间: {avg_duration:.2f}" if avg_duration is not None else "平均执行时间: N/A")
  1118. report.append(f"最短执行时间: {min_duration:.2f}" if min_duration is not None else "最短执行时间: N/A")
  1119. report.append(f"最长执行时间: {max_duration:.2f}" if max_duration is not None else "最长执行时间: N/A")
  1120. # 失败任务详情
  1121. failed_tasks = stats.get('failed_tasks', [])
  1122. if failed_tasks:
  1123. report.append("\n--- 失败任务详情 ---")
  1124. for i, task in enumerate(failed_tasks, 1):
  1125. report.append(f"{i}. 表名: {task['target_table']}")
  1126. report.append(f" 脚本: {task['script_name']}")
  1127. report.append(f" 类型: {task['target_table_label']}")
  1128. exec_duration = task.get('exec_duration')
  1129. if exec_duration is not None:
  1130. report.append(f" 执行时间: {exec_duration:.2f} 秒")
  1131. else:
  1132. report.append(" 执行时间: N/A")
  1133. report.append("\n========== 报告结束 ==========")
  1134. # 将报告转换为字符串
  1135. report_str = "\n".join(report)
  1136. # 记录到日志
  1137. logger.info("\n" + report_str)
  1138. return report_str
  1139. def summarize_execution(**context):
  1140. """
  1141. 汇总执行计划的执行情况,生成报告
  1142. """
  1143. logger.info(f"=== 开始汇总执行情况 - 时间戳: {datetime.now().isoformat()} ===")
  1144. try:
  1145. # 获取执行日期
  1146. execution_date = context.get('execution_date', datetime.now())
  1147. exec_date = execution_date.strftime('%Y-%m-%d')
  1148. # 从本地文件加载执行计划
  1149. plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
  1150. plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
  1151. if not os.path.exists(plan_path):
  1152. logger.warning(f"执行计划文件不存在: {plan_path}")
  1153. return "执行计划文件不存在,无法生成汇总报告"
  1154. with open(plan_path, 'r') as f:
  1155. execution_plan = json.loads(f.read())
  1156. # 获取任务列表
  1157. resource_tasks = execution_plan.get("resource_tasks", [])
  1158. model_tasks = execution_plan.get("model_tasks", [])
  1159. all_tasks = resource_tasks + model_tasks
  1160. # 连接数据库,获取任务执行状态
  1161. conn = get_pg_conn()
  1162. cursor = conn.cursor()
  1163. # 分析任务执行状态
  1164. successful_tasks = []
  1165. failed_tasks = []
  1166. skipped_tasks = []
  1167. for task in all_tasks:
  1168. table_name = task["target_table"]
  1169. table_type = "资源表" if task in resource_tasks else "模型表"
  1170. # 查询任务执行状态
  1171. cursor.execute("""
  1172. SELECT status FROM airflow_task_execution
  1173. WHERE table_name = %s AND exec_date = %s
  1174. ORDER BY execution_time DESC LIMIT 1
  1175. """, (table_name, exec_date))
  1176. result = cursor.fetchone()
  1177. status = result[0] if result else "未执行"
  1178. task_info = {
  1179. "table_name": table_name,
  1180. "table_type": table_type,
  1181. "script_name": task["script_name"],
  1182. "status": status
  1183. }
  1184. if status == "成功":
  1185. successful_tasks.append(task_info)
  1186. elif status == "失败":
  1187. failed_tasks.append(task_info)
  1188. else:
  1189. skipped_tasks.append(task_info)
  1190. # 生成汇总报告
  1191. total_tasks = len(all_tasks)
  1192. success_count = len(successful_tasks)
  1193. fail_count = len(failed_tasks)
  1194. skip_count = len(skipped_tasks)
  1195. summary = f"""
  1196. 执行日期: {exec_date}
  1197. 总任务数: {total_tasks}
  1198. 成功任务数: {success_count}
  1199. 失败任务数: {fail_count}
  1200. 跳过任务数: {skip_count}
  1201. === 成功任务 ===
  1202. """
  1203. for task in successful_tasks:
  1204. summary += f"{task['table_type']} {task['table_name']} ({task['script_name']})\n"
  1205. if failed_tasks:
  1206. summary += "\n=== 失败任务 ===\n"
  1207. for task in failed_tasks:
  1208. summary += f"{task['table_type']} {task['table_name']} ({task['script_name']})\n"
  1209. if skipped_tasks:
  1210. summary += "\n=== 跳过任务 ===\n"
  1211. for task in skipped_tasks:
  1212. summary += f"{task['table_type']} {task['table_name']} ({task['script_name']})\n"
  1213. # 更新汇总表
  1214. cursor.execute("""
  1215. INSERT INTO airflow_execution_summary
  1216. (exec_date, total_tasks, success_count, fail_count, skip_count, summary_text, created_at)
  1217. VALUES (%s, %s, %s, %s, %s, %s, %s)
  1218. ON CONFLICT (exec_date)
  1219. DO UPDATE SET
  1220. total_tasks = EXCLUDED.total_tasks,
  1221. success_count = EXCLUDED.success_count,
  1222. fail_count = EXCLUDED.fail_count,
  1223. skip_count = EXCLUDED.skip_count,
  1224. summary_text = EXCLUDED.summary_text,
  1225. updated_at = CURRENT_TIMESTAMP
  1226. """, (
  1227. exec_date, total_tasks, success_count, fail_count, skip_count,
  1228. summary, datetime.now()
  1229. ))
  1230. conn.commit()
  1231. cursor.close()
  1232. conn.close()
  1233. logger.info(f"=== 执行情况汇总完成 - 时间戳: {datetime.now().isoformat()} ===")
  1234. return summary
  1235. except Exception as e:
  1236. logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
  1237. # 返回一个简单的错误报告,确保任务不会失败
  1238. return f"执行汇总时出现错误: {str(e)}"
  1239. # 创建DAG
  1240. with DAG(
  1241. "dag_dataops_unified_scheduler",
  1242. start_date=datetime(2024, 1, 1),
  1243. schedule_interval="@daily",
  1244. catchup=False,
  1245. default_args={
  1246. 'owner': 'airflow',
  1247. 'depends_on_past': False,
  1248. 'email_on_failure': False,
  1249. 'email_on_retry': False,
  1250. 'retries': 1,
  1251. 'retry_delay': timedelta(minutes=5)
  1252. }
  1253. ) as dag:
  1254. # 初始化全局变量,避免在DAG解析时出现未定义错误
  1255. globals()['_resource_tasks'] = []
  1256. globals()['_task_dict'] = {}
  1257. # DAG开始任务
  1258. dag_start = EmptyOperator(task_id="dag_start")
  1259. # DAG结束任务
  1260. dag_end = EmptyOperator(
  1261. task_id="dag_end",
  1262. trigger_rule="all_done" # 确保DAG无论上游任务成功与否都能完成
  1263. )
  1264. # 准备阶段任务
  1265. prepare_task = PythonOperator(
  1266. task_id="prepare_dag_schedule",
  1267. python_callable=prepare_dag_schedule,
  1268. provide_context=True
  1269. )
  1270. # 汇总执行情况任务
  1271. summarize_task = PythonOperator(
  1272. task_id='summarize_execution',
  1273. python_callable=summarize_execution,
  1274. provide_context=True,
  1275. trigger_rule='all_done', # 无论之前的任务成功还是失败都执行
  1276. retries=2, # 增加重试次数
  1277. retry_delay=timedelta(minutes=1) # 重试延迟
  1278. )
  1279. # 数据处理阶段
  1280. # 获取所有需要执行的任务(实际任务,不是TaskGroup包装的任务)
  1281. exec_date = get_latest_date()
  1282. resource_tasks, model_tasks = get_all_tasks(exec_date)
  1283. # 创建任务字典,用于设置依赖关系
  1284. task_dict = {}
  1285. # 创建资源表任务
  1286. for task_info in resource_tasks:
  1287. table_name = task_info["target_table"]
  1288. script_name = task_info["script_name"]
  1289. exec_mode = task_info.get("script_exec_mode", "append")
  1290. # 创建安全的任务ID
  1291. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  1292. task_id = f"resource_{safe_table_name}"
  1293. # 直接使用 execute_with_monitoring 函数,确保执行脚本
  1294. resource_task = PythonOperator(
  1295. task_id=task_id,
  1296. python_callable=execute_with_monitoring,
  1297. op_kwargs={
  1298. "target_table": table_name,
  1299. "script_name": script_name,
  1300. "script_exec_mode": exec_mode,
  1301. "exec_date": exec_date
  1302. },
  1303. retries=2,
  1304. retry_delay=timedelta(minutes=1),
  1305. trigger_rule="all_done" # 确保无论上游任务成功或失败都会执行
  1306. )
  1307. # 将任务添加到字典
  1308. task_dict[table_name] = resource_task
  1309. # 设置依赖关系:prepare_task -> resource_task
  1310. prepare_task >> resource_task
  1311. # 为所有模型表获取依赖关系
  1312. model_table_names = [task["target_table"] for task in model_tasks]
  1313. dependencies = get_table_dependencies_for_data_phase(model_table_names)
  1314. # 创建有向图,用于确定执行顺序
  1315. G = nx.DiGraph()
  1316. # 将所有模型表添加为节点
  1317. for task_info in model_tasks:
  1318. G.add_node(task_info["target_table"])
  1319. # 添加依赖边
  1320. for source, deps in dependencies.items():
  1321. for dep in deps:
  1322. if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
  1323. G.add_edge(dep.get("table_name"), source)
  1324. # 处理循环依赖
  1325. cycles = list(nx.simple_cycles(G))
  1326. if cycles:
  1327. for cycle in cycles:
  1328. G.remove_edge(cycle[-1], cycle[0])
  1329. # 获取执行顺序
  1330. try:
  1331. execution_order = list(nx.topological_sort(G))
  1332. except Exception as e:
  1333. execution_order = [task["target_table"] for task in model_tasks]
  1334. # 创建模型表任务
  1335. for table_name in execution_order:
  1336. task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
  1337. if not task_info:
  1338. continue
  1339. script_name = task_info["script_name"]
  1340. exec_mode = task_info.get("script_exec_mode", "append")
  1341. # 创建安全的任务ID
  1342. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  1343. task_id = f"model_{safe_table_name}"
  1344. # 直接使用 execute_with_monitoring 函数执行脚本
  1345. model_task = PythonOperator(
  1346. task_id=task_id,
  1347. python_callable=execute_with_monitoring,
  1348. op_kwargs={
  1349. "target_table": table_name,
  1350. "script_name": script_name,
  1351. "script_exec_mode": exec_mode,
  1352. "exec_date": exec_date
  1353. },
  1354. retries=2,
  1355. retry_delay=timedelta(minutes=1),
  1356. trigger_rule="all_done" # 确保无论上游任务成功或失败都会执行
  1357. )
  1358. # 将任务添加到字典
  1359. task_dict[table_name] = model_task
  1360. # 设置依赖关系
  1361. deps = dependencies.get(table_name, [])
  1362. has_dependency = False
  1363. # 处理模型表之间的依赖
  1364. for dep in deps:
  1365. dep_table = dep.get("table_name")
  1366. if dep_table in task_dict:
  1367. task_dict[dep_table] >> model_task
  1368. has_dependency = True
  1369. # 如果没有依赖,则依赖于所有资源表任务
  1370. if not has_dependency and resource_tasks:
  1371. for resource_task_info in resource_tasks:
  1372. resource_name = resource_task_info["target_table"]
  1373. if resource_name in task_dict:
  1374. task_dict[resource_name] >> model_task
  1375. # 如果没有依赖,也没有资源表,则直接依赖于prepare_task
  1376. if not has_dependency and not resource_tasks:
  1377. prepare_task >> model_task
  1378. # 所有处理任务都是summarize_task的上游
  1379. for task in task_dict.values():
  1380. task >> summarize_task
  1381. # 设置主要流程
  1382. dag_start >> prepare_task
  1383. # 创建执行计划文件任务
  1384. create_plan_task = PythonOperator(
  1385. task_id="create_execution_plan",
  1386. python_callable=create_execution_plan,
  1387. provide_context=True
  1388. )
  1389. # 设置依赖关系
  1390. prepare_task >> create_plan_task >> summarize_task >> dag_end
  1391. logger.info(f"DAG dag_dataops_unified_scheduler 定义完成,创建了 {len(task_dict)} 个脚本执行任务")
  1392. # 尝试从数据库获取最新的执行计划,用于WebUI展示
  1393. try:
  1394. # 使用一个只在DAG加载时执行一次的简单查询来获取表信息
  1395. # 这只用于UI展示,不影响实际执行
  1396. conn = get_pg_conn()
  1397. cursor = conn.cursor()
  1398. try:
  1399. cursor.execute("""
  1400. SELECT COUNT(*) FROM airflow_dag_schedule
  1401. """)
  1402. count = cursor.fetchone()
  1403. if count and count[0] > 0:
  1404. logger.info(f"数据库中有 {count[0]} 条任务记录可用于调度")
  1405. else:
  1406. logger.info("数据库中没有找到任务记录,DAG的第一次运行将创建初始计划")
  1407. except Exception as e:
  1408. logger.warning(f"查询数据库时出错: {str(e)}, 这不会影响DAG的实际执行")
  1409. finally:
  1410. cursor.close()
  1411. conn.close()
  1412. except Exception as e:
  1413. logger.warning(f"初始化DAG时发生错误: {str(e)}, 这不会影响DAG的实际执行")
  1414. # 确保即使出错,也有清晰的执行路径
  1415. # 已经有默认依赖链,不需要额外添加