| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- import contextlib
- from logging import getLogger
- from logging.config import fileConfig
- from alembic import context
- from airflow import settings
- from airflow.providers.fab.auth_manager.models.db import FABDBManager
- # this is the Alembic Config object, which provides
- # access to the values within the .ini file in use.
- config = context.config
- version_table = FABDBManager.version_table_name
- # Interpret the config file for Python logging.
- # This line sets up loggers basically.
- if not getLogger().handlers and config.config_file_name:
- fileConfig(config.config_file_name, disable_existing_loggers=False)
- # add your model's MetaData object here
- # for 'autogenerate' support
- # from myapp import mymodel
- # target_metadata = mymodel.Base.metadata
- target_metadata = FABDBManager.metadata
- # other values from the config, defined by the needs of env.py,
- # can be acquired:
- # my_important_option = config.get_main_option("my_important_option")
- # ... etc.
- def include_object(_, name, type_, *args):
- if type_ == "table" and name not in target_metadata.tables:
- return False
- return True
- def run_migrations_offline():
- """
- Run migrations in 'offline' mode.
- This configures the context with just a URL
- and not an Engine, though an Engine is acceptable
- here as well. By skipping the Engine creation
- we don't even need a DBAPI to be available.
- Calls to context.execute() here emit the given string to the
- script output.
- """
- context.configure(
- url=settings.SQL_ALCHEMY_CONN,
- target_metadata=target_metadata,
- literal_binds=True,
- compare_type=True,
- compare_server_default=True,
- render_as_batch=True,
- version_table=version_table,
- include_object=include_object,
- )
- with context.begin_transaction():
- context.run_migrations()
- def run_migrations_online():
- """
- Run migrations in 'online' mode.
- In this scenario we need to create an Engine
- and associate a connection with the context.
- """
- def process_revision_directives(context, revision, directives):
- if getattr(config.cmd_opts, "autogenerate", False):
- script = directives[0]
- if script.upgrade_ops and script.upgrade_ops.is_empty():
- directives[:] = []
- print("No change detected in ORM schema, skipping revision.")
- with contextlib.ExitStack() as stack:
- connection = config.attributes.get("connection", None)
- if not connection:
- connection = stack.push(settings.engine.connect())
- context.configure(
- connection=connection,
- transaction_per_migration=True,
- target_metadata=target_metadata,
- compare_type=True,
- compare_server_default=True,
- include_object=include_object,
- render_as_batch=True,
- process_revision_directives=process_revision_directives,
- version_table=version_table,
- )
- with context.begin_transaction():
- context.run_migrations()
- if context.is_offline_mode():
- run_migrations_offline()
- else:
- run_migrations_online()
|