123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- #
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- import os
- from subprocess import PIPE, STDOUT, Popen
- from tempfile import NamedTemporaryFile, TemporaryDirectory, gettempdir
- from typing import TYPE_CHECKING, Sequence
- from airflow.exceptions import AirflowFailException
- from airflow.sensors.base import BaseSensorOperator
- if TYPE_CHECKING:
- from airflow.utils.context import Context
- class BashSensor(BaseSensorOperator):
- """
- Executes a bash command/script.
- Return True if and only if the return code is 0.
- :param bash_command: The command, set of commands or reference to a
- bash script (must be '.sh') to be executed.
- :param env: If env is not None, it must be a mapping that defines the
- environment variables for the new process; these are used instead
- of inheriting the current process environment, which is the default
- behavior. (templated)
- :param output_encoding: output encoding of bash command.
- :param retry_exit_code: If task exits with this code, treat the sensor
- as not-yet-complete and retry the check later according to the
- usual retry/timeout settings. Any other non-zero return code will
- be treated as an error, and cause the sensor to fail. If set to
- ``None`` (the default), any non-zero exit code will cause a retry
- and the task will never raise an error except on time-out.
- .. seealso::
- For more information on how to use this sensor,take a look at the guide:
- :ref:`howto/operator:BashSensor`
- """
- template_fields: Sequence[str] = ("bash_command", "env")
- def __init__(
- self, *, bash_command, env=None, output_encoding="utf-8", retry_exit_code: int | None = None, **kwargs
- ):
- super().__init__(**kwargs)
- self.bash_command = bash_command
- self.env = env
- self.output_encoding = output_encoding
- self.retry_exit_code = retry_exit_code
- def poke(self, context: Context):
- """Execute the bash command in a temporary directory."""
- bash_command = self.bash_command
- self.log.info("Tmp dir root location: %s", gettempdir())
- with TemporaryDirectory(prefix="airflowtmp") as tmp_dir, NamedTemporaryFile(
- dir=tmp_dir, prefix=self.task_id
- ) as f:
- f.write(bytes(bash_command, "utf_8"))
- f.flush()
- fname = f.name
- script_location = tmp_dir + "/" + fname
- self.log.info("Temporary script location: %s", script_location)
- self.log.info("Running command: %s", bash_command)
- with Popen(
- ["bash", fname],
- stdout=PIPE,
- stderr=STDOUT,
- close_fds=True,
- cwd=tmp_dir,
- env=self.env,
- preexec_fn=os.setsid,
- ) as resp:
- if resp.stdout:
- self.log.info("Output:")
- for line in iter(resp.stdout.readline, b""):
- self.log.info(line.decode(self.output_encoding).strip())
- resp.wait()
- self.log.info("Command exited with return code %s", resp.returncode)
- # zero code means success, the sensor can go green
- if resp.returncode == 0:
- return True
- # we have a retry exit code, sensor retries if return code matches, otherwise error
- elif self.retry_exit_code is not None:
- if resp.returncode == self.retry_exit_code:
- self.log.info("Return code matches retry code, will retry later")
- return False
- else:
- raise AirflowFailException(f"Command exited with return code {resp.returncode}")
- # backwards compatibility: sensor retries no matter the error code
- else:
- self.log.info("Non-zero return code and no retry code set, will retry later")
- return False
|