From a11f17aa8f469ea3b44d3cadc04372ee35fd94e2 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 25 Jul 2024 02:12:39 +0200 Subject: [PATCH] [core] Encrypt users 2FA backup codes with bcrypt. Instead of RSA - decrypting is unnecessary. --- platypush/backend/http/app/routes/otp.py | 17 +++++++- platypush/user/__init__.py | 52 +++++++++++------------- 2 files changed, 39 insertions(+), 30 deletions(-) diff --git a/platypush/backend/http/app/routes/otp.py b/platypush/backend/http/app/routes/otp.py index d81e479fbc..98e37c443b 100644 --- a/platypush/backend/http/app/routes/otp.py +++ b/platypush/backend/http/app/routes/otp.py @@ -90,11 +90,9 @@ def _get_otp(): username = _get_username() user_manager = UserManager() otp_secret = user_manager.get_otp_secret(username) - backup_codes = user_manager.get_user_backup_codes(username) if otp_secret else [] return _dump_response( username=username, otp_secret=otp_secret, - backup_codes=[str(c.code) for c in backup_codes], ) @@ -204,4 +202,19 @@ def otp_route(): return jsonify({'error': str(e)}), 500 +@otp.route('/otp/refresh-codes', methods=['POST']) +def refresh_codes(): + """ + :return: A new set of backup codes for the user. + """ + username = _get_username() + user_manager = UserManager() + otp_secret = user_manager.get_otp_secret(username) + if not otp_secret: + return jsonify({'error': 'OTP not configured for the user'}), 400 + + backup_codes = user_manager.refresh_user_backup_codes(username) + return jsonify({'backup_codes': backup_codes}) + + # vim:sw=4:ts=4:et: diff --git a/platypush/user/__init__.py b/platypush/user/__init__.py index fcb5435fb6..27fdd06f9d 100644 --- a/platypush/user/__init__.py +++ b/platypush/user/__init__.py @@ -337,7 +337,10 @@ class UserManager: @classmethod def _encrypt_password( - cls, pwd: str, salt: Optional[bytes] = None, iterations: Optional[int] = None + cls, + pwd: str, + salt: Optional[Union[str, bytes]] = None, + iterations: Optional[int] = None, ) -> str: # Legacy password check that uses bcrypt if no salt and iterations are provided # See https://git.platypush.tech/platypush/platypush/issues/397 @@ -346,6 +349,9 @@ class UserManager: return bcrypt.hashpw(pwd.encode(), bcrypt.gensalt(12)).decode() + if isinstance(salt, str): + salt = bytes.fromhex(salt) + return hashlib.pbkdf2_hmac('sha256', pwd.encode(), salt, iterations).hex() @classmethod @@ -510,9 +516,11 @@ class UserManager: else (None, AuthenticationStatus.MISSING_OTP_CODE) ) + # The user has 2FA enabled and a TOTP code is provided if self.validate_otp_code(username, code): return user if not with_status else (user, AuthenticationStatus.OK) + # The user has 2FA enabled and a backup code is provided if not self.validate_backup_code(username, code): return ( None @@ -539,13 +547,18 @@ class UserManager: return [] session.query(UserBackupCode).filter_by(user_id=user.user_id).delete() - pub_key, _ = self._get_or_generate_otp_rsa_key_pair() stored_codes = [] for backup_code in backup_codes: + encrypted_code = self._encrypt_password( + backup_code, + salt=user.password_salt, + iterations=user.hmac_iterations, + ) + user_backup_code = UserBackupCode( user_id=user.user_id, - code=self._encrypt(backup_code, pub_key), + code=encrypted_code, created_at=utcnow(), expires_at=utcnow() + datetime.timedelta(days=30), ) @@ -556,35 +569,21 @@ class UserManager: session.commit() return stored_codes - def get_user_backup_codes(self, username: str) -> List['UserBackupCode']: - with self._get_session() as session: - user = self._get_user(session, username) - if not user: - return [] - - _, priv_key = self._get_or_generate_otp_rsa_key_pair() - return [ - UserBackupCode( - user_id=code.user_id, - code=self._decrypt(code.code, priv_key), - created_at=code.created_at, - expires_at=code.expires_at, - ) - for code in session.query(UserBackupCode) - .filter_by(user_id=user.user_id) - .all() - ] - def validate_backup_code(self, username: str, code: str) -> bool: with self._get_session() as session: user = self._get_user(session, username) if not user: return False - pub_key, _ = self._get_or_generate_otp_rsa_key_pair() + encrypted_code = self._encrypt_password( + code, + salt=user.password_salt, + iterations=user.hmac_iterations, + ) + user_backup_code = ( session.query(UserBackupCode) - .filter_by(user_id=user.user_id, code=self._encrypt(code, pub_key)) + .filter_by(user_id=user.user_id, code=encrypted_code) .first() ) @@ -609,10 +608,7 @@ class UserManager: if not otp_secret: return False - _, priv_key = self._get_or_generate_otp_rsa_key_pair() - otp_secret = self._decrypt(otp_secret, priv_key) - - return otp.verify_time_otp(otp=code, secret=otp_secret) + return otp.verify_time_otp(otp=code, secret=otp_secret).output def disable_otp(self, username: str): with self._get_session(locked=True) as session: