core.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. # Copyright Amethyst Reese
  2. # Licensed under the MIT license
  3. """
  4. Core implementation of aiosqlite proxies
  5. """
  6. import asyncio
  7. import logging
  8. import sqlite3
  9. from collections.abc import AsyncIterator, Generator, Iterable
  10. from functools import partial
  11. from pathlib import Path
  12. from queue import Empty, Queue, SimpleQueue
  13. from threading import Thread
  14. from typing import Any, Callable, Literal, Optional, Union
  15. from warnings import warn
  16. from .context import contextmanager
  17. from .cursor import Cursor
  18. __all__ = ["connect", "Connection", "Cursor"]
  19. LOG = logging.getLogger("aiosqlite")
  20. IsolationLevel = Optional[Literal["DEFERRED", "IMMEDIATE", "EXCLUSIVE"]]
  21. def set_result(fut: asyncio.Future, result: Any) -> None:
  22. """Set the result of a future if it hasn't been set already."""
  23. if not fut.done():
  24. fut.set_result(result)
  25. def set_exception(fut: asyncio.Future, e: BaseException) -> None:
  26. """Set the exception of a future if it hasn't been set already."""
  27. if not fut.done():
  28. fut.set_exception(e)
  29. _STOP_RUNNING_SENTINEL = object()
  30. class Connection(Thread):
  31. def __init__(
  32. self,
  33. connector: Callable[[], sqlite3.Connection],
  34. iter_chunk_size: int,
  35. loop: Optional[asyncio.AbstractEventLoop] = None,
  36. ) -> None:
  37. super().__init__()
  38. self._running = True
  39. self._connection: Optional[sqlite3.Connection] = None
  40. self._connector = connector
  41. self._tx: SimpleQueue[tuple[asyncio.Future, Callable[[], Any]]] = SimpleQueue()
  42. self._iter_chunk_size = iter_chunk_size
  43. if loop is not None:
  44. warn(
  45. "aiosqlite.Connection no longer uses the `loop` parameter",
  46. DeprecationWarning,
  47. )
  48. def _stop_running(self):
  49. self._running = False
  50. # PEP 661 is not accepted yet, so we cannot type a sentinel
  51. self._tx.put_nowait(_STOP_RUNNING_SENTINEL) # type: ignore[arg-type]
  52. @property
  53. def _conn(self) -> sqlite3.Connection:
  54. if self._connection is None:
  55. raise ValueError("no active connection")
  56. return self._connection
  57. def _execute_insert(self, sql: str, parameters: Any) -> Optional[sqlite3.Row]:
  58. cursor = self._conn.execute(sql, parameters)
  59. cursor.execute("SELECT last_insert_rowid()")
  60. return cursor.fetchone()
  61. def _execute_fetchall(self, sql: str, parameters: Any) -> Iterable[sqlite3.Row]:
  62. cursor = self._conn.execute(sql, parameters)
  63. return cursor.fetchall()
  64. def run(self) -> None:
  65. """
  66. Execute function calls on a separate thread.
  67. :meta private:
  68. """
  69. while True:
  70. # Continues running until all queue items are processed,
  71. # even after connection is closed (so we can finalize all
  72. # futures)
  73. tx_item = self._tx.get()
  74. if tx_item is _STOP_RUNNING_SENTINEL:
  75. break
  76. future, function = tx_item
  77. try:
  78. LOG.debug("executing %s", function)
  79. result = function()
  80. LOG.debug("operation %s completed", function)
  81. future.get_loop().call_soon_threadsafe(set_result, future, result)
  82. except BaseException as e: # noqa B036
  83. LOG.debug("returning exception %s", e)
  84. future.get_loop().call_soon_threadsafe(set_exception, future, e)
  85. async def _execute(self, fn, *args, **kwargs):
  86. """Queue a function with the given arguments for execution."""
  87. if not self._running or not self._connection:
  88. raise ValueError("Connection closed")
  89. function = partial(fn, *args, **kwargs)
  90. future = asyncio.get_event_loop().create_future()
  91. self._tx.put_nowait((future, function))
  92. return await future
  93. async def _connect(self) -> "Connection":
  94. """Connect to the actual sqlite database."""
  95. if self._connection is None:
  96. try:
  97. future = asyncio.get_event_loop().create_future()
  98. self._tx.put_nowait((future, self._connector))
  99. self._connection = await future
  100. except BaseException:
  101. self._stop_running()
  102. self._connection = None
  103. raise
  104. return self
  105. def __await__(self) -> Generator[Any, None, "Connection"]:
  106. self.start()
  107. return self._connect().__await__()
  108. async def __aenter__(self) -> "Connection":
  109. return await self
  110. async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
  111. await self.close()
  112. @contextmanager
  113. async def cursor(self) -> Cursor:
  114. """Create an aiosqlite cursor wrapping a sqlite3 cursor object."""
  115. return Cursor(self, await self._execute(self._conn.cursor))
  116. async def commit(self) -> None:
  117. """Commit the current transaction."""
  118. await self._execute(self._conn.commit)
  119. async def rollback(self) -> None:
  120. """Roll back the current transaction."""
  121. await self._execute(self._conn.rollback)
  122. async def close(self) -> None:
  123. """Complete queued queries/cursors and close the connection."""
  124. if self._connection is None:
  125. return
  126. try:
  127. await self._execute(self._conn.close)
  128. except Exception:
  129. LOG.info("exception occurred while closing connection")
  130. raise
  131. finally:
  132. self._stop_running()
  133. self._connection = None
  134. @contextmanager
  135. async def execute(
  136. self, sql: str, parameters: Optional[Iterable[Any]] = None
  137. ) -> Cursor:
  138. """Helper to create a cursor and execute the given query."""
  139. if parameters is None:
  140. parameters = []
  141. cursor = await self._execute(self._conn.execute, sql, parameters)
  142. return Cursor(self, cursor)
  143. @contextmanager
  144. async def execute_insert(
  145. self, sql: str, parameters: Optional[Iterable[Any]] = None
  146. ) -> Optional[sqlite3.Row]:
  147. """Helper to insert and get the last_insert_rowid."""
  148. if parameters is None:
  149. parameters = []
  150. return await self._execute(self._execute_insert, sql, parameters)
  151. @contextmanager
  152. async def execute_fetchall(
  153. self, sql: str, parameters: Optional[Iterable[Any]] = None
  154. ) -> Iterable[sqlite3.Row]:
  155. """Helper to execute a query and return all the data."""
  156. if parameters is None:
  157. parameters = []
  158. return await self._execute(self._execute_fetchall, sql, parameters)
  159. @contextmanager
  160. async def executemany(
  161. self, sql: str, parameters: Iterable[Iterable[Any]]
  162. ) -> Cursor:
  163. """Helper to create a cursor and execute the given multiquery."""
  164. cursor = await self._execute(self._conn.executemany, sql, parameters)
  165. return Cursor(self, cursor)
  166. @contextmanager
  167. async def executescript(self, sql_script: str) -> Cursor:
  168. """Helper to create a cursor and execute a user script."""
  169. cursor = await self._execute(self._conn.executescript, sql_script)
  170. return Cursor(self, cursor)
  171. async def interrupt(self) -> None:
  172. """Interrupt pending queries."""
  173. return self._conn.interrupt()
  174. async def create_function(
  175. self, name: str, num_params: int, func: Callable, deterministic: bool = False
  176. ) -> None:
  177. """
  178. Create user-defined function that can be later used
  179. within SQL statements. Must be run within the same thread
  180. that query executions take place so instead of executing directly
  181. against the connection, we defer this to `run` function.
  182. If ``deterministic`` is true, the created function is marked as deterministic,
  183. which allows SQLite to perform additional optimizations. This flag is supported
  184. by SQLite 3.8.3 or higher, ``NotSupportedError`` will be raised if used with
  185. older versions.
  186. """
  187. await self._execute(
  188. self._conn.create_function,
  189. name,
  190. num_params,
  191. func,
  192. deterministic=deterministic,
  193. )
  194. @property
  195. def in_transaction(self) -> bool:
  196. return self._conn.in_transaction
  197. @property
  198. def isolation_level(self) -> Optional[str]:
  199. return self._conn.isolation_level
  200. @isolation_level.setter
  201. def isolation_level(self, value: IsolationLevel) -> None:
  202. self._conn.isolation_level = value
  203. @property
  204. def row_factory(self) -> Optional[type]:
  205. return self._conn.row_factory
  206. @row_factory.setter
  207. def row_factory(self, factory: Optional[type]) -> None:
  208. self._conn.row_factory = factory
  209. @property
  210. def text_factory(self) -> Callable[[bytes], Any]:
  211. return self._conn.text_factory
  212. @text_factory.setter
  213. def text_factory(self, factory: Callable[[bytes], Any]) -> None:
  214. self._conn.text_factory = factory
  215. @property
  216. def total_changes(self) -> int:
  217. return self._conn.total_changes
  218. async def enable_load_extension(self, value: bool) -> None:
  219. await self._execute(self._conn.enable_load_extension, value) # type: ignore
  220. async def load_extension(self, path: str):
  221. await self._execute(self._conn.load_extension, path) # type: ignore
  222. async def set_progress_handler(
  223. self, handler: Callable[[], Optional[int]], n: int
  224. ) -> None:
  225. await self._execute(self._conn.set_progress_handler, handler, n)
  226. async def set_trace_callback(self, handler: Callable) -> None:
  227. await self._execute(self._conn.set_trace_callback, handler)
  228. async def iterdump(self) -> AsyncIterator[str]:
  229. """
  230. Return an async iterator to dump the database in SQL text format.
  231. Example::
  232. async for line in db.iterdump():
  233. ...
  234. """
  235. dump_queue: Queue = Queue()
  236. def dumper():
  237. try:
  238. for line in self._conn.iterdump():
  239. dump_queue.put_nowait(line)
  240. dump_queue.put_nowait(None)
  241. except Exception:
  242. LOG.exception("exception while dumping db")
  243. dump_queue.put_nowait(None)
  244. raise
  245. fut = self._execute(dumper)
  246. task = asyncio.ensure_future(fut)
  247. while True:
  248. try:
  249. line: Optional[str] = dump_queue.get_nowait()
  250. if line is None:
  251. break
  252. yield line
  253. except Empty:
  254. if task.done():
  255. LOG.warning("iterdump completed unexpectedly")
  256. break
  257. await asyncio.sleep(0.01)
  258. await task
  259. async def backup(
  260. self,
  261. target: Union["Connection", sqlite3.Connection],
  262. *,
  263. pages: int = 0,
  264. progress: Optional[Callable[[int, int, int], None]] = None,
  265. name: str = "main",
  266. sleep: float = 0.250,
  267. ) -> None:
  268. """
  269. Make a backup of the current database to the target database.
  270. Takes either a standard sqlite3 or aiosqlite Connection object as the target.
  271. """
  272. if isinstance(target, Connection):
  273. target = target._conn
  274. await self._execute(
  275. self._conn.backup,
  276. target,
  277. pages=pages,
  278. progress=progress,
  279. name=name,
  280. sleep=sleep,
  281. )
  282. def connect(
  283. database: Union[str, Path],
  284. *,
  285. iter_chunk_size=64,
  286. loop: Optional[asyncio.AbstractEventLoop] = None,
  287. **kwargs: Any,
  288. ) -> Connection:
  289. """Create and return a connection proxy to the sqlite database."""
  290. if loop is not None:
  291. warn(
  292. "aiosqlite.connect() no longer uses the `loop` parameter",
  293. DeprecationWarning,
  294. )
  295. def connector() -> sqlite3.Connection:
  296. if isinstance(database, str):
  297. loc = database
  298. elif isinstance(database, bytes):
  299. loc = database.decode("utf-8")
  300. else:
  301. loc = str(database)
  302. return sqlite3.connect(loc, **kwargs)
  303. return Connection(connector, iter_chunk_size)