filesystem.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import datetime
  20. import os
  21. from functools import cached_property
  22. from glob import glob
  23. from typing import TYPE_CHECKING, Any, Sequence
  24. from airflow.configuration import conf
  25. from airflow.exceptions import AirflowException
  26. from airflow.hooks.filesystem import FSHook
  27. from airflow.sensors.base import BaseSensorOperator
  28. from airflow.triggers.base import StartTriggerArgs
  29. from airflow.triggers.file import FileTrigger
  30. if TYPE_CHECKING:
  31. from airflow.utils.context import Context
  32. class FileSensor(BaseSensorOperator):
  33. """
  34. Waits for a file or folder to land in a filesystem.
  35. If the path given is a directory then this sensor will only return true if
  36. any files exist inside it (either directly, or within a subdirectory)
  37. :param fs_conn_id: reference to the File (path)
  38. connection id
  39. :param filepath: File or folder name (relative to
  40. the base path set within the connection), can be a glob.
  41. :param recursive: when set to ``True``, enables recursive directory matching behavior of
  42. ``**`` in glob filepath parameter. Defaults to ``False``.
  43. :param deferrable: If waiting for completion, whether to defer the task until done,
  44. default is ``False``.
  45. :param start_from_trigger: Start the task directly from the triggerer without going into the worker.
  46. :param trigger_kwargs: The keyword arguments passed to the trigger when start_from_trigger is set to True
  47. during dynamic task mapping. This argument is not used in standard usage.
  48. .. seealso::
  49. For more information on how to use this sensor, take a look at the guide:
  50. :ref:`howto/operator:FileSensor`
  51. """
  52. template_fields: Sequence[str] = ("filepath",)
  53. ui_color = "#91818a"
  54. start_trigger_args = StartTriggerArgs(
  55. trigger_cls="airflow.triggers.file.FileTrigger",
  56. trigger_kwargs={},
  57. next_method="execute_complete",
  58. next_kwargs=None,
  59. timeout=None,
  60. )
  61. start_from_trigger = False
  62. def __init__(
  63. self,
  64. *,
  65. filepath,
  66. fs_conn_id="fs_default",
  67. recursive=False,
  68. deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
  69. start_from_trigger: bool = False,
  70. trigger_kwargs: dict[str, Any] | None = None,
  71. **kwargs,
  72. ):
  73. super().__init__(**kwargs)
  74. self.filepath = filepath
  75. self.fs_conn_id = fs_conn_id
  76. self.recursive = recursive
  77. self.deferrable = deferrable
  78. self.start_from_trigger = start_from_trigger
  79. if self.deferrable and self.start_from_trigger:
  80. self.start_trigger_args.timeout = datetime.timedelta(seconds=self.timeout)
  81. self.start_trigger_args.trigger_kwargs = dict(
  82. filepath=self.path,
  83. recursive=self.recursive,
  84. poke_interval=self.poke_interval,
  85. )
  86. @cached_property
  87. def path(self) -> str:
  88. hook = FSHook(self.fs_conn_id)
  89. basepath = hook.get_path()
  90. full_path = os.path.join(basepath, self.filepath)
  91. return full_path
  92. def poke(self, context: Context) -> bool:
  93. self.log.info("Poking for file %s", self.path)
  94. for path in glob(self.path, recursive=self.recursive):
  95. if os.path.isfile(path):
  96. mod_time = datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime("%Y%m%d%H%M%S")
  97. self.log.info("Found File %s last modified: %s", path, mod_time)
  98. return True
  99. for _, _, files in os.walk(path):
  100. if files:
  101. return True
  102. return False
  103. def execute(self, context: Context) -> None:
  104. if not self.deferrable:
  105. super().execute(context=context)
  106. if not self.poke(context=context):
  107. self.defer(
  108. timeout=datetime.timedelta(seconds=self.timeout),
  109. trigger=FileTrigger(
  110. filepath=self.path,
  111. recursive=self.recursive,
  112. poke_interval=self.poke_interval,
  113. ),
  114. method_name="execute_complete",
  115. )
  116. def execute_complete(self, context: Context, event: bool | None = None) -> None:
  117. if not event:
  118. raise AirflowException("%s task failed as %s not found.", self.task_id, self.filepath)
  119. self.log.info("%s completed successfully as %s found.", self.task_id, self.filepath)