dag_dataops_unified_scheduler.py 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214
  1. # dag_dataops_unified_scheduler.py
  2. # 合并了prepare, data和summary三个DAG的功能
  3. from airflow import DAG
  4. from airflow.operators.python import PythonOperator
  5. from airflow.operators.empty import EmptyOperator
  6. from airflow.utils.task_group import TaskGroup
  7. from datetime import datetime, timedelta, date
  8. import logging
  9. import networkx as nx
  10. import json
  11. from decimal import Decimal
  12. from common import (
  13. get_pg_conn,
  14. get_neo4j_driver,
  15. execute_with_monitoring,
  16. get_today_date
  17. )
  18. from config import TASK_RETRY_CONFIG, PG_CONFIG, NEO4J_CONFIG
  19. import os
  20. # 创建日志记录器
  21. logger = logging.getLogger(__name__)
  22. # 添加日期序列化器
  23. def json_serial(obj):
  24. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  25. if isinstance(obj, (datetime, date)):
  26. return obj.isoformat()
  27. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  28. # 添加自定义JSON编码器解决Decimal序列化问题
  29. class DecimalEncoder(json.JSONEncoder):
  30. def default(self, obj):
  31. if isinstance(obj, Decimal):
  32. return float(obj)
  33. # 处理日期类型
  34. elif isinstance(obj, (datetime, date)):
  35. return obj.isoformat()
  36. # 让父类处理其他类型
  37. return super(DecimalEncoder, self).default(obj)
  38. #############################################
  39. # 第一阶段: 准备阶段(Prepare Phase)的函数
  40. #############################################
  41. def get_enabled_tables():
  42. """获取所有启用的表"""
  43. conn = get_pg_conn()
  44. cursor = conn.cursor()
  45. try:
  46. cursor.execute("""
  47. SELECT owner_id, table_name
  48. FROM schedule_status
  49. WHERE schedule_is_enabled = TRUE
  50. """)
  51. result = cursor.fetchall()
  52. return [row[1] for row in result] # 只返回表名
  53. except Exception as e:
  54. logger.error(f"获取启用表失败: {str(e)}")
  55. return []
  56. finally:
  57. cursor.close()
  58. conn.close()
  59. def check_table_directly_subscribed(table_name):
  60. """检查表是否在schedule_status表中直接订阅"""
  61. conn = get_pg_conn()
  62. cursor = conn.cursor()
  63. try:
  64. cursor.execute("""
  65. SELECT schedule_is_enabled
  66. FROM schedule_status
  67. WHERE table_name = %s
  68. """, (table_name,))
  69. result = cursor.fetchone()
  70. return result and result[0] is True
  71. except Exception as e:
  72. logger.error(f"检查表订阅状态失败: {str(e)}")
  73. return False
  74. finally:
  75. cursor.close()
  76. conn.close()
  77. def get_table_info_from_neo4j(table_name):
  78. """从Neo4j获取表的详细信息"""
  79. driver = get_neo4j_driver()
  80. # 检查表是否直接订阅
  81. is_directly_schedule = check_table_directly_subscribed(table_name)
  82. table_info = {
  83. 'target_table': table_name,
  84. 'is_directly_schedule': is_directly_schedule, # 初始值设为True,从schedule_status表获取
  85. }
  86. try:
  87. with driver.session() as session:
  88. # 查询表标签和状态
  89. query_table = """
  90. MATCH (t {en_name: $table_name})
  91. RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency
  92. """
  93. result = session.run(query_table, table_name=table_name)
  94. record = result.single()
  95. if record:
  96. labels = record.get("labels", [])
  97. table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
  98. table_info['target_table_status'] = record.get("status", True) # 默认为True
  99. table_info['default_update_frequency'] = record.get("frequency")
  100. # 根据标签类型查询关系和脚本信息
  101. if "DataResource" in labels:
  102. query_rel = """
  103. MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
  104. RETURN source.en_name AS source_table, rel.script_name AS script_name,
  105. rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
  106. """
  107. elif "DataModel" in labels:
  108. query_rel = """
  109. MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
  110. RETURN source.en_name AS source_table, rel.script_name AS script_name,
  111. rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
  112. """
  113. else:
  114. logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
  115. return table_info
  116. result = session.run(query_rel, table_name=table_name)
  117. record = result.single()
  118. if record:
  119. table_info['source_table'] = record.get("source_table")
  120. # 检查script_name是否为空
  121. script_name = record.get("script_name")
  122. if not script_name:
  123. logger.warning(f"表 {table_name} 的关系中没有script_name属性,可能导致后续处理出错")
  124. table_info['script_name'] = script_name
  125. # 设置默认值,确保即使属性为空也有默认值
  126. table_info['script_type'] = record.get("script_type", "python") # 默认为python
  127. table_info['script_exec_mode'] = record.get("script_exec_mode", "append") # 默认为append
  128. else:
  129. logger.warning(f"未找到表 {table_name} 的关系信息")
  130. else:
  131. logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
  132. except Exception as e:
  133. logger.error(f"获取表 {table_name} 的信息时出错: {str(e)}")
  134. finally:
  135. driver.close()
  136. return table_info
  137. def process_dependencies(tables_info):
  138. """处理表间依赖关系,添加被动调度的表"""
  139. # 存储所有表信息的字典
  140. all_tables = {t['target_table']: t for t in tables_info}
  141. driver = get_neo4j_driver()
  142. try:
  143. with driver.session() as session:
  144. for table_name, table_info in list(all_tables.items()):
  145. if table_info.get('target_table_label') == 'DataModel':
  146. # 查询其依赖表
  147. query = """
  148. MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
  149. RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels,
  150. dep.status AS dep_status, dep.frequency AS dep_frequency
  151. """
  152. result = session.run(query, table_name=table_name)
  153. for record in result:
  154. dep_name = record.get("dep_name")
  155. dep_labels = record.get("dep_labels", [])
  156. dep_status = record.get("dep_status", True)
  157. dep_frequency = record.get("dep_frequency")
  158. # 处理未被直接调度的依赖表
  159. if dep_name and dep_name not in all_tables:
  160. logger.info(f"发现被动依赖表: {dep_name}, 标签: {dep_labels}")
  161. # 获取依赖表详细信息
  162. dep_info = get_table_info_from_neo4j(dep_name)
  163. dep_info['is_directly_schedule'] = False
  164. # 处理调度频率继承
  165. if not dep_info.get('default_update_frequency'):
  166. dep_info['default_update_frequency'] = table_info.get('default_update_frequency')
  167. all_tables[dep_name] = dep_info
  168. except Exception as e:
  169. logger.error(f"处理依赖关系时出错: {str(e)}")
  170. finally:
  171. driver.close()
  172. return list(all_tables.values())
  173. def filter_invalid_tables(tables_info):
  174. """过滤无效表及其依赖,使用NetworkX构建依赖图"""
  175. # 构建表名到索引的映射
  176. table_dict = {t['target_table']: i for i, t in enumerate(tables_info)}
  177. # 找出无效表
  178. invalid_tables = set()
  179. for table in tables_info:
  180. if table.get('target_table_status') is False:
  181. invalid_tables.add(table['target_table'])
  182. logger.info(f"表 {table['target_table']} 的状态为无效")
  183. # 构建依赖图
  184. G = nx.DiGraph()
  185. # 添加所有节点
  186. for table in tables_info:
  187. G.add_node(table['target_table'])
  188. # 查询并添加依赖边
  189. driver = get_neo4j_driver()
  190. try:
  191. with driver.session() as session:
  192. for table in tables_info:
  193. if table.get('target_table_label') == 'DataModel':
  194. query = """
  195. MATCH (source {en_name: $table_name})-[:DERIVED_FROM]->(target)
  196. RETURN target.en_name AS target_name
  197. """
  198. result = session.run(query, table_name=table['target_table'])
  199. for record in result:
  200. target_name = record.get("target_name")
  201. if target_name and target_name in table_dict:
  202. # 添加从目标到源的边,表示目标依赖于源
  203. G.add_edge(table['target_table'], target_name)
  204. logger.debug(f"添加依赖边: {table['target_table']} -> {target_name}")
  205. except Exception as e:
  206. logger.error(f"构建依赖图时出错: {str(e)}")
  207. finally:
  208. driver.close()
  209. # 找出依赖于无效表的所有表
  210. downstream_invalid = set()
  211. for invalid_table in invalid_tables:
  212. # 获取可从无效表到达的所有节点
  213. try:
  214. descendants = nx.descendants(G, invalid_table)
  215. downstream_invalid.update(descendants)
  216. logger.info(f"表 {invalid_table} 的下游无效表: {descendants}")
  217. except Exception as e:
  218. logger.error(f"处理表 {invalid_table} 的下游依赖时出错: {str(e)}")
  219. # 合并所有无效表
  220. all_invalid = invalid_tables.union(downstream_invalid)
  221. logger.info(f"总共 {len(all_invalid)} 个表被标记为无效: {all_invalid}")
  222. # 过滤出有效表
  223. valid_tables = [t for t in tables_info if t['target_table'] not in all_invalid]
  224. logger.info(f"过滤后保留 {len(valid_tables)} 个有效表")
  225. return valid_tables
  226. def write_to_airflow_dag_schedule(exec_date, tables_info):
  227. """将表信息写入airflow_dag_schedule表"""
  228. conn = get_pg_conn()
  229. cursor = conn.cursor()
  230. try:
  231. # 清理当日数据,避免重复
  232. cursor.execute("""
  233. DELETE FROM airflow_dag_schedule WHERE exec_date = %s
  234. """, (exec_date,))
  235. logger.info(f"已清理执行日期 {exec_date} 的现有数据")
  236. # 批量插入新数据
  237. inserted_count = 0
  238. for table in tables_info:
  239. cursor.execute("""
  240. INSERT INTO airflow_dag_schedule (
  241. exec_date, source_table, target_table, target_table_label,
  242. target_table_status, is_directly_schedule, default_update_frequency,
  243. script_name, script_type, script_exec_mode
  244. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  245. """, (
  246. exec_date,
  247. table.get('source_table'),
  248. table['target_table'],
  249. table.get('target_table_label'),
  250. table.get('target_table_status', True),
  251. table.get('is_directly_schedule', False),
  252. table.get('default_update_frequency'),
  253. table.get('script_name'),
  254. table.get('script_type', 'python'),
  255. table.get('script_exec_mode', 'append')
  256. ))
  257. inserted_count += 1
  258. conn.commit()
  259. logger.info(f"成功插入 {inserted_count} 条记录到 airflow_dag_schedule 表")
  260. return inserted_count
  261. except Exception as e:
  262. logger.error(f"写入 airflow_dag_schedule 表时出错: {str(e)}")
  263. conn.rollback()
  264. # 重新抛出异常,确保错误被正确传播
  265. raise
  266. finally:
  267. cursor.close()
  268. conn.close()
  269. def prepare_dag_schedule(**kwargs):
  270. """准备DAG调度任务的主函数"""
  271. exec_date = kwargs.get('ds') or get_today_date()
  272. logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
  273. # 1. 获取启用的表
  274. enabled_tables = get_enabled_tables()
  275. logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
  276. if not enabled_tables:
  277. logger.warning("没有找到启用的表,准备工作结束")
  278. return 0
  279. # 2. 获取表的详细信息
  280. tables_info = []
  281. for table_name in enabled_tables:
  282. table_info = get_table_info_from_neo4j(table_name)
  283. if table_info:
  284. tables_info.append(table_info)
  285. logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
  286. # 3. 处理依赖关系,添加被动调度的表
  287. enriched_tables = process_dependencies(tables_info)
  288. logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
  289. # 4. 过滤无效表及其依赖
  290. valid_tables = filter_invalid_tables(enriched_tables)
  291. logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
  292. # 5. 写入airflow_dag_schedule表
  293. inserted_count = write_to_airflow_dag_schedule(exec_date, valid_tables)
  294. # 6. 检查插入操作是否成功,如果失败则抛出异常
  295. if inserted_count == 0 and valid_tables:
  296. error_msg = f"插入操作失败,无记录被插入到airflow_dag_schedule表,但有{len(valid_tables)}个有效表需要处理"
  297. logger.error(error_msg)
  298. raise Exception(error_msg)
  299. # 7. 生成执行计划数据
  300. resource_tasks = []
  301. model_tasks = []
  302. for table in valid_tables:
  303. if table.get('target_table_label') == 'DataResource':
  304. resource_tasks.append({
  305. "source_table": table.get('source_table'),
  306. "target_table": table['target_table'],
  307. "target_table_label": "DataResource",
  308. "script_name": table.get('script_name'),
  309. "script_exec_mode": table.get('script_exec_mode', 'append')
  310. })
  311. elif table.get('target_table_label') == 'DataModel':
  312. model_tasks.append({
  313. "source_table": table.get('source_table'),
  314. "target_table": table['target_table'],
  315. "target_table_label": "DataModel",
  316. "script_name": table.get('script_name'),
  317. "script_exec_mode": table.get('script_exec_mode', 'append')
  318. })
  319. # 获取依赖关系
  320. model_table_names = [t['target_table'] for t in model_tasks]
  321. dependencies = {}
  322. driver = get_neo4j_driver()
  323. try:
  324. with driver.session() as session:
  325. for table_name in model_table_names:
  326. query = """
  327. MATCH (source:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(target)
  328. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  329. """
  330. result = session.run(query, table_name=table_name)
  331. deps = []
  332. for record in result:
  333. target = record.get("target")
  334. target_labels = record.get("target_labels", [])
  335. if target:
  336. table_type = next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  337. deps.append({
  338. "table_name": target,
  339. "table_type": table_type
  340. })
  341. dependencies[table_name] = deps
  342. finally:
  343. driver.close()
  344. # 创建执行计划
  345. execution_plan = {
  346. "exec_date": exec_date,
  347. "resource_tasks": resource_tasks,
  348. "model_tasks": model_tasks,
  349. "dependencies": dependencies
  350. }
  351. # 将执行计划保存到XCom
  352. kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
  353. logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  354. # 保存执行计划到文件
  355. try:
  356. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  357. with open(plan_path, 'w') as f:
  358. json.dump(execution_plan, f, default=json_serial, indent=2)
  359. logger.info(f"将执行计划保存到文件: {plan_path}")
  360. except Exception as file_e:
  361. logger.error(f"保存执行计划到文件时出错: {str(file_e)}")
  362. return inserted_count
  363. #############################################
  364. # 第二阶段: 数据处理阶段(Data Processing Phase)的函数
  365. #############################################
  366. def get_latest_date():
  367. """获取数据库中包含记录的最近日期"""
  368. conn = get_pg_conn()
  369. cursor = conn.cursor()
  370. try:
  371. cursor.execute("""
  372. SELECT DISTINCT exec_date
  373. FROM airflow_dag_schedule
  374. ORDER BY exec_date DESC
  375. LIMIT 1
  376. """)
  377. result = cursor.fetchone()
  378. if result:
  379. latest_date = result[0]
  380. logger.info(f"找到最近的包含记录的日期: {latest_date}")
  381. return latest_date
  382. else:
  383. logger.warning("未找到包含记录的日期,将使用当前日期")
  384. return get_today_date()
  385. except Exception as e:
  386. logger.error(f"查找最近日期时出错: {str(e)}")
  387. return get_today_date()
  388. finally:
  389. cursor.close()
  390. conn.close()
  391. def get_all_tasks(exec_date):
  392. """获取所有需要执行的任务(DataResource和DataModel)"""
  393. conn = get_pg_conn()
  394. cursor = conn.cursor()
  395. try:
  396. # 查询所有资源表任务
  397. cursor.execute("""
  398. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  399. FROM airflow_dag_schedule
  400. WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
  401. """, (exec_date,))
  402. resource_results = cursor.fetchall()
  403. # 查询所有模型表任务
  404. cursor.execute("""
  405. SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
  406. FROM airflow_dag_schedule
  407. WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
  408. """, (exec_date,))
  409. model_results = cursor.fetchall()
  410. # 整理资源表信息
  411. resource_tasks = []
  412. for row in resource_results:
  413. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  414. if script_name: # 确保脚本名称不为空
  415. resource_tasks.append({
  416. "source_table": source_table,
  417. "target_table": target_table,
  418. "target_table_label": target_table_label,
  419. "script_name": script_name,
  420. "script_exec_mode": script_exec_mode or "append"
  421. })
  422. # 整理模型表信息
  423. model_tasks = []
  424. for row in model_results:
  425. source_table, target_table, target_table_label, script_name, script_exec_mode = row
  426. if script_name: # 确保脚本名称不为空
  427. model_tasks.append({
  428. "source_table": source_table,
  429. "target_table": target_table,
  430. "target_table_label": target_table_label,
  431. "script_name": script_name,
  432. "script_exec_mode": script_exec_mode or "append"
  433. })
  434. logger.info(f"获取到 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  435. return resource_tasks, model_tasks
  436. except Exception as e:
  437. logger.error(f"获取任务信息时出错: {str(e)}")
  438. return [], []
  439. finally:
  440. cursor.close()
  441. conn.close()
  442. def get_table_dependencies_for_data_phase(table_names):
  443. """获取表之间的依赖关系"""
  444. driver = get_neo4j_driver()
  445. dependency_dict = {name: [] for name in table_names}
  446. try:
  447. with driver.session() as session:
  448. # 获取所有模型表之间的依赖关系
  449. query = """
  450. MATCH (source:DataModel)-[:DERIVED_FROM]->(target)
  451. WHERE source.en_name IN $table_names
  452. RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
  453. """
  454. result = session.run(query, table_names=table_names)
  455. for record in result:
  456. source = record.get("source")
  457. target = record.get("target")
  458. target_labels = record.get("target_labels", [])
  459. if source and target:
  460. # 将目标表添加到源表的依赖列表中
  461. dependency_dict[source].append({
  462. "table_name": target,
  463. "table_type": next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
  464. })
  465. logger.debug(f"依赖关系: {source} 依赖于 {target}")
  466. except Exception as e:
  467. logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
  468. finally:
  469. driver.close()
  470. return dependency_dict
  471. def create_execution_plan(**kwargs):
  472. """准备执行计划的函数,使用从准备阶段传递的数据"""
  473. try:
  474. # 从XCom获取执行计划
  475. execution_plan = kwargs['ti'].xcom_pull(task_ids='prepare_phase.prepare_dag_schedule', key='execution_plan')
  476. # 如果找不到执行计划,则从数据库获取
  477. if not execution_plan:
  478. # 获取执行日期
  479. exec_date = get_latest_date()
  480. logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
  481. # 获取所有任务
  482. resource_tasks, model_tasks = get_all_tasks(exec_date)
  483. if not resource_tasks and not model_tasks:
  484. logger.warning(f"执行日期 {exec_date} 没有找到任务")
  485. return 0
  486. # 为所有模型表获取依赖关系
  487. model_table_names = [task["target_table"] for task in model_tasks]
  488. dependencies = get_table_dependencies_for_data_phase(model_table_names)
  489. # 创建执行计划
  490. new_execution_plan = {
  491. "exec_date": exec_date,
  492. "resource_tasks": resource_tasks,
  493. "model_tasks": model_tasks,
  494. "dependencies": dependencies
  495. }
  496. # 保存执行计划
  497. kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(new_execution_plan, default=json_serial))
  498. logger.info(f"创建新的执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
  499. # 保存执行计划到文件
  500. try:
  501. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  502. with open(plan_path, 'w') as f:
  503. json.dump(new_execution_plan, f, default=json_serial, indent=2)
  504. logger.info(f"将执行计划保存到文件: {plan_path}")
  505. except Exception as file_e:
  506. logger.error(f"保存执行计划到文件时出错: {str(file_e)}")
  507. return json.dumps(new_execution_plan, default=json_serial)
  508. # 如果从XCom获取到了执行计划,也保存到文件
  509. try:
  510. plan_json = json.loads(execution_plan) if isinstance(execution_plan, str) else execution_plan
  511. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  512. with open(plan_path, 'w') as f:
  513. json.dump(plan_json, f, default=json_serial, indent=2)
  514. logger.info(f"将从XCom获取的执行计划保存到文件: {plan_path}")
  515. except Exception as file_e:
  516. logger.error(f"保存从XCom获取的执行计划到文件时出错: {str(file_e)}")
  517. logger.info(f"成功获取执行计划")
  518. return execution_plan
  519. except Exception as e:
  520. logger.error(f"创建执行计划时出错: {str(e)}")
  521. # 返回空执行计划
  522. empty_plan = {
  523. "exec_date": get_today_date(),
  524. "resource_tasks": [],
  525. "model_tasks": [],
  526. "dependencies": {}
  527. }
  528. # 尝试保存空执行计划到文件
  529. try:
  530. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  531. with open(plan_path, 'w') as f:
  532. json.dump(empty_plan, f, default=json_serial, indent=2)
  533. logger.info(f"将空执行计划保存到文件: {plan_path}")
  534. except Exception as file_e:
  535. logger.error(f"保存空执行计划到文件时出错: {str(file_e)}")
  536. return json.dumps(empty_plan, default=json_serial)
  537. def process_resource(target_table, script_name, script_exec_mode, exec_date):
  538. """处理单个资源表"""
  539. logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
  540. # 检查exec_date是否是JSON字符串
  541. if isinstance(exec_date, str) and exec_date.startswith('{'):
  542. try:
  543. # 尝试解析JSON字符串
  544. exec_date_data = json.loads(exec_date)
  545. exec_date = exec_date_data.get("exec_date")
  546. logger.info(f"从JSON中提取执行日期: {exec_date}")
  547. except Exception as e:
  548. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  549. return execute_with_monitoring(
  550. target_table=target_table,
  551. script_name=script_name,
  552. script_exec_mode=script_exec_mode,
  553. exec_date=exec_date
  554. )
  555. def process_model(target_table, script_name, script_exec_mode, exec_date):
  556. """处理单个模型表"""
  557. logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
  558. # 检查exec_date是否是JSON字符串
  559. if isinstance(exec_date, str) and exec_date.startswith('{'):
  560. try:
  561. # 尝试解析JSON字符串
  562. exec_date_data = json.loads(exec_date)
  563. exec_date = exec_date_data.get("exec_date")
  564. logger.info(f"从JSON中提取执行日期: {exec_date}")
  565. except Exception as e:
  566. logger.error(f"解析exec_date JSON时出错: {str(e)}")
  567. return execute_with_monitoring(
  568. target_table=target_table,
  569. script_name=script_name,
  570. script_exec_mode=script_exec_mode,
  571. exec_date=exec_date
  572. )
  573. #############################################
  574. # 第三阶段: 汇总阶段(Summary Phase)的函数
  575. #############################################
  576. def get_execution_stats(exec_date):
  577. """获取当日执行统计信息"""
  578. conn = get_pg_conn()
  579. cursor = conn.cursor()
  580. try:
  581. # 查询总任务数
  582. cursor.execute("""
  583. SELECT COUNT(*) FROM airflow_dag_schedule WHERE exec_date = %s
  584. """, (exec_date,))
  585. result = cursor.fetchone()
  586. total_tasks = result[0] if result else 0
  587. # 查询每种类型的任务数
  588. cursor.execute("""
  589. SELECT target_table_label, COUNT(*)
  590. FROM airflow_dag_schedule
  591. WHERE exec_date = %s
  592. GROUP BY target_table_label
  593. """, (exec_date,))
  594. type_counts = {row[0]: row[1] for row in cursor.fetchall()}
  595. # 查询执行结果统计
  596. cursor.execute("""
  597. SELECT COUNT(*)
  598. FROM airflow_dag_schedule
  599. WHERE exec_date = %s AND exec_result IS TRUE
  600. """, (exec_date,))
  601. result = cursor.fetchone()
  602. success_count = result[0] if result else 0
  603. cursor.execute("""
  604. SELECT COUNT(*)
  605. FROM airflow_dag_schedule
  606. WHERE exec_date = %s AND exec_result IS FALSE
  607. """, (exec_date,))
  608. result = cursor.fetchone()
  609. fail_count = result[0] if result else 0
  610. cursor.execute("""
  611. SELECT COUNT(*)
  612. FROM airflow_dag_schedule
  613. WHERE exec_date = %s AND exec_result IS NULL
  614. """, (exec_date,))
  615. result = cursor.fetchone()
  616. pending_count = result[0] if result else 0
  617. # 计算执行时间统计
  618. cursor.execute("""
  619. SELECT AVG(exec_duration), MIN(exec_duration), MAX(exec_duration)
  620. FROM airflow_dag_schedule
  621. WHERE exec_date = %s AND exec_duration IS NOT NULL
  622. """, (exec_date,))
  623. time_stats = cursor.fetchone()
  624. # 确保时间统计不为None
  625. if time_stats and time_stats[0] is not None:
  626. avg_duration = float(time_stats[0])
  627. min_duration = float(time_stats[1]) if time_stats[1] is not None else None
  628. max_duration = float(time_stats[2]) if time_stats[2] is not None else None
  629. else:
  630. avg_duration = None
  631. min_duration = None
  632. max_duration = None
  633. # 查询失败任务详情
  634. cursor.execute("""
  635. SELECT target_table, script_name, target_table_label, exec_duration
  636. FROM airflow_dag_schedule
  637. WHERE exec_date = %s AND exec_result IS FALSE
  638. """, (exec_date,))
  639. failed_tasks = []
  640. for row in cursor.fetchall():
  641. task_dict = {
  642. "target_table": row[0],
  643. "script_name": row[1],
  644. "target_table_label": row[2],
  645. }
  646. if row[3] is not None:
  647. task_dict["exec_duration"] = float(row[3])
  648. else:
  649. task_dict["exec_duration"] = None
  650. failed_tasks.append(task_dict)
  651. # 计算成功率,避免除零错误
  652. success_rate = 0
  653. if total_tasks > 0:
  654. success_rate = (success_count / total_tasks) * 100
  655. # 汇总统计信息
  656. stats = {
  657. "exec_date": exec_date,
  658. "total_tasks": total_tasks,
  659. "type_counts": type_counts,
  660. "success_count": success_count,
  661. "fail_count": fail_count,
  662. "pending_count": pending_count,
  663. "success_rate": success_rate,
  664. "avg_duration": avg_duration,
  665. "min_duration": min_duration,
  666. "max_duration": max_duration,
  667. "failed_tasks": failed_tasks
  668. }
  669. return stats
  670. except Exception as e:
  671. logger.error(f"获取执行统计信息时出错: {str(e)}")
  672. return {}
  673. finally:
  674. cursor.close()
  675. conn.close()
  676. def update_missing_results(exec_date):
  677. """更新缺失的执行结果信息"""
  678. conn = get_pg_conn()
  679. cursor = conn.cursor()
  680. try:
  681. # 查询所有缺失执行结果的任务
  682. cursor.execute("""
  683. SELECT target_table, script_name
  684. FROM airflow_dag_schedule
  685. WHERE exec_date = %s AND exec_result IS NULL
  686. """, (exec_date,))
  687. missing_results = cursor.fetchall()
  688. update_count = 0
  689. for row in missing_results:
  690. target_table, script_name = row
  691. # 如果有开始时间但没有结束时间,假设执行失败
  692. cursor.execute("""
  693. SELECT exec_start_time
  694. FROM airflow_dag_schedule
  695. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  696. """, (exec_date, target_table, script_name))
  697. start_time = cursor.fetchone()
  698. if start_time and start_time[0]:
  699. # 有开始时间但无结果,标记为失败
  700. now = datetime.now()
  701. duration = (now - start_time[0]).total_seconds()
  702. cursor.execute("""
  703. UPDATE airflow_dag_schedule
  704. SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
  705. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  706. """, (now, duration, exec_date, target_table, script_name))
  707. logger.warning(f"任务 {target_table} 的脚本 {script_name} 标记为失败,开始时间: {start_time[0]}")
  708. update_count += 1
  709. else:
  710. # 没有开始时间且无结果,假设未执行
  711. logger.warning(f"任务 {target_table} 的脚本 {script_name} 未执行")
  712. conn.commit()
  713. logger.info(f"更新了 {update_count} 个缺失结果的任务")
  714. return update_count
  715. except Exception as e:
  716. logger.error(f"更新缺失执行结果时出错: {str(e)}")
  717. conn.rollback()
  718. return 0
  719. finally:
  720. cursor.close()
  721. conn.close()
  722. def generate_unified_execution_report(exec_date, stats):
  723. """生成统一执行报告"""
  724. # 构建报告
  725. report = []
  726. report.append(f"========== 统一数据运维系统执行报告 ==========")
  727. report.append(f"执行日期: {exec_date}")
  728. report.append(f"总任务数: {stats['total_tasks']}")
  729. # 任务类型分布
  730. report.append("\n--- 任务类型分布 ---")
  731. for label, count in stats.get('type_counts', {}).items():
  732. report.append(f"{label} 任务: {count} 个")
  733. # 执行结果统计
  734. report.append("\n--- 执行结果统计 ---")
  735. report.append(f"成功任务: {stats.get('success_count', 0)} 个")
  736. report.append(f"失败任务: {stats.get('fail_count', 0)} 个")
  737. report.append(f"未执行任务: {stats.get('pending_count', 0)} 个")
  738. report.append(f"成功率: {stats.get('success_rate', 0):.2f}%")
  739. # 执行时间统计
  740. report.append("\n--- 执行时间统计 (秒) ---")
  741. avg_duration = stats.get('avg_duration')
  742. min_duration = stats.get('min_duration')
  743. max_duration = stats.get('max_duration')
  744. report.append(f"平均执行时间: {avg_duration:.2f}" if avg_duration is not None else "平均执行时间: N/A")
  745. report.append(f"最短执行时间: {min_duration:.2f}" if min_duration is not None else "最短执行时间: N/A")
  746. report.append(f"最长执行时间: {max_duration:.2f}" if max_duration is not None else "最长执行时间: N/A")
  747. # 失败任务详情
  748. failed_tasks = stats.get('failed_tasks', [])
  749. if failed_tasks:
  750. report.append("\n--- 失败任务详情 ---")
  751. for i, task in enumerate(failed_tasks, 1):
  752. report.append(f"{i}. 表名: {task['target_table']}")
  753. report.append(f" 脚本: {task['script_name']}")
  754. report.append(f" 类型: {task['target_table_label']}")
  755. exec_duration = task.get('exec_duration')
  756. if exec_duration is not None:
  757. report.append(f" 执行时间: {exec_duration:.2f} 秒")
  758. else:
  759. report.append(" 执行时间: N/A")
  760. report.append("\n========== 报告结束 ==========")
  761. # 将报告转换为字符串
  762. report_str = "\n".join(report)
  763. # 记录到日志
  764. logger.info("\n" + report_str)
  765. return report_str
  766. def summarize_execution(**kwargs):
  767. """汇总执行情况的主函数"""
  768. try:
  769. exec_date = kwargs.get('ds') or get_today_date()
  770. logger.info(f"开始汇总执行日期 {exec_date} 的统一执行情况")
  771. # 1. 更新缺失的执行结果
  772. try:
  773. update_count = update_missing_results(exec_date)
  774. logger.info(f"更新了 {update_count} 个缺失的执行结果")
  775. except Exception as e:
  776. logger.error(f"更新缺失执行结果时出错: {str(e)}")
  777. update_count = 0
  778. # 2. 获取执行统计信息
  779. try:
  780. stats = get_execution_stats(exec_date)
  781. if not stats:
  782. logger.warning("未能获取执行统计信息,将使用默认值")
  783. stats = {
  784. "exec_date": exec_date,
  785. "total_tasks": 0,
  786. "type_counts": {},
  787. "success_count": 0,
  788. "fail_count": 0,
  789. "pending_count": 0,
  790. "success_rate": 0,
  791. "avg_duration": None,
  792. "min_duration": None,
  793. "max_duration": None,
  794. "failed_tasks": []
  795. }
  796. except Exception as e:
  797. logger.error(f"获取执行统计信息时出错: {str(e)}")
  798. stats = {
  799. "exec_date": exec_date,
  800. "total_tasks": 0,
  801. "type_counts": {},
  802. "success_count": 0,
  803. "fail_count": 0,
  804. "pending_count": 0,
  805. "success_rate": 0,
  806. "avg_duration": None,
  807. "min_duration": None,
  808. "max_duration": None,
  809. "failed_tasks": []
  810. }
  811. # 3. 生成执行报告
  812. try:
  813. report = generate_unified_execution_report(exec_date, stats)
  814. except Exception as e:
  815. logger.error(f"生成执行报告时出错: {str(e)}")
  816. report = f"生成执行报告时出错: {str(e)}\n基础统计: 总任务数: {stats.get('total_tasks', 0)}, 成功: {stats.get('success_count', 0)}, 失败: {stats.get('fail_count', 0)}"
  817. # 将报告和统计信息传递给下一个任务
  818. try:
  819. kwargs['ti'].xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
  820. kwargs['ti'].xcom_push(key='execution_report', value=report)
  821. except Exception as e:
  822. logger.error(f"保存报告到XCom时出错: {str(e)}")
  823. return report
  824. except Exception as e:
  825. logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
  826. # 返回一个简单的错误报告,确保任务不会失败
  827. return f"执行汇总时出现错误: {str(e)}"
  828. # 创建DAG
  829. with DAG(
  830. "dag_dataops_unified_scheduler",
  831. start_date=datetime(2024, 1, 1),
  832. schedule_interval="@daily",
  833. catchup=False,
  834. default_args={
  835. 'owner': 'airflow',
  836. 'depends_on_past': False,
  837. 'email_on_failure': False,
  838. 'email_on_retry': False,
  839. 'retries': 1,
  840. 'retry_delay': timedelta(minutes=5)
  841. }
  842. ) as dag:
  843. #############################################
  844. # 阶段1: 准备阶段(Prepare Phase)
  845. #############################################
  846. with TaskGroup("prepare_phase") as prepare_group:
  847. # 任务开始标记
  848. start_preparation = EmptyOperator(
  849. task_id="start_preparation"
  850. )
  851. # 准备调度任务
  852. prepare_task = PythonOperator(
  853. task_id="prepare_dag_schedule",
  854. python_callable=prepare_dag_schedule,
  855. provide_context=True
  856. )
  857. # 创建执行计划 - 从data_processing_phase移至这里
  858. create_plan = PythonOperator(
  859. task_id="create_execution_plan",
  860. python_callable=create_execution_plan,
  861. provide_context=True
  862. )
  863. # 准备完成标记
  864. preparation_completed = EmptyOperator(
  865. task_id="preparation_completed"
  866. )
  867. # 设置任务依赖 - 调整为包含create_plan
  868. start_preparation >> prepare_task >> create_plan >> preparation_completed
  869. #############################################
  870. # 阶段2: 数据处理阶段(Data Processing Phase)
  871. #############################################
  872. with TaskGroup("data_processing_phase") as data_group:
  873. # 过程完成标记
  874. processing_completed = EmptyOperator(
  875. task_id="processing_completed"
  876. )
  877. #############################################
  878. # 阶段3: 汇总阶段(Summary Phase)
  879. #############################################
  880. with TaskGroup("summary_phase") as summary_group:
  881. # 汇总执行情况
  882. summarize_task = PythonOperator(
  883. task_id="summarize_execution",
  884. python_callable=summarize_execution,
  885. provide_context=True
  886. )
  887. # 总结完成标记
  888. summary_completed = EmptyOperator(
  889. task_id="summary_completed"
  890. )
  891. # 设置任务依赖
  892. summarize_task >> summary_completed
  893. # 设置三个阶段之间的依赖关系 - 使用简单的TaskGroup依赖
  894. prepare_group >> data_group >> summary_group
  895. # 实际数据处理任务的动态创建逻辑
  896. # 这部分代码在DAG运行时执行,根据数据库数据和执行计划动态创建任务
  897. # 从执行计划JSON中获取信息
  898. execution_plan_json = '''{"exec_date": "2025-04-12", "resource_tasks": [], "model_tasks": [], "dependencies": {}}'''
  899. try:
  900. # 尝试从文件中读取最新的执行计划,仅用于构建DAG视图
  901. import os
  902. plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
  903. if os.path.exists(plan_path):
  904. with open(plan_path, 'r') as f:
  905. execution_plan_json = f.read()
  906. except Exception as e:
  907. logger.warning(f"读取执行计划默认值时出错: {str(e)}")
  908. # 解析执行计划获取任务信息
  909. try:
  910. execution_plan = json.loads(execution_plan_json)
  911. exec_date = execution_plan.get("exec_date", get_today_date())
  912. resource_tasks = execution_plan.get("resource_tasks", [])
  913. model_tasks = execution_plan.get("model_tasks", [])
  914. dependencies = execution_plan.get("dependencies", {})
  915. # 任务字典,用于设置依赖关系
  916. task_dict = {}
  917. # 1. 创建资源表任务
  918. for task_info in resource_tasks:
  919. table_name = task_info["target_table"]
  920. script_name = task_info["script_name"]
  921. exec_mode = task_info.get("script_exec_mode", "append")
  922. # 创建安全的任务ID - 直接使用表名作为ID,更简洁易读
  923. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  924. # 确保所有任务都是data_processing_phase的一部分
  925. with data_group:
  926. resource_task = PythonOperator(
  927. task_id=f"resource_{safe_table_name}", # 不需要加前缀,TaskGroup会自动添加
  928. python_callable=process_resource,
  929. op_kwargs={
  930. "target_table": table_name,
  931. "script_name": script_name,
  932. "script_exec_mode": exec_mode,
  933. "exec_date": """{{ ti.xcom_pull(task_ids='prepare_phase.create_execution_plan') }}"""
  934. },
  935. retries=TASK_RETRY_CONFIG["retries"],
  936. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
  937. )
  938. # 将任务添加到字典
  939. task_dict[table_name] = resource_task
  940. # 设置任务依赖 - 使用正确的引用方式
  941. preparation_completed >> resource_task
  942. # 创建有向图,用于检测模型表之间的依赖关系
  943. G = nx.DiGraph()
  944. # 将所有模型表添加为节点
  945. for task_info in model_tasks:
  946. table_name = task_info["target_table"]
  947. G.add_node(table_name)
  948. # 添加模型表之间的依赖边
  949. for source, deps in dependencies.items():
  950. for dep in deps:
  951. if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
  952. G.add_edge(dep.get("table_name"), source) # 依赖方向:依赖项 -> 目标
  953. # 检测循环依赖并处理
  954. cycles = list(nx.simple_cycles(G))
  955. if cycles:
  956. logger.warning(f"检测到循环依赖: {cycles}")
  957. for cycle in cycles:
  958. G.remove_edge(cycle[-1], cycle[0])
  959. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  960. # 生成拓扑排序,确定执行顺序
  961. try:
  962. execution_order = list(nx.topological_sort(G))
  963. logger.info(f"计算出的执行顺序: {execution_order}")
  964. except Exception as e:
  965. logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
  966. execution_order = [task_info["target_table"] for task_info in model_tasks]
  967. # 2. 按拓扑排序顺序创建模型表任务
  968. for table_name in execution_order:
  969. task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
  970. if not task_info:
  971. continue
  972. script_name = task_info["script_name"]
  973. exec_mode = task_info.get("script_exec_mode", "append")
  974. # 创建安全的任务ID
  975. safe_table_name = table_name.replace(".", "_").replace("-", "_")
  976. # 确保所有任务都是data_processing_phase的一部分
  977. with data_group:
  978. model_task = PythonOperator(
  979. task_id=f"model_{safe_table_name}", # 更简洁的ID
  980. python_callable=process_model,
  981. op_kwargs={
  982. "target_table": table_name,
  983. "script_name": script_name,
  984. "script_exec_mode": exec_mode,
  985. "exec_date": """{{ ti.xcom_pull(task_ids='prepare_phase.create_execution_plan') }}"""
  986. },
  987. retries=TASK_RETRY_CONFIG["retries"],
  988. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
  989. )
  990. # 将任务添加到字典
  991. task_dict[table_name] = model_task
  992. # 设置依赖关系
  993. deps = dependencies.get(table_name, [])
  994. has_dependency = False
  995. # 处理模型表之间的依赖
  996. for dep in deps:
  997. dep_table = dep.get("table_name")
  998. dep_type = dep.get("table_type")
  999. if dep_table in task_dict:
  1000. task_dict[dep_table] >> model_task
  1001. has_dependency = True
  1002. logger.info(f"设置依赖: {dep_table} >> {table_name}")
  1003. # 如果没有依赖,则依赖于资源表任务
  1004. if not has_dependency:
  1005. # 依赖于prepare_phase的完成
  1006. preparation_completed >> model_task
  1007. # 同时从所有资源表任务连接
  1008. for resource_table in resource_tasks:
  1009. resource_name = resource_table["target_table"]
  1010. if resource_name in task_dict:
  1011. task_dict[resource_name] >> model_task
  1012. logger.info(f"设置资源依赖: {resource_name} >> {table_name}")
  1013. # 如果没有模型表任务,将所有资源表任务视为终端任务
  1014. if not model_tasks and resource_tasks:
  1015. terminal_tasks = [task["target_table"] for task in resource_tasks]
  1016. else:
  1017. # 找出所有终端任务(没有下游依赖的任务)
  1018. terminal_tasks = []
  1019. # 检查所有模型表任务
  1020. for table_name in execution_order:
  1021. # 检查是否有下游任务
  1022. has_downstream = False
  1023. for source, deps in dependencies.items():
  1024. if source == table_name: # 跳过自身
  1025. continue
  1026. for dep in deps:
  1027. if dep.get("table_name") == table_name:
  1028. has_downstream = True
  1029. break
  1030. if has_downstream:
  1031. break
  1032. # 如果没有下游任务,添加到终端任务列表
  1033. if not has_downstream and table_name in task_dict:
  1034. terminal_tasks.append(table_name)
  1035. # 如果既没有模型表任务也没有资源表任务,已有默认依赖链
  1036. if not terminal_tasks:
  1037. logger.warning("未找到任何任务,使用默认依赖链")
  1038. else:
  1039. # 将所有终端任务连接到完成标记
  1040. for table_name in terminal_tasks:
  1041. if table_name in task_dict:
  1042. task_dict[table_name] >> processing_completed
  1043. logger.info(f"设置终端任务: {table_name} >> processing_completed")
  1044. except Exception as e:
  1045. logger.error(f"构建任务DAG时出错: {str(e)}")
  1046. import traceback
  1047. logger.error(traceback.format_exc())
  1048. # 确保即使出错,也有清晰的执行路径
  1049. # 已经有默认依赖链,不需要额外添加