info_command.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Config sub-commands."""
  18. from __future__ import annotations
  19. import locale
  20. import logging
  21. import os
  22. import platform
  23. import subprocess
  24. import sys
  25. from enum import Enum
  26. from urllib.parse import urlsplit, urlunsplit
  27. import httpx
  28. import tenacity
  29. from airflow import configuration
  30. from airflow.cli.simple_table import AirflowConsole
  31. from airflow.providers_manager import ProvidersManager
  32. from airflow.typing_compat import Protocol
  33. from airflow.utils.cli import suppress_logs_and_warning
  34. from airflow.utils.platform import getuser
  35. from airflow.utils.providers_configuration_loader import providers_configuration_loaded
  36. from airflow.version import version as airflow_version
  37. log = logging.getLogger(__name__)
  38. class Anonymizer(Protocol):
  39. """Anonymizer protocol."""
  40. def process_path(self, value) -> str:
  41. """Remove pii from paths."""
  42. def process_username(self, value) -> str:
  43. """Remove pii from username."""
  44. def process_url(self, value) -> str:
  45. """Remove pii from URL."""
  46. class NullAnonymizer(Anonymizer):
  47. """Do nothing."""
  48. def _identity(self, value) -> str:
  49. return value
  50. process_path = process_username = process_url = _identity
  51. del _identity
  52. class PiiAnonymizer(Anonymizer):
  53. """Remove personally identifiable info from path."""
  54. def __init__(self):
  55. home_path = os.path.expanduser("~")
  56. username = getuser()
  57. self._path_replacements = {home_path: "${HOME}", username: "${USER}"}
  58. def process_path(self, value) -> str:
  59. if not value:
  60. return value
  61. for src, target in self._path_replacements.items():
  62. value = value.replace(src, target)
  63. return value
  64. def process_username(self, value) -> str:
  65. if not value:
  66. return value
  67. return f"{value[0]}...{value[-1]}"
  68. def process_url(self, value) -> str:
  69. if not value:
  70. return value
  71. url_parts = urlsplit(value)
  72. netloc = None
  73. if url_parts.netloc:
  74. # unpack
  75. userinfo = None
  76. username = None
  77. password = None
  78. if "@" in url_parts.netloc:
  79. userinfo, _, host = url_parts.netloc.partition("@")
  80. else:
  81. host = url_parts.netloc
  82. if userinfo:
  83. if ":" in userinfo:
  84. username, _, password = userinfo.partition(":")
  85. else:
  86. username = userinfo
  87. # anonymize
  88. username = self.process_username(username) if username else None
  89. password = "PASSWORD" if password else None
  90. # pack
  91. if username and password and host:
  92. netloc = f"{username}:{password}@{host}"
  93. elif username and host:
  94. netloc = f"{username}@{host}"
  95. elif password and host:
  96. netloc = f":{password}@{host}"
  97. elif host:
  98. netloc = host
  99. else:
  100. netloc = ""
  101. return urlunsplit((url_parts.scheme, netloc, url_parts.path, url_parts.query, url_parts.fragment))
  102. class OperatingSystem(Enum):
  103. """Operating system."""
  104. WINDOWS = "Windows"
  105. LINUX = "Linux"
  106. MACOSX = "Mac OS"
  107. CYGWIN = "Cygwin"
  108. UNKNOWN = "Unknown"
  109. @staticmethod
  110. def get_current() -> OperatingSystem:
  111. """Get current operating system."""
  112. if os.name == "nt":
  113. return OperatingSystem.WINDOWS
  114. elif "linux" in sys.platform:
  115. return OperatingSystem.LINUX
  116. elif "darwin" in sys.platform:
  117. return OperatingSystem.MACOSX
  118. elif "cygwin" in sys.platform:
  119. return OperatingSystem.CYGWIN
  120. return OperatingSystem.UNKNOWN
  121. class Architecture(Enum):
  122. """Compute architecture."""
  123. X86_64 = "x86_64"
  124. X86 = "x86"
  125. PPC = "ppc"
  126. ARM = "arm"
  127. UNKNOWN = "unknown"
  128. @staticmethod
  129. def get_current() -> Architecture:
  130. """Get architecture."""
  131. current_architecture = _MACHINE_TO_ARCHITECTURE.get(platform.machine().lower())
  132. return current_architecture or Architecture.UNKNOWN
  133. _MACHINE_TO_ARCHITECTURE: dict[str, Architecture] = {
  134. "amd64": Architecture.X86_64,
  135. "x86_64": Architecture.X86_64,
  136. "i686-64": Architecture.X86_64,
  137. "i386": Architecture.X86,
  138. "i686": Architecture.X86,
  139. "x86": Architecture.X86,
  140. "ia64": Architecture.X86, # Itanium is different x64 arch, treat it as the common x86.
  141. "powerpc": Architecture.PPC,
  142. "power macintosh": Architecture.PPC,
  143. "ppc64": Architecture.PPC,
  144. "armv6": Architecture.ARM,
  145. "armv6l": Architecture.ARM,
  146. "arm64": Architecture.ARM,
  147. "armv7": Architecture.ARM,
  148. "armv7l": Architecture.ARM,
  149. "aarch64": Architecture.ARM,
  150. }
  151. class AirflowInfo:
  152. """Renders information about Airflow instance."""
  153. def __init__(self, anonymizer):
  154. self.anonymizer = anonymizer
  155. @staticmethod
  156. def _get_version(cmd: list[str], grep: bytes | None = None):
  157. """Return tools version."""
  158. try:
  159. with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as proc:
  160. stdoutdata, _ = proc.communicate()
  161. data = [f for f in stdoutdata.split(b"\n") if f]
  162. if grep:
  163. data = [line for line in data if grep in line]
  164. if len(data) != 1:
  165. return "NOT AVAILABLE"
  166. else:
  167. return data[0].decode()
  168. except OSError:
  169. return "NOT AVAILABLE"
  170. @staticmethod
  171. def _task_logging_handler():
  172. """Return task logging handler."""
  173. def get_fullname(o):
  174. module = o.__class__.__module__
  175. if module is None or module == str.__class__.__module__:
  176. return o.__class__.__name__ # Avoid reporting __builtin__
  177. else:
  178. return f"{module}.{o.__class__.__name__}"
  179. try:
  180. handler_names = [get_fullname(handler) for handler in logging.getLogger("airflow.task").handlers]
  181. return ", ".join(handler_names)
  182. except Exception:
  183. return "NOT AVAILABLE"
  184. @property
  185. def _airflow_info(self):
  186. executor = configuration.conf.get("core", "executor")
  187. sql_alchemy_conn = self.anonymizer.process_url(
  188. configuration.conf.get("database", "SQL_ALCHEMY_CONN", fallback="NOT AVAILABLE")
  189. )
  190. dags_folder = self.anonymizer.process_path(
  191. configuration.conf.get("core", "dags_folder", fallback="NOT AVAILABLE")
  192. )
  193. plugins_folder = self.anonymizer.process_path(
  194. configuration.conf.get("core", "plugins_folder", fallback="NOT AVAILABLE")
  195. )
  196. base_log_folder = self.anonymizer.process_path(
  197. configuration.conf.get("logging", "base_log_folder", fallback="NOT AVAILABLE")
  198. )
  199. remote_base_log_folder = self.anonymizer.process_path(
  200. configuration.conf.get("logging", "remote_base_log_folder", fallback="NOT AVAILABLE")
  201. )
  202. return [
  203. ("version", airflow_version),
  204. ("executor", executor),
  205. ("task_logging_handler", self._task_logging_handler()),
  206. ("sql_alchemy_conn", sql_alchemy_conn),
  207. ("dags_folder", dags_folder),
  208. ("plugins_folder", plugins_folder),
  209. ("base_log_folder", base_log_folder),
  210. ("remote_base_log_folder", remote_base_log_folder),
  211. ]
  212. @property
  213. def _system_info(self):
  214. operating_system = OperatingSystem.get_current()
  215. arch = Architecture.get_current()
  216. uname = platform.uname()
  217. _locale = locale.getlocale()
  218. python_location = self.anonymizer.process_path(sys.executable)
  219. python_version = sys.version.replace("\n", " ")
  220. return [
  221. ("OS", operating_system.value),
  222. ("architecture", arch.value),
  223. ("uname", str(uname)),
  224. ("locale", str(_locale)),
  225. ("python_version", python_version),
  226. ("python_location", python_location),
  227. ]
  228. @property
  229. def _tools_info(self):
  230. git_version = self._get_version(["git", "--version"])
  231. ssh_version = self._get_version(["ssh", "-V"])
  232. kubectl_version = self._get_version(["kubectl", "version", "--short=True", "--client=True"])
  233. gcloud_version = self._get_version(["gcloud", "version"], grep=b"Google Cloud SDK")
  234. cloud_sql_proxy_version = self._get_version(["cloud_sql_proxy", "--version"])
  235. mysql_version = self._get_version(["mysql", "--version"])
  236. sqlite3_version = self._get_version(["sqlite3", "--version"])
  237. psql_version = self._get_version(["psql", "--version"])
  238. return [
  239. ("git", git_version),
  240. ("ssh", ssh_version),
  241. ("kubectl", kubectl_version),
  242. ("gcloud", gcloud_version),
  243. ("cloud_sql_proxy", cloud_sql_proxy_version),
  244. ("mysql", mysql_version),
  245. ("sqlite3", sqlite3_version),
  246. ("psql", psql_version),
  247. ]
  248. @property
  249. def _paths_info(self):
  250. system_path = os.environ.get("PATH", "").split(os.pathsep)
  251. airflow_home = self.anonymizer.process_path(configuration.get_airflow_home())
  252. system_path = [self.anonymizer.process_path(p) for p in system_path]
  253. python_path = [self.anonymizer.process_path(p) for p in sys.path]
  254. airflow_on_path = any(os.path.exists(os.path.join(path_elem, "airflow")) for path_elem in system_path)
  255. return [
  256. ("airflow_home", airflow_home),
  257. ("system_path", os.pathsep.join(system_path)),
  258. ("python_path", os.pathsep.join(python_path)),
  259. ("airflow_on_path", str(airflow_on_path)),
  260. ]
  261. @property
  262. def _providers_info(self):
  263. return [(p.data["package-name"], p.version) for p in ProvidersManager().providers.values()]
  264. def show(self, output: str, console: AirflowConsole | None = None) -> None:
  265. """Show information about Airflow instance."""
  266. all_info = {
  267. "Apache Airflow": self._airflow_info,
  268. "System info": self._system_info,
  269. "Tools info": self._tools_info,
  270. "Paths info": self._paths_info,
  271. "Providers info": self._providers_info,
  272. }
  273. console = console or AirflowConsole(show_header=False)
  274. if output in ("table", "plain"):
  275. # Show each info as table with key, value column
  276. for key, info in all_info.items():
  277. console.print(f"\n[bold][green]{key}[/bold][/green]", highlight=False)
  278. console.print_as(data=[{"key": k, "value": v} for k, v in info], output=output)
  279. else:
  280. # Render info in given format, change keys to snake_case
  281. console.print_as(
  282. data=[{k.lower().replace(" ", "_"): dict(v)} for k, v in all_info.items()], output=output
  283. )
  284. def render_text(self, output: str) -> str:
  285. """Export the info to string."""
  286. console = AirflowConsole(record=True)
  287. with console.capture():
  288. self.show(output=output, console=console)
  289. return console.export_text()
  290. class FileIoException(Exception):
  291. """Raises when error happens in FileIo.io integration."""
  292. @tenacity.retry(
  293. stop=tenacity.stop_after_attempt(5),
  294. wait=tenacity.wait_exponential(multiplier=1, max=10),
  295. retry=tenacity.retry_if_exception_type(FileIoException),
  296. before=tenacity.before_log(log, logging.DEBUG),
  297. after=tenacity.after_log(log, logging.DEBUG),
  298. )
  299. def _upload_text_to_fileio(content):
  300. """Upload text file to File.io service and return link."""
  301. resp = httpx.post("https://file.io", content=content)
  302. if resp.status_code not in [200, 201]:
  303. print(resp.json())
  304. raise FileIoException("Failed to send report to file.io service.")
  305. try:
  306. return resp.json()["link"]
  307. except ValueError as e:
  308. log.debug(e)
  309. raise FileIoException("Failed to send report to file.io service.")
  310. def _send_report_to_fileio(info):
  311. print("Uploading report to file.io service.")
  312. try:
  313. link = _upload_text_to_fileio(str(info))
  314. print("Report uploaded.")
  315. print(link)
  316. print()
  317. except FileIoException as ex:
  318. print(str(ex))
  319. @suppress_logs_and_warning
  320. @providers_configuration_loaded
  321. def show_info(args):
  322. """Show information related to Airflow, system and other."""
  323. # Enforce anonymization, when file_io upload is tuned on.
  324. anonymizer = PiiAnonymizer() if args.anonymize or args.file_io else NullAnonymizer()
  325. info = AirflowInfo(anonymizer)
  326. if args.file_io:
  327. _send_report_to_fileio(info.render_text(args.output))
  328. else:
  329. info.show(args.output)