dataops_productline_manual_trigger_dag.py 54 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343
  1. # dataops_productline_manual_trigger_dag.py
  2. """
  3. 手动触发数据产品线脚本执行DAG
  4. 功能:
  5. - 支持灵活的参数组合:
  6. - 可以只提供脚本名称,自动查找目标表
  7. - 可以只提供目标表,智能处理对应的脚本组合
  8. - 可以同时提供脚本名称和目标表
  9. - 支持三种依赖级别:
  10. - 'self':只执行当前脚本,不处理上游依赖
  11. - 'resource':到Resource层
  12. - 'source':到Source层
  13. - 支持三种脚本类型:
  14. - 'python_script':执行物理Python脚本文件
  15. - 'python':从data_transform_scripts表获取Python脚本内容并执行
  16. - 'sql':从data_transform_scripts表获取SQL脚本内容并执行
  17. - 支持 Logical date 参数:
  18. - 可以在Airflow UI中选择特定的Logical date
  19. - 选择的日期将被转换为执行日期并传递给脚本
  20. 参数:
  21. - script_name:[可选] 目标脚本名称
  22. - target_table:[可选] 目标表名
  23. - dependency_level:依赖级别
  24. 使用示例:
  25. 1. 只提供脚本名称:
  26. {
  27. "conf": {
  28. "script_name": "book_sale_amt_monthly_process.py",
  29. "dependency_level": "resource"
  30. }
  31. }
  32. 2. 只提供目标表:
  33. {
  34. "conf": {
  35. "target_table": "book_sale_amt_monthly",
  36. "dependency_level": "resource"
  37. }
  38. }
  39. 3. 同时提供脚本名称和目标表:
  40. {
  41. "conf": {
  42. "script_name": "book_sale_amt_monthly_process.py",
  43. "target_table": "book_sale_amt_monthly",
  44. "dependency_level": "resource"
  45. }
  46. }
  47. """
  48. from airflow import DAG
  49. from airflow.operators.python import PythonOperator, ShortCircuitOperator
  50. from airflow.operators.empty import EmptyOperator
  51. from datetime import datetime, timedelta
  52. import logging
  53. import importlib.util
  54. import os
  55. from pathlib import Path
  56. from neo4j import GraphDatabase
  57. import psycopg2
  58. import networkx as nx
  59. import json
  60. from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG
  61. import traceback
  62. import pendulum
  63. import pytz
  64. from utils import get_pg_conn, get_cn_exec_date, check_script_exists
  65. # 设置logger
  66. logger = logging.getLogger(__name__)
  67. # DAG参数
  68. default_args = {
  69. 'owner': 'airflow',
  70. 'depends_on_past': False,
  71. 'start_date': datetime(2024, 1, 1),
  72. 'email_on_failure': False,
  73. 'email_on_retry': False,
  74. 'retries': 1,
  75. 'retry_delay': timedelta(minutes=1),
  76. }
  77. def get_execution_mode(table_name):
  78. """
  79. 从Neo4j获取表的执行模式
  80. 参数:
  81. table_name (str): 表名
  82. 返回:
  83. str: 执行模式,如果未找到则返回"append"作为默认值
  84. """
  85. try:
  86. driver = GraphDatabase.driver(
  87. NEO4J_CONFIG['uri'],
  88. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  89. )
  90. # 先检查是否为structure类型的DataResource
  91. with driver.session() as session:
  92. query_structure = """
  93. MATCH (n:DataResource {en_name: $table_name})
  94. RETURN n.type AS type
  95. """
  96. result = session.run(query_structure, table_name=table_name)
  97. record = result.single()
  98. if record and record.get("type") == "structure":
  99. logger.info(f"表 {table_name} 是structure类型的DataResource,使用默认执行模式'append'")
  100. return "append"
  101. # 查询执行模式,分别尝试DataModel和DataResource
  102. with driver.session() as session:
  103. # 首先检查DataModel类型表
  104. query_model = """
  105. MATCH (n:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->()
  106. RETURN rel.exec_mode AS execution_mode LIMIT 1
  107. """
  108. result = session.run(query_model, table_name=table_name)
  109. record = result.single()
  110. if record and record.get("execution_mode"):
  111. return record.get("execution_mode")
  112. # 然后检查DataResource类型表
  113. query_resource = """
  114. MATCH (n:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->()
  115. RETURN rel.exec_mode AS execution_mode LIMIT 1
  116. """
  117. result = session.run(query_resource, table_name=table_name)
  118. record = result.single()
  119. if record and record.get("execution_mode"):
  120. return record.get("execution_mode")
  121. # 如果上面两种方式都找不到,使用默认值
  122. logger.warning(f"未在Neo4j中找到表 {table_name} 的执行模式,使用默认值 'append'")
  123. return "append"
  124. except Exception as e:
  125. logger.error(f"获取表 {table_name} 的执行模式时出错: {str(e)}")
  126. return "append"
  127. finally:
  128. if driver:
  129. driver.close()
  130. def get_dag_params(**context):
  131. """获取DAG运行参数"""
  132. #params = context.get('params', {})
  133. dag_run = context.get('dag_run')
  134. params = dag_run.conf if dag_run and dag_run.conf else {}
  135. script_name = params.get('script_name', '')
  136. target_table = params.get('target_table', '')
  137. # 记录原始参数信息
  138. logger.info(f"接收到的原始参数: {params}")
  139. # 获取依赖级别参数
  140. dependency_level = params.get('dependency_level', 'self')
  141. logger.info(f"获取的依赖级别值: {dependency_level}")
  142. # 获取 logical_date
  143. dag_run = context.get('dag_run')
  144. logical_date = dag_run.logical_date if dag_run else datetime.now()
  145. exec_date, local_logical_date = get_cn_exec_date(logical_date)
  146. logger.info(f"【时间参数】get_dag_params: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  147. # 验证参数组合
  148. if not script_name and not target_table:
  149. logger.error("必须至少提供script_name或target_table参数之一")
  150. raise ValueError("必须至少提供script_name或target_table参数之一")
  151. # 验证dependency_level参数
  152. if dependency_level not in ['self', 'resource', 'source']:
  153. logger.warning(f"无效的依赖级别参数: {dependency_level},使用默认值'self'")
  154. dependency_level = 'self'
  155. logger.info(f"最终使用的参数 - 脚本名称: {script_name}, 目标表: {target_table}, 依赖级别: {dependency_level}, 执行日期: {exec_date}")
  156. return script_name, target_table, dependency_level, exec_date, logical_date
  157. def get_table_label(table_name):
  158. """确定表的标签类型(DataModel or DataResource)"""
  159. driver = GraphDatabase.driver(
  160. NEO4J_CONFIG['uri'],
  161. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  162. )
  163. query = """
  164. MATCH (n {en_name: $table_name})
  165. RETURN labels(n) AS labels
  166. """
  167. try:
  168. with driver.session() as session:
  169. result = session.run(query, table_name=table_name)
  170. record = result.single()
  171. if record and record.get("labels"):
  172. labels = record.get("labels")
  173. if "DataModel" in labels:
  174. return "DataModel"
  175. elif "DataResource" in labels:
  176. return "DataResource"
  177. elif "DataSource" in labels:
  178. return "DataSource"
  179. return None
  180. except Exception as e:
  181. logger.error(f"获取表 {table_name} 的标签时出错: {str(e)}")
  182. return None
  183. finally:
  184. driver.close()
  185. def find_target_table_for_script(script_name):
  186. """
  187. 根据脚本名称查找对应的目标表
  188. 参数:
  189. script_name (str): 脚本名称
  190. 返回:
  191. str: 目标表名,如果找不到则返回None
  192. """
  193. driver = GraphDatabase.driver(
  194. NEO4J_CONFIG['uri'],
  195. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  196. )
  197. # 首先查找DataModel表中的脚本关系
  198. query_datamodel = """
  199. MATCH (source)-[rel:DERIVED_FROM]->(target)
  200. WHERE rel.script_name = $script_name
  201. RETURN source.en_name AS target_table LIMIT 1
  202. """
  203. # 如果在DataModel中找不到,尝试查找DataResource表
  204. query_dataresource = """
  205. MATCH (source)-[rel:ORIGINATES_FROM]->(target)
  206. WHERE rel.script_name = $script_name
  207. RETURN source.en_name AS target_table LIMIT 1
  208. """
  209. try:
  210. with driver.session() as session:
  211. # 先查找DataModel
  212. result = session.run(query_datamodel, script_name=script_name)
  213. record = result.single()
  214. if record and record.get("target_table"):
  215. return record.get("target_table")
  216. # 如果在DataModel中找不到,尝试DataResource
  217. result = session.run(query_dataresource, script_name=script_name)
  218. record = result.single()
  219. if record and record.get("target_table"):
  220. return record.get("target_table")
  221. logger.warning(f"未找到脚本 {script_name} 对应的目标表")
  222. return None
  223. except Exception as e:
  224. logger.error(f"查找脚本 {script_name} 对应的目标表时出错: {str(e)}")
  225. return None
  226. finally:
  227. driver.close()
  228. def find_scripts_for_table(table_name):
  229. """
  230. 根据表名查找对应的脚本信息
  231. 参数:
  232. table_name (str): 表名
  233. 返回:
  234. list: 脚本信息列表,如果找不到则返回空列表
  235. """
  236. driver = GraphDatabase.driver(
  237. NEO4J_CONFIG['uri'],
  238. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  239. )
  240. table_label = get_table_label(table_name)
  241. scripts = []
  242. try:
  243. with driver.session() as session:
  244. # 先检查是否为structure类型的DataResource
  245. if table_label == "DataResource":
  246. query_type = """
  247. MATCH (n:DataResource {en_name: $table_name})
  248. RETURN n.type AS type
  249. """
  250. result = session.run(query_type, table_name=table_name)
  251. record = result.single()
  252. if record and record.get("type") == "structure":
  253. logger.info(f"表 {table_name} 是structure类型的DataResource,使用默认脚本'load_file.py'")
  254. scripts.append({
  255. "script_name": "load_file.py",
  256. "script_type": "python_script"
  257. })
  258. return scripts
  259. if table_label == "DataModel":
  260. # 查询DataModel的所有脚本关系
  261. query = """
  262. MATCH (target:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
  263. RETURN rel.script_name AS script_name, rel.script_type AS script_type
  264. """
  265. elif table_label == "DataResource":
  266. # 查询DataResource的所有脚本关系
  267. query = """
  268. MATCH (target:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
  269. RETURN rel.script_name AS script_name, rel.script_type AS script_type
  270. """
  271. else:
  272. logger.warning(f"表 {table_name} 不是DataModel或DataResource类型")
  273. return scripts
  274. result = session.run(query, table_name=table_name)
  275. for record in result:
  276. script_name = record.get("script_name")
  277. script_type = record.get("script_type", "python_script")
  278. if script_name:
  279. scripts.append({
  280. "script_name": script_name,
  281. "script_type": script_type
  282. })
  283. # 如果找不到脚本,使用表名作为基础生成默认脚本名
  284. if not scripts:
  285. logger.warning(f"表 {table_name} 没有关联的脚本,尝试使用默认脚本名")
  286. # 尝试查找可能的默认脚本文件
  287. default_script_name = f"{table_name}_process.py"
  288. script_path = os.path.join(SCRIPTS_BASE_PATH, default_script_name)
  289. if os.path.exists(script_path):
  290. logger.info(f"发现默认脚本文件: {default_script_name}")
  291. scripts.append({
  292. "script_name": default_script_name,
  293. "script_type": "python_script"
  294. })
  295. except Exception as e:
  296. logger.error(f"查找表 {table_name} 对应的脚本时出错: {str(e)}")
  297. finally:
  298. driver.close()
  299. logger.info(f"表 {table_name} 关联的脚本: {scripts}")
  300. return scripts
  301. def get_script_info_from_neo4j(script_name, target_table):
  302. """
  303. 从Neo4j获取脚本和表的详细信息
  304. 参数:
  305. script_name: 脚本名称
  306. target_table: 目标表名
  307. 返回:
  308. dict: 脚本和表的详细信息
  309. """
  310. driver = GraphDatabase.driver(
  311. NEO4J_CONFIG['uri'],
  312. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  313. )
  314. # 获取表的标签类型
  315. table_label = get_table_label(target_table)
  316. script_info = {
  317. 'script_name': script_name,
  318. 'target_table': target_table,
  319. 'script_id': f"{script_name.replace('.', '_')}_{target_table}",
  320. 'target_table_label': table_label,
  321. 'source_tables': [],
  322. 'script_type': 'python_script' # 默认类型改为python_script,表示物理脚本文件
  323. }
  324. # 检查是否为structure类型的DataResource
  325. try:
  326. with driver.session() as session:
  327. if table_label == 'DataResource':
  328. query_structure = """
  329. MATCH (n:DataResource {en_name: $table_name})
  330. RETURN n.type AS type, n.storage_location AS storage_location, n.frequency AS frequency
  331. """
  332. result = session.run(query_structure, table_name=target_table)
  333. record = result.single()
  334. if record and record.get("type") == "structure":
  335. logger.info(f"表 {target_table} 是structure类型的DataResource")
  336. # 设置特殊属性
  337. script_info['target_type'] = 'structure'
  338. storage_location = record.get("storage_location")
  339. frequency = record.get("frequency", "daily")
  340. if storage_location:
  341. script_info['storage_location'] = storage_location
  342. script_info['frequency'] = frequency
  343. # 如果没有指定脚本名称或指定的是default,则设置为load_file.py
  344. if not script_name or script_name.lower() == 'default' or script_name == 'load_file.py':
  345. script_info['script_name'] = 'load_file.py'
  346. script_info['script_id'] = f"load_file_py_{target_table}"
  347. script_info['execution_mode'] = "append"
  348. logger.info(f"对于structure类型的DataResource表 {target_table},使用默认脚本'load_file.py'")
  349. return script_info
  350. except Exception as e:
  351. logger.error(f"检查表 {target_table} 是否为structure类型时出错: {str(e)}")
  352. logger.error(traceback.format_exc())
  353. # 根据表标签类型查询脚本信息和依赖关系
  354. try:
  355. with driver.session() as session:
  356. if script_info['target_table_label'] == 'DataModel':
  357. # 查询DataModel的上游依赖
  358. query = """
  359. MATCH (target:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
  360. RETURN source.en_name AS source_table, rel.script_name AS script_name, rel.script_type AS script_type
  361. """
  362. result = session.run(query, table_name=target_table)
  363. for record in result:
  364. source_table = record.get("source_table")
  365. source_labels = record.get("source_labels", [])
  366. db_script_name = record.get("script_name")
  367. script_type = record.get("script_type", "python_script")
  368. # 验证脚本名称匹配
  369. if db_script_name and db_script_name == script_name:
  370. if source_table and source_table not in script_info['source_tables']:
  371. script_info['source_tables'].append(source_table)
  372. script_info['script_type'] = script_type
  373. elif script_info['target_table_label'] == 'DataResource':
  374. # 查询DataResource的上游依赖
  375. query = """
  376. MATCH (target:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
  377. RETURN source.en_name AS source_table, rel.script_name AS script_name, rel.script_type AS script_type
  378. """
  379. result = session.run(query, table_name=target_table)
  380. for record in result:
  381. source_table = record.get("source_table")
  382. source_labels = record.get("source_labels", [])
  383. db_script_name = record.get("script_name")
  384. script_type = record.get("script_type", "python_script")
  385. # 验证脚本名称匹配
  386. if db_script_name and db_script_name == script_name:
  387. if source_table and source_table not in script_info['source_tables']:
  388. script_info['source_tables'].append(source_table)
  389. script_info['script_type'] = script_type
  390. # 如果没有找到依赖关系,记录警告
  391. if not script_info['source_tables']:
  392. logger.warning(f"未找到脚本 {script_name} 和表 {target_table} 的依赖关系")
  393. # 获取特殊属性(如果是structure类型)
  394. if script_info['target_table_label'] == 'DataResource':
  395. query = """
  396. MATCH (n:DataResource {en_name: $table_name})
  397. RETURN n.type AS target_type, n.storage_location AS storage_location, n.frequency AS frequency
  398. """
  399. result = session.run(query, table_name=target_table)
  400. record = result.single()
  401. if record:
  402. target_type = record.get("target_type")
  403. storage_location = record.get("storage_location")
  404. frequency = record.get("frequency")
  405. if target_type:
  406. script_info['target_type'] = target_type
  407. if storage_location:
  408. script_info['storage_location'] = storage_location
  409. if frequency:
  410. script_info['frequency'] = frequency
  411. # 如果是structure类型,再次检查是否应使用默认脚本
  412. if target_type == 'structure' and (not script_name or script_name.lower() == 'default' or script_name == 'load_file.py'):
  413. script_info['script_name'] = 'load_file.py'
  414. script_info['script_id'] = f"load_file_py_{target_table}"
  415. script_info['execution_mode'] = "append"
  416. logger.info(f"对于structure类型的DataResource表 {target_table},使用默认脚本'load_file.py'")
  417. except Exception as e:
  418. logger.error(f"从Neo4j获取脚本 {script_name} 和表 {target_table} 的信息时出错: {str(e)}")
  419. logger.error(traceback.format_exc())
  420. finally:
  421. driver.close()
  422. # 获取表的执行模式
  423. script_info['execution_mode'] = get_execution_mode(target_table)
  424. logger.info(f"获取到脚本信息: {script_info}")
  425. return script_info
  426. def get_upstream_script_dependencies(script_info, dependency_level='resource'):
  427. """
  428. 获取脚本的上游依赖
  429. 参数:
  430. script_info: 脚本信息
  431. dependency_level: 依赖级别
  432. - self: 只考虑当前脚本
  433. - resource: 到Resource层
  434. - source: 到Source层
  435. 返回:
  436. list: 依赖链脚本列表
  437. """
  438. # 如果是self级别,直接返回当前脚本信息
  439. if dependency_level == 'self':
  440. logger.info(f"依赖级别为'self',只包含当前脚本: {script_info['script_name']}")
  441. return [script_info]
  442. # 创建依赖图
  443. G = nx.DiGraph()
  444. # 记录所有处理过的脚本
  445. processed_scripts = {}
  446. script_id = script_info['script_id']
  447. # 添加起始节点
  448. G.add_node(script_id, **script_info)
  449. processed_scripts[script_id] = script_info
  450. # 使用BFS算法遍历依赖树
  451. queue = [script_info]
  452. visited = set([script_id])
  453. while queue:
  454. current = queue.pop(0)
  455. current_id = current['script_id']
  456. # 如果没有源表依赖,则跳过
  457. if not current.get('source_tables'):
  458. logger.info(f"脚本 {current['script_name']} 没有源表依赖")
  459. continue
  460. # 对每个源表查找对应的脚本
  461. for source_table in current.get('source_tables', []):
  462. # 获取源表对应的脚本信息(假设一个表只有一个主要脚本)
  463. driver = GraphDatabase.driver(
  464. NEO4J_CONFIG['uri'],
  465. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  466. )
  467. try:
  468. with driver.session() as session:
  469. # 获取表的类型
  470. table_label = get_table_label(source_table)
  471. # 检查依赖级别和表类型
  472. # 如果是resource级别且表类型是DataSource,则不继续处理
  473. if dependency_level == 'resource' and table_label == 'DataSource':
  474. logger.info(f"依赖级别为'resource',跳过DataSource类型表: {source_table}")
  475. continue
  476. # 如果是DataResource表,检查是否为structure类型
  477. if table_label == 'DataResource':
  478. query_structure = """
  479. MATCH (n:DataResource {en_name: $table_name})
  480. RETURN n.type AS type, n.storage_location AS storage_location, n.frequency AS frequency
  481. """
  482. result = session.run(query_structure, table_name=source_table)
  483. record = result.single()
  484. if record and record.get("type") == "structure":
  485. logger.info(f"上游表 {source_table} 是structure类型的DataResource,使用默认脚本'load_file.py'")
  486. # 构建structure类型表的脚本信息
  487. upstream_script_name = "load_file.py"
  488. upstream_target_table = source_table
  489. upstream_id = f"load_file_py_{upstream_target_table}"
  490. # 如果还没有处理过这个脚本
  491. if upstream_id not in visited:
  492. # 创建脚本信息
  493. upstream_info = {
  494. 'script_name': upstream_script_name,
  495. 'target_table': upstream_target_table,
  496. 'script_id': upstream_id,
  497. 'target_table_label': 'DataResource',
  498. 'target_type': 'structure',
  499. 'source_tables': [],
  500. 'script_type': 'python_script',
  501. 'execution_mode': 'append',
  502. 'frequency': record.get("frequency", "daily")
  503. }
  504. # 添加storage_location如果存在
  505. if record.get("storage_location"):
  506. upstream_info["storage_location"] = record.get("storage_location")
  507. # 添加到图中
  508. G.add_node(upstream_id, **upstream_info)
  509. G.add_edge(current_id, upstream_id)
  510. # 记录处理过的脚本
  511. processed_scripts[upstream_id] = upstream_info
  512. # 添加到已访问集合
  513. visited.add(upstream_id)
  514. queue.append(upstream_info)
  515. # 处理完structure类型表后继续下一个源表
  516. continue
  517. # 根据表的类型,查询不同的关系
  518. if table_label == 'DataModel':
  519. query = """
  520. MATCH (target:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
  521. RETURN source.en_name AS source_table, rel.script_name AS script_name, rel.script_type AS script_type
  522. LIMIT 1
  523. """
  524. elif table_label == 'DataResource':
  525. query = """
  526. MATCH (target:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
  527. RETURN source.en_name AS source_table, rel.script_name AS script_name, rel.script_type AS script_type
  528. LIMIT 1
  529. """
  530. else:
  531. logger.warning(f"表 {source_table} 类型未知或不受支持: {table_label}")
  532. continue
  533. result = session.run(query, table_name=source_table)
  534. record = result.single()
  535. if record and record.get("script_name"):
  536. upstream_script_name = record.get("script_name")
  537. upstream_target_table = source_table
  538. upstream_script_type = record.get("script_type", "python_script")
  539. source_table_from_db = record.get("source_table")
  540. # 记录源表信息
  541. if source_table_from_db:
  542. logger.info(f"表 {source_table} 的上游源表: {source_table_from_db}")
  543. # 构建上游脚本ID
  544. upstream_id = f"{upstream_script_name.replace('.', '_')}_{upstream_target_table}"
  545. if upstream_id not in visited:
  546. # 获取完整的上游脚本信息
  547. upstream_info = get_script_info_from_neo4j(upstream_script_name, upstream_target_table)
  548. # 添加到图中
  549. G.add_node(upstream_id, **upstream_info)
  550. G.add_edge(current_id, upstream_id)
  551. # 记录处理过的脚本
  552. processed_scripts[upstream_id] = upstream_info
  553. # 继续递归处理,除非遇到限制条件
  554. visited.add(upstream_id)
  555. queue.append(upstream_info)
  556. else:
  557. logger.warning(f"未找到表 {source_table} 对应的脚本信息")
  558. except Exception as e:
  559. logger.error(f"获取表 {source_table} 对应的脚本信息时出错: {str(e)}")
  560. logger.error(traceback.format_exc())
  561. finally:
  562. driver.close()
  563. # 检测循环依赖
  564. cycles = list(nx.simple_cycles(G))
  565. if cycles:
  566. logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
  567. for cycle in cycles:
  568. if len(cycle) > 1:
  569. G.remove_edge(cycle[0], cycle[1])
  570. logger.info(f"打破循环依赖: 移除 {cycle[0]} -> {cycle[1]} 的依赖")
  571. # 生成拓扑排序
  572. try:
  573. # 对于依赖图,我们需要反转边的方向,因为我们记录的是"依赖于"关系,而拓扑排序是按照"先于"关系
  574. reverse_G = G.reverse()
  575. execution_order = list(nx.topological_sort(reverse_G))
  576. # 构建最终依赖链
  577. dependency_chain = []
  578. for script_id in execution_order:
  579. script_info = processed_scripts[script_id]
  580. dependency_chain.append(script_info)
  581. logger.info(f"最终依赖链长度: {len(dependency_chain)}")
  582. logger.info(f"最终依赖链: {[info['script_name'] for info in dependency_chain]}")
  583. return dependency_chain
  584. except Exception as e:
  585. logger.error(f"生成拓扑排序时出错: {str(e)}")
  586. logger.error(traceback.format_exc())
  587. # 出错时,至少返回当前脚本
  588. return [script_info]
  589. def execute_python_script(script_info):
  590. """
  591. 执行Python脚本文件
  592. 参数:
  593. script_info: 脚本信息字典
  594. 返回:
  595. bool: 执行成功返回True,失败返回False
  596. """
  597. script_name = script_info.get('script_name')
  598. target_table = script_info.get('target_table')
  599. execution_mode = script_info.get('execution_mode', 'append')
  600. target_table_label = script_info.get('target_table_label')
  601. source_tables = script_info.get('source_tables', [])
  602. frequency = script_info.get('frequency', 'daily')
  603. # 使用传入的执行日期,如果不存在则使用当前日期
  604. exec_date = script_info.get('exec_date', datetime.now().strftime('%Y-%m-%d'))
  605. # 记录开始执行
  606. logger.info(f"===== 开始执行物理Python脚本文件: {script_name} =====")
  607. logger.info(f"目标表: {target_table}")
  608. logger.info(f"执行模式: {execution_mode}")
  609. logger.info(f"表标签: {target_table_label}")
  610. logger.info(f"源表: {source_tables}")
  611. logger.info(f"频率: {frequency}")
  612. logger.info(f"执行日期: {exec_date}")
  613. # 检查脚本文件是否存在
  614. exists, script_path = check_script_exists(script_name)
  615. if not exists:
  616. return False
  617. try:
  618. # 创建执行开始时间
  619. start_time = datetime.now()
  620. # 动态导入模块
  621. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  622. module = importlib.util.module_from_spec(spec)
  623. spec.loader.exec_module(module)
  624. # 检查并调用标准入口函数run
  625. if hasattr(module, "run"):
  626. logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
  627. # 构建函数参数
  628. run_kwargs = {
  629. "table_name": target_table,
  630. "execution_mode": execution_mode,
  631. "frequency": frequency,
  632. "exec_date": exec_date # 使用传入的执行日期而不是当前日期
  633. }
  634. # 如果是structure类型,添加特殊参数
  635. if target_table_label == 'DataResource' and script_info.get('target_type') == 'structure':
  636. run_kwargs["target_type"] = script_info.get('target_type')
  637. run_kwargs["storage_location"] = script_info.get('storage_location')
  638. # 添加源表
  639. if source_tables:
  640. run_kwargs["source_tables"] = source_tables
  641. # 执行脚本
  642. result = module.run(**run_kwargs)
  643. # 记录结束时间
  644. end_time = datetime.now()
  645. duration = (end_time - start_time).total_seconds()
  646. # 确保结果是布尔值
  647. if not isinstance(result, bool):
  648. result = bool(result)
  649. logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  650. return result
  651. else:
  652. logger.error(f"脚本 {script_name} 没有定义标准入口函数 run()")
  653. return False
  654. except Exception as e:
  655. logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
  656. logger.error(traceback.format_exc())
  657. return False
  658. def execute_sql(script_info):
  659. """
  660. 执行SQL脚本(从data_transform_scripts表获取)
  661. 参数:
  662. script_info: 脚本信息字典
  663. 返回:
  664. bool: 执行成功返回True,失败返回False
  665. """
  666. script_name = script_info.get('script_name')
  667. target_table = script_info.get('target_table')
  668. execution_mode = script_info.get('execution_mode', 'append')
  669. target_table_label = script_info.get('target_table_label')
  670. frequency = script_info.get('frequency', 'daily')
  671. # 使用传入的执行日期,如果不存在则使用当前日期
  672. exec_date = script_info.get('exec_date', datetime.now().strftime('%Y-%m-%d'))
  673. # 记录开始执行
  674. logger.info(f"===== 开始执行SQL脚本: {script_name} =====")
  675. logger.info(f"目标表: {target_table}")
  676. logger.info(f"执行模式: {execution_mode}")
  677. logger.info(f"表标签: {target_table_label}")
  678. logger.info(f"频率: {frequency}")
  679. logger.info(f"执行日期: {exec_date}")
  680. try:
  681. # 记录执行开始时间
  682. start_time = datetime.now()
  683. # 导入execution_sql模块
  684. exec_sql_path = os.path.join(SCRIPTS_BASE_PATH, "execution_sql.py")
  685. if not os.path.exists(exec_sql_path):
  686. logger.error(f"SQL执行脚本文件不存在: {exec_sql_path}")
  687. return False
  688. # 动态导入execution_sql模块
  689. spec = importlib.util.spec_from_file_location("execution_sql", exec_sql_path)
  690. exec_sql_module = importlib.util.module_from_spec(spec)
  691. spec.loader.exec_module(exec_sql_module)
  692. # 检查并调用标准入口函数run
  693. if hasattr(exec_sql_module, "run"):
  694. logger.info(f"调用SQL执行脚本的标准入口函数 run()")
  695. # 构建函数参数
  696. run_kwargs = {
  697. "script_type": "sql",
  698. "target_table": target_table,
  699. "script_name": script_name,
  700. "exec_date": exec_date, # 使用传入的执行日期而不是当前日期
  701. "frequency": frequency,
  702. "target_table_label": target_table_label,
  703. "execution_mode": execution_mode
  704. }
  705. # 如果是structure类型,添加特殊参数
  706. if target_table_label == 'DataResource' and script_info.get('target_type') == 'structure':
  707. run_kwargs["target_type"] = script_info.get('target_type')
  708. run_kwargs["storage_location"] = script_info.get('storage_location')
  709. # 添加源表
  710. if 'source_tables' in script_info and script_info['source_tables']:
  711. run_kwargs["source_tables"] = script_info['source_tables']
  712. # 执行脚本
  713. result = exec_sql_module.run(**run_kwargs)
  714. # 记录结束时间
  715. end_time = datetime.now()
  716. duration = (end_time - start_time).total_seconds()
  717. # 确保结果是布尔值
  718. if not isinstance(result, bool):
  719. result = bool(result)
  720. logger.info(f"SQL脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  721. return result
  722. else:
  723. logger.error(f"SQL执行模块没有定义标准入口函数 run()")
  724. return False
  725. except Exception as e:
  726. logger.error(f"执行SQL脚本 {script_name} 时出错: {str(e)}")
  727. logger.error(traceback.format_exc())
  728. return False
  729. def execute_python(script_info):
  730. """
  731. 执行Python脚本(从data_transform_scripts表获取)
  732. 参数:
  733. script_info: 脚本信息字典
  734. 返回:
  735. bool: 执行成功返回True,失败返回False
  736. """
  737. script_name = script_info.get('script_name')
  738. target_table = script_info.get('target_table')
  739. execution_mode = script_info.get('execution_mode', 'append')
  740. target_table_label = script_info.get('target_table_label')
  741. frequency = script_info.get('frequency', 'daily')
  742. # 使用传入的执行日期,如果不存在则使用当前日期
  743. exec_date = script_info.get('exec_date', datetime.now().strftime('%Y-%m-%d'))
  744. # 记录开始执行
  745. logger.info(f"===== 开始执行Python脚本(data_transform_scripts): {script_name} =====")
  746. logger.info(f"目标表: {target_table}")
  747. logger.info(f"执行模式: {execution_mode}")
  748. logger.info(f"表标签: {target_table_label}")
  749. logger.info(f"频率: {frequency}")
  750. logger.info(f"执行日期: {exec_date}")
  751. try:
  752. # 记录执行开始时间
  753. start_time = datetime.now()
  754. # 导入execution_python模块
  755. exec_python_path = os.path.join(SCRIPTS_BASE_PATH, "execution_python.py")
  756. if not os.path.exists(exec_python_path):
  757. logger.error(f"Python执行脚本文件不存在: {exec_python_path}")
  758. return False
  759. # 动态导入execution_python模块
  760. spec = importlib.util.spec_from_file_location("execution_python", exec_python_path)
  761. exec_python_module = importlib.util.module_from_spec(spec)
  762. spec.loader.exec_module(exec_python_module)
  763. # 检查并调用标准入口函数run
  764. if hasattr(exec_python_module, "run"):
  765. logger.info(f"调用Python执行脚本的标准入口函数 run()")
  766. # 构建函数参数
  767. run_kwargs = {
  768. "script_type": "python",
  769. "target_table": target_table,
  770. "script_name": script_name,
  771. "exec_date": exec_date, # 使用传入的执行日期而不是当前日期
  772. "frequency": frequency,
  773. "target_table_label": target_table_label,
  774. "execution_mode": execution_mode
  775. }
  776. # 如果是structure类型,添加特殊参数
  777. if target_table_label == 'DataResource' and script_info.get('target_type') == 'structure':
  778. run_kwargs["target_type"] = script_info.get('target_type')
  779. run_kwargs["storage_location"] = script_info.get('storage_location')
  780. # 添加源表
  781. if 'source_tables' in script_info and script_info['source_tables']:
  782. run_kwargs["source_tables"] = script_info['source_tables']
  783. # 执行脚本
  784. result = exec_python_module.run(**run_kwargs)
  785. # 记录结束时间
  786. end_time = datetime.now()
  787. duration = (end_time - start_time).total_seconds()
  788. # 确保结果是布尔值
  789. if not isinstance(result, bool):
  790. result = bool(result)
  791. logger.info(f"Python脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  792. return result
  793. else:
  794. logger.error(f"Python执行模块没有定义标准入口函数 run()")
  795. return False
  796. except Exception as e:
  797. logger.error(f"执行Python脚本 {script_name} 时出错: {str(e)}")
  798. logger.error(traceback.format_exc())
  799. return False
  800. def choose_executor(script_info):
  801. """
  802. 根据脚本类型选择合适的执行函数
  803. 参数:
  804. script_info: 脚本信息字典
  805. 返回:
  806. function: 执行函数
  807. """
  808. script_type = script_info.get('script_type', 'python_script').lower()
  809. target_table_label = script_info.get('target_table_label')
  810. # 根据脚本类型和目标表标签选择执行函数
  811. if script_type == 'sql' and target_table_label == 'DataModel':
  812. # 使用SQL脚本执行函数
  813. logger.info(f"脚本 {script_info['script_id']} 是SQL类型且目标表标签为DataModel,使用execute_sql函数执行")
  814. return execute_sql
  815. elif script_type == 'python' and target_table_label == 'DataModel':
  816. # 使用Python脚本执行函数
  817. logger.info(f"脚本 {script_info['script_id']} 是Python类型且目标表标签为DataModel,使用execute_python函数执行")
  818. return execute_python
  819. elif script_type == 'python_script':
  820. # 使用Python脚本文件执行函数
  821. logger.info(f"脚本 {script_info['script_id']} 是python_script类型,使用execute_python_script函数执行")
  822. return execute_python_script
  823. else:
  824. # 默认使用Python脚本文件执行函数
  825. logger.warning(f"未识别的脚本类型 {script_type},使用默认execute_python_script函数执行")
  826. return execute_python_script
  827. def prepare_script_info(script_name=None, target_table=None, dependency_level=None):
  828. """
  829. 准备脚本信息,根据输入的参数组合智能处理
  830. 参数:
  831. script_name: [可选] 脚本名称
  832. target_table: [可选] 目标表名
  833. dependency_level: 依赖级别
  834. 返回:
  835. list: 脚本信息列表
  836. """
  837. all_script_infos = []
  838. # 如果script_name和target_table都为空或None
  839. if not script_name and not target_table:
  840. logger.error("script_name和target_table参数都为空,无法确定要执行的脚本")
  841. raise ValueError("必须至少提供script_name或target_table参数之一")
  842. # 情况1: 同时提供脚本名称和目标表名
  843. if script_name and target_table:
  844. logger.info(f"方案1: 同时提供了脚本名称和目标表名")
  845. script_info = get_script_info_from_neo4j(script_name, target_table)
  846. if script_info:
  847. all_script_infos.append(script_info)
  848. # 情况2: 只提供脚本名称,自动查找目标表
  849. elif script_name and not target_table:
  850. logger.info(f"方案2: 只提供了脚本名称,自动查找目标表")
  851. target_table = find_target_table_for_script(script_name)
  852. if target_table:
  853. logger.info(f"找到脚本 {script_name} 对应的目标表: {target_table}")
  854. script_info = get_script_info_from_neo4j(script_name, target_table)
  855. if script_info:
  856. all_script_infos.append(script_info)
  857. else:
  858. logger.error(f"未找到脚本 {script_name} 对应的目标表")
  859. # 情况3: 只提供目标表名,查找并处理相关的脚本
  860. elif not script_name and target_table:
  861. logger.info(f"方案3: 只提供了目标表名,查找相关的脚本")
  862. # 首先检查是否为structure类型的DataResource表
  863. table_label = get_table_label(target_table)
  864. if table_label == 'DataResource':
  865. driver = GraphDatabase.driver(
  866. NEO4J_CONFIG['uri'],
  867. auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  868. )
  869. try:
  870. with driver.session() as session:
  871. query = """
  872. MATCH (n:DataResource {en_name: $table_name})
  873. RETURN n.type AS type
  874. """
  875. result = session.run(query, table_name=target_table)
  876. record = result.single()
  877. if record and record.get("type") == "structure":
  878. logger.info(f"表 {target_table} 是structure类型的DataResource,使用默认脚本'load_file.py'")
  879. script_info = get_script_info_from_neo4j('load_file.py', target_table)
  880. if script_info:
  881. all_script_infos.append(script_info)
  882. return all_script_infos
  883. finally:
  884. driver.close()
  885. # 如果不是structure类型,使用原有逻辑查找脚本
  886. scripts = find_scripts_for_table(target_table)
  887. if not scripts:
  888. logger.warning(f"未找到表 {target_table} 关联的脚本")
  889. # 如果是DataResource的表,再次检查是否为structure类型
  890. if table_label == 'DataResource':
  891. logger.info(f"尝试使用默认脚本'load_file.py'处理表 {target_table}")
  892. script_info = get_script_info_from_neo4j('load_file.py', target_table)
  893. if script_info:
  894. all_script_infos.append(script_info)
  895. return all_script_infos
  896. # 查看是否所有脚本名称都相同
  897. script_names = set(script['script_name'] for script in scripts)
  898. if len(script_names) == 1:
  899. # 如果只有一个不同的脚本名称,处理为单个脚本
  900. single_script_name = next(iter(script_names))
  901. logger.info(f"表 {target_table} 只关联了一个脚本: {single_script_name}")
  902. script_info = get_script_info_from_neo4j(single_script_name, target_table)
  903. if script_info:
  904. all_script_infos.append(script_info)
  905. else:
  906. # 如果有多个不同的脚本名称,分别处理每个脚本
  907. logger.info(f"表 {target_table} 关联了多个不同脚本: {script_names}")
  908. for script in scripts:
  909. script_name = script['script_name']
  910. script_info = get_script_info_from_neo4j(script_name, target_table)
  911. if script_info:
  912. all_script_infos.append(script_info)
  913. return all_script_infos
  914. def prepare_dependency_chain(**context):
  915. """
  916. 准备依赖链并保存到XCom
  917. """
  918. # 获取脚本和表名参数
  919. script_name, target_table, dependency_level, exec_date, logical_date = get_dag_params(**context)
  920. # 记录依赖级别信息
  921. logger.info(f"依赖级别说明:")
  922. logger.info(f"- self: 只执行当前脚本,不处理上游依赖")
  923. logger.info(f"- resource: 到Resource层")
  924. logger.info(f"- source: 到Source层")
  925. logger.info(f"当前依赖级别: {dependency_level}")
  926. logger.info(f"执行日期: {exec_date}")
  927. # 准备脚本信息
  928. script_infos = prepare_script_info(script_name, target_table, dependency_level)
  929. if not script_infos:
  930. logger.error(f"未能获取有效的脚本信息")
  931. return False
  932. # 获取完整的依赖链
  933. all_dependencies = []
  934. for script_info in script_infos:
  935. # 验证脚本信息
  936. if not script_info.get('target_table_label'):
  937. logger.warning(f"未能确定表 {script_info.get('target_table')} 的类型")
  938. continue
  939. # 获取脚本依赖链
  940. dependency_chain = get_upstream_script_dependencies(script_info, dependency_level)
  941. if dependency_chain:
  942. all_dependencies.extend(dependency_chain)
  943. else:
  944. logger.warning(f"没有找到脚本 {script_info.get('script_name')} 的依赖链")
  945. # 去重
  946. unique_dependencies = []
  947. seen_script_ids = set()
  948. for dep in all_dependencies:
  949. script_id = dep.get('script_id')
  950. if script_id and script_id not in seen_script_ids:
  951. seen_script_ids.add(script_id)
  952. unique_dependencies.append(dep)
  953. if not unique_dependencies:
  954. logger.error("没有找到任何有效的依赖链")
  955. return False
  956. # 保存依赖链和执行日期到XCom
  957. ti = context['ti']
  958. ti.xcom_push(key='dependency_chain', value=unique_dependencies)
  959. ti.xcom_push(key='exec_date', value=exec_date)
  960. ti.xcom_push(key='logical_date', value=logical_date.isoformat() if isinstance(logical_date, datetime) else logical_date)
  961. logger.info(f"成功准备了 {len(unique_dependencies)} 个脚本的依赖链")
  962. return True
  963. def execute_script_chain(**context):
  964. """
  965. 执行依赖链中的所有脚本
  966. """
  967. # 获取依赖链和执行日期
  968. ti = context['ti']
  969. dependency_chain = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_chain')
  970. exec_date = ti.xcom_pull(task_ids='prepare_dependency_chain', key='exec_date')
  971. logical_date_str = ti.xcom_pull(task_ids='prepare_dependency_chain', key='logical_date')
  972. # 转换logical_date为datetime对象(如果是字符串)
  973. if isinstance(logical_date_str, str):
  974. try:
  975. logical_date = pendulum.parse(logical_date_str)
  976. except:
  977. logical_date = datetime.now()
  978. else:
  979. logical_date = datetime.now()
  980. logger.info(f"【时间参数】execute_script_chain: exec_date={exec_date}, logical_date={logical_date}")
  981. if not dependency_chain:
  982. logger.error("没有找到依赖链,无法执行脚本")
  983. return False
  984. # 记录依赖链信息
  985. logger.info(f"准备执行依赖链中的 {len(dependency_chain)} 个脚本")
  986. for idx, script_info in enumerate(dependency_chain, 1):
  987. logger.info(f"脚本[{idx}]: {script_info['script_name']} -> {script_info['target_table']} (类型: {script_info['script_type']})")
  988. # 逐个执行脚本
  989. all_success = True
  990. results = []
  991. for idx, script_info in enumerate(dependency_chain, 1):
  992. script_name = script_info['script_name']
  993. target_table = script_info['target_table']
  994. script_type = script_info.get('script_type', 'python_script')
  995. logger.info(f"===== 执行脚本 {idx}/{len(dependency_chain)}: {script_name} -> {target_table} (类型: {script_type}) =====")
  996. # 根据脚本类型选择执行函数
  997. executor = choose_executor(script_info)
  998. # 将执行日期添加到脚本信息中
  999. script_info['exec_date'] = exec_date
  1000. script_info['logical_date'] = logical_date
  1001. # 执行脚本
  1002. success = executor(script_info)
  1003. # 记录结果
  1004. result = {
  1005. "script_name": script_name,
  1006. "target_table": target_table,
  1007. "script_type": script_type,
  1008. "success": success
  1009. }
  1010. results.append(result)
  1011. # 如果任何一个脚本执行失败,标记整体失败
  1012. if not success:
  1013. all_success = False
  1014. logger.error(f"脚本 {script_name} 执行失败,中断执行链")
  1015. break
  1016. # 保存执行结果
  1017. ti.xcom_push(key='execution_results', value=results)
  1018. return all_success
  1019. def generate_execution_report(**context):
  1020. """
  1021. 生成执行报告
  1022. """
  1023. # 获取执行结果
  1024. ti = context['ti']
  1025. results = ti.xcom_pull(task_ids='execute_script_chain', key='execution_results')
  1026. dependency_chain = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_chain')
  1027. exec_date = ti.xcom_pull(task_ids='prepare_dependency_chain', key='exec_date')
  1028. if not results:
  1029. report = "未找到执行结果,无法生成报告"
  1030. ti.xcom_push(key='execution_report', value=report)
  1031. return report
  1032. # 计算执行统计信息
  1033. total = len(results)
  1034. success_count = sum(1 for r in results if r['success'])
  1035. fail_count = total - success_count
  1036. # 统计不同类型脚本数量
  1037. script_types = {}
  1038. for result in results:
  1039. script_type = result.get('script_type', 'python_script')
  1040. if script_type not in script_types:
  1041. script_types[script_type] = 0
  1042. script_types[script_type] += 1
  1043. # 构建报告
  1044. report = []
  1045. report.append("\n========== 脚本执行报告 ==========")
  1046. report.append(f"执行日期: {exec_date}")
  1047. report.append(f"报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  1048. report.append(f"总脚本数: {total}")
  1049. report.append(f"成功数: {success_count}")
  1050. report.append(f"失败数: {fail_count}")
  1051. report.append(f"成功率: {success_count / total * 100:.2f}%")
  1052. # 添加脚本类型统计
  1053. report.append("\n--- 脚本类型统计 ---")
  1054. for script_type, count in script_types.items():
  1055. report.append(f"{script_type}: {count} 个")
  1056. report.append("\n--- 执行详情 ---")
  1057. for idx, result in enumerate(results, 1):
  1058. script_name = result['script_name']
  1059. target_table = result['target_table']
  1060. script_type = result.get('script_type', 'python_script')
  1061. success = result['success']
  1062. status = "✓ 成功" if success else "✗ 失败"
  1063. report.append(f"{idx}. {script_name} -> {target_table} ({script_type}): {status}")
  1064. report.append("\n========== 报告结束 ==========")
  1065. # 转换为字符串
  1066. report_str = "\n".join(report)
  1067. # 保存报告
  1068. ti.xcom_push(key='execution_report', value=report_str)
  1069. # 记录到日志
  1070. logger.info(report_str)
  1071. return report_str
  1072. # 创建DAG
  1073. with DAG(
  1074. 'dataops_productline_manual_trigger_dag',
  1075. default_args=default_args,
  1076. description='script_name和target_table可以二选一,支持三种依赖级别:self(仅当前表或脚本)、resource(到Resource层)、source(到Source层)',
  1077. schedule_interval=None, # 设置为None表示只能手动触发
  1078. catchup=False,
  1079. is_paused_upon_creation=False,
  1080. params={
  1081. "script_name": "",
  1082. "target_table": "",
  1083. "dependency_level": "self"
  1084. },
  1085. ) as dag:
  1086. # 任务1: 准备依赖阶段
  1087. prepare_task = ShortCircuitOperator(
  1088. task_id='prepare_dependency_chain',
  1089. python_callable=prepare_dependency_chain,
  1090. provide_context=True,
  1091. )
  1092. # 任务2: 执行脚本链
  1093. execute_task = PythonOperator(
  1094. task_id='execute_script_chain',
  1095. python_callable=execute_script_chain,
  1096. provide_context=True,
  1097. )
  1098. # 任务3: 生成执行报告
  1099. report_task = PythonOperator(
  1100. task_id='generate_execution_report',
  1101. python_callable=generate_execution_report,
  1102. provide_context=True,
  1103. trigger_rule='all_done' # 无论前面的任务成功或失败,都生成报告
  1104. )
  1105. # 任务4: 完成标记
  1106. completed_task = EmptyOperator(
  1107. task_id='execution_completed',
  1108. trigger_rule='all_done' # 无论前面的任务成功或失败,都标记为完成
  1109. )
  1110. # 设置任务依赖关系
  1111. prepare_task >> execute_task >> report_task >> completed_task