db_command.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Database sub-commands."""
  18. from __future__ import annotations
  19. import logging
  20. import os
  21. import textwrap
  22. import warnings
  23. from tempfile import NamedTemporaryFile
  24. from typing import TYPE_CHECKING
  25. from packaging.version import InvalidVersion, parse as parse_version
  26. from tenacity import Retrying, stop_after_attempt, wait_fixed
  27. from airflow import settings
  28. from airflow.api_internal.internal_api_call import InternalApiConfig
  29. from airflow.exceptions import AirflowException
  30. from airflow.utils import cli as cli_utils, db
  31. from airflow.utils.db import _REVISION_HEADS_MAP
  32. from airflow.utils.db_cleanup import config_dict, drop_archived_tables, export_archived_records, run_cleanup
  33. from airflow.utils.process_utils import execute_interactive
  34. from airflow.utils.providers_configuration_loader import providers_configuration_loaded
  35. if TYPE_CHECKING:
  36. from tenacity import RetryCallState
  37. log = logging.getLogger(__name__)
  38. @providers_configuration_loaded
  39. def initdb(args):
  40. """Initialize the metadata database."""
  41. warnings.warn(
  42. "`db init` is deprecated. Use `db migrate` instead to migrate the db and/or "
  43. "airflow connections create-default-connections to create the default connections",
  44. DeprecationWarning,
  45. stacklevel=2,
  46. )
  47. print(f"DB: {settings.engine.url!r}")
  48. db.initdb()
  49. print("Initialization done")
  50. @providers_configuration_loaded
  51. def resetdb(args):
  52. """Reset the metadata database."""
  53. print(f"DB: {settings.engine.url!r}")
  54. if not (args.yes or input("This will drop existing tables if they exist. Proceed? (y/n)").upper() == "Y"):
  55. raise SystemExit("Cancelled")
  56. db.resetdb(skip_init=args.skip_init, use_migration_files=args.use_migration_files)
  57. def upgradedb(args):
  58. """Upgrades the metadata database."""
  59. warnings.warn("`db upgrade` is deprecated. Use `db migrate` instead.", DeprecationWarning, stacklevel=2)
  60. migratedb(args)
  61. def get_version_revision(version: str, recursion_limit=10) -> str | None:
  62. """
  63. Recursively search for the revision of the given version.
  64. This searches REVISION_HEADS_MAP for the revision of the given version, recursively
  65. searching for the previous version if the given version is not found.
  66. """
  67. if version in _REVISION_HEADS_MAP:
  68. return _REVISION_HEADS_MAP[version]
  69. try:
  70. major, minor, patch = map(int, version.split("."))
  71. except ValueError:
  72. return None
  73. new_version = f"{major}.{minor}.{patch - 1}"
  74. recursion_limit -= 1
  75. if recursion_limit <= 0:
  76. # Prevent infinite recursion as I can't imagine 10 successive versions without migration
  77. return None
  78. return get_version_revision(new_version, recursion_limit)
  79. @cli_utils.action_cli(check_db=False)
  80. @providers_configuration_loaded
  81. def migratedb(args):
  82. """Migrates the metadata database."""
  83. print(f"DB: {settings.engine.url!r}")
  84. if args.to_revision and args.to_version:
  85. raise SystemExit("Cannot supply both `--to-revision` and `--to-version`.")
  86. if args.from_version and args.from_revision:
  87. raise SystemExit("Cannot supply both `--from-revision` and `--from-version`")
  88. if (args.from_revision or args.from_version) and not args.show_sql_only:
  89. raise SystemExit(
  90. "Args `--from-revision` and `--from-version` may only be used with `--show-sql-only`"
  91. )
  92. to_revision = None
  93. from_revision = None
  94. if args.from_revision:
  95. from_revision = args.from_revision
  96. elif args.from_version:
  97. try:
  98. parsed_version = parse_version(args.from_version)
  99. except InvalidVersion:
  100. raise SystemExit(f"Invalid version {args.from_version!r} supplied as `--from-version`.")
  101. if parsed_version < parse_version("2.0.0"):
  102. raise SystemExit("--from-version must be greater or equal to than 2.0.0")
  103. from_revision = get_version_revision(args.from_version)
  104. if not from_revision:
  105. raise SystemExit(f"Unknown version {args.from_version!r} supplied as `--from-version`.")
  106. if args.to_version:
  107. try:
  108. parse_version(args.to_version)
  109. except InvalidVersion:
  110. raise SystemExit(f"Invalid version {args.to_version!r} supplied as `--to-version`.")
  111. to_revision = get_version_revision(args.to_version)
  112. if not to_revision:
  113. raise SystemExit(f"Unknown version {args.to_version!r} supplied as `--to-version`.")
  114. elif args.to_revision:
  115. to_revision = args.to_revision
  116. if not args.show_sql_only:
  117. print(f"Performing upgrade to the metadata database {settings.engine.url!r}")
  118. else:
  119. print("Generating sql for upgrade -- upgrade commands will *not* be submitted.")
  120. db.upgradedb(
  121. to_revision=to_revision,
  122. from_revision=from_revision,
  123. show_sql_only=args.show_sql_only,
  124. reserialize_dags=args.reserialize_dags,
  125. use_migration_files=args.use_migration_files,
  126. )
  127. if not args.show_sql_only:
  128. print("Database migrating done!")
  129. @cli_utils.action_cli(check_db=False)
  130. @providers_configuration_loaded
  131. def downgrade(args):
  132. """Downgrades the metadata database."""
  133. if args.to_revision and args.to_version:
  134. raise SystemExit("Cannot supply both `--to-revision` and `--to-version`.")
  135. if args.from_version and args.from_revision:
  136. raise SystemExit("`--from-revision` may not be combined with `--from-version`")
  137. if (args.from_revision or args.from_version) and not args.show_sql_only:
  138. raise SystemExit(
  139. "Args `--from-revision` and `--from-version` may only be used with `--show-sql-only`"
  140. )
  141. if not (args.to_version or args.to_revision):
  142. raise SystemExit("Must provide either --to-revision or --to-version.")
  143. from_revision = None
  144. if args.from_revision:
  145. from_revision = args.from_revision
  146. elif args.from_version:
  147. from_revision = get_version_revision(args.from_version)
  148. if not from_revision:
  149. raise SystemExit(f"Unknown version {args.from_version!r} supplied as `--from-version`.")
  150. if args.to_version:
  151. to_revision = get_version_revision(args.to_version)
  152. if not to_revision:
  153. raise SystemExit(f"Downgrading to version {args.to_version} is not supported.")
  154. elif args.to_revision:
  155. to_revision = args.to_revision
  156. if not args.show_sql_only:
  157. print(f"Performing downgrade with database {settings.engine.url!r}")
  158. else:
  159. print("Generating sql for downgrade -- downgrade commands will *not* be submitted.")
  160. if args.show_sql_only or (
  161. args.yes
  162. or input(
  163. "\nWarning: About to reverse schema migrations for the airflow metastore. "
  164. "Please ensure you have backed up your database before any upgrade or "
  165. "downgrade operation. Proceed? (y/n)\n"
  166. ).upper()
  167. == "Y"
  168. ):
  169. db.downgrade(to_revision=to_revision, from_revision=from_revision, show_sql_only=args.show_sql_only)
  170. if not args.show_sql_only:
  171. print("Downgrade complete")
  172. else:
  173. raise SystemExit("Cancelled")
  174. @providers_configuration_loaded
  175. def check_migrations(args):
  176. """Wait for all airflow migrations to complete. Used for launching airflow in k8s."""
  177. db.check_migrations(timeout=args.migration_wait_timeout)
  178. @cli_utils.action_cli(check_db=False)
  179. @providers_configuration_loaded
  180. def shell(args):
  181. """Run a shell that allows to access metadata database."""
  182. url = settings.engine.url
  183. print(f"DB: {url!r}")
  184. if url.get_backend_name() == "mysql":
  185. with NamedTemporaryFile(suffix="my.cnf") as f:
  186. content = textwrap.dedent(
  187. f"""
  188. [client]
  189. host = {url.host}
  190. user = {url.username}
  191. password = {url.password or ""}
  192. port = {url.port or "3306"}
  193. database = {url.database}
  194. """
  195. ).strip()
  196. f.write(content.encode())
  197. f.flush()
  198. execute_interactive(["mysql", f"--defaults-extra-file={f.name}"])
  199. elif url.get_backend_name() == "sqlite":
  200. execute_interactive(["sqlite3", url.database])
  201. elif url.get_backend_name() == "postgresql":
  202. env = os.environ.copy()
  203. env["PGHOST"] = url.host or ""
  204. env["PGPORT"] = str(url.port or "5432")
  205. env["PGUSER"] = url.username or ""
  206. # PostgreSQL does not allow the use of PGPASSFILE if the current user is root.
  207. env["PGPASSWORD"] = url.password or ""
  208. env["PGDATABASE"] = url.database
  209. execute_interactive(["psql"], env=env)
  210. else:
  211. raise AirflowException(f"Unknown driver: {url.drivername}")
  212. @cli_utils.action_cli(check_db=False)
  213. @providers_configuration_loaded
  214. def check(args):
  215. """Run a check command that checks if db is available."""
  216. if InternalApiConfig.get_use_internal_api():
  217. return
  218. retries: int = args.retry
  219. retry_delay: int = args.retry_delay
  220. def _warn_remaining_retries(retrystate: RetryCallState):
  221. remain = retries - retrystate.attempt_number
  222. log.warning("%d retries remain. Will retry in %d seconds", remain, retry_delay)
  223. for attempt in Retrying(
  224. stop=stop_after_attempt(1 + retries),
  225. wait=wait_fixed(retry_delay),
  226. reraise=True,
  227. before_sleep=_warn_remaining_retries,
  228. ):
  229. with attempt:
  230. db.check()
  231. # lazily imported by CLI parser for `help` command
  232. all_tables = sorted(config_dict)
  233. @cli_utils.action_cli(check_db=False)
  234. @providers_configuration_loaded
  235. def cleanup_tables(args):
  236. """Purges old records in metadata database."""
  237. run_cleanup(
  238. table_names=args.tables,
  239. dry_run=args.dry_run,
  240. clean_before_timestamp=args.clean_before_timestamp,
  241. verbose=args.verbose,
  242. confirm=not args.yes,
  243. skip_archive=args.skip_archive,
  244. )
  245. @cli_utils.action_cli(check_db=False)
  246. @providers_configuration_loaded
  247. def export_archived(args):
  248. """Export archived records from metadata database."""
  249. export_archived_records(
  250. export_format=args.export_format,
  251. output_path=args.output_path,
  252. table_names=args.tables,
  253. drop_archives=args.drop_archives,
  254. needs_confirm=not args.yes,
  255. )
  256. @cli_utils.action_cli(check_db=False)
  257. @providers_configuration_loaded
  258. def drop_archived(args):
  259. """Drop archived tables from metadata database."""
  260. drop_archived_tables(
  261. table_names=args.tables,
  262. needs_confirm=not args.yes,
  263. )