瀏覽代碼

准备按照线上AI的建议,修改代码

wangxq 1 月之前
父節點
當前提交
aa65954786
共有 2 個文件被更改,包括 126 次插入11 次删除
  1. 20 2
      dags/dag_data_model_monthly.py
  2. 106 9
      dags/utils.py

+ 20 - 2
dags/dag_data_model_monthly.py

@@ -3,7 +3,7 @@ from airflow.operators.python import PythonOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.sensors.external_task import ExternalTaskSensor
 from datetime import datetime
-from utils import get_enabled_tables, is_data_model_table, run_model_script, get_model_dependency_graph
+from utils import get_enabled_tables, is_data_model_table, run_model_script, get_model_dependency_graph, check_table_relationship
 from config import NEO4J_CONFIG
 import pendulum
 import logging
@@ -96,6 +96,12 @@ with DAG("dag_data_model_monthly", start_date=datetime(2024, 1, 1), schedule_int
                 # 获取表名列表
                 table_names = [t['table_name'] for t in model_tables]
                 
+                # 特别检查两个表之间的关系
+                if 'book_sale_amt_yearly' in table_names and 'book_sale_amt_monthly' in table_names:
+                    logger.info("特别检查 book_sale_amt_yearly 和 book_sale_amt_monthly 之间的关系")
+                    relationship = check_table_relationship('book_sale_amt_yearly', 'book_sale_amt_monthly')
+                    logger.info(f"关系检查结果: {relationship}")
+                
                 # 使用优化函数生成执行顺序,可以处理循环依赖
                 optimized_table_order = generate_optimized_execution_order(table_names)
                 logger.info(f"生成优化执行顺序, 共 {len(optimized_table_order)} 个表")
@@ -132,22 +138,34 @@ with DAG("dag_data_model_monthly", start_date=datetime(2024, 1, 1), schedule_int
 
                 # 建立任务依赖(基于 DERIVED_FROM 图)
                 dependency_count = 0
+                logger.info("开始建立任务依赖关系...")
                 for target, upstream_list in dependency_graph.items():
+                    logger.info(f"处理目标表 {target} 的依赖关系")
                     for upstream in upstream_list:
                         if upstream in task_dict and target in task_dict:
+                            logger.info(f"建立依赖边: {upstream} >> {target}")
                             task_dict[upstream] >> task_dict[target]
                             dependency_count += 1
                             logger.debug(f"建立依赖关系: {upstream} >> {target}")
                         else:
-                            logger.warning(f"无法建立依赖关系,缺少任务: {upstream} 或 {target}")
+                            missing = []
+                            if upstream not in task_dict:
+                                missing.append(f"上游表 {upstream}")
+                            if target not in task_dict:
+                                missing.append(f"目标表 {target}")
+                            missing_str = " 和 ".join(missing)
+                            logger.warning(f"无法建立依赖关系: {upstream} >> {target},缺少任务: {missing_str}")
 
                 logger.info(f"总共建立了 {dependency_count} 个任务依赖关系")
+                logger.info(f"任务字典中的所有表: {list(task_dict.keys())}")
 
                 # 最顶层的 task(没有任何上游)需要依赖周模型任务完成
                 all_upstreams = set()
                 for upstreams in dependency_graph.values():
                     all_upstreams.update(upstreams)
                 top_level_tasks = [t for t in table_names if t not in all_upstreams]
+                logger.info(f"所有上游表集合: {all_upstreams}")
+                logger.info(f"识别出的顶层表: {top_level_tasks}")
                 
                 if top_level_tasks:
                     logger.info(f"发现 {len(top_level_tasks)} 个顶层任务: {', '.join(top_level_tasks)}")

+ 106 - 9
dags/utils.py

@@ -222,26 +222,43 @@ def get_script_name_from_neo4j(table_name):
     uri = NEO4J_CONFIG['uri']
     auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
     driver = GraphDatabase.driver(uri, auth=auth)
-    query = """
-        MATCH (target:DataModel {en_name: $table_name})-[r:DERIVED_FROM]->(n)
-        WHERE n:DataModel OR n:DataResource
-        RETURN r.script_name AS script_name
+    logger.info(f"从Neo4j查询表 {table_name} 的脚本名称")
+    
+    # 检查查询的是 DERIVED_FROM 关系的方向
+    check_query = """
+        MATCH (a:DataModel {en_name: $table_name})-[r:DERIVED_FROM]->(b)
+        RETURN b.en_name AS upstream_name LIMIT 5
     """
+    
     try:
         with driver.session() as session:
+            # 先检查依赖关系
+            logger.info(f"检查表 {table_name} 的上游依赖方向")
+            check_result = session.run(check_query, table_name=table_name)
+            upstreams = [record['upstream_name'] for record in check_result if 'upstream_name' in record]
+            logger.info(f"表 {table_name} 的上游依赖: {upstreams}")
+            
+            # 查询脚本名称
+            query = """
+                MATCH (target:DataModel {en_name: $table_name})-[r:DERIVED_FROM]->(n)
+                WHERE n:DataModel OR n:DataResource
+                RETURN r.script_name AS script_name
+            """
             result = session.run(query, table_name=table_name)
             record = result.single()
             if record:
                 try:
                     script_name = record['script_name']
+                    logger.info(f"找到表 {table_name} 的脚本名称: {script_name}")
                     return script_name
                 except (KeyError, TypeError) as e:
-                    print(f"[WARN] 记录中不包含script_name字段: {e}")
+                    logger.warning(f"记录中不包含script_name字段: {e}")
                     return None
             else:
+                logger.warning(f"没有找到表 {table_name} 的脚本名称")
                 return None
     except Exception as e:
-        print(f"[ERROR] 查询表 {table_name} 的脚本名称时出错: {str(e)}")
+        logger.error(f"查询表 {table_name} 的脚本名称时出错: {str(e)}")
         return None
     finally:
         driver.close()
@@ -390,6 +407,7 @@ def get_model_dependency_graph(table_names: list) -> dict:
     返回:
         dict: 依赖关系字典 {目标表: [上游依赖表1, 上游依赖表2, ...]}
     """
+    logger.info(f"开始构建依赖关系图,表列表: {table_names}")
     # 创建有向图
     G = nx.DiGraph()
     
@@ -404,16 +422,29 @@ def get_model_dependency_graph(table_names: list) -> dict:
     try:
         with driver.session() as session:
             for table_name in table_names:
+                # 修改查询,移除对节点类型的限制,但保留对表名集合的过滤
                 query = """
-                    MATCH (t:DataModel {en_name: $table_name})<-[:DERIVED_FROM]-(up:DataModel)
+                    MATCH (t {en_name: $table_name})-[:DERIVED_FROM]->(up)
                     WHERE up.en_name IN $all_tables
                     RETURN up.en_name AS upstream
                 """
+                logger.info(f"执行Neo4j查询: 查找 {table_name} 在当前批次中的上游依赖")
                 result = session.run(query, table_name=table_name, all_tables=table_names)
                 deps = [record['upstream'] for record in result if 'upstream' in record]
+                logger.info(f"表 {table_name} 的上游依赖(当前批次内): {deps}")
+                
+                # 同时查询所有上游依赖(不限于当前批次),用于日志记录
+                all_deps_query = """
+                    MATCH (t {en_name: $table_name})-[:DERIVED_FROM]->(up)
+                    RETURN up.en_name AS upstream
+                """
+                all_deps_result = session.run(all_deps_query, table_name=table_name)
+                all_deps = [record['upstream'] for record in all_deps_result if 'upstream' in record]
+                logger.info(f"表 {table_name} 的所有上游依赖: {all_deps}")
                 
                 # 添加依赖边
                 for dep in deps:
+                    logger.info(f"添加依赖边: {dep} -> {table_name}")
                     G.add_edge(dep, table_name)
     finally:
         driver.close()
@@ -429,8 +460,11 @@ def get_model_dependency_graph(table_names: list) -> dict:
     # 转换为字典格式返回
     dependency_dict = {}
     for table_name in table_names:
-        dependency_dict[table_name] = list(G.predecessors(table_name))
+        predecessors = list(G.predecessors(table_name))
+        dependency_dict[table_name] = predecessors
+        logger.info(f"最终依赖关系 - 表 {table_name} 依赖于: {predecessors}")
     
+    logger.info(f"完整依赖图: {dependency_dict}")
     return dependency_dict
 
 
@@ -533,4 +567,67 @@ def identify_common_paths(table_names: list) -> dict:
     # 按使用次数排序
     common_paths = dict(sorted(common_paths.items(), key=lambda x: x[1], reverse=True))
     
-    return common_paths
+    return common_paths
+
+def check_table_relationship(table1, table2):
+    """
+    直接检查Neo4j中两个表之间的关系
+    
+    参数:
+        table1: 第一个表名
+        table2: 第二个表名
+    
+    返回:
+        关系信息字典
+    """
+    uri = NEO4J_CONFIG['uri']
+    auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
+    driver = GraphDatabase.driver(uri, auth=auth)
+    
+    relationship_info = {}
+    
+    try:
+        with driver.session() as session:
+            # 检查 table1 -> table2 方向
+            forward_query = """
+                MATCH (a:DataModel {en_name: $table1})-[r:DERIVED_FROM]->(b:DataModel {en_name: $table2})
+                RETURN count(r) > 0 AS has_relationship, r.script_name AS script_name
+            """
+            forward_result = session.run(forward_query, table1=table1, table2=table2)
+            forward_record = forward_result.single()
+            
+            if forward_record and forward_record['has_relationship']:
+                relationship_info['forward'] = {
+                    'exists': True,
+                    'direction': f"{table1} -> {table2}",
+                    'script_name': forward_record.get('script_name')
+                }
+                logger.info(f"发现关系: {table1} -[:DERIVED_FROM]-> {table2}, 脚本: {forward_record.get('script_name')}")
+            else:
+                relationship_info['forward'] = {'exists': False}
+                
+            # 检查 table2 -> table1 方向
+            backward_query = """
+                MATCH (a:DataModel {en_name: $table2})-[r:DERIVED_FROM]->(b:DataModel {en_name: $table1})
+                RETURN count(r) > 0 AS has_relationship, r.script_name AS script_name
+            """
+            backward_result = session.run(backward_query, table1=table1, table2=table2)
+            backward_record = backward_result.single()
+            
+            if backward_record and backward_record['has_relationship']:
+                relationship_info['backward'] = {
+                    'exists': True,
+                    'direction': f"{table2} -> {table1}",
+                    'script_name': backward_record.get('script_name')
+                }
+                logger.info(f"发现关系: {table2} -[:DERIVED_FROM]-> {table1}, 脚本: {backward_record.get('script_name')}")
+            else:
+                relationship_info['backward'] = {'exists': False}
+                
+    except Exception as e:
+        logger.error(f"检查表关系时出错: {str(e)}")
+        relationship_info['error'] = str(e)
+    finally:
+        driver.close()
+        
+    return relationship_info