Pārlūkot izejas kodu

修改config.py,让prepare_day.py在作业中修改系统var变量。

wangxq 1 nedēļu atpakaļ
vecāks
revīzija
540676cf42

+ 4 - 1
dags/config.py

@@ -1,5 +1,7 @@
 # config.py
 
+import os
+
 # PostgreSQL 连接信息
 PG_CONFIG = {
     "host": "localhost",
@@ -32,7 +34,8 @@ TASK_RETRY_CONFIG = {
 # 脚本文件基础路径配置
 # 部署到 Airflow 环境时使用此路径
 AIRFLOW_BASE_PATH='/opt/airflow'
-SCRIPTS_BASE_PATH = "/opt/airflow/dataops_scripts"
+DATAOPS_DAGS_PATH = os.path.join(AIRFLOW_BASE_PATH, 'dags')
+SCRIPTS_BASE_PATH = os.path.join(AIRFLOW_BASE_PATH, 'dataops_scripts')
 
 # 上传的CSV/EXCEL文件的基准上传路径
 STRUCTURE_UPLOAD_BASE_PATH ="/data/csv"

+ 18 - 1
dags/dataops_productline_prepare_dag.py

@@ -2,6 +2,7 @@
 from airflow import DAG
 from airflow.operators.python import PythonOperator, ShortCircuitOperator
 from airflow.operators.empty import EmptyOperator
+from airflow.models import Variable
 from datetime import datetime, timedelta
 import logging
 import networkx as nx
@@ -17,7 +18,7 @@ from utils import (
     get_neo4j_driver,
     get_today_date
 )
-from config import PG_CONFIG, NEO4J_CONFIG
+from config import PG_CONFIG, NEO4J_CONFIG, DATAOPS_DAGS_PATH
 
 # 创建日志记录器
 logger = logging.getLogger(__name__)
@@ -1022,6 +1023,19 @@ def optimize_script_execution_order(scripts, script_dependencies, G):
         # 出错时返回原始脚本ID列表,不进行优化
         return [script['script_id'] for script in scripts] 
 
+def set_dataops_dags_path_variable():
+    """
+    将DATAOPS_DAGS_PATH设置为Airflow变量
+    """
+    try:
+        # 从config中获取DATAOPS_DAGS_PATH值
+        Variable.set("DATAOPS_DAGS_PATH", DATAOPS_DAGS_PATH)
+        logger.info(f"已成功设置Airflow变量DATAOPS_DAGS_PATH为: {DATAOPS_DAGS_PATH}")
+        return True
+    except Exception as e:
+        logger.error(f"设置Airflow变量DATAOPS_DAGS_PATH失败: {str(e)}")
+        return False
+
 def prepare_productline_dag_schedule(**kwargs):
     """准备产品线DAG调度任务的主函数"""
     # 添加更严格的异常处理
@@ -1287,6 +1301,9 @@ def prepare_productline_dag_schedule(**kwargs):
                 else:
                     raise Exception("执行计划保存到数据库失败")
                 
+                # 13. 设置Airflow变量DATAOPS_DAGS_PATH
+                set_dataops_dags_path_variable()
+                
             except Exception as db_e:
                 # 捕获数据库保存错误
                 error_msg = f"保存执行计划到数据库时出错: {str(db_e)}"

+ 35 - 0
dags/var_test_dag.py

@@ -0,0 +1,35 @@
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.providers.postgres.hooks.postgres import PostgresHook
+from airflow.models import Variable 
+from datetime import datetime
+
+def test_pg_conn():
+    hook = PostgresHook(postgres_conn_id="pg_dataops")
+    conn = hook.get_conn()
+    cursor = conn.cursor()
+    cursor.execute("SELECT 1;")
+    result = cursor.fetchone()
+    print(f"Query result: {result}")
+    cursor.close()
+    conn.close()
+
+        # 获取并打印 Airflow Variables
+    upload_base_path = Variable.get("STRUCTURE_UPLOAD_BASE_PATH", default_var="Not Set")
+    archive_base_path = Variable.get("STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH", default_var="Not Set")
+
+    print(f"STRUCTURE_UPLOAD_BASE_PATH: {upload_base_path}")
+    print(f"STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH: {archive_base_path}")
+
+with DAG(
+    dag_id="test_pg_hook_connection",
+    start_date=datetime(2024, 1, 1),
+    schedule_interval=None,
+    catchup=False,
+    tags=["postgres", "test"],
+) as dag:
+
+    run_test = PythonOperator(
+        task_id="test_pg_conn",
+        python_callable=test_pg_conn,
+    )

+ 1 - 1
dataops_scripts/execution_sql.py

@@ -36,7 +36,7 @@ except ImportError as e:
     # 尝试备用方法1:完整路径导入
     try:
         sys.path.append(os.path.dirname(current_dir))  # 添加父目录
-        import dataops.scripts.script_utils as script_utils
+        import dataops_scripts.script_utils as script_utils
         logger.info("使用完整路径成功导入script_utils模块")
     except ImportError as e2:
         logger.error(f"使用完整路径导入失败: {str(e2)}")

+ 0 - 7
dataops_scripts/load_data.py

@@ -26,13 +26,6 @@ def load_data_from_source(source_name="default", execution_date=None, execution_
     if script_name is None:
         script_name = os.path.basename(__file__)
     
-    # 使用print输出所有参数
-    print(f"===== 参数信息 (print输出) =====")
-    print(f"table_name: {source_name}")
-    print(f"exec_date: {execution_date}")
-    print(f"execution_mode: {execution_mode}")
-    print(f"script_name: {script_name}")
-    print(f"================================")
     
     # 使用logger.info输出所有参数
     logger.info(f"===== 参数信息 (logger输出) =====")

+ 3 - 1
requirements.txt

@@ -1,9 +1,11 @@
 apache-airflow==2.10.5
-psycopg2-binary>=2.9.9
+psycopg2-binary>=2.9.10
 neo4j>=5.19.0
 pendulum>=3.0.0
 networkx>=3.4.2
 pandas>=2.2.3
 xlrd>=2.0.1
 openpyxl>=3.1.5
+apache-airflow-providers-postgres>=6.1.3
+apache-airflow-providers-common-sql>=1.26.0