# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """Config sub-commands.""" from __future__ import annotations import locale import logging import os import platform import subprocess import sys from enum import Enum from urllib.parse import urlsplit, urlunsplit import httpx import tenacity from airflow import configuration from airflow.cli.simple_table import AirflowConsole from airflow.providers_manager import ProvidersManager from airflow.typing_compat import Protocol from airflow.utils.cli import suppress_logs_and_warning from airflow.utils.platform import getuser from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.version import version as airflow_version log = logging.getLogger(__name__) class Anonymizer(Protocol): """Anonymizer protocol.""" def process_path(self, value) -> str: """Remove pii from paths.""" def process_username(self, value) -> str: """Remove pii from username.""" def process_url(self, value) -> str: """Remove pii from URL.""" class NullAnonymizer(Anonymizer): """Do nothing.""" def _identity(self, value) -> str: return value process_path = process_username = process_url = _identity del _identity class PiiAnonymizer(Anonymizer): """Remove personally identifiable info from path.""" def __init__(self): home_path = os.path.expanduser("~") username = getuser() self._path_replacements = {home_path: "${HOME}", username: "${USER}"} def process_path(self, value) -> str: if not value: return value for src, target in self._path_replacements.items(): value = value.replace(src, target) return value def process_username(self, value) -> str: if not value: return value return f"{value[0]}...{value[-1]}" def process_url(self, value) -> str: if not value: return value url_parts = urlsplit(value) netloc = None if url_parts.netloc: # unpack userinfo = None username = None password = None if "@" in url_parts.netloc: userinfo, _, host = url_parts.netloc.partition("@") else: host = url_parts.netloc if userinfo: if ":" in userinfo: username, _, password = userinfo.partition(":") else: username = userinfo # anonymize username = self.process_username(username) if username else None password = "PASSWORD" if password else None # pack if username and password and host: netloc = f"{username}:{password}@{host}" elif username and host: netloc = f"{username}@{host}" elif password and host: netloc = f":{password}@{host}" elif host: netloc = host else: netloc = "" return urlunsplit((url_parts.scheme, netloc, url_parts.path, url_parts.query, url_parts.fragment)) class OperatingSystem(Enum): """Operating system.""" WINDOWS = "Windows" LINUX = "Linux" MACOSX = "Mac OS" CYGWIN = "Cygwin" UNKNOWN = "Unknown" @staticmethod def get_current() -> OperatingSystem: """Get current operating system.""" if os.name == "nt": return OperatingSystem.WINDOWS elif "linux" in sys.platform: return OperatingSystem.LINUX elif "darwin" in sys.platform: return OperatingSystem.MACOSX elif "cygwin" in sys.platform: return OperatingSystem.CYGWIN return OperatingSystem.UNKNOWN class Architecture(Enum): """Compute architecture.""" X86_64 = "x86_64" X86 = "x86" PPC = "ppc" ARM = "arm" UNKNOWN = "unknown" @staticmethod def get_current() -> Architecture: """Get architecture.""" current_architecture = _MACHINE_TO_ARCHITECTURE.get(platform.machine().lower()) return current_architecture or Architecture.UNKNOWN _MACHINE_TO_ARCHITECTURE: dict[str, Architecture] = { "amd64": Architecture.X86_64, "x86_64": Architecture.X86_64, "i686-64": Architecture.X86_64, "i386": Architecture.X86, "i686": Architecture.X86, "x86": Architecture.X86, "ia64": Architecture.X86, # Itanium is different x64 arch, treat it as the common x86. "powerpc": Architecture.PPC, "power macintosh": Architecture.PPC, "ppc64": Architecture.PPC, "armv6": Architecture.ARM, "armv6l": Architecture.ARM, "arm64": Architecture.ARM, "armv7": Architecture.ARM, "armv7l": Architecture.ARM, "aarch64": Architecture.ARM, } class AirflowInfo: """Renders information about Airflow instance.""" def __init__(self, anonymizer): self.anonymizer = anonymizer @staticmethod def _get_version(cmd: list[str], grep: bytes | None = None): """Return tools version.""" try: with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as proc: stdoutdata, _ = proc.communicate() data = [f for f in stdoutdata.split(b"\n") if f] if grep: data = [line for line in data if grep in line] if len(data) != 1: return "NOT AVAILABLE" else: return data[0].decode() except OSError: return "NOT AVAILABLE" @staticmethod def _task_logging_handler(): """Return task logging handler.""" def get_fullname(o): module = o.__class__.__module__ if module is None or module == str.__class__.__module__: return o.__class__.__name__ # Avoid reporting __builtin__ else: return f"{module}.{o.__class__.__name__}" try: handler_names = [get_fullname(handler) for handler in logging.getLogger("airflow.task").handlers] return ", ".join(handler_names) except Exception: return "NOT AVAILABLE" @property def _airflow_info(self): executor = configuration.conf.get("core", "executor") sql_alchemy_conn = self.anonymizer.process_url( configuration.conf.get("database", "SQL_ALCHEMY_CONN", fallback="NOT AVAILABLE") ) dags_folder = self.anonymizer.process_path( configuration.conf.get("core", "dags_folder", fallback="NOT AVAILABLE") ) plugins_folder = self.anonymizer.process_path( configuration.conf.get("core", "plugins_folder", fallback="NOT AVAILABLE") ) base_log_folder = self.anonymizer.process_path( configuration.conf.get("logging", "base_log_folder", fallback="NOT AVAILABLE") ) remote_base_log_folder = self.anonymizer.process_path( configuration.conf.get("logging", "remote_base_log_folder", fallback="NOT AVAILABLE") ) return [ ("version", airflow_version), ("executor", executor), ("task_logging_handler", self._task_logging_handler()), ("sql_alchemy_conn", sql_alchemy_conn), ("dags_folder", dags_folder), ("plugins_folder", plugins_folder), ("base_log_folder", base_log_folder), ("remote_base_log_folder", remote_base_log_folder), ] @property def _system_info(self): operating_system = OperatingSystem.get_current() arch = Architecture.get_current() uname = platform.uname() _locale = locale.getlocale() python_location = self.anonymizer.process_path(sys.executable) python_version = sys.version.replace("\n", " ") return [ ("OS", operating_system.value), ("architecture", arch.value), ("uname", str(uname)), ("locale", str(_locale)), ("python_version", python_version), ("python_location", python_location), ] @property def _tools_info(self): git_version = self._get_version(["git", "--version"]) ssh_version = self._get_version(["ssh", "-V"]) kubectl_version = self._get_version(["kubectl", "version", "--short=True", "--client=True"]) gcloud_version = self._get_version(["gcloud", "version"], grep=b"Google Cloud SDK") cloud_sql_proxy_version = self._get_version(["cloud_sql_proxy", "--version"]) mysql_version = self._get_version(["mysql", "--version"]) sqlite3_version = self._get_version(["sqlite3", "--version"]) psql_version = self._get_version(["psql", "--version"]) return [ ("git", git_version), ("ssh", ssh_version), ("kubectl", kubectl_version), ("gcloud", gcloud_version), ("cloud_sql_proxy", cloud_sql_proxy_version), ("mysql", mysql_version), ("sqlite3", sqlite3_version), ("psql", psql_version), ] @property def _paths_info(self): system_path = os.environ.get("PATH", "").split(os.pathsep) airflow_home = self.anonymizer.process_path(configuration.get_airflow_home()) system_path = [self.anonymizer.process_path(p) for p in system_path] python_path = [self.anonymizer.process_path(p) for p in sys.path] airflow_on_path = any(os.path.exists(os.path.join(path_elem, "airflow")) for path_elem in system_path) return [ ("airflow_home", airflow_home), ("system_path", os.pathsep.join(system_path)), ("python_path", os.pathsep.join(python_path)), ("airflow_on_path", str(airflow_on_path)), ] @property def _providers_info(self): return [(p.data["package-name"], p.version) for p in ProvidersManager().providers.values()] def show(self, output: str, console: AirflowConsole | None = None) -> None: """Show information about Airflow instance.""" all_info = { "Apache Airflow": self._airflow_info, "System info": self._system_info, "Tools info": self._tools_info, "Paths info": self._paths_info, "Providers info": self._providers_info, } console = console or AirflowConsole(show_header=False) if output in ("table", "plain"): # Show each info as table with key, value column for key, info in all_info.items(): console.print(f"\n[bold][green]{key}[/bold][/green]", highlight=False) console.print_as(data=[{"key": k, "value": v} for k, v in info], output=output) else: # Render info in given format, change keys to snake_case console.print_as( data=[{k.lower().replace(" ", "_"): dict(v)} for k, v in all_info.items()], output=output ) def render_text(self, output: str) -> str: """Export the info to string.""" console = AirflowConsole(record=True) with console.capture(): self.show(output=output, console=console) return console.export_text() class FileIoException(Exception): """Raises when error happens in FileIo.io integration.""" @tenacity.retry( stop=tenacity.stop_after_attempt(5), wait=tenacity.wait_exponential(multiplier=1, max=10), retry=tenacity.retry_if_exception_type(FileIoException), before=tenacity.before_log(log, logging.DEBUG), after=tenacity.after_log(log, logging.DEBUG), ) def _upload_text_to_fileio(content): """Upload text file to File.io service and return link.""" resp = httpx.post("https://file.io", content=content) if resp.status_code not in [200, 201]: print(resp.json()) raise FileIoException("Failed to send report to file.io service.") try: return resp.json()["link"] except ValueError as e: log.debug(e) raise FileIoException("Failed to send report to file.io service.") def _send_report_to_fileio(info): print("Uploading report to file.io service.") try: link = _upload_text_to_fileio(str(info)) print("Report uploaded.") print(link) print() except FileIoException as ex: print(str(ex)) @suppress_logs_and_warning @providers_configuration_loaded def show_info(args): """Show information related to Airflow, system and other.""" # Enforce anonymization, when file_io upload is tuned on. anonymizer = PiiAnonymizer() if args.anonymize or args.file_io else NullAnonymizer() info = AirflowInfo(anonymizer) if args.file_io: _send_report_to_fileio(info.render_text(args.output)) else: info.show(args.output)