latest_only.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """Contains an operator to run downstream tasks only for the latest scheduled DagRun."""
  19. from __future__ import annotations
  20. from typing import TYPE_CHECKING, Iterable
  21. import pendulum
  22. from airflow.operators.branch import BaseBranchOperator
  23. if TYPE_CHECKING:
  24. from airflow.models import DAG, DagRun
  25. from airflow.utils.context import Context
  26. class LatestOnlyOperator(BaseBranchOperator):
  27. """
  28. Skip tasks that are not running during the most recent schedule interval.
  29. If the task is run outside the latest schedule interval (i.e. external_trigger),
  30. all directly downstream tasks will be skipped.
  31. Note that downstream tasks are never skipped if the given DAG_Run is
  32. marked as externally triggered.
  33. """
  34. ui_color = "#e9ffdb" # nyanza
  35. def choose_branch(self, context: Context) -> str | Iterable[str]:
  36. # If the DAG Run is externally triggered, then return without
  37. # skipping downstream tasks
  38. dag_run: DagRun = context["dag_run"] # type: ignore[assignment]
  39. if dag_run.external_trigger:
  40. self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
  41. return list(context["task"].get_direct_relative_ids(upstream=False))
  42. dag: DAG = context["dag"]
  43. next_info = dag.next_dagrun_info(dag.get_run_data_interval(dag_run), restricted=False)
  44. now = pendulum.now("UTC")
  45. if next_info is None:
  46. self.log.info("Last scheduled execution: allowing execution to proceed.")
  47. return list(context["task"].get_direct_relative_ids(upstream=False))
  48. left_window, right_window = next_info.data_interval
  49. self.log.info(
  50. "Checking latest only with left_window: %s right_window: %s now: %s",
  51. left_window,
  52. right_window,
  53. now,
  54. )
  55. if not left_window < now <= right_window:
  56. self.log.info("Not latest execution, skipping downstream.")
  57. # we return an empty list, thus the parent BaseBranchOperator
  58. # won't exclude any downstream tasks from skipping.
  59. return []
  60. else:
  61. self.log.info("Latest, allowing execution to proceed.")
  62. return list(context["task"].get_direct_relative_ids(upstream=False))