Browse Source

统一使用create_time,修改原来使用insert_time的字段。

wangxq 3 weeks ago
parent
commit
08feb8b784
1 changed files with 4 additions and 4 deletions
  1. 4 4
      dags/dataops_productline_prepare_dag.py

+ 4 - 4
dags/dataops_productline_prepare_dag.py

@@ -775,16 +775,16 @@ def check_execution_plan_in_db(**kwargs):
                 if EXECUTION_PLAN_KEEP_COUNT > 0:
                     cursor.execute(f"""
                         WITH to_keep AS (
-                            SELECT dag_id, run_id, exec_date, insert_time
+                            SELECT dag_id, run_id, exec_date, create_time
                             FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
                             WHERE dag_id = %s AND exec_date = %s
-                            ORDER BY insert_time DESC
+                            ORDER BY create_time DESC
                             LIMIT %s
                         )
                         DELETE FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
                         WHERE dag_id = %s AND exec_date = %s
-                        AND (dag_id, run_id, exec_date, insert_time) NOT IN (
-                            SELECT dag_id, run_id, exec_date, insert_time FROM to_keep
+                        AND (dag_id, run_id, exec_date, create_time) NOT IN (
+                            SELECT dag_id, run_id, exec_date, create_time FROM to_keep
                         )
                     """, (dag_id, ds, EXECUTION_PLAN_KEEP_COUNT, dag_id, ds))