view.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. import sqlalchemy as sa
  2. from sqlalchemy.ext import compiler
  3. from sqlalchemy.schema import DDLElement, PrimaryKeyConstraint
  4. from sqlalchemy.sql.expression import ClauseElement, Executable
  5. from sqlalchemy_utils.functions import get_columns
  6. class CreateView(DDLElement):
  7. def __init__(self, name, selectable, materialized=False, replace=False):
  8. if materialized and replace:
  9. raise ValueError("Cannot use CREATE OR REPLACE with materialized views")
  10. self.name = name
  11. self.selectable = selectable
  12. self.materialized = materialized
  13. self.replace = replace
  14. @compiler.compiles(CreateView)
  15. def compile_create_materialized_view(element, compiler, **kw):
  16. return 'CREATE {}{}VIEW {} AS {}'.format(
  17. 'OR REPLACE ' if element.replace else '',
  18. 'MATERIALIZED ' if element.materialized else '',
  19. compiler.dialect.identifier_preparer.quote(element.name),
  20. compiler.sql_compiler.process(element.selectable, literal_binds=True),
  21. )
  22. class DropView(DDLElement):
  23. def __init__(self, name, materialized=False, cascade=True):
  24. self.name = name
  25. self.materialized = materialized
  26. self.cascade = cascade
  27. @compiler.compiles(DropView)
  28. def compile_drop_materialized_view(element, compiler, **kw):
  29. return 'DROP {}VIEW IF EXISTS {} {}'.format(
  30. 'MATERIALIZED ' if element.materialized else '',
  31. compiler.dialect.identifier_preparer.quote(element.name),
  32. 'CASCADE' if element.cascade else ''
  33. )
  34. def create_table_from_selectable(
  35. name,
  36. selectable,
  37. indexes=None,
  38. metadata=None,
  39. aliases=None,
  40. **kwargs
  41. ):
  42. if indexes is None:
  43. indexes = []
  44. if metadata is None:
  45. metadata = sa.MetaData()
  46. if aliases is None:
  47. aliases = {}
  48. args = [
  49. sa.Column(
  50. c.name,
  51. c.type,
  52. key=aliases.get(c.name, c.name),
  53. primary_key=c.primary_key
  54. )
  55. for c in get_columns(selectable)
  56. ] + indexes
  57. table = sa.Table(name, metadata, *args, **kwargs)
  58. if not any([c.primary_key for c in get_columns(selectable)]):
  59. table.append_constraint(
  60. PrimaryKeyConstraint(*[c.name for c in get_columns(selectable)])
  61. )
  62. return table
  63. def create_materialized_view(
  64. name,
  65. selectable,
  66. metadata,
  67. indexes=None,
  68. aliases=None
  69. ):
  70. """ Create a view on a given metadata
  71. :param name: The name of the view to create.
  72. :param selectable: An SQLAlchemy selectable e.g. a select() statement.
  73. :param metadata:
  74. An SQLAlchemy Metadata instance that stores the features of the
  75. database being described.
  76. :param indexes: An optional list of SQLAlchemy Index instances.
  77. :param aliases:
  78. An optional dictionary containing with keys as column names and values
  79. as column aliases.
  80. Same as for ``create_view`` except that a ``CREATE MATERIALIZED VIEW``
  81. statement is emitted instead of a ``CREATE VIEW``.
  82. """
  83. table = create_table_from_selectable(
  84. name=name,
  85. selectable=selectable,
  86. indexes=indexes,
  87. metadata=None,
  88. aliases=aliases
  89. )
  90. sa.event.listen(
  91. metadata,
  92. 'after_create',
  93. CreateView(name, selectable, materialized=True)
  94. )
  95. @sa.event.listens_for(metadata, 'after_create')
  96. def create_indexes(target, connection, **kw):
  97. for idx in table.indexes:
  98. idx.create(connection)
  99. sa.event.listen(
  100. metadata,
  101. 'before_drop',
  102. DropView(name, materialized=True)
  103. )
  104. return table
  105. def create_view(
  106. name,
  107. selectable,
  108. metadata,
  109. cascade_on_drop=True,
  110. replace=False,
  111. ):
  112. """ Create a view on a given metadata
  113. :param name: The name of the view to create.
  114. :param selectable: An SQLAlchemy selectable e.g. a select() statement.
  115. :param metadata:
  116. An SQLAlchemy Metadata instance that stores the features of the
  117. database being described.
  118. :param cascade_on_drop: If ``True`` the view will be dropped with
  119. ``CASCADE``, deleting all dependent objects as well.
  120. :param replace: If ``True`` the view will be created with ``OR REPLACE``,
  121. replacing an existing view with the same name.
  122. The process for creating a view is similar to the standard way that a
  123. table is constructed, except that a selectable is provided instead of
  124. a set of columns. The view is created once a ``CREATE`` statement is
  125. executed against the supplied metadata (e.g. ``metadata.create_all(..)``),
  126. and dropped when a ``DROP`` is executed against the metadata.
  127. To create a view that performs basic filtering on a table. ::
  128. metadata = MetaData()
  129. users = Table('users', metadata,
  130. Column('id', Integer, primary_key=True),
  131. Column('name', String),
  132. Column('fullname', String),
  133. Column('premium_user', Boolean, default=False),
  134. )
  135. premium_members = select(users).where(users.c.premium_user == True)
  136. # sqlalchemy 1.3:
  137. # premium_members = select([users]).where(users.c.premium_user == True)
  138. create_view('premium_users', premium_members, metadata)
  139. metadata.create_all(engine) # View is created at this point
  140. """
  141. table = create_table_from_selectable(
  142. name=name,
  143. selectable=selectable,
  144. metadata=None
  145. )
  146. sa.event.listen(
  147. metadata,
  148. 'after_create',
  149. CreateView(name, selectable, replace=replace),
  150. )
  151. @sa.event.listens_for(metadata, 'after_create')
  152. def create_indexes(target, connection, **kw):
  153. for idx in table.indexes:
  154. idx.create(connection)
  155. sa.event.listen(
  156. metadata,
  157. 'before_drop',
  158. DropView(name, cascade=cascade_on_drop)
  159. )
  160. return table
  161. class RefreshMaterializedView(Executable, ClauseElement):
  162. inherit_cache = True
  163. def __init__(self, name, concurrently):
  164. self.name = name
  165. self.concurrently = concurrently
  166. @compiler.compiles(RefreshMaterializedView)
  167. def compile_refresh_materialized_view(element, compiler):
  168. return 'REFRESH MATERIALIZED VIEW {concurrently}{name}'.format(
  169. concurrently='CONCURRENTLY ' if element.concurrently else '',
  170. name=compiler.dialect.identifier_preparer.quote(element.name),
  171. )
  172. def refresh_materialized_view(session, name, concurrently=False):
  173. """ Refreshes an already existing materialized view
  174. :param session: An SQLAlchemy Session instance.
  175. :param name: The name of the materialized view to refresh.
  176. :param concurrently:
  177. Optional flag that causes the ``CONCURRENTLY`` parameter
  178. to be specified when the materialized view is refreshed.
  179. """
  180. # Since session.execute() bypasses autoflush, we must manually flush in
  181. # order to include newly-created/modified objects in the refresh.
  182. session.flush()
  183. session.execute(RefreshMaterializedView(name, concurrently))