temporal.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import asyncio
  19. import datetime
  20. from typing import Any, AsyncIterator
  21. import pendulum
  22. from airflow.triggers.base import BaseTrigger, TaskSuccessEvent, TriggerEvent
  23. from airflow.utils import timezone
  24. class DateTimeTrigger(BaseTrigger):
  25. """
  26. Trigger based on a datetime.
  27. A trigger that fires exactly once, at the given datetime, give or take
  28. a few seconds.
  29. The provided datetime MUST be in UTC.
  30. :param moment: when to yield event
  31. :param end_from_trigger: whether the trigger should mark the task successful after time condition
  32. reached or resume the task after time condition reached.
  33. """
  34. def __init__(self, moment: datetime.datetime, *, end_from_trigger: bool = False) -> None:
  35. super().__init__()
  36. if not isinstance(moment, datetime.datetime):
  37. raise TypeError(f"Expected datetime.datetime type for moment. Got {type(moment)}")
  38. # Make sure it's in UTC
  39. elif moment.tzinfo is None:
  40. raise ValueError("You cannot pass naive datetimes")
  41. else:
  42. self.moment: pendulum.DateTime = timezone.convert_to_utc(moment)
  43. self.end_from_trigger = end_from_trigger
  44. def serialize(self) -> tuple[str, dict[str, Any]]:
  45. return (
  46. "airflow.triggers.temporal.DateTimeTrigger",
  47. {"moment": self.moment, "end_from_trigger": self.end_from_trigger},
  48. )
  49. async def run(self) -> AsyncIterator[TriggerEvent]:
  50. """
  51. Loop until the relevant time is met.
  52. We do have a two-phase delay to save some cycles, but sleeping is so
  53. cheap anyway that it's pretty loose. We also don't just sleep for
  54. "the number of seconds until the time" in case the system clock changes
  55. unexpectedly, or handles a DST change poorly.
  56. """
  57. # Sleep in successively smaller increments starting from 1 hour down to 10 seconds at a time
  58. self.log.info("trigger starting")
  59. for step in 3600, 60, 10:
  60. seconds_remaining = (self.moment - pendulum.instance(timezone.utcnow())).total_seconds()
  61. while seconds_remaining > 2 * step:
  62. self.log.info("%d seconds remaining; sleeping %s seconds", seconds_remaining, step)
  63. await asyncio.sleep(step)
  64. seconds_remaining = (self.moment - pendulum.instance(timezone.utcnow())).total_seconds()
  65. # Sleep a second at a time otherwise
  66. while self.moment > pendulum.instance(timezone.utcnow()):
  67. self.log.info("sleeping 1 second...")
  68. await asyncio.sleep(1)
  69. if self.end_from_trigger:
  70. self.log.info("Sensor time condition reached; marking task successful and exiting")
  71. yield TaskSuccessEvent()
  72. else:
  73. self.log.info("yielding event with payload %r", self.moment)
  74. yield TriggerEvent(self.moment)
  75. class TimeDeltaTrigger(DateTimeTrigger):
  76. """
  77. Create DateTimeTriggers based on delays.
  78. Subclass to create DateTimeTriggers based on time delays rather
  79. than exact moments.
  80. While this is its own distinct class here, it will serialise to a
  81. DateTimeTrigger class, since they're operationally the same.
  82. :param delta: how long to wait
  83. :param end_from_trigger: whether the trigger should mark the task successful after time condition
  84. reached or resume the task after time condition reached.
  85. """
  86. def __init__(self, delta: datetime.timedelta, *, end_from_trigger: bool = False) -> None:
  87. super().__init__(moment=timezone.utcnow() + delta, end_from_trigger=end_from_trigger)