| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128 |
- # mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
- # mypy: no-warn-return-any, allow-any-generics
- from __future__ import annotations
- from io import StringIO
- import re
- from typing import Any
- from typing import cast
- from typing import Dict
- from typing import List
- from typing import Optional
- from typing import Tuple
- from typing import TYPE_CHECKING
- from typing import Union
- from mako.pygen import PythonPrinter
- from sqlalchemy import schema as sa_schema
- from sqlalchemy import sql
- from sqlalchemy import types as sqltypes
- from sqlalchemy.sql.elements import conv
- from sqlalchemy.sql.elements import Label
- from sqlalchemy.sql.elements import quoted_name
- from .. import util
- from ..operations import ops
- from ..util import sqla_compat
- if TYPE_CHECKING:
- from typing import Literal
- from sqlalchemy import Computed
- from sqlalchemy import Identity
- from sqlalchemy.sql.base import DialectKWArgs
- from sqlalchemy.sql.elements import ColumnElement
- from sqlalchemy.sql.elements import TextClause
- from sqlalchemy.sql.schema import CheckConstraint
- from sqlalchemy.sql.schema import Column
- from sqlalchemy.sql.schema import Constraint
- from sqlalchemy.sql.schema import FetchedValue
- from sqlalchemy.sql.schema import ForeignKey
- from sqlalchemy.sql.schema import ForeignKeyConstraint
- from sqlalchemy.sql.schema import Index
- from sqlalchemy.sql.schema import MetaData
- from sqlalchemy.sql.schema import PrimaryKeyConstraint
- from sqlalchemy.sql.schema import UniqueConstraint
- from sqlalchemy.sql.sqltypes import ARRAY
- from sqlalchemy.sql.type_api import TypeEngine
- from alembic.autogenerate.api import AutogenContext
- from alembic.config import Config
- from alembic.operations.ops import MigrationScript
- from alembic.operations.ops import ModifyTableOps
- MAX_PYTHON_ARGS = 255
- def _render_gen_name(
- autogen_context: AutogenContext,
- name: sqla_compat._ConstraintName,
- ) -> Optional[Union[quoted_name, str, _f_name]]:
- if isinstance(name, conv):
- return _f_name(_alembic_autogenerate_prefix(autogen_context), name)
- else:
- return sqla_compat.constraint_name_or_none(name)
- def _indent(text: str) -> str:
- text = re.compile(r"^", re.M).sub(" ", text).strip()
- text = re.compile(r" +$", re.M).sub("", text)
- return text
- def _render_python_into_templatevars(
- autogen_context: AutogenContext,
- migration_script: MigrationScript,
- template_args: Dict[str, Union[str, Config]],
- ) -> None:
- imports = autogen_context.imports
- for upgrade_ops, downgrade_ops in zip(
- migration_script.upgrade_ops_list, migration_script.downgrade_ops_list
- ):
- template_args[upgrade_ops.upgrade_token] = _indent(
- _render_cmd_body(upgrade_ops, autogen_context)
- )
- template_args[downgrade_ops.downgrade_token] = _indent(
- _render_cmd_body(downgrade_ops, autogen_context)
- )
- template_args["imports"] = "\n".join(sorted(imports))
- default_renderers = renderers = util.Dispatcher()
- def _render_cmd_body(
- op_container: ops.OpContainer,
- autogen_context: AutogenContext,
- ) -> str:
- buf = StringIO()
- printer = PythonPrinter(buf)
- printer.writeline(
- "# ### commands auto generated by Alembic - please adjust! ###"
- )
- has_lines = False
- for op in op_container.ops:
- lines = render_op(autogen_context, op)
- has_lines = has_lines or bool(lines)
- for line in lines:
- printer.writeline(line)
- if not has_lines:
- printer.writeline("pass")
- printer.writeline("# ### end Alembic commands ###")
- return buf.getvalue()
- def render_op(
- autogen_context: AutogenContext, op: ops.MigrateOperation
- ) -> List[str]:
- renderer = renderers.dispatch(op)
- lines = util.to_list(renderer(autogen_context, op))
- return lines
- def render_op_text(
- autogen_context: AutogenContext, op: ops.MigrateOperation
- ) -> str:
- return "\n".join(render_op(autogen_context, op))
- @renderers.dispatch_for(ops.ModifyTableOps)
- def _render_modify_table(
- autogen_context: AutogenContext, op: ModifyTableOps
- ) -> List[str]:
- opts = autogen_context.opts
- render_as_batch = opts.get("render_as_batch", False)
- if op.ops:
- lines = []
- if render_as_batch:
- with autogen_context._within_batch():
- lines.append(
- "with op.batch_alter_table(%r, schema=%r) as batch_op:"
- % (op.table_name, op.schema)
- )
- for t_op in op.ops:
- t_lines = render_op(autogen_context, t_op)
- lines.extend(t_lines)
- lines.append("")
- else:
- for t_op in op.ops:
- t_lines = render_op(autogen_context, t_op)
- lines.extend(t_lines)
- return lines
- else:
- return []
- @renderers.dispatch_for(ops.CreateTableCommentOp)
- def _render_create_table_comment(
- autogen_context: AutogenContext, op: ops.CreateTableCommentOp
- ) -> str:
- if autogen_context._has_batch:
- templ = (
- "{prefix}create_table_comment(\n"
- "{indent}{comment},\n"
- "{indent}existing_comment={existing}\n"
- ")"
- )
- else:
- templ = (
- "{prefix}create_table_comment(\n"
- "{indent}'{tname}',\n"
- "{indent}{comment},\n"
- "{indent}existing_comment={existing},\n"
- "{indent}schema={schema}\n"
- ")"
- )
- return templ.format(
- prefix=_alembic_autogenerate_prefix(autogen_context),
- tname=op.table_name,
- comment="%r" % op.comment if op.comment is not None else None,
- existing=(
- "%r" % op.existing_comment
- if op.existing_comment is not None
- else None
- ),
- schema="'%s'" % op.schema if op.schema is not None else None,
- indent=" ",
- )
- @renderers.dispatch_for(ops.DropTableCommentOp)
- def _render_drop_table_comment(
- autogen_context: AutogenContext, op: ops.DropTableCommentOp
- ) -> str:
- if autogen_context._has_batch:
- templ = (
- "{prefix}drop_table_comment(\n"
- "{indent}existing_comment={existing}\n"
- ")"
- )
- else:
- templ = (
- "{prefix}drop_table_comment(\n"
- "{indent}'{tname}',\n"
- "{indent}existing_comment={existing},\n"
- "{indent}schema={schema}\n"
- ")"
- )
- return templ.format(
- prefix=_alembic_autogenerate_prefix(autogen_context),
- tname=op.table_name,
- existing=(
- "%r" % op.existing_comment
- if op.existing_comment is not None
- else None
- ),
- schema="'%s'" % op.schema if op.schema is not None else None,
- indent=" ",
- )
- @renderers.dispatch_for(ops.CreateTableOp)
- def _add_table(autogen_context: AutogenContext, op: ops.CreateTableOp) -> str:
- table = op.to_table()
- args = [
- col
- for col in [
- _render_column(col, autogen_context) for col in table.columns
- ]
- if col
- ] + sorted(
- [
- rcons
- for rcons in [
- _render_constraint(
- cons, autogen_context, op._namespace_metadata
- )
- for cons in table.constraints
- ]
- if rcons is not None
- ]
- )
- if len(args) > MAX_PYTHON_ARGS:
- args_str = "*[" + ",\n".join(args) + "]"
- else:
- args_str = ",\n".join(args)
- text = "%(prefix)screate_table(%(tablename)r,\n%(args)s" % {
- "tablename": _ident(op.table_name),
- "prefix": _alembic_autogenerate_prefix(autogen_context),
- "args": args_str,
- }
- if op.schema:
- text += ",\nschema=%r" % _ident(op.schema)
- comment = table.comment
- if comment:
- text += ",\ncomment=%r" % _ident(comment)
- info = table.info
- if info:
- text += f",\ninfo={info!r}"
- for k in sorted(op.kw):
- text += ",\n%s=%r" % (k.replace(" ", "_"), op.kw[k])
- if table._prefixes:
- prefixes = ", ".join("'%s'" % p for p in table._prefixes)
- text += ",\nprefixes=[%s]" % prefixes
- if op.if_not_exists is not None:
- text += ",\nif_not_exists=%r" % bool(op.if_not_exists)
- text += "\n)"
- return text
- @renderers.dispatch_for(ops.DropTableOp)
- def _drop_table(autogen_context: AutogenContext, op: ops.DropTableOp) -> str:
- text = "%(prefix)sdrop_table(%(tname)r" % {
- "prefix": _alembic_autogenerate_prefix(autogen_context),
- "tname": _ident(op.table_name),
- }
- if op.schema:
- text += ", schema=%r" % _ident(op.schema)
- if op.if_exists is not None:
- text += ", if_exists=%r" % bool(op.if_exists)
- text += ")"
- return text
- def _render_dialect_kwargs_items(
- autogen_context: AutogenContext, item: DialectKWArgs
- ) -> list[str]:
- return [
- f"{key}={_render_potential_expr(val, autogen_context)}"
- for key, val in item.dialect_kwargs.items()
- ]
- @renderers.dispatch_for(ops.CreateIndexOp)
- def _add_index(autogen_context: AutogenContext, op: ops.CreateIndexOp) -> str:
- index = op.to_index()
- has_batch = autogen_context._has_batch
- if has_batch:
- tmpl = (
- "%(prefix)screate_index(%(name)r, [%(columns)s], "
- "unique=%(unique)r%(kwargs)s)"
- )
- else:
- tmpl = (
- "%(prefix)screate_index(%(name)r, %(table)r, [%(columns)s], "
- "unique=%(unique)r%(schema)s%(kwargs)s)"
- )
- assert index.table is not None
- opts = _render_dialect_kwargs_items(autogen_context, index)
- if op.if_not_exists is not None:
- opts.append("if_not_exists=%r" % bool(op.if_not_exists))
- text = tmpl % {
- "prefix": _alembic_autogenerate_prefix(autogen_context),
- "name": _render_gen_name(autogen_context, index.name),
- "table": _ident(index.table.name),
- "columns": ", ".join(
- _get_index_rendered_expressions(index, autogen_context)
- ),
- "unique": index.unique or False,
- "schema": (
- (", schema=%r" % _ident(index.table.schema))
- if index.table.schema
- else ""
- ),
- "kwargs": ", " + ", ".join(opts) if opts else "",
- }
- return text
- @renderers.dispatch_for(ops.DropIndexOp)
- def _drop_index(autogen_context: AutogenContext, op: ops.DropIndexOp) -> str:
- index = op.to_index()
- has_batch = autogen_context._has_batch
- if has_batch:
- tmpl = "%(prefix)sdrop_index(%(name)r%(kwargs)s)"
- else:
- tmpl = (
- "%(prefix)sdrop_index(%(name)r, "
- "table_name=%(table_name)r%(schema)s%(kwargs)s)"
- )
- opts = _render_dialect_kwargs_items(autogen_context, index)
- if op.if_exists is not None:
- opts.append("if_exists=%r" % bool(op.if_exists))
- text = tmpl % {
- "prefix": _alembic_autogenerate_prefix(autogen_context),
- "name": _render_gen_name(autogen_context, op.index_name),
- "table_name": _ident(op.table_name),
- "schema": ((", schema=%r" % _ident(op.schema)) if op.schema else ""),
- "kwargs": ", " + ", ".join(opts) if opts else "",
- }
- return text
- @renderers.dispatch_for(ops.CreateUniqueConstraintOp)
- def _add_unique_constraint(
- autogen_context: AutogenContext, op: ops.CreateUniqueConstraintOp
- ) -> List[str]:
- return [_uq_constraint(op.to_constraint(), autogen_context, True)]
- @renderers.dispatch_for(ops.CreateForeignKeyOp)
- def _add_fk_constraint(
- autogen_context: AutogenContext, op: ops.CreateForeignKeyOp
- ) -> str:
- args = [repr(_render_gen_name(autogen_context, op.constraint_name))]
- if not autogen_context._has_batch:
- args.append(repr(_ident(op.source_table)))
- args.extend(
- [
- repr(_ident(op.referent_table)),
- repr([_ident(col) for col in op.local_cols]),
- repr([_ident(col) for col in op.remote_cols]),
- ]
- )
- kwargs = [
- "referent_schema",
- "onupdate",
- "ondelete",
- "initially",
- "deferrable",
- "use_alter",
- "match",
- ]
- if not autogen_context._has_batch:
- kwargs.insert(0, "source_schema")
- for k in kwargs:
- if k in op.kw:
- value = op.kw[k]
- if value is not None:
- args.append("%s=%r" % (k, value))
- return "%(prefix)screate_foreign_key(%(args)s)" % {
- "prefix": _alembic_autogenerate_prefix(autogen_context),
- "args": ", ".join(args),
- }
- @renderers.dispatch_for(ops.CreatePrimaryKeyOp)
- def _add_pk_constraint(constraint, autogen_context):
- raise NotImplementedError()
- @renderers.dispatch_for(ops.CreateCheckConstraintOp)
- def _add_check_constraint(constraint, autogen_context):
- raise NotImplementedError()
- @renderers.dispatch_for(ops.DropConstraintOp)
- def _drop_constraint(
- autogen_context: AutogenContext, op: ops.DropConstraintOp
- ) -> str:
- prefix = _alembic_autogenerate_prefix(autogen_context)
- name = _render_gen_name(autogen_context, op.constraint_name)
- schema = _ident(op.schema) if op.schema else None
- type_ = _ident(op.constraint_type) if op.constraint_type else None
- params_strs = []
- params_strs.append(repr(name))
- if not autogen_context._has_batch:
- params_strs.append(repr(_ident(op.table_name)))
- if schema is not None:
- params_strs.append(f"schema={schema!r}")
- if type_ is not None:
- params_strs.append(f"type_={type_!r}")
- return f"{prefix}drop_constraint({', '.join(params_strs)})"
- @renderers.dispatch_for(ops.AddColumnOp)
- def _add_column(autogen_context: AutogenContext, op: ops.AddColumnOp) -> str:
- schema, tname, column = op.schema, op.table_name, op.column
- if autogen_context._has_batch:
- template = "%(prefix)sadd_column(%(column)s)"
- else:
- template = "%(prefix)sadd_column(%(tname)r, %(column)s"
- if schema:
- template += ", schema=%(schema)r"
- template += ")"
- text = template % {
- "prefix": _alembic_autogenerate_prefix(autogen_context),
- "tname": tname,
- "column": _render_column(column, autogen_context),
- "schema": schema,
- }
- return text
- @renderers.dispatch_for(ops.DropColumnOp)
- def _drop_column(autogen_context: AutogenContext, op: ops.DropColumnOp) -> str:
- schema, tname, column_name = op.schema, op.table_name, op.column_name
- if autogen_context._has_batch:
- template = "%(prefix)sdrop_column(%(cname)r)"
- else:
- template = "%(prefix)sdrop_column(%(tname)r, %(cname)r"
- if schema:
- template += ", schema=%(schema)r"
- template += ")"
- text = template % {
- "prefix": _alembic_autogenerate_prefix(autogen_context),
- "tname": _ident(tname),
- "cname": _ident(column_name),
- "schema": _ident(schema),
- }
- return text
- @renderers.dispatch_for(ops.AlterColumnOp)
- def _alter_column(
- autogen_context: AutogenContext, op: ops.AlterColumnOp
- ) -> str:
- tname = op.table_name
- cname = op.column_name
- server_default = op.modify_server_default
- type_ = op.modify_type
- nullable = op.modify_nullable
- comment = op.modify_comment
- newname = op.modify_name
- autoincrement = op.kw.get("autoincrement", None)
- existing_type = op.existing_type
- existing_nullable = op.existing_nullable
- existing_comment = op.existing_comment
- existing_server_default = op.existing_server_default
- schema = op.schema
- indent = " " * 11
- if autogen_context._has_batch:
- template = "%(prefix)salter_column(%(cname)r"
- else:
- template = "%(prefix)salter_column(%(tname)r, %(cname)r"
- text = template % {
- "prefix": _alembic_autogenerate_prefix(autogen_context),
- "tname": tname,
- "cname": cname,
- }
- if existing_type is not None:
- text += ",\n%sexisting_type=%s" % (
- indent,
- _repr_type(existing_type, autogen_context),
- )
- if server_default is not False:
- rendered = _render_server_default(server_default, autogen_context)
- text += ",\n%sserver_default=%s" % (indent, rendered)
- if newname is not None:
- text += ",\n%snew_column_name=%r" % (indent, newname)
- if type_ is not None:
- text += ",\n%stype_=%s" % (indent, _repr_type(type_, autogen_context))
- if nullable is not None:
- text += ",\n%snullable=%r" % (indent, nullable)
- if comment is not False:
- text += ",\n%scomment=%r" % (indent, comment)
- if existing_comment is not None:
- text += ",\n%sexisting_comment=%r" % (indent, existing_comment)
- if nullable is None and existing_nullable is not None:
- text += ",\n%sexisting_nullable=%r" % (indent, existing_nullable)
- if autoincrement is not None:
- text += ",\n%sautoincrement=%r" % (indent, autoincrement)
- if server_default is False and existing_server_default:
- rendered = _render_server_default(
- existing_server_default, autogen_context
- )
- text += ",\n%sexisting_server_default=%s" % (indent, rendered)
- if schema and not autogen_context._has_batch:
- text += ",\n%sschema=%r" % (indent, schema)
- text += ")"
- return text
- class _f_name:
- def __init__(self, prefix: str, name: conv) -> None:
- self.prefix = prefix
- self.name = name
- def __repr__(self) -> str:
- return "%sf(%r)" % (self.prefix, _ident(self.name))
- def _ident(name: Optional[Union[quoted_name, str]]) -> Optional[str]:
- """produce a __repr__() object for a string identifier that may
- use quoted_name() in SQLAlchemy 0.9 and greater.
- The issue worked around here is that quoted_name() doesn't have
- very good repr() behavior by itself when unicode is involved.
- """
- if name is None:
- return name
- elif isinstance(name, quoted_name):
- return str(name)
- elif isinstance(name, str):
- return name
- def _render_potential_expr(
- value: Any,
- autogen_context: AutogenContext,
- *,
- wrap_in_element: bool = True,
- is_server_default: bool = False,
- is_index: bool = False,
- ) -> str:
- if isinstance(value, sql.ClauseElement):
- sql_text = autogen_context.migration_context.impl.render_ddl_sql_expr(
- value, is_server_default=is_server_default, is_index=is_index
- )
- if wrap_in_element:
- prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
- element = "literal_column" if is_index else "text"
- value_str = f"{prefix}{element}({sql_text!r})"
- if (
- is_index
- and isinstance(value, Label)
- and type(value.name) is str
- ):
- return value_str + f".label({value.name!r})"
- else:
- return value_str
- else:
- return repr(sql_text)
- else:
- return repr(value)
- def _get_index_rendered_expressions(
- idx: Index, autogen_context: AutogenContext
- ) -> List[str]:
- return [
- (
- repr(_ident(getattr(exp, "name", None)))
- if isinstance(exp, sa_schema.Column)
- else _render_potential_expr(exp, autogen_context, is_index=True)
- )
- for exp in idx.expressions
- ]
- def _uq_constraint(
- constraint: UniqueConstraint,
- autogen_context: AutogenContext,
- alter: bool,
- ) -> str:
- opts: List[Tuple[str, Any]] = []
- has_batch = autogen_context._has_batch
- if constraint.deferrable:
- opts.append(("deferrable", constraint.deferrable))
- if constraint.initially:
- opts.append(("initially", constraint.initially))
- if not has_batch and alter and constraint.table.schema:
- opts.append(("schema", _ident(constraint.table.schema)))
- if not alter and constraint.name:
- opts.append(
- ("name", _render_gen_name(autogen_context, constraint.name))
- )
- dialect_options = _render_dialect_kwargs_items(autogen_context, constraint)
- if alter:
- args = [repr(_render_gen_name(autogen_context, constraint.name))]
- if not has_batch:
- args += [repr(_ident(constraint.table.name))]
- args.append(repr([_ident(col.name) for col in constraint.columns]))
- args.extend(["%s=%r" % (k, v) for k, v in opts])
- args.extend(dialect_options)
- return "%(prefix)screate_unique_constraint(%(args)s)" % {
- "prefix": _alembic_autogenerate_prefix(autogen_context),
- "args": ", ".join(args),
- }
- else:
- args = [repr(_ident(col.name)) for col in constraint.columns]
- args.extend(["%s=%r" % (k, v) for k, v in opts])
- args.extend(dialect_options)
- return "%(prefix)sUniqueConstraint(%(args)s)" % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "args": ", ".join(args),
- }
- def _user_autogenerate_prefix(autogen_context, target):
- prefix = autogen_context.opts["user_module_prefix"]
- if prefix is None:
- return "%s." % target.__module__
- else:
- return prefix
- def _sqlalchemy_autogenerate_prefix(autogen_context: AutogenContext) -> str:
- return autogen_context.opts["sqlalchemy_module_prefix"] or ""
- def _alembic_autogenerate_prefix(autogen_context: AutogenContext) -> str:
- if autogen_context._has_batch:
- return "batch_op."
- else:
- return autogen_context.opts["alembic_module_prefix"] or ""
- def _user_defined_render(
- type_: str, object_: Any, autogen_context: AutogenContext
- ) -> Union[str, Literal[False]]:
- if "render_item" in autogen_context.opts:
- render = autogen_context.opts["render_item"]
- if render:
- rendered = render(type_, object_, autogen_context)
- if rendered is not False:
- return rendered
- return False
- def _render_column(
- column: Column[Any], autogen_context: AutogenContext
- ) -> str:
- rendered = _user_defined_render("column", column, autogen_context)
- if rendered is not False:
- return rendered
- args: List[str] = []
- opts: List[Tuple[str, Any]] = []
- if column.server_default:
- rendered = _render_server_default( # type:ignore[assignment]
- column.server_default, autogen_context
- )
- if rendered:
- if _should_render_server_default_positionally(
- column.server_default
- ):
- args.append(rendered)
- else:
- opts.append(("server_default", rendered))
- if (
- column.autoincrement is not None
- and column.autoincrement != sqla_compat.AUTOINCREMENT_DEFAULT
- ):
- opts.append(("autoincrement", column.autoincrement))
- if column.nullable is not None:
- opts.append(("nullable", column.nullable))
- if column.system:
- opts.append(("system", column.system))
- comment = column.comment
- if comment:
- opts.append(("comment", "%r" % comment))
- # TODO: for non-ascii colname, assign a "key"
- return "%(prefix)sColumn(%(name)r, %(type)s, %(args)s%(kwargs)s)" % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "name": _ident(column.name),
- "type": _repr_type(column.type, autogen_context),
- "args": ", ".join([str(arg) for arg in args]) + ", " if args else "",
- "kwargs": (
- ", ".join(
- ["%s=%s" % (kwname, val) for kwname, val in opts]
- + [
- "%s=%s"
- % (key, _render_potential_expr(val, autogen_context))
- for key, val in column.kwargs.items()
- ]
- )
- ),
- }
- def _should_render_server_default_positionally(server_default: Any) -> bool:
- return sqla_compat._server_default_is_computed(
- server_default
- ) or sqla_compat._server_default_is_identity(server_default)
- def _render_server_default(
- default: Optional[
- Union[FetchedValue, str, TextClause, ColumnElement[Any]]
- ],
- autogen_context: AutogenContext,
- repr_: bool = True,
- ) -> Optional[str]:
- rendered = _user_defined_render("server_default", default, autogen_context)
- if rendered is not False:
- return rendered
- if sqla_compat._server_default_is_computed(default):
- return _render_computed(cast("Computed", default), autogen_context)
- elif sqla_compat._server_default_is_identity(default):
- return _render_identity(cast("Identity", default), autogen_context)
- elif isinstance(default, sa_schema.DefaultClause):
- if isinstance(default.arg, str):
- default = default.arg
- else:
- return _render_potential_expr(
- default.arg, autogen_context, is_server_default=True
- )
- if isinstance(default, str) and repr_:
- default = repr(re.sub(r"^'|'$", "", default))
- return cast(str, default)
- def _render_computed(
- computed: Computed, autogen_context: AutogenContext
- ) -> str:
- text = _render_potential_expr(
- computed.sqltext, autogen_context, wrap_in_element=False
- )
- kwargs = {}
- if computed.persisted is not None:
- kwargs["persisted"] = computed.persisted
- return "%(prefix)sComputed(%(text)s, %(kwargs)s)" % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "text": text,
- "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
- }
- def _render_identity(
- identity: Identity, autogen_context: AutogenContext
- ) -> str:
- kwargs = sqla_compat._get_identity_options_dict(
- identity, dialect_kwargs=True
- )
- return "%(prefix)sIdentity(%(kwargs)s)" % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
- }
- def _repr_type(
- type_: TypeEngine,
- autogen_context: AutogenContext,
- _skip_variants: bool = False,
- ) -> str:
- rendered = _user_defined_render("type", type_, autogen_context)
- if rendered is not False:
- return rendered
- if hasattr(autogen_context.migration_context, "impl"):
- impl_rt = autogen_context.migration_context.impl.render_type(
- type_, autogen_context
- )
- else:
- impl_rt = None
- mod = type(type_).__module__
- imports = autogen_context.imports
- if not _skip_variants and sqla_compat._type_has_variants(type_):
- return _render_Variant_type(type_, autogen_context)
- elif mod.startswith("sqlalchemy.dialects"):
- match = re.match(r"sqlalchemy\.dialects\.(\w+)", mod)
- assert match is not None
- dname = match.group(1)
- if imports is not None:
- imports.add("from sqlalchemy.dialects import %s" % dname)
- if impl_rt:
- return impl_rt
- else:
- return "%s.%r" % (dname, type_)
- elif impl_rt:
- return impl_rt
- elif mod.startswith("sqlalchemy."):
- if "_render_%s_type" % type_.__visit_name__ in globals():
- fn = globals()["_render_%s_type" % type_.__visit_name__]
- return fn(type_, autogen_context)
- else:
- prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
- return "%s%r" % (prefix, type_)
- else:
- prefix = _user_autogenerate_prefix(autogen_context, type_)
- return "%s%r" % (prefix, type_)
- def _render_ARRAY_type(type_: ARRAY, autogen_context: AutogenContext) -> str:
- return cast(
- str,
- _render_type_w_subtype(
- type_, autogen_context, "item_type", r"(.+?\()"
- ),
- )
- def _render_Variant_type(
- type_: TypeEngine, autogen_context: AutogenContext
- ) -> str:
- base_type, variant_mapping = sqla_compat._get_variant_mapping(type_)
- base = _repr_type(base_type, autogen_context, _skip_variants=True)
- assert base is not None and base is not False # type: ignore[comparison-overlap] # noqa:E501
- for dialect in sorted(variant_mapping):
- typ = variant_mapping[dialect]
- base += ".with_variant(%s, %r)" % (
- _repr_type(typ, autogen_context, _skip_variants=True),
- dialect,
- )
- return base
- def _render_type_w_subtype(
- type_: TypeEngine,
- autogen_context: AutogenContext,
- attrname: str,
- regexp: str,
- prefix: Optional[str] = None,
- ) -> Union[Optional[str], Literal[False]]:
- outer_repr = repr(type_)
- inner_type = getattr(type_, attrname, None)
- if inner_type is None:
- return False
- inner_repr = repr(inner_type)
- inner_repr = re.sub(r"([\(\)])", r"\\\1", inner_repr)
- sub_type = _repr_type(getattr(type_, attrname), autogen_context)
- outer_type = re.sub(regexp + inner_repr, r"\1%s" % sub_type, outer_repr)
- if prefix:
- return "%s%s" % (prefix, outer_type)
- mod = type(type_).__module__
- if mod.startswith("sqlalchemy.dialects"):
- match = re.match(r"sqlalchemy\.dialects\.(\w+)", mod)
- assert match is not None
- dname = match.group(1)
- return "%s.%s" % (dname, outer_type)
- elif mod.startswith("sqlalchemy"):
- prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
- return "%s%s" % (prefix, outer_type)
- else:
- return None
- _constraint_renderers = util.Dispatcher()
- def _render_constraint(
- constraint: Constraint,
- autogen_context: AutogenContext,
- namespace_metadata: Optional[MetaData],
- ) -> Optional[str]:
- try:
- renderer = _constraint_renderers.dispatch(constraint)
- except ValueError:
- util.warn("No renderer is established for object %r" % constraint)
- return "[Unknown Python object %r]" % constraint
- else:
- return renderer(constraint, autogen_context, namespace_metadata)
- @_constraint_renderers.dispatch_for(sa_schema.PrimaryKeyConstraint)
- def _render_primary_key(
- constraint: PrimaryKeyConstraint,
- autogen_context: AutogenContext,
- namespace_metadata: Optional[MetaData],
- ) -> Optional[str]:
- rendered = _user_defined_render("primary_key", constraint, autogen_context)
- if rendered is not False:
- return rendered
- if not constraint.columns:
- return None
- opts = []
- if constraint.name:
- opts.append(
- ("name", repr(_render_gen_name(autogen_context, constraint.name)))
- )
- return "%(prefix)sPrimaryKeyConstraint(%(args)s)" % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "args": ", ".join(
- [repr(c.name) for c in constraint.columns]
- + ["%s=%s" % (kwname, val) for kwname, val in opts]
- ),
- }
- def _fk_colspec(
- fk: ForeignKey,
- metadata_schema: Optional[str],
- namespace_metadata: MetaData,
- ) -> str:
- """Implement a 'safe' version of ForeignKey._get_colspec() that
- won't fail if the remote table can't be resolved.
- """
- colspec = fk._get_colspec()
- tokens = colspec.split(".")
- tname, colname = tokens[-2:]
- if metadata_schema is not None and len(tokens) == 2:
- table_fullname = "%s.%s" % (metadata_schema, tname)
- else:
- table_fullname = ".".join(tokens[0:-1])
- if (
- not fk.link_to_name
- and fk.parent is not None
- and fk.parent.table is not None
- ):
- # try to resolve the remote table in order to adjust for column.key.
- # the FK constraint needs to be rendered in terms of the column
- # name.
- if table_fullname in namespace_metadata.tables:
- col = namespace_metadata.tables[table_fullname].c.get(colname)
- if col is not None:
- colname = _ident(col.name) # type: ignore[assignment]
- colspec = "%s.%s" % (table_fullname, colname)
- return colspec
- def _populate_render_fk_opts(
- constraint: ForeignKeyConstraint, opts: List[Tuple[str, str]]
- ) -> None:
- if constraint.onupdate:
- opts.append(("onupdate", repr(constraint.onupdate)))
- if constraint.ondelete:
- opts.append(("ondelete", repr(constraint.ondelete)))
- if constraint.initially:
- opts.append(("initially", repr(constraint.initially)))
- if constraint.deferrable:
- opts.append(("deferrable", repr(constraint.deferrable)))
- if constraint.use_alter:
- opts.append(("use_alter", repr(constraint.use_alter)))
- if constraint.match:
- opts.append(("match", repr(constraint.match)))
- @_constraint_renderers.dispatch_for(sa_schema.ForeignKeyConstraint)
- def _render_foreign_key(
- constraint: ForeignKeyConstraint,
- autogen_context: AutogenContext,
- namespace_metadata: MetaData,
- ) -> Optional[str]:
- rendered = _user_defined_render("foreign_key", constraint, autogen_context)
- if rendered is not False:
- return rendered
- opts = []
- if constraint.name:
- opts.append(
- ("name", repr(_render_gen_name(autogen_context, constraint.name)))
- )
- _populate_render_fk_opts(constraint, opts)
- apply_metadata_schema = namespace_metadata.schema
- return (
- "%(prefix)sForeignKeyConstraint([%(cols)s], "
- "[%(refcols)s], %(args)s)"
- % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "cols": ", ".join(
- repr(_ident(f.parent.name)) for f in constraint.elements
- ),
- "refcols": ", ".join(
- repr(_fk_colspec(f, apply_metadata_schema, namespace_metadata))
- for f in constraint.elements
- ),
- "args": ", ".join(
- ["%s=%s" % (kwname, val) for kwname, val in opts]
- ),
- }
- )
- @_constraint_renderers.dispatch_for(sa_schema.UniqueConstraint)
- def _render_unique_constraint(
- constraint: UniqueConstraint,
- autogen_context: AutogenContext,
- namespace_metadata: Optional[MetaData],
- ) -> str:
- rendered = _user_defined_render("unique", constraint, autogen_context)
- if rendered is not False:
- return rendered
- return _uq_constraint(constraint, autogen_context, False)
- @_constraint_renderers.dispatch_for(sa_schema.CheckConstraint)
- def _render_check_constraint(
- constraint: CheckConstraint,
- autogen_context: AutogenContext,
- namespace_metadata: Optional[MetaData],
- ) -> Optional[str]:
- rendered = _user_defined_render("check", constraint, autogen_context)
- if rendered is not False:
- return rendered
- # detect the constraint being part of
- # a parent type which is probably in the Table already.
- # ideally SQLAlchemy would give us more of a first class
- # way to detect this.
- if (
- constraint._create_rule
- and hasattr(constraint._create_rule, "target")
- and isinstance(
- constraint._create_rule.target,
- sqltypes.TypeEngine,
- )
- ):
- return None
- opts = []
- if constraint.name:
- opts.append(
- ("name", repr(_render_gen_name(autogen_context, constraint.name)))
- )
- return "%(prefix)sCheckConstraint(%(sqltext)s%(opts)s)" % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "opts": (
- ", " + (", ".join("%s=%s" % (k, v) for k, v in opts))
- if opts
- else ""
- ),
- "sqltext": _render_potential_expr(
- constraint.sqltext, autogen_context, wrap_in_element=False
- ),
- }
- @renderers.dispatch_for(ops.ExecuteSQLOp)
- def _execute_sql(autogen_context: AutogenContext, op: ops.ExecuteSQLOp) -> str:
- if not isinstance(op.sqltext, str):
- raise NotImplementedError(
- "Autogenerate rendering of SQL Expression language constructs "
- "not supported here; please use a plain SQL string"
- )
- return "op.execute(%r)" % op.sqltext
- renderers = default_renderers.branch()
|