379 lines
13 KiB
Python
379 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
'''
|
|
flask_login.utils
|
|
-----------------
|
|
General utilities.
|
|
'''
|
|
|
|
|
|
import hmac
|
|
from hashlib import sha512
|
|
from functools import wraps
|
|
from werkzeug.local import LocalProxy
|
|
from werkzeug.security import safe_str_cmp
|
|
from werkzeug.urls import url_decode, url_encode
|
|
|
|
from flask import (_request_ctx_stack, current_app, request, session, url_for,
|
|
has_request_context)
|
|
|
|
from ._compat import text_type, urlparse, urlunparse
|
|
from .config import COOKIE_NAME, EXEMPT_METHODS
|
|
from .signals import user_logged_in, user_logged_out, user_login_confirmed
|
|
|
|
|
|
#: A proxy for the current user. If no user is logged in, this will be an
|
|
#: anonymous user
|
|
current_user = LocalProxy(lambda: _get_user())
|
|
|
|
|
|
def encode_cookie(payload):
|
|
'''
|
|
This will encode a ``unicode`` value into a cookie, and sign that cookie
|
|
with the app's secret key.
|
|
|
|
:param payload: The value to encode, as `unicode`.
|
|
:type payload: unicode
|
|
'''
|
|
return u'{0}|{1}'.format(payload, _cookie_digest(payload))
|
|
|
|
|
|
def decode_cookie(cookie):
|
|
'''
|
|
This decodes a cookie given by `encode_cookie`. If verification of the
|
|
cookie fails, ``None`` will be implicitly returned.
|
|
|
|
:param cookie: An encoded cookie.
|
|
:type cookie: str
|
|
'''
|
|
try:
|
|
payload, digest = cookie.rsplit(u'|', 1)
|
|
if hasattr(digest, 'decode'):
|
|
digest = digest.decode('ascii') # pragma: no cover
|
|
except ValueError:
|
|
return
|
|
|
|
if safe_str_cmp(_cookie_digest(payload), digest):
|
|
return payload
|
|
|
|
|
|
def make_next_param(login_url, current_url):
|
|
'''
|
|
Reduces the scheme and host from a given URL so it can be passed to
|
|
the given `login` URL more efficiently.
|
|
|
|
:param login_url: The login URL being redirected to.
|
|
:type login_url: str
|
|
:param current_url: The URL to reduce.
|
|
:type current_url: str
|
|
'''
|
|
l = urlparse(login_url)
|
|
c = urlparse(current_url)
|
|
|
|
if (not l.scheme or l.scheme == c.scheme) and \
|
|
(not l.netloc or l.netloc == c.netloc):
|
|
return urlunparse(('', '', c.path, c.params, c.query, ''))
|
|
return current_url
|
|
|
|
|
|
def expand_login_view(login_view):
|
|
'''
|
|
Returns the url for the login view, expanding the view name to a url if
|
|
needed.
|
|
|
|
:param login_view: The name of the login view or a URL for the login view.
|
|
:type login_view: str
|
|
'''
|
|
if login_view.startswith(('https://', 'http://', '/')):
|
|
return login_view
|
|
else:
|
|
return url_for(login_view)
|
|
|
|
|
|
def login_url(login_view, next_url=None, next_field='next'):
|
|
'''
|
|
Creates a URL for redirecting to a login page. If only `login_view` is
|
|
provided, this will just return the URL for it. If `next_url` is provided,
|
|
however, this will append a ``next=URL`` parameter to the query string
|
|
so that the login view can redirect back to that URL. Flask-Login's default
|
|
unauthorized handler uses this function when redirecting to your login url.
|
|
To force the host name used, set `FORCE_HOST_FOR_REDIRECTS` to a host. This
|
|
prevents from redirecting to external sites if request headers Host or
|
|
X-Forwarded-For are present.
|
|
|
|
:param login_view: The name of the login view. (Alternately, the actual
|
|
URL to the login view.)
|
|
:type login_view: str
|
|
:param next_url: The URL to give the login view for redirection.
|
|
:type next_url: str
|
|
:param next_field: What field to store the next URL in. (It defaults to
|
|
``next``.)
|
|
:type next_field: str
|
|
'''
|
|
base = expand_login_view(login_view)
|
|
|
|
if next_url is None:
|
|
return base
|
|
|
|
parsed_result = urlparse(base)
|
|
md = url_decode(parsed_result.query)
|
|
md[next_field] = make_next_param(base, next_url)
|
|
netloc = current_app.config.get('FORCE_HOST_FOR_REDIRECTS') or \
|
|
parsed_result.netloc
|
|
parsed_result = parsed_result._replace(netloc=netloc,
|
|
query=url_encode(md, sort=True))
|
|
return urlunparse(parsed_result)
|
|
|
|
|
|
def login_fresh():
|
|
'''
|
|
This returns ``True`` if the current login is fresh.
|
|
'''
|
|
return session.get('_fresh', False)
|
|
|
|
|
|
def login_user(user, remember=False, duration=None, force=False, fresh=True):
|
|
'''
|
|
Logs a user in. You should pass the actual user object to this. If the
|
|
user's `is_active` property is ``False``, they will not be logged in
|
|
unless `force` is ``True``.
|
|
|
|
This will return ``True`` if the log in attempt succeeds, and ``False`` if
|
|
it fails (i.e. because the user is inactive).
|
|
|
|
:param user: The user object to log in.
|
|
:type user: object
|
|
:param remember: Whether to remember the user after their session expires.
|
|
Defaults to ``False``.
|
|
:type remember: bool
|
|
:param duration: The amount of time before the remember cookie expires. If
|
|
``None`` the value set in the settings is used. Defaults to ``None``.
|
|
:type duration: :class:`datetime.timedelta`
|
|
:param force: If the user is inactive, setting this to ``True`` will log
|
|
them in regardless. Defaults to ``False``.
|
|
:type force: bool
|
|
:param fresh: setting this to ``False`` will log in the user with a session
|
|
marked as not "fresh". Defaults to ``True``.
|
|
:type fresh: bool
|
|
'''
|
|
if not force and not user.is_active:
|
|
return False
|
|
|
|
user_id = getattr(user, current_app.login_manager.id_attribute)()
|
|
session['user_id'] = user_id
|
|
session['_fresh'] = fresh
|
|
session['_id'] = current_app.login_manager._session_identifier_generator()
|
|
|
|
if remember:
|
|
session['remember'] = 'set'
|
|
if duration is not None:
|
|
try:
|
|
# equal to timedelta.total_seconds() but works with Python 2.6
|
|
session['remember_seconds'] = (duration.microseconds +
|
|
(duration.seconds +
|
|
duration.days * 24 * 3600) *
|
|
10**6) / 10.0**6
|
|
except AttributeError:
|
|
raise Exception('duration must be a datetime.timedelta, '
|
|
'instead got: {0}'.format(duration))
|
|
|
|
_request_ctx_stack.top.user = user
|
|
user_logged_in.send(current_app._get_current_object(), user=_get_user())
|
|
return True
|
|
|
|
|
|
def logout_user():
|
|
'''
|
|
Logs a user out. (You do not need to pass the actual user.) This will
|
|
also clean up the remember me cookie if it exists.
|
|
'''
|
|
|
|
user = _get_user()
|
|
|
|
if 'user_id' in session:
|
|
session.pop('user_id')
|
|
|
|
if '_fresh' in session:
|
|
session.pop('_fresh')
|
|
|
|
cookie_name = current_app.config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME)
|
|
if cookie_name in request.cookies:
|
|
session['remember'] = 'clear'
|
|
if 'remember_seconds' in session:
|
|
session.pop('remember_seconds')
|
|
|
|
user_logged_out.send(current_app._get_current_object(), user=user)
|
|
|
|
current_app.login_manager.reload_user()
|
|
return True
|
|
|
|
|
|
def confirm_login():
|
|
'''
|
|
This sets the current session as fresh. Sessions become stale when they
|
|
are reloaded from a cookie.
|
|
'''
|
|
session['_fresh'] = True
|
|
session['_id'] = current_app.login_manager._session_identifier_generator()
|
|
user_login_confirmed.send(current_app._get_current_object())
|
|
|
|
|
|
def login_required(func):
|
|
'''
|
|
If you decorate a view with this, it will ensure that the current user is
|
|
logged in and authenticated before calling the actual view. (If they are
|
|
not, it calls the :attr:`LoginManager.unauthorized` callback.) For
|
|
example::
|
|
|
|
@app.route('/post')
|
|
@login_required
|
|
def post():
|
|
pass
|
|
|
|
If there are only certain times you need to require that your user is
|
|
logged in, you can do so with::
|
|
|
|
if not current_user.is_authenticated:
|
|
return current_app.login_manager.unauthorized()
|
|
|
|
...which is essentially the code that this function adds to your views.
|
|
|
|
It can be convenient to globally turn off authentication when unit testing.
|
|
To enable this, if the application configuration variable `LOGIN_DISABLED`
|
|
is set to `True`, this decorator will be ignored.
|
|
|
|
.. Note ::
|
|
|
|
Per `W3 guidelines for CORS preflight requests
|
|
<http://www.w3.org/TR/cors/#cross-origin-request-with-preflight-0>`_,
|
|
HTTP ``OPTIONS`` requests are exempt from login checks.
|
|
|
|
:param func: The view function to decorate.
|
|
:type func: function
|
|
'''
|
|
@wraps(func)
|
|
def decorated_view(*args, **kwargs):
|
|
if request.method in EXEMPT_METHODS:
|
|
return func(*args, **kwargs)
|
|
elif current_app.login_manager._login_disabled:
|
|
return func(*args, **kwargs)
|
|
elif not current_user.is_authenticated:
|
|
return current_app.login_manager.unauthorized()
|
|
return func(*args, **kwargs)
|
|
return decorated_view
|
|
|
|
|
|
def fresh_login_required(func):
|
|
'''
|
|
If you decorate a view with this, it will ensure that the current user's
|
|
login is fresh - i.e. their session was not restored from a 'remember me'
|
|
cookie. Sensitive operations, like changing a password or e-mail, should
|
|
be protected with this, to impede the efforts of cookie thieves.
|
|
|
|
If the user is not authenticated, :meth:`LoginManager.unauthorized` is
|
|
called as normal. If they are authenticated, but their session is not
|
|
fresh, it will call :meth:`LoginManager.needs_refresh` instead. (In that
|
|
case, you will need to provide a :attr:`LoginManager.refresh_view`.)
|
|
|
|
Behaves identically to the :func:`login_required` decorator with respect
|
|
to configutation variables.
|
|
|
|
.. Note ::
|
|
|
|
Per `W3 guidelines for CORS preflight requests
|
|
<http://www.w3.org/TR/cors/#cross-origin-request-with-preflight-0>`_,
|
|
HTTP ``OPTIONS`` requests are exempt from login checks.
|
|
|
|
:param func: The view function to decorate.
|
|
:type func: function
|
|
'''
|
|
@wraps(func)
|
|
def decorated_view(*args, **kwargs):
|
|
if request.method in EXEMPT_METHODS:
|
|
return func(*args, **kwargs)
|
|
elif current_app.login_manager._login_disabled:
|
|
return func(*args, **kwargs)
|
|
elif not current_user.is_authenticated:
|
|
return current_app.login_manager.unauthorized()
|
|
elif not login_fresh():
|
|
return current_app.login_manager.needs_refresh()
|
|
return func(*args, **kwargs)
|
|
return decorated_view
|
|
|
|
|
|
def set_login_view(login_view, blueprint=None):
|
|
'''
|
|
Sets the login view for the app or blueprint. If a blueprint is passed,
|
|
the login view is set for this blueprint on ``blueprint_login_views``.
|
|
|
|
:param login_view: The user object to log in.
|
|
:type login_view: str
|
|
:param blueprint: The blueprint which this login view should be set on.
|
|
Defaults to ``None``.
|
|
:type blueprint: object
|
|
'''
|
|
|
|
num_login_views = len(current_app.login_manager.blueprint_login_views)
|
|
if blueprint is not None or num_login_views != 0:
|
|
|
|
(current_app.login_manager
|
|
.blueprint_login_views[blueprint.name]) = login_view
|
|
|
|
if (current_app.login_manager.login_view is not None and
|
|
None not in current_app.login_manager.blueprint_login_views):
|
|
|
|
(current_app.login_manager
|
|
.blueprint_login_views[None]) = (current_app.login_manager
|
|
.login_view)
|
|
|
|
current_app.login_manager.login_view = None
|
|
else:
|
|
current_app.login_manager.login_view = login_view
|
|
|
|
|
|
def _get_user():
|
|
if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'):
|
|
current_app.login_manager._load_user()
|
|
|
|
return getattr(_request_ctx_stack.top, 'user', None)
|
|
|
|
|
|
def _cookie_digest(payload, key=None):
|
|
key = _secret_key(key)
|
|
|
|
return hmac.new(key, payload.encode('utf-8'), sha512).hexdigest()
|
|
|
|
|
|
def _get_remote_addr():
|
|
address = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
if address is not None:
|
|
# An 'X-Forwarded-For' header includes a comma separated list of the
|
|
# addresses, the first address being the actual remote address.
|
|
address = address.encode('utf-8').split(b',')[0].strip()
|
|
return address
|
|
|
|
|
|
def _create_identifier():
|
|
user_agent = request.headers.get('User-Agent')
|
|
if user_agent is not None:
|
|
user_agent = user_agent.encode('utf-8')
|
|
base = '{0}|{1}'.format(_get_remote_addr(), user_agent)
|
|
if str is bytes:
|
|
base = text_type(base, 'utf-8', errors='replace') # pragma: no cover
|
|
h = sha512()
|
|
h.update(base.encode('utf8'))
|
|
return h.hexdigest()
|
|
|
|
|
|
def _user_context_processor():
|
|
return dict(current_user=_get_user())
|
|
|
|
|
|
def _secret_key(key=None):
|
|
if key is None:
|
|
key = current_app.config['SECRET_KEY']
|
|
|
|
if isinstance(key, text_type): # pragma: no cover
|
|
key = key.encode('latin1') # ensure bytes
|
|
|
|
return key
|