weekday.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import warnings
  20. from typing import TYPE_CHECKING, Iterable
  21. from airflow.exceptions import RemovedInAirflow3Warning
  22. from airflow.operators.branch import BaseBranchOperator
  23. from airflow.utils import timezone
  24. from airflow.utils.weekday import WeekDay
  25. if TYPE_CHECKING:
  26. from airflow.utils.context import Context
  27. class BranchDayOfWeekOperator(BaseBranchOperator):
  28. """
  29. Branches into one of two lists of tasks depending on the current day.
  30. For more information on how to use this operator, take a look at the guide:
  31. :ref:`howto/operator:BranchDayOfWeekOperator`
  32. **Example** (with single day):
  33. .. code-block:: python
  34. from airflow.operators.empty import EmptyOperator
  35. monday = EmptyOperator(task_id="monday")
  36. other_day = EmptyOperator(task_id="other_day")
  37. monday_check = BranchDayOfWeekOperator(
  38. task_id="monday_check",
  39. week_day="Monday",
  40. use_task_logical_date=True,
  41. follow_task_ids_if_true="monday",
  42. follow_task_ids_if_false="other_day",
  43. )
  44. monday_check >> [monday, other_day]
  45. **Example** (with :class:`~airflow.utils.weekday.WeekDay` enum):
  46. .. code-block:: python
  47. # import WeekDay Enum
  48. from airflow.utils.weekday import WeekDay
  49. from airflow.operators.empty import EmptyOperator
  50. workday = EmptyOperator(task_id="workday")
  51. weekend = EmptyOperator(task_id="weekend")
  52. weekend_check = BranchDayOfWeekOperator(
  53. task_id="weekend_check",
  54. week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
  55. use_task_logical_date=True,
  56. follow_task_ids_if_true="weekend",
  57. follow_task_ids_if_false="workday",
  58. )
  59. # add downstream dependencies as you would do with any branch operator
  60. weekend_check >> [workday, weekend]
  61. :param follow_task_ids_if_true: task_id, task_group_id, or a list of task_ids and/or task_group_ids
  62. to follow if criteria met.
  63. :param follow_task_ids_if_false: task_id, task_group_id, or a list of task_ids and/or task_group_ids
  64. to follow if criteria not met.
  65. :param week_day: Day of the week to check (full name). Optionally, a set
  66. of days can also be provided using a set. Example values:
  67. * ``"MONDAY"``,
  68. * ``{"Saturday", "Sunday"}``
  69. * ``{WeekDay.TUESDAY}``
  70. * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
  71. To use `WeekDay` enum, import it from `airflow.utils.weekday`
  72. :param use_task_logical_date: If ``True``, uses task's logical date to compare
  73. with is_today. Execution Date is Useful for backfilling.
  74. If ``False``, uses system's day of the week.
  75. :param use_task_execution_day: deprecated parameter, same effect as `use_task_logical_date`
  76. """
  77. def __init__(
  78. self,
  79. *,
  80. follow_task_ids_if_true: str | Iterable[str],
  81. follow_task_ids_if_false: str | Iterable[str],
  82. week_day: str | Iterable[str] | WeekDay | Iterable[WeekDay],
  83. use_task_logical_date: bool = False,
  84. use_task_execution_day: bool = False,
  85. **kwargs,
  86. ) -> None:
  87. super().__init__(**kwargs)
  88. self.follow_task_ids_if_true = follow_task_ids_if_true
  89. self.follow_task_ids_if_false = follow_task_ids_if_false
  90. self.week_day = week_day
  91. self.use_task_logical_date = use_task_logical_date
  92. if use_task_execution_day:
  93. self.use_task_logical_date = use_task_execution_day
  94. warnings.warn(
  95. "Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.",
  96. RemovedInAirflow3Warning,
  97. stacklevel=2,
  98. )
  99. self._week_day_num = WeekDay.validate_week_day(week_day)
  100. def choose_branch(self, context: Context) -> str | Iterable[str]:
  101. if self.use_task_logical_date:
  102. now = context["logical_date"]
  103. else:
  104. now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
  105. if now.isoweekday() in self._week_day_num:
  106. return self.follow_task_ids_if_true
  107. return self.follow_task_ids_if_false