LINT/format fixes.

This commit is contained in:
Fabio Manganiello 2023-04-25 10:36:27 +02:00
parent 4cc88fcf5f
commit 440d70d9cf
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
1 changed files with 8 additions and 12 deletions

View File

@ -3,11 +3,11 @@ import datetime
import hashlib import hashlib
import json import json
import random import random
import rsa
import time import time
from typing import Optional, Dict from typing import Optional, Dict
import bcrypt import bcrypt
import rsa
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import make_transient from sqlalchemy.orm import make_transient
@ -73,7 +73,7 @@ class UserManager:
username=username, username=username,
password=self._encrypt_password(password), password=self._encrypt_password(password),
created_at=datetime.datetime.utcnow(), created_at=datetime.datetime.utcnow(),
**kwargs **kwargs,
) )
session.add(record) session.add(record)
@ -238,9 +238,7 @@ class UserManager:
indent=None, indent=None,
) )
return base64.b64encode( return base64.b64encode(rsa.encrypt(payload.encode('ascii'), pub_key)).decode()
rsa.encrypt(payload.encode('ascii'), pub_key)
).decode()
def validate_jwt_token(self, token: str) -> Dict[str, str]: def validate_jwt_token(self, token: str) -> Dict[str, str]:
""" """
@ -263,21 +261,19 @@ class UserManager:
try: try:
payload = json.loads( payload = json.loads(
rsa.decrypt( rsa.decrypt(base64.b64decode(token.encode('ascii')), priv_key).decode(
base64.b64decode(token.encode('ascii')), 'ascii'
priv_key )
).decode('ascii')
) )
except (TypeError, ValueError) as e: except (TypeError, ValueError) as e:
raise InvalidJWTTokenException(f'Could not decode JWT token: {e}') raise InvalidJWTTokenException(f'Could not decode JWT token: {e}') from e
expires_at = payload.get('expires_at') expires_at = payload.get('expires_at')
if expires_at and time.time() > expires_at: if expires_at and time.time() > expires_at:
raise InvalidJWTTokenException('Expired JWT token') raise InvalidJWTTokenException('Expired JWT token')
user = self.authenticate_user( user = self.authenticate_user(
payload.get('username', ''), payload.get('username', ''), payload.get('password', '')
payload.get('password', '')
) )
if not user: if not user: