url.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819
  1. # engine/url.py
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. """Provides the :class:`~sqlalchemy.engine.url.URL` class which encapsulates
  8. information about a database connection specification.
  9. The URL object is created automatically when
  10. :func:`~sqlalchemy.engine.create_engine` is called with a string
  11. argument; alternatively, the URL is a public-facing construct which can
  12. be used directly and is also accepted directly by ``create_engine()``.
  13. """
  14. import re
  15. from .interfaces import Dialect
  16. from .. import exc
  17. from .. import util
  18. from ..dialects import plugins
  19. from ..dialects import registry
  20. from ..util import collections_abc
  21. from ..util import compat
  22. class URL(
  23. util.namedtuple(
  24. "URL",
  25. [
  26. "drivername",
  27. "username",
  28. "password",
  29. "host",
  30. "port",
  31. "database",
  32. "query",
  33. ],
  34. )
  35. ):
  36. """
  37. Represent the components of a URL used to connect to a database.
  38. URLs are typically constructed from a fully formatted URL string, where the
  39. :func:`.make_url` function is used internally by the
  40. :func:`_sa.create_engine` function in order to parse the URL string into
  41. its individual components, which are then used to construct a new
  42. :class:`.URL` object. When parsing from a formatted URL string, the parsing
  43. format generally follows
  44. `RFC-1738 <https://www.ietf.org/rfc/rfc1738.txt>`_, with some exceptions.
  45. A :class:`_engine.URL` object may also be produced directly, either by
  46. using the :func:`.make_url` function with a fully formed URL string, or
  47. by using the :meth:`_engine.URL.create` constructor in order
  48. to construct a :class:`_engine.URL` programmatically given individual
  49. fields. The resulting :class:`.URL` object may be passed directly to
  50. :func:`_sa.create_engine` in place of a string argument, which will bypass
  51. the usage of :func:`.make_url` within the engine's creation process.
  52. .. versionchanged:: 1.4
  53. The :class:`_engine.URL` object is now an immutable object. To
  54. create a URL, use the :func:`_engine.make_url` or
  55. :meth:`_engine.URL.create` function / method. To modify
  56. a :class:`_engine.URL`, use methods like
  57. :meth:`_engine.URL.set` and
  58. :meth:`_engine.URL.update_query_dict` to return a new
  59. :class:`_engine.URL` object with modifications. See notes for this
  60. change at :ref:`change_5526`.
  61. :class:`_engine.URL` contains the following attributes:
  62. * :attr:`_engine.URL.drivername`: database backend and driver name, such as
  63. ``postgresql+psycopg2``
  64. * :attr:`_engine.URL.username`: username string
  65. * :attr:`_engine.URL.password`: password string
  66. * :attr:`_engine.URL.host`: string hostname
  67. * :attr:`_engine.URL.port`: integer port number
  68. * :attr:`_engine.URL.database`: string database name
  69. * :attr:`_engine.URL.query`: an immutable mapping representing the query
  70. string. contains strings for keys and either strings or tuples of
  71. strings for values.
  72. """
  73. def __new__(self, *arg, **kw):
  74. if kw.pop("_new_ok", False):
  75. return super(URL, self).__new__(self, *arg, **kw)
  76. else:
  77. util.warn_deprecated(
  78. "Calling URL() directly is deprecated and will be disabled "
  79. "in a future release. The public constructor for URL is "
  80. "now the URL.create() method.",
  81. "1.4",
  82. )
  83. return URL.create(*arg, **kw)
  84. @classmethod
  85. def create(
  86. cls,
  87. drivername,
  88. username=None,
  89. password=None,
  90. host=None,
  91. port=None,
  92. database=None,
  93. query=util.EMPTY_DICT,
  94. ):
  95. """Create a new :class:`_engine.URL` object.
  96. :param drivername: the name of the database backend. This name will
  97. correspond to a module in sqlalchemy/databases or a third party
  98. plug-in.
  99. :param username: The user name.
  100. :param password: database password. Is typically a string, but may
  101. also be an object that can be stringified with ``str()``.
  102. .. note:: A password-producing object will be stringified only
  103. **once** per :class:`_engine.Engine` object. For dynamic password
  104. generation per connect, see :ref:`engines_dynamic_tokens`.
  105. :param host: The name of the host.
  106. :param port: The port number.
  107. :param database: The database name.
  108. :param query: A dictionary of string keys to string values to be passed
  109. to the dialect and/or the DBAPI upon connect. To specify non-string
  110. parameters to a Python DBAPI directly, use the
  111. :paramref:`_sa.create_engine.connect_args` parameter to
  112. :func:`_sa.create_engine`. See also
  113. :attr:`_engine.URL.normalized_query` for a dictionary that is
  114. consistently string->list of string.
  115. :return: new :class:`_engine.URL` object.
  116. .. versionadded:: 1.4
  117. The :class:`_engine.URL` object is now an **immutable named
  118. tuple**. In addition, the ``query`` dictionary is also immutable.
  119. To create a URL, use the :func:`_engine.url.make_url` or
  120. :meth:`_engine.URL.create` function/ method. To modify a
  121. :class:`_engine.URL`, use the :meth:`_engine.URL.set` and
  122. :meth:`_engine.URL.update_query` methods.
  123. """
  124. return cls(
  125. cls._assert_str(drivername, "drivername"),
  126. cls._assert_none_str(username, "username"),
  127. password,
  128. cls._assert_none_str(host, "host"),
  129. cls._assert_port(port),
  130. cls._assert_none_str(database, "database"),
  131. cls._str_dict(query),
  132. _new_ok=True,
  133. )
  134. @classmethod
  135. def _assert_port(cls, port):
  136. if port is None:
  137. return None
  138. try:
  139. return int(port)
  140. except TypeError:
  141. raise TypeError("Port argument must be an integer or None")
  142. @classmethod
  143. def _assert_str(cls, v, paramname):
  144. if not isinstance(v, compat.string_types):
  145. raise TypeError("%s must be a string" % paramname)
  146. return v
  147. @classmethod
  148. def _assert_none_str(cls, v, paramname):
  149. if v is None:
  150. return v
  151. return cls._assert_str(v, paramname)
  152. @classmethod
  153. def _str_dict(cls, dict_):
  154. if dict_ is None:
  155. return util.EMPTY_DICT
  156. def _assert_value(val):
  157. if isinstance(val, compat.string_types):
  158. return val
  159. elif isinstance(val, collections_abc.Sequence):
  160. return tuple(_assert_value(elem) for elem in val)
  161. else:
  162. raise TypeError(
  163. "Query dictionary values must be strings or "
  164. "sequences of strings"
  165. )
  166. def _assert_str(v):
  167. if not isinstance(v, compat.string_types):
  168. raise TypeError("Query dictionary keys must be strings")
  169. return v
  170. if isinstance(dict_, collections_abc.Sequence):
  171. dict_items = dict_
  172. else:
  173. dict_items = dict_.items()
  174. return util.immutabledict(
  175. {
  176. _assert_str(key): _assert_value(
  177. value,
  178. )
  179. for key, value in dict_items
  180. }
  181. )
  182. def set(
  183. self,
  184. drivername=None,
  185. username=None,
  186. password=None,
  187. host=None,
  188. port=None,
  189. database=None,
  190. query=None,
  191. ):
  192. """return a new :class:`_engine.URL` object with modifications.
  193. Values are used if they are non-None. To set a value to ``None``
  194. explicitly, use the :meth:`_engine.URL._replace` method adapted
  195. from ``namedtuple``.
  196. :param drivername: new drivername
  197. :param username: new username
  198. :param password: new password
  199. :param host: new hostname
  200. :param port: new port
  201. :param query: new query parameters, passed a dict of string keys
  202. referring to string or sequence of string values. Fully
  203. replaces the previous list of arguments.
  204. :return: new :class:`_engine.URL` object.
  205. .. versionadded:: 1.4
  206. .. seealso::
  207. :meth:`_engine.URL.update_query_dict`
  208. """
  209. kw = {}
  210. if drivername is not None:
  211. kw["drivername"] = drivername
  212. if username is not None:
  213. kw["username"] = username
  214. if password is not None:
  215. kw["password"] = password
  216. if host is not None:
  217. kw["host"] = host
  218. if port is not None:
  219. kw["port"] = port
  220. if database is not None:
  221. kw["database"] = database
  222. if query is not None:
  223. kw["query"] = query
  224. return self._replace(**kw)
  225. def _replace(self, **kw):
  226. """Override ``namedtuple._replace()`` to provide argument checking."""
  227. if "drivername" in kw:
  228. self._assert_str(kw["drivername"], "drivername")
  229. for name in "username", "host", "database":
  230. if name in kw:
  231. self._assert_none_str(kw[name], name)
  232. if "port" in kw:
  233. self._assert_port(kw["port"])
  234. if "query" in kw:
  235. kw["query"] = self._str_dict(kw["query"])
  236. return super(URL, self)._replace(**kw)
  237. def update_query_string(self, query_string, append=False):
  238. """Return a new :class:`_engine.URL` object with the :attr:`_engine.URL.query`
  239. parameter dictionary updated by the given query string.
  240. E.g.::
  241. >>> from sqlalchemy.engine import make_url
  242. >>> url = make_url("postgresql://user:pass@host/dbname")
  243. >>> url = url.update_query_string("alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt")
  244. >>> str(url)
  245. 'postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt'
  246. :param query_string: a URL escaped query string, not including the
  247. question mark.
  248. :param append: if True, parameters in the existing query string will
  249. not be removed; new parameters will be in addition to those present.
  250. If left at its default of False, keys present in the given query
  251. parameters will replace those of the existing query string.
  252. .. versionadded:: 1.4
  253. .. seealso::
  254. :attr:`_engine.URL.query`
  255. :meth:`_engine.URL.update_query_dict`
  256. """ # noqa: E501
  257. return self.update_query_pairs(
  258. util.parse_qsl(query_string), append=append
  259. )
  260. def update_query_pairs(self, key_value_pairs, append=False):
  261. """Return a new :class:`_engine.URL` object with the
  262. :attr:`_engine.URL.query`
  263. parameter dictionary updated by the given sequence of key/value pairs
  264. E.g.::
  265. >>> from sqlalchemy.engine import make_url
  266. >>> url = make_url("postgresql://user:pass@host/dbname")
  267. >>> url = url.update_query_pairs([("alt_host", "host1"), ("alt_host", "host2"), ("ssl_cipher", "/path/to/crt")])
  268. >>> str(url)
  269. 'postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt'
  270. :param key_value_pairs: A sequence of tuples containing two strings
  271. each.
  272. :param append: if True, parameters in the existing query string will
  273. not be removed; new parameters will be in addition to those present.
  274. If left at its default of False, keys present in the given query
  275. parameters will replace those of the existing query string.
  276. .. versionadded:: 1.4
  277. .. seealso::
  278. :attr:`_engine.URL.query`
  279. :meth:`_engine.URL.difference_update_query`
  280. :meth:`_engine.URL.set`
  281. """ # noqa: E501
  282. existing_query = self.query
  283. new_keys = {}
  284. for key, value in key_value_pairs:
  285. if key in new_keys:
  286. new_keys[key] = util.to_list(new_keys[key])
  287. new_keys[key].append(value)
  288. else:
  289. new_keys[key] = value
  290. if append:
  291. new_query = {}
  292. for k in new_keys:
  293. if k in existing_query:
  294. new_query[k] = util.to_list(
  295. existing_query[k]
  296. ) + util.to_list(new_keys[k])
  297. else:
  298. new_query[k] = new_keys[k]
  299. new_query.update(
  300. {
  301. k: existing_query[k]
  302. for k in set(existing_query).difference(new_keys)
  303. }
  304. )
  305. else:
  306. new_query = self.query.union(new_keys)
  307. return self.set(query=new_query)
  308. def update_query_dict(self, query_parameters, append=False):
  309. """Return a new :class:`_engine.URL` object with the
  310. :attr:`_engine.URL.query` parameter dictionary updated by the given
  311. dictionary.
  312. The dictionary typically contains string keys and string values.
  313. In order to represent a query parameter that is expressed multiple
  314. times, pass a sequence of string values.
  315. E.g.::
  316. >>> from sqlalchemy.engine import make_url
  317. >>> url = make_url("postgresql://user:pass@host/dbname")
  318. >>> url = url.update_query_dict({"alt_host": ["host1", "host2"], "ssl_cipher": "/path/to/crt"})
  319. >>> str(url)
  320. 'postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt'
  321. :param query_parameters: A dictionary with string keys and values
  322. that are either strings, or sequences of strings.
  323. :param append: if True, parameters in the existing query string will
  324. not be removed; new parameters will be in addition to those present.
  325. If left at its default of False, keys present in the given query
  326. parameters will replace those of the existing query string.
  327. .. versionadded:: 1.4
  328. .. seealso::
  329. :attr:`_engine.URL.query`
  330. :meth:`_engine.URL.update_query_string`
  331. :meth:`_engine.URL.update_query_pairs`
  332. :meth:`_engine.URL.difference_update_query`
  333. :meth:`_engine.URL.set`
  334. """ # noqa: E501
  335. return self.update_query_pairs(query_parameters.items(), append=append)
  336. def difference_update_query(self, names):
  337. """
  338. Remove the given names from the :attr:`_engine.URL.query` dictionary,
  339. returning the new :class:`_engine.URL`.
  340. E.g.::
  341. url = url.difference_update_query(['foo', 'bar'])
  342. Equivalent to using :meth:`_engine.URL.set` as follows::
  343. url = url.set(
  344. query={
  345. key: url.query[key]
  346. for key in set(url.query).difference(['foo', 'bar'])
  347. }
  348. )
  349. .. versionadded:: 1.4
  350. .. seealso::
  351. :attr:`_engine.URL.query`
  352. :meth:`_engine.URL.update_query_dict`
  353. :meth:`_engine.URL.set`
  354. """
  355. if not set(names).intersection(self.query):
  356. return self
  357. return URL(
  358. self.drivername,
  359. self.username,
  360. self.password,
  361. self.host,
  362. self.port,
  363. self.database,
  364. util.immutabledict(
  365. {
  366. key: self.query[key]
  367. for key in set(self.query).difference(names)
  368. }
  369. ),
  370. _new_ok=True,
  371. )
  372. @util.memoized_property
  373. def normalized_query(self):
  374. """Return the :attr:`_engine.URL.query` dictionary with values normalized
  375. into sequences.
  376. As the :attr:`_engine.URL.query` dictionary may contain either
  377. string values or sequences of string values to differentiate between
  378. parameters that are specified multiple times in the query string,
  379. code that needs to handle multiple parameters generically will wish
  380. to use this attribute so that all parameters present are presented
  381. as sequences. Inspiration is from Python's ``urllib.parse.parse_qs``
  382. function. E.g.::
  383. >>> from sqlalchemy.engine import make_url
  384. >>> url = make_url("postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt")
  385. >>> url.query
  386. immutabledict({'alt_host': ('host1', 'host2'), 'ssl_cipher': '/path/to/crt'})
  387. >>> url.normalized_query
  388. immutabledict({'alt_host': ('host1', 'host2'), 'ssl_cipher': ('/path/to/crt',)})
  389. """ # noqa: E501
  390. return util.immutabledict(
  391. {
  392. k: (v,) if not isinstance(v, tuple) else v
  393. for k, v in self.query.items()
  394. }
  395. )
  396. @util.deprecated(
  397. "1.4",
  398. "The :meth:`_engine.URL.__to_string__ method is deprecated and will "
  399. "be removed in a future release. Please use the "
  400. ":meth:`_engine.URL.render_as_string` method.",
  401. )
  402. def __to_string__(self, hide_password=True):
  403. """Render this :class:`_engine.URL` object as a string.
  404. :param hide_password: Defaults to True. The password is not shown
  405. in the string unless this is set to False.
  406. """
  407. return self.render_as_string(hide_password=hide_password)
  408. def render_as_string(self, hide_password=True):
  409. """Render this :class:`_engine.URL` object as a string.
  410. This method is used when the ``__str__()`` or ``__repr__()``
  411. methods are used. The method directly includes additional options.
  412. :param hide_password: Defaults to True. The password is not shown
  413. in the string unless this is set to False.
  414. """
  415. s = self.drivername + "://"
  416. if self.username is not None:
  417. s += _sqla_url_quote(self.username)
  418. if self.password is not None:
  419. s += ":" + (
  420. "***"
  421. if hide_password
  422. else _sqla_url_quote(str(self.password))
  423. )
  424. s += "@"
  425. if self.host is not None:
  426. if ":" in self.host:
  427. s += "[%s]" % self.host
  428. else:
  429. s += self.host
  430. if self.port is not None:
  431. s += ":" + str(self.port)
  432. if self.database is not None:
  433. s += "/" + self.database
  434. if self.query:
  435. keys = list(self.query)
  436. keys.sort()
  437. s += "?" + "&".join(
  438. "%s=%s" % (util.quote_plus(k), util.quote_plus(element))
  439. for k in keys
  440. for element in util.to_list(self.query[k])
  441. )
  442. return s
  443. def __str__(self):
  444. return self.render_as_string(hide_password=False)
  445. def __repr__(self):
  446. return self.render_as_string()
  447. def __copy__(self):
  448. return self.__class__.create(
  449. self.drivername,
  450. self.username,
  451. self.password,
  452. self.host,
  453. self.port,
  454. self.database,
  455. # note this is an immutabledict of str-> str / tuple of str,
  456. # also fully immutable. does not require deepcopy
  457. self.query,
  458. )
  459. def __deepcopy__(self, memo):
  460. return self.__copy__()
  461. def __hash__(self):
  462. return hash(str(self))
  463. def __eq__(self, other):
  464. return (
  465. isinstance(other, URL)
  466. and self.drivername == other.drivername
  467. and self.username == other.username
  468. and self.password == other.password
  469. and self.host == other.host
  470. and self.database == other.database
  471. and self.query == other.query
  472. and self.port == other.port
  473. )
  474. def __ne__(self, other):
  475. return not self == other
  476. def get_backend_name(self):
  477. """Return the backend name.
  478. This is the name that corresponds to the database backend in
  479. use, and is the portion of the :attr:`_engine.URL.drivername`
  480. that is to the left of the plus sign.
  481. """
  482. if "+" not in self.drivername:
  483. return self.drivername
  484. else:
  485. return self.drivername.split("+")[0]
  486. def get_driver_name(self):
  487. """Return the backend name.
  488. This is the name that corresponds to the DBAPI driver in
  489. use, and is the portion of the :attr:`_engine.URL.drivername`
  490. that is to the right of the plus sign.
  491. If the :attr:`_engine.URL.drivername` does not include a plus sign,
  492. then the default :class:`_engine.Dialect` for this :class:`_engine.URL`
  493. is imported in order to get the driver name.
  494. """
  495. if "+" not in self.drivername:
  496. return self.get_dialect().driver
  497. else:
  498. return self.drivername.split("+")[1]
  499. def _instantiate_plugins(self, kwargs):
  500. plugin_names = util.to_list(self.query.get("plugin", ()))
  501. plugin_names += kwargs.get("plugins", [])
  502. kwargs = dict(kwargs)
  503. loaded_plugins = [
  504. plugins.load(plugin_name)(self, kwargs)
  505. for plugin_name in plugin_names
  506. ]
  507. u = self.difference_update_query(["plugin", "plugins"])
  508. for plugin in loaded_plugins:
  509. new_u = plugin.update_url(u)
  510. if new_u is not None:
  511. u = new_u
  512. kwargs.pop("plugins", None)
  513. return u, loaded_plugins, kwargs
  514. def _get_entrypoint(self):
  515. """Return the "entry point" dialect class.
  516. This is normally the dialect itself except in the case when the
  517. returned class implements the get_dialect_cls() method.
  518. """
  519. if "+" not in self.drivername:
  520. name = self.drivername
  521. else:
  522. name = self.drivername.replace("+", ".")
  523. cls = registry.load(name)
  524. # check for legacy dialects that
  525. # would return a module with 'dialect' as the
  526. # actual class
  527. if (
  528. hasattr(cls, "dialect")
  529. and isinstance(cls.dialect, type)
  530. and issubclass(cls.dialect, Dialect)
  531. ):
  532. return cls.dialect
  533. else:
  534. return cls
  535. def get_dialect(self):
  536. """Return the SQLAlchemy :class:`_engine.Dialect` class corresponding
  537. to this URL's driver name.
  538. """
  539. entrypoint = self._get_entrypoint()
  540. dialect_cls = entrypoint.get_dialect_cls(self)
  541. return dialect_cls
  542. def translate_connect_args(self, names=None, **kw):
  543. r"""Translate url attributes into a dictionary of connection arguments.
  544. Returns attributes of this url (`host`, `database`, `username`,
  545. `password`, `port`) as a plain dictionary. The attribute names are
  546. used as the keys by default. Unset or false attributes are omitted
  547. from the final dictionary.
  548. :param \**kw: Optional, alternate key names for url attributes.
  549. :param names: Deprecated. Same purpose as the keyword-based alternate
  550. names, but correlates the name to the original positionally.
  551. """
  552. if names is not None:
  553. util.warn_deprecated(
  554. "The `URL.translate_connect_args.name`s parameter is "
  555. "deprecated. Please pass the "
  556. "alternate names as kw arguments.",
  557. "1.4",
  558. )
  559. translated = {}
  560. attribute_names = ["host", "database", "username", "password", "port"]
  561. for sname in attribute_names:
  562. if names:
  563. name = names.pop(0)
  564. elif sname in kw:
  565. name = kw[sname]
  566. else:
  567. name = sname
  568. if name is not None and getattr(self, sname, False):
  569. if sname == "password":
  570. translated[name] = str(getattr(self, sname))
  571. else:
  572. translated[name] = getattr(self, sname)
  573. return translated
  574. def make_url(name_or_url):
  575. """Given a string or unicode instance, produce a new URL instance.
  576. The format of the URL generally follows `RFC-1738
  577. <https://www.ietf.org/rfc/rfc1738.txt>`_, with some exceptions, including
  578. that underscores, and not dashes or periods, are accepted within the
  579. "scheme" portion.
  580. If a :class:`.URL` object is passed, it is returned as is.
  581. """
  582. if isinstance(name_or_url, util.string_types):
  583. return _parse_url(name_or_url)
  584. else:
  585. return name_or_url
  586. def _parse_url(name):
  587. pattern = re.compile(
  588. r"""
  589. (?P<name>[\w\+]+)://
  590. (?:
  591. (?P<username>[^:/]*)
  592. (?::(?P<password>[^@]*))?
  593. @)?
  594. (?:
  595. (?:
  596. \[(?P<ipv6host>[^/\?]+)\] |
  597. (?P<ipv4host>[^/:\?]+)
  598. )?
  599. (?::(?P<port>[^/\?]*))?
  600. )?
  601. (?:/(?P<database>[^\?]*))?
  602. (?:\?(?P<query>.*))?
  603. """,
  604. re.X,
  605. )
  606. m = pattern.match(name)
  607. if m is not None:
  608. components = m.groupdict()
  609. if components["query"] is not None:
  610. query = {}
  611. for key, value in util.parse_qsl(components["query"]):
  612. if util.py2k:
  613. key = key.encode("ascii")
  614. if key in query:
  615. query[key] = util.to_list(query[key])
  616. query[key].append(value)
  617. else:
  618. query[key] = value
  619. else:
  620. query = None
  621. components["query"] = query
  622. if components["username"] is not None:
  623. components["username"] = _sqla_url_unquote(components["username"])
  624. if components["password"] is not None:
  625. components["password"] = _sqla_url_unquote(components["password"])
  626. ipv4host = components.pop("ipv4host")
  627. ipv6host = components.pop("ipv6host")
  628. components["host"] = ipv4host or ipv6host
  629. name = components.pop("name")
  630. if components["port"]:
  631. components["port"] = int(components["port"])
  632. return URL.create(name, **components)
  633. else:
  634. raise exc.ArgumentError(
  635. "Could not parse SQLAlchemy URL from string '%s'" % name
  636. )
  637. def _sqla_url_quote(text):
  638. return re.sub(r"[:@/]", lambda m: "%%%X" % ord(m.group(0)), text)
  639. def _sqla_url_unquote(text):
  640. return util.unquote(text)
  641. def _parse_keyvalue_args(name):
  642. m = re.match(r"(\w+)://(.*)", name)
  643. if m is not None:
  644. (name, args) = m.group(1, 2)
  645. opts = dict(util.parse_qsl(args))
  646. return URL(name, *opts)
  647. else:
  648. return None