time_delta.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. from typing import TYPE_CHECKING, Any, NoReturn
  20. from airflow.exceptions import AirflowSkipException
  21. from airflow.sensors.base import BaseSensorOperator
  22. from airflow.triggers.temporal import DateTimeTrigger
  23. from airflow.utils import timezone
  24. if TYPE_CHECKING:
  25. from airflow.utils.context import Context
  26. class TimeDeltaSensor(BaseSensorOperator):
  27. """
  28. Waits for a timedelta after the run's data interval.
  29. :param delta: time length to wait after the data interval before succeeding.
  30. .. seealso::
  31. For more information on how to use this sensor, take a look at the guide:
  32. :ref:`howto/operator:TimeDeltaSensor`
  33. """
  34. def __init__(self, *, delta, **kwargs):
  35. super().__init__(**kwargs)
  36. self.delta = delta
  37. def poke(self, context: Context):
  38. target_dttm = context["data_interval_end"]
  39. target_dttm += self.delta
  40. self.log.info("Checking if the time (%s) has come", target_dttm)
  41. return timezone.utcnow() > target_dttm
  42. class TimeDeltaSensorAsync(TimeDeltaSensor):
  43. """
  44. A deferrable drop-in replacement for TimeDeltaSensor.
  45. Will defers itself to avoid taking up a worker slot while it is waiting.
  46. :param delta: time length to wait after the data interval before succeeding.
  47. :param end_from_trigger: End the task directly from the triggerer without going into the worker.
  48. .. seealso::
  49. For more information on how to use this sensor, take a look at the guide:
  50. :ref:`howto/operator:TimeDeltaSensorAsync`
  51. """
  52. def __init__(self, *, end_from_trigger: bool = False, delta, **kwargs) -> None:
  53. super().__init__(delta=delta, **kwargs)
  54. self.end_from_trigger = end_from_trigger
  55. def execute(self, context: Context) -> bool | NoReturn:
  56. target_dttm = context["data_interval_end"]
  57. target_dttm += self.delta
  58. if timezone.utcnow() > target_dttm:
  59. # If the target datetime is in the past, return immediately
  60. return True
  61. try:
  62. trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger)
  63. except (TypeError, ValueError) as e:
  64. if self.soft_fail:
  65. raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
  66. raise
  67. self.defer(trigger=trigger, method_name="execute_complete")
  68. def execute_complete(self, context: Context, event: Any = None) -> None:
  69. """Handle the event when the trigger fires and return immediately."""
  70. return None