sqlite.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import sqlite3
  20. from urllib.parse import unquote
  21. from airflow.providers.common.sql.hooks.sql import DbApiHook
  22. class SqliteHook(DbApiHook):
  23. """Interact with SQLite."""
  24. conn_name_attr = "sqlite_conn_id"
  25. default_conn_name = "sqlite_default"
  26. conn_type = "sqlite"
  27. hook_name = "Sqlite"
  28. def __init__(self, *args, **kwargs):
  29. super().__init__(*args, **kwargs)
  30. self._placeholder: str = "?"
  31. def get_conn(self) -> sqlite3.dbapi2.Connection:
  32. """Return SQLite connection object."""
  33. sqlalchemy_uri = self.get_uri()
  34. # The sqlite3 connection does not use the sqlite scheme.
  35. # See https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#uri-connections for details.
  36. sqlite_uri = sqlalchemy_uri.replace("sqlite:///", "file:")
  37. conn = sqlite3.connect(sqlite_uri, uri=True)
  38. return conn
  39. def get_uri(self) -> str:
  40. """Override DbApiHook get_uri method for get_sqlalchemy_engine()."""
  41. conn_id = self.get_conn_id()
  42. airflow_conn = self.get_connection(conn_id)
  43. if airflow_conn.conn_type is None:
  44. airflow_conn.conn_type = self.conn_type
  45. airflow_uri = unquote(airflow_conn.get_uri())
  46. # For sqlite, there is no schema in the connection URI. So we need to drop the trailing slash.
  47. airflow_sqlite_uri = airflow_uri.replace("/?", "?")
  48. # The sqlite connection has one more slash for path specification.
  49. # See https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#connect-strings for details.
  50. sqlalchemy_uri = airflow_sqlite_uri.replace("sqlite://", "sqlite:///")
  51. return sqlalchemy_uri