render.py 35 KB


  1. # mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
  2. # mypy: no-warn-return-any, allow-any-generics
  3. from __future__ import annotations
  4. from io import StringIO
  5. import re
  6. from typing import Any
  7. from typing import cast
  8. from typing import Dict
  9. from typing import List
  10. from typing import Optional
  11. from typing import Tuple
  12. from typing import TYPE_CHECKING
  13. from typing import Union
  14. from mako.pygen import PythonPrinter
  15. from sqlalchemy import schema as sa_schema
  16. from sqlalchemy import sql
  17. from sqlalchemy import types as sqltypes
  18. from sqlalchemy.sql.elements import conv
  19. from sqlalchemy.sql.elements import Label
  20. from sqlalchemy.sql.elements import quoted_name
  21. from .. import util
  22. from ..operations import ops
  23. from ..util import sqla_compat
  24. if TYPE_CHECKING:
  25. from typing import Literal
  26. from sqlalchemy import Computed
  27. from sqlalchemy import Identity
  28. from sqlalchemy.sql.base import DialectKWArgs
  29. from sqlalchemy.sql.elements import ColumnElement
  30. from sqlalchemy.sql.elements import TextClause
  31. from sqlalchemy.sql.schema import CheckConstraint
  32. from sqlalchemy.sql.schema import Column
  33. from sqlalchemy.sql.schema import Constraint
  34. from sqlalchemy.sql.schema import FetchedValue
  35. from sqlalchemy.sql.schema import ForeignKey
  36. from sqlalchemy.sql.schema import ForeignKeyConstraint
  37. from sqlalchemy.sql.schema import Index
  38. from sqlalchemy.sql.schema import MetaData
  39. from sqlalchemy.sql.schema import PrimaryKeyConstraint
  40. from sqlalchemy.sql.schema import UniqueConstraint
  41. from sqlalchemy.sql.sqltypes import ARRAY
  42. from sqlalchemy.sql.type_api import TypeEngine
  43. from alembic.autogenerate.api import AutogenContext
  44. from alembic.config import Config
  45. from alembic.operations.ops import MigrationScript
  46. from alembic.operations.ops import ModifyTableOps
  47. MAX_PYTHON_ARGS = 255
  48. def _render_gen_name(
  49. autogen_context: AutogenContext,
  50. name: sqla_compat._ConstraintName,
  51. ) -> Optional[Union[quoted_name, str, _f_name]]:
  52. if isinstance(name, conv):
  53. return _f_name(_alembic_autogenerate_prefix(autogen_context), name)
  54. else:
  55. return sqla_compat.constraint_name_or_none(name)
  56. def _indent(text: str) -> str:
  57. text = re.compile(r"^", re.M).sub(" ", text).strip()
  58. text = re.compile(r" +$", re.M).sub("", text)
  59. return text
  60. def _render_python_into_templatevars(
  61. autogen_context: AutogenContext,
  62. migration_script: MigrationScript,
  63. template_args: Dict[str, Union[str, Config]],
  64. ) -> None:
  65. imports = autogen_context.imports
  66. for upgrade_ops, downgrade_ops in zip(
  67. migration_script.upgrade_ops_list, migration_script.downgrade_ops_list
  68. ):
  69. template_args[upgrade_ops.upgrade_token] = _indent(
  70. _render_cmd_body(upgrade_ops, autogen_context)
  71. )
  72. template_args[downgrade_ops.downgrade_token] = _indent(
  73. _render_cmd_body(downgrade_ops, autogen_context)
  74. )
  75. template_args["imports"] = "\n".join(sorted(imports))
  76. default_renderers = renderers = util.Dispatcher()
  77. def _render_cmd_body(
  78. op_container: ops.OpContainer,
  79. autogen_context: AutogenContext,
  80. ) -> str:
  81. buf = StringIO()
  82. printer = PythonPrinter(buf)
  83. printer.writeline(
  84. "# ### commands auto generated by Alembic - please adjust! ###"
  85. )
  86. has_lines = False
  87. for op in op_container.ops:
  88. lines = render_op(autogen_context, op)
  89. has_lines = has_lines or bool(lines)
  90. for line in lines:
  91. printer.writeline(line)
  92. if not has_lines:
  93. printer.writeline("pass")
  94. printer.writeline("# ### end Alembic commands ###")
  95. return buf.getvalue()
  96. def render_op(
  97. autogen_context: AutogenContext, op: ops.MigrateOperation
  98. ) -> List[str]:
  99. renderer = renderers.dispatch(op)
  100. lines = util.to_list(renderer(autogen_context, op))
  101. return lines
  102. def render_op_text(
  103. autogen_context: AutogenContext, op: ops.MigrateOperation
  104. ) -> str:
  105. return "\n".join(render_op(autogen_context, op))
  106. @renderers.dispatch_for(ops.ModifyTableOps)
  107. def _render_modify_table(
  108. autogen_context: AutogenContext, op: ModifyTableOps
  109. ) -> List[str]:
  110. opts = autogen_context.opts
  111. render_as_batch = opts.get("render_as_batch", False)
  112. if op.ops:
  113. lines = []
  114. if render_as_batch:
  115. with autogen_context._within_batch():
  116. lines.append(
  117. "with op.batch_alter_table(%r, schema=%r) as batch_op:"
  118. % (op.table_name, op.schema)
  119. )
  120. for t_op in op.ops:
  121. t_lines = render_op(autogen_context, t_op)
  122. lines.extend(t_lines)
  123. lines.append("")
  124. else:
  125. for t_op in op.ops:
  126. t_lines = render_op(autogen_context, t_op)
  127. lines.extend(t_lines)
  128. return lines
  129. else:
  130. return []
  131. @renderers.dispatch_for(ops.CreateTableCommentOp)
  132. def _render_create_table_comment(
  133. autogen_context: AutogenContext, op: ops.CreateTableCommentOp
  134. ) -> str:
  135. if autogen_context._has_batch:
  136. templ = (
  137. "{prefix}create_table_comment(\n"
  138. "{indent}{comment},\n"
  139. "{indent}existing_comment={existing}\n"
  140. ")"
  141. )
  142. else:
  143. templ = (
  144. "{prefix}create_table_comment(\n"
  145. "{indent}'{tname}',\n"
  146. "{indent}{comment},\n"
  147. "{indent}existing_comment={existing},\n"
  148. "{indent}schema={schema}\n"
  149. ")"
  150. )
  151. return templ.format(
  152. prefix=_alembic_autogenerate_prefix(autogen_context),
  153. tname=op.table_name,
  154. comment="%r" % op.comment if op.comment is not None else None,
  155. existing=(
  156. "%r" % op.existing_comment
  157. if op.existing_comment is not None
  158. else None
  159. ),
  160. schema="'%s'" % op.schema if op.schema is not None else None,
  161. indent=" ",
  162. )
  163. @renderers.dispatch_for(ops.DropTableCommentOp)
  164. def _render_drop_table_comment(
  165. autogen_context: AutogenContext, op: ops.DropTableCommentOp
  166. ) -> str:
  167. if autogen_context._has_batch:
  168. templ = (
  169. "{prefix}drop_table_comment(\n"
  170. "{indent}existing_comment={existing}\n"
  171. ")"
  172. )
  173. else:
  174. templ = (
  175. "{prefix}drop_table_comment(\n"
  176. "{indent}'{tname}',\n"
  177. "{indent}existing_comment={existing},\n"
  178. "{indent}schema={schema}\n"
  179. ")"
  180. )
  181. return templ.format(
  182. prefix=_alembic_autogenerate_prefix(autogen_context),
  183. tname=op.table_name,
  184. existing=(
  185. "%r" % op.existing_comment
  186. if op.existing_comment is not None
  187. else None
  188. ),
  189. schema="'%s'" % op.schema if op.schema is not None else None,
  190. indent=" ",
  191. )
  192. @renderers.dispatch_for(ops.CreateTableOp)
  193. def _add_table(autogen_context: AutogenContext, op: ops.CreateTableOp) -> str:
  194. table = op.to_table()
  195. args = [
  196. col
  197. for col in [
  198. _render_column(col, autogen_context) for col in table.columns
  199. ]
  200. if col
  201. ] + sorted(
  202. [
  203. rcons
  204. for rcons in [
  205. _render_constraint(
  206. cons, autogen_context, op._namespace_metadata
  207. )
  208. for cons in table.constraints
  209. ]
  210. if rcons is not None
  211. ]
  212. )
  213. if len(args) > MAX_PYTHON_ARGS:
  214. args_str = "*[" + ",\n".join(args) + "]"
  215. else:
  216. args_str = ",\n".join(args)
  217. text = "%(prefix)screate_table(%(tablename)r,\n%(args)s" % {
  218. "tablename": _ident(op.table_name),
  219. "prefix": _alembic_autogenerate_prefix(autogen_context),
  220. "args": args_str,
  221. }
  222. if op.schema:
  223. text += ",\nschema=%r" % _ident(op.schema)
  224. comment = table.comment
  225. if comment:
  226. text += ",\ncomment=%r" % _ident(comment)
  227. info = table.info
  228. if info:
  229. text += f",\ninfo={info!r}"
  230. for k in sorted(op.kw):
  231. text += ",\n%s=%r" % (k.replace(" ", "_"), op.kw[k])
  232. if table._prefixes:
  233. prefixes = ", ".join("'%s'" % p for p in table._prefixes)
  234. text += ",\nprefixes=[%s]" % prefixes
  235. if op.if_not_exists is not None:
  236. text += ",\nif_not_exists=%r" % bool(op.if_not_exists)
  237. text += "\n)"
  238. return text
  239. @renderers.dispatch_for(ops.DropTableOp)
  240. def _drop_table(autogen_context: AutogenContext, op: ops.DropTableOp) -> str:
  241. text = "%(prefix)sdrop_table(%(tname)r" % {
  242. "prefix": _alembic_autogenerate_prefix(autogen_context),
  243. "tname": _ident(op.table_name),
  244. }
  245. if op.schema:
  246. text += ", schema=%r" % _ident(op.schema)
  247. if op.if_exists is not None:
  248. text += ", if_exists=%r" % bool(op.if_exists)
  249. text += ")"
  250. return text
  251. def _render_dialect_kwargs_items(
  252. autogen_context: AutogenContext, item: DialectKWArgs
  253. ) -> list[str]:
  254. return [
  255. f"{key}={_render_potential_expr(val, autogen_context)}"
  256. for key, val in item.dialect_kwargs.items()
  257. ]
  258. @renderers.dispatch_for(ops.CreateIndexOp)
  259. def _add_index(autogen_context: AutogenContext, op: ops.CreateIndexOp) -> str:
  260. index = op.to_index()
  261. has_batch = autogen_context._has_batch
  262. if has_batch:
  263. tmpl = (
  264. "%(prefix)screate_index(%(name)r, [%(columns)s], "
  265. "unique=%(unique)r%(kwargs)s)"
  266. )
  267. else:
  268. tmpl = (
  269. "%(prefix)screate_index(%(name)r, %(table)r, [%(columns)s], "
  270. "unique=%(unique)r%(schema)s%(kwargs)s)"
  271. )
  272. assert index.table is not None
  273. opts = _render_dialect_kwargs_items(autogen_context, index)
  274. if op.if_not_exists is not None:
  275. opts.append("if_not_exists=%r" % bool(op.if_not_exists))
  276. text = tmpl % {
  277. "prefix": _alembic_autogenerate_prefix(autogen_context),
  278. "name": _render_gen_name(autogen_context, index.name),
  279. "table": _ident(index.table.name),
  280. "columns": ", ".join(
  281. _get_index_rendered_expressions(index, autogen_context)
  282. ),
  283. "unique": index.unique or False,
  284. "schema": (
  285. (", schema=%r" % _ident(index.table.schema))
  286. if index.table.schema
  287. else ""
  288. ),
  289. "kwargs": ", " + ", ".join(opts) if opts else "",
  290. }
  291. return text
  292. @renderers.dispatch_for(ops.DropIndexOp)
  293. def _drop_index(autogen_context: AutogenContext, op: ops.DropIndexOp) -> str:
  294. index = op.to_index()
  295. has_batch = autogen_context._has_batch
  296. if has_batch:
  297. tmpl = "%(prefix)sdrop_index(%(name)r%(kwargs)s)"
  298. else:
  299. tmpl = (
  300. "%(prefix)sdrop_index(%(name)r, "
  301. "table_name=%(table_name)r%(schema)s%(kwargs)s)"
  302. )
  303. opts = _render_dialect_kwargs_items(autogen_context, index)
  304. if op.if_exists is not None:
  305. opts.append("if_exists=%r" % bool(op.if_exists))
  306. text = tmpl % {
  307. "prefix": _alembic_autogenerate_prefix(autogen_context),
  308. "name": _render_gen_name(autogen_context, op.index_name),
  309. "table_name": _ident(op.table_name),
  310. "schema": ((", schema=%r" % _ident(op.schema)) if op.schema else ""),
  311. "kwargs": ", " + ", ".join(opts) if opts else "",
  312. }
  313. return text
  314. @renderers.dispatch_for(ops.CreateUniqueConstraintOp)
  315. def _add_unique_constraint(
  316. autogen_context: AutogenContext, op: ops.CreateUniqueConstraintOp
  317. ) -> List[str]:
  318. return [_uq_constraint(op.to_constraint(), autogen_context, True)]
  319. @renderers.dispatch_for(ops.CreateForeignKeyOp)
  320. def _add_fk_constraint(
  321. autogen_context: AutogenContext, op: ops.CreateForeignKeyOp
  322. ) -> str:
  323. args = [repr(_render_gen_name(autogen_context, op.constraint_name))]
  324. if not autogen_context._has_batch:
  325. args.append(repr(_ident(op.source_table)))
  326. args.extend(
  327. [
  328. repr(_ident(op.referent_table)),
  329. repr([_ident(col) for col in op.local_cols]),
  330. repr([_ident(col) for col in op.remote_cols]),
  331. ]
  332. )
  333. kwargs = [
  334. "referent_schema",
  335. "onupdate",
  336. "ondelete",
  337. "initially",
  338. "deferrable",
  339. "use_alter",
  340. "match",
  341. ]
  342. if not autogen_context._has_batch:
  343. kwargs.insert(0, "source_schema")
  344. for k in kwargs:
  345. if k in op.kw:
  346. value = op.kw[k]
  347. if value is not None:
  348. args.append("%s=%r" % (k, value))
  349. return "%(prefix)screate_foreign_key(%(args)s)" % {
  350. "prefix": _alembic_autogenerate_prefix(autogen_context),
  351. "args": ", ".join(args),
  352. }
  353. @renderers.dispatch_for(ops.CreatePrimaryKeyOp)
  354. def _add_pk_constraint(constraint, autogen_context):
  355. raise NotImplementedError()
  356. @renderers.dispatch_for(ops.CreateCheckConstraintOp)
  357. def _add_check_constraint(constraint, autogen_context):
  358. raise NotImplementedError()
  359. @renderers.dispatch_for(ops.DropConstraintOp)
  360. def _drop_constraint(
  361. autogen_context: AutogenContext, op: ops.DropConstraintOp
  362. ) -> str:
  363. prefix = _alembic_autogenerate_prefix(autogen_context)
  364. name = _render_gen_name(autogen_context, op.constraint_name)
  365. schema = _ident(op.schema) if op.schema else None
  366. type_ = _ident(op.constraint_type) if op.constraint_type else None
  367. params_strs = []
  368. params_strs.append(repr(name))
  369. if not autogen_context._has_batch:
  370. params_strs.append(repr(_ident(op.table_name)))
  371. if schema is not None:
  372. params_strs.append(f"schema={schema!r}")
  373. if type_ is not None:
  374. params_strs.append(f"type_={type_!r}")
  375. return f"{prefix}drop_constraint({', '.join(params_strs)})"
  376. @renderers.dispatch_for(ops.AddColumnOp)
  377. def _add_column(autogen_context: AutogenContext, op: ops.AddColumnOp) -> str:
  378. schema, tname, column = op.schema, op.table_name, op.column
  379. if autogen_context._has_batch:
  380. template = "%(prefix)sadd_column(%(column)s)"
  381. else:
  382. template = "%(prefix)sadd_column(%(tname)r, %(column)s"
  383. if schema:
  384. template += ", schema=%(schema)r"
  385. template += ")"
  386. text = template % {
  387. "prefix": _alembic_autogenerate_prefix(autogen_context),
  388. "tname": tname,
  389. "column": _render_column(column, autogen_context),
  390. "schema": schema,
  391. }
  392. return text
  393. @renderers.dispatch_for(ops.DropColumnOp)
  394. def _drop_column(autogen_context: AutogenContext, op: ops.DropColumnOp) -> str:
  395. schema, tname, column_name = op.schema, op.table_name, op.column_name
  396. if autogen_context._has_batch:
  397. template = "%(prefix)sdrop_column(%(cname)r)"
  398. else:
  399. template = "%(prefix)sdrop_column(%(tname)r, %(cname)r"
  400. if schema:
  401. template += ", schema=%(schema)r"
  402. template += ")"
  403. text = template % {
  404. "prefix": _alembic_autogenerate_prefix(autogen_context),
  405. "tname": _ident(tname),
  406. "cname": _ident(column_name),
  407. "schema": _ident(schema),
  408. }
  409. return text
  410. @renderers.dispatch_for(ops.AlterColumnOp)
  411. def _alter_column(
  412. autogen_context: AutogenContext, op: ops.AlterColumnOp
  413. ) -> str:
  414. tname = op.table_name
  415. cname = op.column_name
  416. server_default = op.modify_server_default
  417. type_ = op.modify_type
  418. nullable = op.modify_nullable
  419. comment = op.modify_comment
  420. newname = op.modify_name
  421. autoincrement = op.kw.get("autoincrement", None)
  422. existing_type = op.existing_type
  423. existing_nullable = op.existing_nullable
  424. existing_comment = op.existing_comment
  425. existing_server_default = op.existing_server_default
  426. schema = op.schema
  427. indent = " " * 11
  428. if autogen_context._has_batch:
  429. template = "%(prefix)salter_column(%(cname)r"
  430. else:
  431. template = "%(prefix)salter_column(%(tname)r, %(cname)r"
  432. text = template % {
  433. "prefix": _alembic_autogenerate_prefix(autogen_context),
  434. "tname": tname,
  435. "cname": cname,
  436. }
  437. if existing_type is not None:
  438. text += ",\n%sexisting_type=%s" % (
  439. indent,
  440. _repr_type(existing_type, autogen_context),
  441. )
  442. if server_default is not False:
  443. rendered = _render_server_default(server_default, autogen_context)
  444. text += ",\n%sserver_default=%s" % (indent, rendered)
  445. if newname is not None:
  446. text += ",\n%snew_column_name=%r" % (indent, newname)
  447. if type_ is not None:
  448. text += ",\n%stype_=%s" % (indent, _repr_type(type_, autogen_context))
  449. if nullable is not None:
  450. text += ",\n%snullable=%r" % (indent, nullable)
  451. if comment is not False:
  452. text += ",\n%scomment=%r" % (indent, comment)
  453. if existing_comment is not None:
  454. text += ",\n%sexisting_comment=%r" % (indent, existing_comment)
  455. if nullable is None and existing_nullable is not None:
  456. text += ",\n%sexisting_nullable=%r" % (indent, existing_nullable)
  457. if autoincrement is not None:
  458. text += ",\n%sautoincrement=%r" % (indent, autoincrement)
  459. if server_default is False and existing_server_default:
  460. rendered = _render_server_default(
  461. existing_server_default, autogen_context
  462. )
  463. text += ",\n%sexisting_server_default=%s" % (indent, rendered)
  464. if schema and not autogen_context._has_batch:
  465. text += ",\n%sschema=%r" % (indent, schema)
  466. text += ")"
  467. return text
  468. class _f_name:
  469. def __init__(self, prefix: str, name: conv) -> None:
  470. self.prefix = prefix
  471. self.name = name
  472. def __repr__(self) -> str:
  473. return "%sf(%r)" % (self.prefix, _ident(self.name))
  474. def _ident(name: Optional[Union[quoted_name, str]]) -> Optional[str]:
  475. """produce a __repr__() object for a string identifier that may
  476. use quoted_name() in SQLAlchemy 0.9 and greater.
  477. The issue worked around here is that quoted_name() doesn't have
  478. very good repr() behavior by itself when unicode is involved.
  479. """
  480. if name is None:
  481. return name
  482. elif isinstance(name, quoted_name):
  483. return str(name)
  484. elif isinstance(name, str):
  485. return name
  486. def _render_potential_expr(
  487. value: Any,
  488. autogen_context: AutogenContext,
  489. *,
  490. wrap_in_element: bool = True,
  491. is_server_default: bool = False,
  492. is_index: bool = False,
  493. ) -> str:
  494. if isinstance(value, sql.ClauseElement):
  495. sql_text = autogen_context.migration_context.impl.render_ddl_sql_expr(
  496. value, is_server_default=is_server_default, is_index=is_index
  497. )
  498. if wrap_in_element:
  499. prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
  500. element = "literal_column" if is_index else "text"
  501. value_str = f"{prefix}{element}({sql_text!r})"
  502. if (
  503. is_index
  504. and isinstance(value, Label)
  505. and type(value.name) is str
  506. ):
  507. return value_str + f".label({value.name!r})"
  508. else:
  509. return value_str
  510. else:
  511. return repr(sql_text)
  512. else:
  513. return repr(value)
  514. def _get_index_rendered_expressions(
  515. idx: Index, autogen_context: AutogenContext
  516. ) -> List[str]:
  517. return [
  518. (
  519. repr(_ident(getattr(exp, "name", None)))
  520. if isinstance(exp, sa_schema.Column)
  521. else _render_potential_expr(exp, autogen_context, is_index=True)
  522. )
  523. for exp in idx.expressions
  524. ]
  525. def _uq_constraint(
  526. constraint: UniqueConstraint,
  527. autogen_context: AutogenContext,
  528. alter: bool,
  529. ) -> str:
  530. opts: List[Tuple[str, Any]] = []
  531. has_batch = autogen_context._has_batch
  532. if constraint.deferrable:
  533. opts.append(("deferrable", constraint.deferrable))
  534. if constraint.initially:
  535. opts.append(("initially", constraint.initially))
  536. if not has_batch and alter and constraint.table.schema:
  537. opts.append(("schema", _ident(constraint.table.schema)))
  538. if not alter and constraint.name:
  539. opts.append(
  540. ("name", _render_gen_name(autogen_context, constraint.name))
  541. )
  542. dialect_options = _render_dialect_kwargs_items(autogen_context, constraint)
  543. if alter:
  544. args = [repr(_render_gen_name(autogen_context, constraint.name))]
  545. if not has_batch:
  546. args += [repr(_ident(constraint.table.name))]
  547. args.append(repr([_ident(col.name) for col in constraint.columns]))
  548. args.extend(["%s=%r" % (k, v) for k, v in opts])
  549. args.extend(dialect_options)
  550. return "%(prefix)screate_unique_constraint(%(args)s)" % {
  551. "prefix": _alembic_autogenerate_prefix(autogen_context),
  552. "args": ", ".join(args),
  553. }
  554. else:
  555. args = [repr(_ident(col.name)) for col in constraint.columns]
  556. args.extend(["%s=%r" % (k, v) for k, v in opts])
  557. args.extend(dialect_options)
  558. return "%(prefix)sUniqueConstraint(%(args)s)" % {
  559. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  560. "args": ", ".join(args),
  561. }
  562. def _user_autogenerate_prefix(autogen_context, target):
  563. prefix = autogen_context.opts["user_module_prefix"]
  564. if prefix is None:
  565. return "%s." % target.__module__
  566. else:
  567. return prefix
  568. def _sqlalchemy_autogenerate_prefix(autogen_context: AutogenContext) -> str:
  569. return autogen_context.opts["sqlalchemy_module_prefix"] or ""
  570. def _alembic_autogenerate_prefix(autogen_context: AutogenContext) -> str:
  571. if autogen_context._has_batch:
  572. return "batch_op."
  573. else:
  574. return autogen_context.opts["alembic_module_prefix"] or ""
  575. def _user_defined_render(
  576. type_: str, object_: Any, autogen_context: AutogenContext
  577. ) -> Union[str, Literal[False]]:
  578. if "render_item" in autogen_context.opts:
  579. render = autogen_context.opts["render_item"]
  580. if render:
  581. rendered = render(type_, object_, autogen_context)
  582. if rendered is not False:
  583. return rendered
  584. return False
  585. def _render_column(
  586. column: Column[Any], autogen_context: AutogenContext
  587. ) -> str:
  588. rendered = _user_defined_render("column", column, autogen_context)
  589. if rendered is not False:
  590. return rendered
  591. args: List[str] = []
  592. opts: List[Tuple[str, Any]] = []
  593. if column.server_default:
  594. rendered = _render_server_default( # type:ignore[assignment]
  595. column.server_default, autogen_context
  596. )
  597. if rendered:
  598. if _should_render_server_default_positionally(
  599. column.server_default
  600. ):
  601. args.append(rendered)
  602. else:
  603. opts.append(("server_default", rendered))
  604. if (
  605. column.autoincrement is not None
  606. and column.autoincrement != sqla_compat.AUTOINCREMENT_DEFAULT
  607. ):
  608. opts.append(("autoincrement", column.autoincrement))
  609. if column.nullable is not None:
  610. opts.append(("nullable", column.nullable))
  611. if column.system:
  612. opts.append(("system", column.system))
  613. comment = column.comment
  614. if comment:
  615. opts.append(("comment", "%r" % comment))
  616. # TODO: for non-ascii colname, assign a "key"
  617. return "%(prefix)sColumn(%(name)r, %(type)s, %(args)s%(kwargs)s)" % {
  618. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  619. "name": _ident(column.name),
  620. "type": _repr_type(column.type, autogen_context),
  621. "args": ", ".join([str(arg) for arg in args]) + ", " if args else "",
  622. "kwargs": (
  623. ", ".join(
  624. ["%s=%s" % (kwname, val) for kwname, val in opts]
  625. + [
  626. "%s=%s"
  627. % (key, _render_potential_expr(val, autogen_context))
  628. for key, val in column.kwargs.items()
  629. ]
  630. )
  631. ),
  632. }
  633. def _should_render_server_default_positionally(server_default: Any) -> bool:
  634. return sqla_compat._server_default_is_computed(
  635. server_default
  636. ) or sqla_compat._server_default_is_identity(server_default)
  637. def _render_server_default(
  638. default: Optional[
  639. Union[FetchedValue, str, TextClause, ColumnElement[Any]]
  640. ],
  641. autogen_context: AutogenContext,
  642. repr_: bool = True,
  643. ) -> Optional[str]:
  644. rendered = _user_defined_render("server_default", default, autogen_context)
  645. if rendered is not False:
  646. return rendered
  647. if sqla_compat._server_default_is_computed(default):
  648. return _render_computed(cast("Computed", default), autogen_context)
  649. elif sqla_compat._server_default_is_identity(default):
  650. return _render_identity(cast("Identity", default), autogen_context)
  651. elif isinstance(default, sa_schema.DefaultClause):
  652. if isinstance(default.arg, str):
  653. default = default.arg
  654. else:
  655. return _render_potential_expr(
  656. default.arg, autogen_context, is_server_default=True
  657. )
  658. if isinstance(default, str) and repr_:
  659. default = repr(re.sub(r"^'|'$", "", default))
  660. return cast(str, default)
  661. def _render_computed(
  662. computed: Computed, autogen_context: AutogenContext
  663. ) -> str:
  664. text = _render_potential_expr(
  665. computed.sqltext, autogen_context, wrap_in_element=False
  666. )
  667. kwargs = {}
  668. if computed.persisted is not None:
  669. kwargs["persisted"] = computed.persisted
  670. return "%(prefix)sComputed(%(text)s, %(kwargs)s)" % {
  671. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  672. "text": text,
  673. "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
  674. }
  675. def _render_identity(
  676. identity: Identity, autogen_context: AutogenContext
  677. ) -> str:
  678. kwargs = sqla_compat._get_identity_options_dict(
  679. identity, dialect_kwargs=True
  680. )
  681. return "%(prefix)sIdentity(%(kwargs)s)" % {
  682. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  683. "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
  684. }
  685. def _repr_type(
  686. type_: TypeEngine,
  687. autogen_context: AutogenContext,
  688. _skip_variants: bool = False,
  689. ) -> str:
  690. rendered = _user_defined_render("type", type_, autogen_context)
  691. if rendered is not False:
  692. return rendered
  693. if hasattr(autogen_context.migration_context, "impl"):
  694. impl_rt = autogen_context.migration_context.impl.render_type(
  695. type_, autogen_context
  696. )
  697. else:
  698. impl_rt = None
  699. mod = type(type_).__module__
  700. imports = autogen_context.imports
  701. if not _skip_variants and sqla_compat._type_has_variants(type_):
  702. return _render_Variant_type(type_, autogen_context)
  703. elif mod.startswith("sqlalchemy.dialects"):
  704. match = re.match(r"sqlalchemy\.dialects\.(\w+)", mod)
  705. assert match is not None
  706. dname = match.group(1)
  707. if imports is not None:
  708. imports.add("from sqlalchemy.dialects import %s" % dname)
  709. if impl_rt:
  710. return impl_rt
  711. else:
  712. return "%s.%r" % (dname, type_)
  713. elif impl_rt:
  714. return impl_rt
  715. elif mod.startswith("sqlalchemy."):
  716. if "_render_%s_type" % type_.__visit_name__ in globals():
  717. fn = globals()["_render_%s_type" % type_.__visit_name__]
  718. return fn(type_, autogen_context)
  719. else:
  720. prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
  721. return "%s%r" % (prefix, type_)
  722. else:
  723. prefix = _user_autogenerate_prefix(autogen_context, type_)
  724. return "%s%r" % (prefix, type_)
  725. def _render_ARRAY_type(type_: ARRAY, autogen_context: AutogenContext) -> str:
  726. return cast(
  727. str,
  728. _render_type_w_subtype(
  729. type_, autogen_context, "item_type", r"(.+?\()"
  730. ),
  731. )
  732. def _render_Variant_type(
  733. type_: TypeEngine, autogen_context: AutogenContext
  734. ) -> str:
  735. base_type, variant_mapping = sqla_compat._get_variant_mapping(type_)
  736. base = _repr_type(base_type, autogen_context, _skip_variants=True)
  737. assert base is not None and base is not False # type: ignore[comparison-overlap] # noqa:E501
  738. for dialect in sorted(variant_mapping):
  739. typ = variant_mapping[dialect]
  740. base += ".with_variant(%s, %r)" % (
  741. _repr_type(typ, autogen_context, _skip_variants=True),
  742. dialect,
  743. )
  744. return base
  745. def _render_type_w_subtype(
  746. type_: TypeEngine,
  747. autogen_context: AutogenContext,
  748. attrname: str,
  749. regexp: str,
  750. prefix: Optional[str] = None,
  751. ) -> Union[Optional[str], Literal[False]]:
  752. outer_repr = repr(type_)
  753. inner_type = getattr(type_, attrname, None)
  754. if inner_type is None:
  755. return False
  756. inner_repr = repr(inner_type)
  757. inner_repr = re.sub(r"([\(\)])", r"\\\1", inner_repr)
  758. sub_type = _repr_type(getattr(type_, attrname), autogen_context)
  759. outer_type = re.sub(regexp + inner_repr, r"\1%s" % sub_type, outer_repr)
  760. if prefix:
  761. return "%s%s" % (prefix, outer_type)
  762. mod = type(type_).__module__
  763. if mod.startswith("sqlalchemy.dialects"):
  764. match = re.match(r"sqlalchemy\.dialects\.(\w+)", mod)
  765. assert match is not None
  766. dname = match.group(1)
  767. return "%s.%s" % (dname, outer_type)
  768. elif mod.startswith("sqlalchemy"):
  769. prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
  770. return "%s%s" % (prefix, outer_type)
  771. else:
  772. return None
  773. _constraint_renderers = util.Dispatcher()
  774. def _render_constraint(
  775. constraint: Constraint,
  776. autogen_context: AutogenContext,
  777. namespace_metadata: Optional[MetaData],
  778. ) -> Optional[str]:
  779. try:
  780. renderer = _constraint_renderers.dispatch(constraint)
  781. except ValueError:
  782. util.warn("No renderer is established for object %r" % constraint)
  783. return "[Unknown Python object %r]" % constraint
  784. else:
  785. return renderer(constraint, autogen_context, namespace_metadata)
  786. @_constraint_renderers.dispatch_for(sa_schema.PrimaryKeyConstraint)
  787. def _render_primary_key(
  788. constraint: PrimaryKeyConstraint,
  789. autogen_context: AutogenContext,
  790. namespace_metadata: Optional[MetaData],
  791. ) -> Optional[str]:
  792. rendered = _user_defined_render("primary_key", constraint, autogen_context)
  793. if rendered is not False:
  794. return rendered
  795. if not constraint.columns:
  796. return None
  797. opts = []
  798. if constraint.name:
  799. opts.append(
  800. ("name", repr(_render_gen_name(autogen_context, constraint.name)))
  801. )
  802. return "%(prefix)sPrimaryKeyConstraint(%(args)s)" % {
  803. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  804. "args": ", ".join(
  805. [repr(c.name) for c in constraint.columns]
  806. + ["%s=%s" % (kwname, val) for kwname, val in opts]
  807. ),
  808. }
  809. def _fk_colspec(
  810. fk: ForeignKey,
  811. metadata_schema: Optional[str],
  812. namespace_metadata: MetaData,
  813. ) -> str:
  814. """Implement a 'safe' version of ForeignKey._get_colspec() that
  815. won't fail if the remote table can't be resolved.
  816. """
  817. colspec = fk._get_colspec()
  818. tokens = colspec.split(".")
  819. tname, colname = tokens[-2:]
  820. if metadata_schema is not None and len(tokens) == 2:
  821. table_fullname = "%s.%s" % (metadata_schema, tname)
  822. else:
  823. table_fullname = ".".join(tokens[0:-1])
  824. if (
  825. not fk.link_to_name
  826. and fk.parent is not None
  827. and fk.parent.table is not None
  828. ):
  829. # try to resolve the remote table in order to adjust for column.key.
  830. # the FK constraint needs to be rendered in terms of the column
  831. # name.
  832. if table_fullname in namespace_metadata.tables:
  833. col = namespace_metadata.tables[table_fullname].c.get(colname)
  834. if col is not None:
  835. colname = _ident(col.name) # type: ignore[assignment]
  836. colspec = "%s.%s" % (table_fullname, colname)
  837. return colspec
  838. def _populate_render_fk_opts(
  839. constraint: ForeignKeyConstraint, opts: List[Tuple[str, str]]
  840. ) -> None:
  841. if constraint.onupdate:
  842. opts.append(("onupdate", repr(constraint.onupdate)))
  843. if constraint.ondelete:
  844. opts.append(("ondelete", repr(constraint.ondelete)))
  845. if constraint.initially:
  846. opts.append(("initially", repr(constraint.initially)))
  847. if constraint.deferrable:
  848. opts.append(("deferrable", repr(constraint.deferrable)))
  849. if constraint.use_alter:
  850. opts.append(("use_alter", repr(constraint.use_alter)))
  851. if constraint.match:
  852. opts.append(("match", repr(constraint.match)))
  853. @_constraint_renderers.dispatch_for(sa_schema.ForeignKeyConstraint)
  854. def _render_foreign_key(
  855. constraint: ForeignKeyConstraint,
  856. autogen_context: AutogenContext,
  857. namespace_metadata: MetaData,
  858. ) -> Optional[str]:
  859. rendered = _user_defined_render("foreign_key", constraint, autogen_context)
  860. if rendered is not False:
  861. return rendered
  862. opts = []
  863. if constraint.name:
  864. opts.append(
  865. ("name", repr(_render_gen_name(autogen_context, constraint.name)))
  866. )
  867. _populate_render_fk_opts(constraint, opts)
  868. apply_metadata_schema = namespace_metadata.schema
  869. return (
  870. "%(prefix)sForeignKeyConstraint([%(cols)s], "
  871. "[%(refcols)s], %(args)s)"
  872. % {
  873. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  874. "cols": ", ".join(
  875. repr(_ident(f.parent.name)) for f in constraint.elements
  876. ),
  877. "refcols": ", ".join(
  878. repr(_fk_colspec(f, apply_metadata_schema, namespace_metadata))
  879. for f in constraint.elements
  880. ),
  881. "args": ", ".join(
  882. ["%s=%s" % (kwname, val) for kwname, val in opts]
  883. ),
  884. }
  885. )
  886. @_constraint_renderers.dispatch_for(sa_schema.UniqueConstraint)
  887. def _render_unique_constraint(
  888. constraint: UniqueConstraint,
  889. autogen_context: AutogenContext,
  890. namespace_metadata: Optional[MetaData],
  891. ) -> str:
  892. rendered = _user_defined_render("unique", constraint, autogen_context)
  893. if rendered is not False:
  894. return rendered
  895. return _uq_constraint(constraint, autogen_context, False)
  896. @_constraint_renderers.dispatch_for(sa_schema.CheckConstraint)
  897. def _render_check_constraint(
  898. constraint: CheckConstraint,
  899. autogen_context: AutogenContext,
  900. namespace_metadata: Optional[MetaData],
  901. ) -> Optional[str]:
  902. rendered = _user_defined_render("check", constraint, autogen_context)
  903. if rendered is not False:
  904. return rendered
  905. # detect the constraint being part of
  906. # a parent type which is probably in the Table already.
  907. # ideally SQLAlchemy would give us more of a first class
  908. # way to detect this.
  909. if (
  910. constraint._create_rule
  911. and hasattr(constraint._create_rule, "target")
  912. and isinstance(
  913. constraint._create_rule.target,
  914. sqltypes.TypeEngine,
  915. )
  916. ):
  917. return None
  918. opts = []
  919. if constraint.name:
  920. opts.append(
  921. ("name", repr(_render_gen_name(autogen_context, constraint.name)))
  922. )
  923. return "%(prefix)sCheckConstraint(%(sqltext)s%(opts)s)" % {
  924. "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
  925. "opts": (
  926. ", " + (", ".join("%s=%s" % (k, v) for k, v in opts))
  927. if opts
  928. else ""
  929. ),
  930. "sqltext": _render_potential_expr(
  931. constraint.sqltext, autogen_context, wrap_in_element=False
  932. ),
  933. }
  934. @renderers.dispatch_for(ops.ExecuteSQLOp)
  935. def _execute_sql(autogen_context: AutogenContext, op: ops.ExecuteSQLOp) -> str:
  936. if not isinstance(op.sqltext, str):
  937. raise NotImplementedError(
  938. "Autogenerate rendering of SQL Expression language constructs "
  939. "not supported here; please use a plain SQL string"
  940. )
  941. return "op.execute(%r)" % op.sqltext
  942. renderers = default_renderers.branch()