bash.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import os
  20. from subprocess import PIPE, STDOUT, Popen
  21. from tempfile import NamedTemporaryFile, TemporaryDirectory, gettempdir
  22. from typing import TYPE_CHECKING, Sequence
  23. from airflow.exceptions import AirflowFailException
  24. from airflow.sensors.base import BaseSensorOperator
  25. if TYPE_CHECKING:
  26. from airflow.utils.context import Context
  27. class BashSensor(BaseSensorOperator):
  28. """
  29. Executes a bash command/script.
  30. Return True if and only if the return code is 0.
  31. :param bash_command: The command, set of commands or reference to a
  32. bash script (must be '.sh') to be executed.
  33. :param env: If env is not None, it must be a mapping that defines the
  34. environment variables for the new process; these are used instead
  35. of inheriting the current process environment, which is the default
  36. behavior. (templated)
  37. :param output_encoding: output encoding of bash command.
  38. :param retry_exit_code: If task exits with this code, treat the sensor
  39. as not-yet-complete and retry the check later according to the
  40. usual retry/timeout settings. Any other non-zero return code will
  41. be treated as an error, and cause the sensor to fail. If set to
  42. ``None`` (the default), any non-zero exit code will cause a retry
  43. and the task will never raise an error except on time-out.
  44. .. seealso::
  45. For more information on how to use this sensor,take a look at the guide:
  46. :ref:`howto/operator:BashSensor`
  47. """
  48. template_fields: Sequence[str] = ("bash_command", "env")
  49. def __init__(
  50. self, *, bash_command, env=None, output_encoding="utf-8", retry_exit_code: int | None = None, **kwargs
  51. ):
  52. super().__init__(**kwargs)
  53. self.bash_command = bash_command
  54. self.env = env
  55. self.output_encoding = output_encoding
  56. self.retry_exit_code = retry_exit_code
  57. def poke(self, context: Context):
  58. """Execute the bash command in a temporary directory."""
  59. bash_command = self.bash_command
  60. self.log.info("Tmp dir root location: %s", gettempdir())
  61. with TemporaryDirectory(prefix="airflowtmp") as tmp_dir, NamedTemporaryFile(
  62. dir=tmp_dir, prefix=self.task_id
  63. ) as f:
  64. f.write(bytes(bash_command, "utf_8"))
  65. f.flush()
  66. fname = f.name
  67. script_location = tmp_dir + "/" + fname
  68. self.log.info("Temporary script location: %s", script_location)
  69. self.log.info("Running command: %s", bash_command)
  70. with Popen(
  71. ["bash", fname],
  72. stdout=PIPE,
  73. stderr=STDOUT,
  74. close_fds=True,
  75. cwd=tmp_dir,
  76. env=self.env,
  77. preexec_fn=os.setsid,
  78. ) as resp:
  79. if resp.stdout:
  80. self.log.info("Output:")
  81. for line in iter(resp.stdout.readline, b""):
  82. self.log.info(line.decode(self.output_encoding).strip())
  83. resp.wait()
  84. self.log.info("Command exited with return code %s", resp.returncode)
  85. # zero code means success, the sensor can go green
  86. if resp.returncode == 0:
  87. return True
  88. # we have a retry exit code, sensor retries if return code matches, otherwise error
  89. elif self.retry_exit_code is not None:
  90. if resp.returncode == self.retry_exit_code:
  91. self.log.info("Return code matches retry code, will retry later")
  92. return False
  93. else:
  94. raise AirflowFailException(f"Command exited with return code {resp.returncode}")
  95. # backwards compatibility: sensor retries no matter the error code
  96. else:
  97. self.log.info("Non-zero return code and no retry code set, will retry later")
  98. return False