aiomysql.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. # dialects/mysql/aiomysql.py
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors <see AUTHORS
  3. # file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. r"""
  8. .. dialect:: mysql+aiomysql
  9. :name: aiomysql
  10. :dbapi: aiomysql
  11. :connectstring: mysql+aiomysql://user:password@host:port/dbname[?key=value&key=value...]
  12. :url: https://github.com/aio-libs/aiomysql
  13. The aiomysql dialect is SQLAlchemy's second Python asyncio dialect.
  14. Using a special asyncio mediation layer, the aiomysql dialect is usable
  15. as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
  16. extension package.
  17. This dialect should normally be used only with the
  18. :func:`_asyncio.create_async_engine` engine creation function::
  19. from sqlalchemy.ext.asyncio import create_async_engine
  20. engine = create_async_engine("mysql+aiomysql://user:pass@hostname/dbname?charset=utf8mb4")
  21. """ # noqa
  22. from .pymysql import MySQLDialect_pymysql
  23. from ... import pool
  24. from ... import util
  25. from ...engine import AdaptedConnection
  26. from ...util.concurrency import asyncio
  27. from ...util.concurrency import await_fallback
  28. from ...util.concurrency import await_only
  29. class AsyncAdapt_aiomysql_cursor:
  30. server_side = False
  31. __slots__ = (
  32. "_adapt_connection",
  33. "_connection",
  34. "await_",
  35. "_cursor",
  36. "_rows",
  37. )
  38. def __init__(self, adapt_connection):
  39. self._adapt_connection = adapt_connection
  40. self._connection = adapt_connection._connection
  41. self.await_ = adapt_connection.await_
  42. cursor = self._connection.cursor(adapt_connection.dbapi.Cursor)
  43. # see https://github.com/aio-libs/aiomysql/issues/543
  44. self._cursor = self.await_(cursor.__aenter__())
  45. self._rows = []
  46. @property
  47. def description(self):
  48. return self._cursor.description
  49. @property
  50. def rowcount(self):
  51. return self._cursor.rowcount
  52. @property
  53. def arraysize(self):
  54. return self._cursor.arraysize
  55. @arraysize.setter
  56. def arraysize(self, value):
  57. self._cursor.arraysize = value
  58. @property
  59. def lastrowid(self):
  60. return self._cursor.lastrowid
  61. def close(self):
  62. # note we aren't actually closing the cursor here,
  63. # we are just letting GC do it. to allow this to be async
  64. # we would need the Result to change how it does "Safe close cursor".
  65. # MySQL "cursors" don't actually have state to be "closed" besides
  66. # exhausting rows, which we already have done for sync cursor.
  67. # another option would be to emulate aiosqlite dialect and assign
  68. # cursor only if we are doing server side cursor operation.
  69. self._rows[:] = []
  70. def execute(self, operation, parameters=None):
  71. return self.await_(self._execute_async(operation, parameters))
  72. def executemany(self, operation, seq_of_parameters):
  73. return self.await_(
  74. self._executemany_async(operation, seq_of_parameters)
  75. )
  76. async def _execute_async(self, operation, parameters):
  77. async with self._adapt_connection._execute_mutex:
  78. result = await self._cursor.execute(operation, parameters)
  79. if not self.server_side:
  80. # aiomysql has a "fake" async result, so we have to pull it out
  81. # of that here since our default result is not async.
  82. # we could just as easily grab "_rows" here and be done with it
  83. # but this is safer.
  84. self._rows = list(await self._cursor.fetchall())
  85. return result
  86. async def _executemany_async(self, operation, seq_of_parameters):
  87. async with self._adapt_connection._execute_mutex:
  88. return await self._cursor.executemany(operation, seq_of_parameters)
  89. def setinputsizes(self, *inputsizes):
  90. pass
  91. def __iter__(self):
  92. while self._rows:
  93. yield self._rows.pop(0)
  94. def fetchone(self):
  95. if self._rows:
  96. return self._rows.pop(0)
  97. else:
  98. return None
  99. def fetchmany(self, size=None):
  100. if size is None:
  101. size = self.arraysize
  102. retval = self._rows[0:size]
  103. self._rows[:] = self._rows[size:]
  104. return retval
  105. def fetchall(self):
  106. retval = self._rows[:]
  107. self._rows[:] = []
  108. return retval
  109. class AsyncAdapt_aiomysql_ss_cursor(AsyncAdapt_aiomysql_cursor):
  110. __slots__ = ()
  111. server_side = True
  112. def __init__(self, adapt_connection):
  113. self._adapt_connection = adapt_connection
  114. self._connection = adapt_connection._connection
  115. self.await_ = adapt_connection.await_
  116. cursor = self._connection.cursor(adapt_connection.dbapi.SSCursor)
  117. self._cursor = self.await_(cursor.__aenter__())
  118. def close(self):
  119. if self._cursor is not None:
  120. self.await_(self._cursor.close())
  121. self._cursor = None
  122. def fetchone(self):
  123. return self.await_(self._cursor.fetchone())
  124. def fetchmany(self, size=None):
  125. return self.await_(self._cursor.fetchmany(size=size))
  126. def fetchall(self):
  127. return self.await_(self._cursor.fetchall())
  128. class AsyncAdapt_aiomysql_connection(AdaptedConnection):
  129. await_ = staticmethod(await_only)
  130. __slots__ = ("dbapi", "_connection", "_execute_mutex")
  131. def __init__(self, dbapi, connection):
  132. self.dbapi = dbapi
  133. self._connection = connection
  134. self._execute_mutex = asyncio.Lock()
  135. def ping(self, reconnect):
  136. return self.await_(self._connection.ping(reconnect))
  137. def character_set_name(self):
  138. return self._connection.character_set_name()
  139. def autocommit(self, value):
  140. self.await_(self._connection.autocommit(value))
  141. def cursor(self, server_side=False):
  142. if server_side:
  143. return AsyncAdapt_aiomysql_ss_cursor(self)
  144. else:
  145. return AsyncAdapt_aiomysql_cursor(self)
  146. def rollback(self):
  147. self.await_(self._connection.rollback())
  148. def commit(self):
  149. self.await_(self._connection.commit())
  150. def close(self):
  151. # it's not awaitable.
  152. self._connection.close()
  153. class AsyncAdaptFallback_aiomysql_connection(AsyncAdapt_aiomysql_connection):
  154. __slots__ = ()
  155. await_ = staticmethod(await_fallback)
  156. class AsyncAdapt_aiomysql_dbapi:
  157. def __init__(self, aiomysql, pymysql):
  158. self.aiomysql = aiomysql
  159. self.pymysql = pymysql
  160. self.paramstyle = "format"
  161. self._init_dbapi_attributes()
  162. self.Cursor, self.SSCursor = self._init_cursors_subclasses()
  163. def _init_dbapi_attributes(self):
  164. for name in (
  165. "Warning",
  166. "Error",
  167. "InterfaceError",
  168. "DataError",
  169. "DatabaseError",
  170. "OperationalError",
  171. "InterfaceError",
  172. "IntegrityError",
  173. "ProgrammingError",
  174. "InternalError",
  175. "NotSupportedError",
  176. ):
  177. setattr(self, name, getattr(self.aiomysql, name))
  178. for name in (
  179. "NUMBER",
  180. "STRING",
  181. "DATETIME",
  182. "BINARY",
  183. "TIMESTAMP",
  184. "Binary",
  185. ):
  186. setattr(self, name, getattr(self.pymysql, name))
  187. def connect(self, *arg, **kw):
  188. async_fallback = kw.pop("async_fallback", False)
  189. if util.asbool(async_fallback):
  190. return AsyncAdaptFallback_aiomysql_connection(
  191. self,
  192. await_fallback(self.aiomysql.connect(*arg, **kw)),
  193. )
  194. else:
  195. return AsyncAdapt_aiomysql_connection(
  196. self,
  197. await_only(self.aiomysql.connect(*arg, **kw)),
  198. )
  199. def _init_cursors_subclasses(self):
  200. # suppress unconditional warning emitted by aiomysql
  201. class Cursor(self.aiomysql.Cursor):
  202. async def _show_warnings(self, conn):
  203. pass
  204. class SSCursor(self.aiomysql.SSCursor):
  205. async def _show_warnings(self, conn):
  206. pass
  207. return Cursor, SSCursor
  208. class MySQLDialect_aiomysql(MySQLDialect_pymysql):
  209. driver = "aiomysql"
  210. supports_statement_cache = True
  211. supports_server_side_cursors = True
  212. _sscursor = AsyncAdapt_aiomysql_ss_cursor
  213. is_async = True
  214. @classmethod
  215. def dbapi(cls):
  216. return AsyncAdapt_aiomysql_dbapi(
  217. __import__("aiomysql"), __import__("pymysql")
  218. )
  219. @classmethod
  220. def get_pool_class(cls, url):
  221. async_fallback = url.query.get("async_fallback", False)
  222. if util.asbool(async_fallback):
  223. return pool.FallbackAsyncAdaptedQueuePool
  224. else:
  225. return pool.AsyncAdaptedQueuePool
  226. def create_connect_args(self, url):
  227. return super(MySQLDialect_aiomysql, self).create_connect_args(
  228. url, _translate_args=dict(username="user", database="db")
  229. )
  230. def is_disconnect(self, e, connection, cursor):
  231. if super(MySQLDialect_aiomysql, self).is_disconnect(
  232. e, connection, cursor
  233. ):
  234. return True
  235. else:
  236. str_e = str(e).lower()
  237. return "not connected" in str_e
  238. def _found_rows_client_flag(self):
  239. from pymysql.constants import CLIENT
  240. return CLIENT.FOUND_ROWS
  241. def get_driver_connection(self, connection):
  242. return connection._connection
  243. dialect = MySQLDialect_aiomysql