var_test_dag.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435
  1. from airflow import DAG
  2. from airflow.operators.python import PythonOperator
  3. from airflow.providers.postgres.hooks.postgres import PostgresHook
  4. from airflow.models import Variable
  5. from datetime import datetime
  6. def test_pg_conn():
  7. hook = PostgresHook(postgres_conn_id="pg_dataops")
  8. conn = hook.get_conn()
  9. cursor = conn.cursor()
  10. cursor.execute("SELECT 1;")
  11. result = cursor.fetchone()
  12. print(f"Query result: {result}")
  13. cursor.close()
  14. conn.close()
  15. # 获取并打印 Airflow Variables
  16. upload_base_path = Variable.get("STRUCTURE_UPLOAD_BASE_PATH", default_var="Not Set")
  17. archive_base_path = Variable.get("STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH", default_var="Not Set")
  18. print(f"STRUCTURE_UPLOAD_BASE_PATH: {upload_base_path}")
  19. print(f"STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH: {archive_base_path}")
  20. with DAG(
  21. dag_id="test_pg_hook_connection",
  22. start_date=datetime(2024, 1, 1),
  23. schedule_interval=None,
  24. catchup=False,
  25. tags=["postgres", "test"],
  26. ) as dag:
  27. run_test = PythonOperator(
  28. task_id="test_pg_conn",
  29. python_callable=test_pg_conn,
  30. )