[core] Encrypt users 2FA backup codes with bcrypt.

Instead of RSA - decrypting is unnecessary.
This commit is contained in:
Fabio Manganiello 2024-07-25 02:12:39 +02:00
parent 67d8d0a515
commit a11f17aa8f
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
2 changed files with 39 additions and 30 deletions

View file

@ -90,11 +90,9 @@ def _get_otp():
username = _get_username() username = _get_username()
user_manager = UserManager() user_manager = UserManager()
otp_secret = user_manager.get_otp_secret(username) otp_secret = user_manager.get_otp_secret(username)
backup_codes = user_manager.get_user_backup_codes(username) if otp_secret else []
return _dump_response( return _dump_response(
username=username, username=username,
otp_secret=otp_secret, otp_secret=otp_secret,
backup_codes=[str(c.code) for c in backup_codes],
) )
@ -204,4 +202,19 @@ def otp_route():
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@otp.route('/otp/refresh-codes', methods=['POST'])
def refresh_codes():
"""
:return: A new set of backup codes for the user.
"""
username = _get_username()
user_manager = UserManager()
otp_secret = user_manager.get_otp_secret(username)
if not otp_secret:
return jsonify({'error': 'OTP not configured for the user'}), 400
backup_codes = user_manager.refresh_user_backup_codes(username)
return jsonify({'backup_codes': backup_codes})
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -337,7 +337,10 @@ class UserManager:
@classmethod @classmethod
def _encrypt_password( def _encrypt_password(
cls, pwd: str, salt: Optional[bytes] = None, iterations: Optional[int] = None cls,
pwd: str,
salt: Optional[Union[str, bytes]] = None,
iterations: Optional[int] = None,
) -> str: ) -> str:
# Legacy password check that uses bcrypt if no salt and iterations are provided # Legacy password check that uses bcrypt if no salt and iterations are provided
# See https://git.platypush.tech/platypush/platypush/issues/397 # See https://git.platypush.tech/platypush/platypush/issues/397
@ -346,6 +349,9 @@ class UserManager:
return bcrypt.hashpw(pwd.encode(), bcrypt.gensalt(12)).decode() return bcrypt.hashpw(pwd.encode(), bcrypt.gensalt(12)).decode()
if isinstance(salt, str):
salt = bytes.fromhex(salt)
return hashlib.pbkdf2_hmac('sha256', pwd.encode(), salt, iterations).hex() return hashlib.pbkdf2_hmac('sha256', pwd.encode(), salt, iterations).hex()
@classmethod @classmethod
@ -510,9 +516,11 @@ class UserManager:
else (None, AuthenticationStatus.MISSING_OTP_CODE) else (None, AuthenticationStatus.MISSING_OTP_CODE)
) )
# The user has 2FA enabled and a TOTP code is provided
if self.validate_otp_code(username, code): if self.validate_otp_code(username, code):
return user if not with_status else (user, AuthenticationStatus.OK) return user if not with_status else (user, AuthenticationStatus.OK)
# The user has 2FA enabled and a backup code is provided
if not self.validate_backup_code(username, code): if not self.validate_backup_code(username, code):
return ( return (
None None
@ -539,13 +547,18 @@ class UserManager:
return [] return []
session.query(UserBackupCode).filter_by(user_id=user.user_id).delete() session.query(UserBackupCode).filter_by(user_id=user.user_id).delete()
pub_key, _ = self._get_or_generate_otp_rsa_key_pair()
stored_codes = [] stored_codes = []
for backup_code in backup_codes: for backup_code in backup_codes:
encrypted_code = self._encrypt_password(
backup_code,
salt=user.password_salt,
iterations=user.hmac_iterations,
)
user_backup_code = UserBackupCode( user_backup_code = UserBackupCode(
user_id=user.user_id, user_id=user.user_id,
code=self._encrypt(backup_code, pub_key), code=encrypted_code,
created_at=utcnow(), created_at=utcnow(),
expires_at=utcnow() + datetime.timedelta(days=30), expires_at=utcnow() + datetime.timedelta(days=30),
) )
@ -556,35 +569,21 @@ class UserManager:
session.commit() session.commit()
return stored_codes return stored_codes
def get_user_backup_codes(self, username: str) -> List['UserBackupCode']:
with self._get_session() as session:
user = self._get_user(session, username)
if not user:
return []
_, priv_key = self._get_or_generate_otp_rsa_key_pair()
return [
UserBackupCode(
user_id=code.user_id,
code=self._decrypt(code.code, priv_key),
created_at=code.created_at,
expires_at=code.expires_at,
)
for code in session.query(UserBackupCode)
.filter_by(user_id=user.user_id)
.all()
]
def validate_backup_code(self, username: str, code: str) -> bool: def validate_backup_code(self, username: str, code: str) -> bool:
with self._get_session() as session: with self._get_session() as session:
user = self._get_user(session, username) user = self._get_user(session, username)
if not user: if not user:
return False return False
pub_key, _ = self._get_or_generate_otp_rsa_key_pair() encrypted_code = self._encrypt_password(
code,
salt=user.password_salt,
iterations=user.hmac_iterations,
)
user_backup_code = ( user_backup_code = (
session.query(UserBackupCode) session.query(UserBackupCode)
.filter_by(user_id=user.user_id, code=self._encrypt(code, pub_key)) .filter_by(user_id=user.user_id, code=encrypted_code)
.first() .first()
) )
@ -609,10 +608,7 @@ class UserManager:
if not otp_secret: if not otp_secret:
return False return False
_, priv_key = self._get_or_generate_otp_rsa_key_pair() return otp.verify_time_otp(otp=code, secret=otp_secret).output
otp_secret = self._decrypt(otp_secret, priv_key)
return otp.verify_time_otp(otp=code, secret=otp_secret)
def disable_otp(self, username: str): def disable_otp(self, username: str):
with self._get_session(locked=True) as session: with self._get_session(locked=True) as session: