[core] Added current_user() HTTP utility.

This commit is contained in:
Fabio Manganiello 2024-07-24 00:49:21 +02:00
parent 2033f9760a
commit 357d92b479
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
7 changed files with 71 additions and 60 deletions

View file

@ -85,7 +85,9 @@ def _session_auth():
return _dump_session(session, redirect_page)
if status:
return status.to_response() # type: ignore
auth_status = UserAuthStatus.by_status(status)
assert auth_status
return auth_status.to_response()
return UserAuthStatus.INVALID_CREDENTIALS.to_response()
@ -214,6 +216,4 @@ def auth_endpoint():
return UserAuthStatus.INVALID_METHOD.to_response()
# from flask import Blueprint, request, redirect, render_template, make_response, abort
# vim:sw=4:ts=4:et:

View file

@ -1,7 +1,9 @@
from .auth import (
UserAuthStatus,
authenticate,
authenticate_token,
authenticate_user_pass,
current_user,
get_auth_status,
)
from .bus import bus, send_message, send_request
@ -17,10 +19,12 @@ from .streaming import get_streaming_routes
from .ws import get_ws_routes
__all__ = [
'UserAuthStatus',
'authenticate',
'authenticate_token',
'authenticate_user_pass',
'bus',
'current_user',
'get_auth_status',
'get_http_port',
'get_ip_or_hostname',

View file

@ -1,12 +1,12 @@
import base64
from functools import wraps
from typing import Optional
from typing import Optional, Union
from flask import request, redirect
from flask.wrappers import Response
from platypush.config import Config
from platypush.user import UserManager
from platypush.user import User, UserManager
from ..logger import logger
from .status import UserAuthStatus
@ -41,8 +41,8 @@ def get_cookie(req, name: str) -> Optional[str]:
return cookie.value
def authenticate_token(req):
token = Config.get('token')
def authenticate_token(req) -> Optional[User]:
global_token = Config.get('user.global_token')
user_token = None
if 'X-Token' in req.headers:
@ -55,14 +55,18 @@ def authenticate_token(req):
user_token = get_arg(req, 'token')
if not user_token:
return False
return None
try:
user_manager.validate_jwt_token(user_token)
return True
return user_manager.validate_jwt_token(user_token)
except Exception as e:
logger().debug(str(e))
return bool(token and user_token == token)
# Legacy global token authentication.
# The global token should be specified in the configuration file,
# as a root parameter named `token`.
if bool(global_token and user_token == global_token):
return User(username='__token__', user_id=1)
logger().info('Invalid token: %s', e)
def authenticate_user_pass(req):
@ -91,7 +95,7 @@ def authenticate_user_pass(req):
return user_manager.authenticate_user(username, password)
def authenticate_session(req):
def authenticate_session(req) -> Optional[User]:
user = None
# Check the X-Session-Token header
@ -108,7 +112,7 @@ def authenticate_session(req):
if user_session_token:
user, _ = user_manager.authenticate_user_session(user_session_token)[:2]
return user is not None
return user
def authenticate(
@ -154,37 +158,40 @@ def authenticate(
# pylint: disable=too-many-return-statements
def get_auth_status(req, skip_auth_methods=None) -> UserAuthStatus:
def _get_current_user_or_auth_status(
req, skip_auth_methods=None
) -> Union[User, UserAuthStatus]:
"""
Check against the available authentication methods (except those listed in
``skip_auth_methods``) if the user is properly authenticated.
Returns the current user if authenticated, and the authentication status if
``with_status`` is True.
"""
n_users = user_manager.get_user_count()
skip_methods = skip_auth_methods or []
# User/pass HTTP authentication
http_auth_ok = True
if n_users > 0 and 'http' not in skip_methods:
http_auth_ok = authenticate_user_pass(req)
if http_auth_ok:
return UserAuthStatus.OK
response = authenticate_user_pass(req)
if response:
user = response[0] if isinstance(response, tuple) else response
if user:
return user
# Token-based authentication
token_auth_ok = True
if 'token' not in skip_methods:
token_auth_ok = authenticate_token(req)
if token_auth_ok:
return UserAuthStatus.OK
user = authenticate_token(req)
if user:
return user
# Session token based authentication
session_auth_ok = True
if n_users > 0 and 'session' not in skip_methods:
return (
UserAuthStatus.OK
if authenticate_session(req)
else UserAuthStatus.INVALID_CREDENTIALS
)
user = authenticate_session(req)
if user:
return user
return UserAuthStatus.INVALID_CREDENTIALS
# At least a user should be created before accessing an authenticated resource
if n_users == 0 and 'session' not in skip_methods:
@ -198,3 +205,20 @@ def get_auth_status(req, skip_auth_methods=None) -> UserAuthStatus:
return UserAuthStatus.OK
return UserAuthStatus.INVALID_CREDENTIALS
def get_auth_status(req, skip_auth_methods=None) -> UserAuthStatus:
"""
Check against the available authentication methods (except those listed in
``skip_auth_methods``) if the user is properly authenticated.
"""
ret = _get_current_user_or_auth_status(req, skip_auth_methods=skip_auth_methods)
return UserAuthStatus.OK if isinstance(ret, User) else ret
def current_user() -> Optional[User]:
"""
Returns the current user if authenticated.
"""
ret = _get_current_user_or_auth_status(request)
return ret if isinstance(ret, User) else None

View file

@ -7,6 +7,7 @@ import re
from platypush.config import Config
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.utils import current_user
class HttpUtils:
@ -130,5 +131,9 @@ class HttpUtils:
path = path[0] if len(path) == 1 else os.path.join(*path)
return os.path.isfile(path)
@staticmethod
def current_user():
return current_user()
# vim:sw=4:ts=4:et:

View file

@ -41,6 +41,7 @@ module.exports = {
'^/logo.svg': httpProxy,
'^/logout': httpProxy,
'^/media/': httpProxy,
'^/otp': httpProxy,
'^/sound/': httpProxy,
'^/ws/events': wsProxy,
'^/ws/requests': wsProxy,

View file

@ -5,7 +5,7 @@ import json
import os
import random
import time
from typing import List, Optional, Dict, Tuple, Union
from typing import List, Optional, Tuple, Union
import rsa
@ -408,22 +408,16 @@ class UserManager:
pub_key,
)
def validate_jwt_token(self, token: str) -> Dict[str, str]:
def validate_jwt_token(self, token: str) -> User:
"""
Validate a JWT token.
:param token: Token to validate.
:return: On success, it returns the JWT payload with the following structure:
.. code-block:: json
{
"username": "user ID/name",
"created_at": "token creation timestamp",
"expires_at": "token expiration timestamp"
}
:raises: :class:`platypush.exceptions.user.InvalidJWTTokenException` in case of invalid token.
:return: On success, it returns the user associated to the token.
:raises: :class:`platypush.exceptions.user.InvalidJWTTokenException` in
case of invalid token.
:raises: :class:`platypush.exceptions.user.InvalidCredentialsException`
in case of invalid credentials stored in the token.
"""
_, priv_key = self._get_jwt_rsa_key_pair()
@ -443,7 +437,7 @@ class UserManager:
if not user:
raise InvalidCredentialsException()
return payload
return user
def _authenticate_user(
self,

View file

@ -1,7 +1,4 @@
from dataclasses import dataclass, field
import datetime
import enum
from typing import List, Optional
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
@ -82,18 +79,4 @@ class UserBackupCode(Base):
expires_at = Column(DateTime)
@dataclass
class UserResponse:
"""
Dataclass containing full information about a user (minus the password).
"""
user_id: int
username: str
otp_secret: Optional[str] = None
session_token: Optional[str] = None
created_at: Optional[datetime.datetime] = None
backup_codes: List[str] = field(default_factory=list)
# vim:sw=4:ts=4:et: