utils.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. import hmac
  2. from functools import wraps
  3. from hashlib import sha512
  4. from urllib.parse import parse_qs
  5. from urllib.parse import urlencode
  6. from urllib.parse import urlsplit
  7. from urllib.parse import urlunsplit
  8. from flask import current_app
  9. from flask import g
  10. from flask import has_request_context
  11. from flask import request
  12. from flask import session
  13. from flask import url_for
  14. from werkzeug.local import LocalProxy
  15. from .config import COOKIE_NAME
  16. from .config import EXEMPT_METHODS
  17. from .signals import user_logged_in
  18. from .signals import user_logged_out
  19. from .signals import user_login_confirmed
  20. #: A proxy for the current user. If no user is logged in, this will be an
  21. #: anonymous user
  22. current_user = LocalProxy(lambda: _get_user())
  23. def encode_cookie(payload, key=None):
  24. """
  25. This will encode a ``str`` value into a cookie, and sign that cookie
  26. with the app's secret key.
  27. :param payload: The value to encode, as `str`.
  28. :type payload: str
  29. :param key: The key to use when creating the cookie digest. If not
  30. specified, the SECRET_KEY value from app config will be used.
  31. :type key: str
  32. """
  33. return f"{payload}|{_cookie_digest(payload, key=key)}"
  34. def decode_cookie(cookie, key=None):
  35. """
  36. This decodes a cookie given by `encode_cookie`. If verification of the
  37. cookie fails, ``None`` will be implicitly returned.
  38. :param cookie: An encoded cookie.
  39. :type cookie: str
  40. :param key: The key to use when creating the cookie digest. If not
  41. specified, the SECRET_KEY value from app config will be used.
  42. :type key: str
  43. """
  44. try:
  45. payload, digest = cookie.rsplit("|", 1)
  46. if hasattr(digest, "decode"):
  47. digest = digest.decode("ascii") # pragma: no cover
  48. except ValueError:
  49. return
  50. if hmac.compare_digest(_cookie_digest(payload, key=key), digest):
  51. return payload
  52. def make_next_param(login_url, current_url):
  53. """
  54. Reduces the scheme and host from a given URL so it can be passed to
  55. the given `login` URL more efficiently.
  56. :param login_url: The login URL being redirected to.
  57. :type login_url: str
  58. :param current_url: The URL to reduce.
  59. :type current_url: str
  60. """
  61. l_url = urlsplit(login_url)
  62. c_url = urlsplit(current_url)
  63. if (not l_url.scheme or l_url.scheme == c_url.scheme) and (
  64. not l_url.netloc or l_url.netloc == c_url.netloc
  65. ):
  66. return urlunsplit(("", "", c_url.path, c_url.query, ""))
  67. return current_url
  68. def expand_login_view(login_view):
  69. """
  70. Returns the url for the login view, expanding the view name to a url if
  71. needed.
  72. :param login_view: The name of the login view or a URL for the login view.
  73. :type login_view: str
  74. """
  75. if login_view.startswith(("https://", "http://", "/")):
  76. return login_view
  77. return url_for(login_view)
  78. def login_url(login_view, next_url=None, next_field="next"):
  79. """
  80. Creates a URL for redirecting to a login page. If only `login_view` is
  81. provided, this will just return the URL for it. If `next_url` is provided,
  82. however, this will append a ``next=URL`` parameter to the query string
  83. so that the login view can redirect back to that URL. Flask-Login's default
  84. unauthorized handler uses this function when redirecting to your login url.
  85. To force the host name used, set `FORCE_HOST_FOR_REDIRECTS` to a host. This
  86. prevents from redirecting to external sites if request headers Host or
  87. X-Forwarded-For are present.
  88. :param login_view: The name of the login view. (Alternately, the actual
  89. URL to the login view.)
  90. :type login_view: str
  91. :param next_url: The URL to give the login view for redirection.
  92. :type next_url: str
  93. :param next_field: What field to store the next URL in. (It defaults to
  94. ``next``.)
  95. :type next_field: str
  96. """
  97. base = expand_login_view(login_view)
  98. if next_url is None:
  99. return base
  100. parsed_result = urlsplit(base)
  101. md = parse_qs(parsed_result.query, keep_blank_values=True)
  102. md[next_field] = make_next_param(base, next_url)
  103. netloc = current_app.config.get("FORCE_HOST_FOR_REDIRECTS") or parsed_result.netloc
  104. parsed_result = parsed_result._replace(
  105. netloc=netloc, query=urlencode(md, doseq=True)
  106. )
  107. return urlunsplit(parsed_result)
  108. def login_fresh():
  109. """
  110. This returns ``True`` if the current login is fresh.
  111. """
  112. return session.get("_fresh", False)
  113. def login_remembered():
  114. """
  115. This returns ``True`` if the current login is remembered across sessions.
  116. """
  117. config = current_app.config
  118. cookie_name = config.get("REMEMBER_COOKIE_NAME", COOKIE_NAME)
  119. has_cookie = cookie_name in request.cookies and session.get("_remember") != "clear"
  120. if has_cookie:
  121. cookie = request.cookies[cookie_name]
  122. user_id = decode_cookie(cookie)
  123. return user_id is not None
  124. return False
  125. def login_user(user, remember=False, duration=None, force=False, fresh=True):
  126. """
  127. Logs a user in. You should pass the actual user object to this. If the
  128. user's `is_active` property is ``False``, they will not be logged in
  129. unless `force` is ``True``.
  130. This will return ``True`` if the log in attempt succeeds, and ``False`` if
  131. it fails (i.e. because the user is inactive).
  132. :param user: The user object to log in.
  133. :type user: object
  134. :param remember: Whether to remember the user after their session expires.
  135. Defaults to ``False``.
  136. :type remember: bool
  137. :param duration: The amount of time before the remember cookie expires. If
  138. ``None`` the value set in the settings is used. Defaults to ``None``.
  139. :type duration: :class:`datetime.timedelta`
  140. :param force: If the user is inactive, setting this to ``True`` will log
  141. them in regardless. Defaults to ``False``.
  142. :type force: bool
  143. :param fresh: setting this to ``False`` will log in the user with a session
  144. marked as not "fresh". Defaults to ``True``.
  145. :type fresh: bool
  146. """
  147. if not force and not user.is_active:
  148. return False
  149. user_id = getattr(user, current_app.login_manager.id_attribute)()
  150. session["_user_id"] = user_id
  151. session["_fresh"] = fresh
  152. session["_id"] = current_app.login_manager._session_identifier_generator()
  153. if remember:
  154. session["_remember"] = "set"
  155. if duration is not None:
  156. try:
  157. # equal to timedelta.total_seconds() but works with Python 2.6
  158. session["_remember_seconds"] = (
  159. duration.microseconds
  160. + (duration.seconds + duration.days * 24 * 3600) * 10**6
  161. ) / 10.0**6
  162. except AttributeError as e:
  163. raise Exception(
  164. f"duration must be a datetime.timedelta, instead got: {duration}"
  165. ) from e
  166. current_app.login_manager._update_request_context_with_user(user)
  167. user_logged_in.send(current_app._get_current_object(), user=_get_user())
  168. return True
  169. def logout_user():
  170. """
  171. Logs a user out. (You do not need to pass the actual user.) This will
  172. also clean up the remember me cookie if it exists.
  173. """
  174. user = _get_user()
  175. if "_user_id" in session:
  176. session.pop("_user_id")
  177. if "_fresh" in session:
  178. session.pop("_fresh")
  179. if "_id" in session:
  180. session.pop("_id")
  181. cookie_name = current_app.config.get("REMEMBER_COOKIE_NAME", COOKIE_NAME)
  182. if cookie_name in request.cookies:
  183. session["_remember"] = "clear"
  184. if "_remember_seconds" in session:
  185. session.pop("_remember_seconds")
  186. user_logged_out.send(current_app._get_current_object(), user=user)
  187. current_app.login_manager._update_request_context_with_user()
  188. return True
  189. def confirm_login():
  190. """
  191. This sets the current session as fresh. Sessions become stale when they
  192. are reloaded from a cookie.
  193. """
  194. session["_fresh"] = True
  195. session["_id"] = current_app.login_manager._session_identifier_generator()
  196. user_login_confirmed.send(current_app._get_current_object())
  197. def login_required(func):
  198. """
  199. If you decorate a view with this, it will ensure that the current user is
  200. logged in and authenticated before calling the actual view. (If they are
  201. not, it calls the :attr:`LoginManager.unauthorized` callback.) For
  202. example::
  203. @app.route('/post')
  204. @login_required
  205. def post():
  206. pass
  207. If there are only certain times you need to require that your user is
  208. logged in, you can do so with::
  209. if not current_user.is_authenticated:
  210. return current_app.login_manager.unauthorized()
  211. ...which is essentially the code that this function adds to your views.
  212. It can be convenient to globally turn off authentication when unit testing.
  213. To enable this, if the application configuration variable `LOGIN_DISABLED`
  214. is set to `True`, this decorator will be ignored.
  215. .. Note ::
  216. Per `W3 guidelines for CORS preflight requests
  217. <http://www.w3.org/TR/cors/#cross-origin-request-with-preflight-0>`_,
  218. HTTP ``OPTIONS`` requests are exempt from login checks.
  219. :param func: The view function to decorate.
  220. :type func: function
  221. """
  222. @wraps(func)
  223. def decorated_view(*args, **kwargs):
  224. if request.method in EXEMPT_METHODS or current_app.config.get("LOGIN_DISABLED"):
  225. pass
  226. elif not current_user.is_authenticated:
  227. return current_app.login_manager.unauthorized()
  228. # flask 1.x compatibility
  229. # current_app.ensure_sync is only available in Flask >= 2.0
  230. if callable(getattr(current_app, "ensure_sync", None)):
  231. return current_app.ensure_sync(func)(*args, **kwargs)
  232. return func(*args, **kwargs)
  233. return decorated_view
  234. def fresh_login_required(func):
  235. """
  236. If you decorate a view with this, it will ensure that the current user's
  237. login is fresh - i.e. their session was not restored from a 'remember me'
  238. cookie. Sensitive operations, like changing a password or e-mail, should
  239. be protected with this, to impede the efforts of cookie thieves.
  240. If the user is not authenticated, :meth:`LoginManager.unauthorized` is
  241. called as normal. If they are authenticated, but their session is not
  242. fresh, it will call :meth:`LoginManager.needs_refresh` instead. (In that
  243. case, you will need to provide a :attr:`LoginManager.refresh_view`.)
  244. Behaves identically to the :func:`login_required` decorator with respect
  245. to configuration variables.
  246. .. Note ::
  247. Per `W3 guidelines for CORS preflight requests
  248. <http://www.w3.org/TR/cors/#cross-origin-request-with-preflight-0>`_,
  249. HTTP ``OPTIONS`` requests are exempt from login checks.
  250. :param func: The view function to decorate.
  251. :type func: function
  252. """
  253. @wraps(func)
  254. def decorated_view(*args, **kwargs):
  255. if request.method in EXEMPT_METHODS or current_app.config.get("LOGIN_DISABLED"):
  256. pass
  257. elif not current_user.is_authenticated:
  258. return current_app.login_manager.unauthorized()
  259. elif not login_fresh():
  260. return current_app.login_manager.needs_refresh()
  261. try:
  262. # current_app.ensure_sync available in Flask >= 2.0
  263. return current_app.ensure_sync(func)(*args, **kwargs)
  264. except AttributeError: # pragma: no cover
  265. return func(*args, **kwargs)
  266. return decorated_view
  267. def set_login_view(login_view, blueprint=None):
  268. """
  269. Sets the login view for the app or blueprint. If a blueprint is passed,
  270. the login view is set for this blueprint on ``blueprint_login_views``.
  271. :param login_view: The user object to log in.
  272. :type login_view: str
  273. :param blueprint: The blueprint which this login view should be set on.
  274. Defaults to ``None``.
  275. :type blueprint: object
  276. """
  277. num_login_views = len(current_app.login_manager.blueprint_login_views)
  278. if blueprint is not None or num_login_views != 0:
  279. (current_app.login_manager.blueprint_login_views[blueprint.name]) = login_view
  280. if (
  281. current_app.login_manager.login_view is not None
  282. and None not in current_app.login_manager.blueprint_login_views
  283. ):
  284. (
  285. current_app.login_manager.blueprint_login_views[None]
  286. ) = current_app.login_manager.login_view
  287. current_app.login_manager.login_view = None
  288. else:
  289. current_app.login_manager.login_view = login_view
  290. def _get_user():
  291. if has_request_context():
  292. if "_login_user" not in g:
  293. current_app.login_manager._load_user()
  294. return g._login_user
  295. return None
  296. def _cookie_digest(payload, key=None):
  297. key = _secret_key(key)
  298. return hmac.new(key, payload.encode("utf-8"), sha512).hexdigest()
  299. def _get_remote_addr():
  300. address = request.headers.get("X-Forwarded-For", request.remote_addr)
  301. if address is not None:
  302. # An 'X-Forwarded-For' header includes a comma separated list of the
  303. # addresses, the first address being the actual remote address.
  304. address = address.encode("utf-8").split(b",")[0].strip()
  305. return address
  306. def _create_identifier():
  307. user_agent = request.headers.get("User-Agent")
  308. if user_agent is not None:
  309. user_agent = user_agent.encode("utf-8")
  310. base = f"{_get_remote_addr()}|{user_agent}"
  311. if str is bytes:
  312. base = str(base, "utf-8", errors="replace") # pragma: no cover
  313. h = sha512()
  314. h.update(base.encode("utf8"))
  315. return h.hexdigest()
  316. def _user_context_processor():
  317. return dict(current_user=_get_user())
  318. def _secret_key(key=None):
  319. if key is None:
  320. key = current_app.config["SECRET_KEY"]
  321. if isinstance(key, str): # pragma: no cover
  322. key = key.encode("latin1") # ensure bytes
  323. return key