Add user credentials on the encrypted JWT token.

Adding the credentials ensures that tokens associated to non-existing
users, or users with an invalid password, won't be accepted, even if
they were correctly encrypted using the host's keypair.

This adds an additional layer of security in case the host's keypair
gets compromised.
This commit is contained in:
Fabio Manganiello 2022-11-21 13:16:09 +01:00
parent 98d7c95aa7
commit d95baac74e

View file

@ -198,6 +198,7 @@ class UserManager:
payload = json.dumps( payload = json.dumps(
{ {
'username': username, 'username': username,
'password': password,
'created_at': datetime.datetime.now().timestamp(), 'created_at': datetime.datetime.now().timestamp(),
'expires_at': expires_at.timestamp() if expires_at else None, 'expires_at': expires_at.timestamp() if expires_at else None,
}, },
@ -209,8 +210,7 @@ class UserManager:
rsa.encrypt(payload.encode('ascii'), pub_key) rsa.encrypt(payload.encode('ascii'), pub_key)
).decode() ).decode()
@staticmethod def validate_jwt_token(self, token: str) -> Dict[str, str]:
def validate_jwt_token(token: str) -> Dict[str, str]:
""" """
Validate a JWT token. Validate a JWT token.
@ -243,6 +243,14 @@ class UserManager:
if expires_at and time.time() > expires_at: if expires_at and time.time() > expires_at:
raise InvalidJWTTokenException('Expired JWT token') raise InvalidJWTTokenException('Expired JWT token')
user = self.authenticate_user(
payload.get('username', ''),
payload.get('password', '')
)
if not user:
raise InvalidCredentialsException()
return payload return payload
def _get_db_session(self): def _get_db_session(self):