mirror of
https://github.com/wowlikon/LiB.git
synced 2026-02-04 04:31:09 +00:00
Улучшение документации и KDF с шифрованием totp
This commit is contained in:
@@ -1,10 +1,13 @@
|
||||
"""Модуль основного функционала авторизации и аутентификации"""
|
||||
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Annotated
|
||||
|
||||
from uuid import uuid4
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
from argon2.low_level import hash_secret_raw, Type
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from jose import jwt, JWTError, ExpiredSignatureError
|
||||
@@ -17,7 +20,6 @@ from library_service.settings import get_session, get_logger
|
||||
|
||||
|
||||
# Конфигурация JWT из переменных окружения
|
||||
SECRET_KEY = os.getenv("SECRET_KEY")
|
||||
ALGORITHM = os.getenv("ALGORITHM", "HS256")
|
||||
PARTIAL_TOKEN_EXPIRE_MINUTES = int(os.getenv("PARTIAL_TOKEN_EXPIRE_MINUTES", "5"))
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "15"))
|
||||
@@ -38,16 +40,76 @@ RECOVERY_CODE_SEGMENT_BYTES = int(os.getenv("RECOVERY_CODE_SEGMENT_BYTES", "2"))
|
||||
RECOVERY_MIN_REMAINING_WARNING = int(os.getenv("RECOVERY_MIN_REMAINING_WARNING", "3"))
|
||||
RECOVERY_MAX_AGE_DAYS = int(os.getenv("RECOVERY_MAX_AGE_DAYS", "365"))
|
||||
|
||||
SECRET_KEY = os.getenv("SECRET_KEY")
|
||||
|
||||
# Получение логгера
|
||||
logger = get_logger()
|
||||
|
||||
# OAuth2 схема
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token")
|
||||
|
||||
|
||||
class KeyDeriver:
|
||||
def __init__(self, master_key: bytes):
|
||||
self.master_key = master_key
|
||||
|
||||
def derive(
|
||||
self,
|
||||
context: str,
|
||||
key_len: int = 32,
|
||||
time_cost: int = 12,
|
||||
memory_cost: int = 512 * 1024,
|
||||
parallelism: int = 4,
|
||||
) -> bytes:
|
||||
"""
|
||||
Формирование разных ключей из одного.
|
||||
context: любая строка, например "aes", "hmac", "totp"
|
||||
"""
|
||||
salt = hashlib.sha256(context.encode("utf-8")).digest()
|
||||
key = hash_secret_raw(
|
||||
secret=self.master_key,
|
||||
salt=salt,
|
||||
time_cost=time_cost,
|
||||
memory_cost=memory_cost,
|
||||
parallelism=parallelism,
|
||||
hash_len=key_len,
|
||||
type=Type.ID,
|
||||
)
|
||||
return key
|
||||
|
||||
|
||||
class AES256Cipher:
|
||||
def __init__(self, key: bytes):
|
||||
if len(key) != 32:
|
||||
raise ValueError("AES-256 требует ключ длиной 32 байта")
|
||||
self.key = key
|
||||
self.aesgcm = AESGCM(key)
|
||||
|
||||
def encrypt(self, plaintext: bytes, nonce_len: int = 12) -> bytes:
|
||||
"""Зашифровывает данные с помощью AES256-GCM"""
|
||||
nonce = os.urandom(nonce_len)
|
||||
ct = self.aesgcm.encrypt(nonce, plaintext, associated_data=None)
|
||||
return nonce + ct
|
||||
|
||||
def decrypt(self, data: bytes, nonce_len: int = 12) -> bytes:
|
||||
"""Расшифровывает данные с помощью AES256-GCM"""
|
||||
nonce = data[:nonce_len]
|
||||
ct = data[nonce_len:]
|
||||
return self.aesgcm.decrypt(nonce, ct, associated_data=None)
|
||||
|
||||
|
||||
# Проверка секретного ключа
|
||||
if not SECRET_KEY:
|
||||
raise RuntimeError("SECRET_KEY environment variable is required")
|
||||
|
||||
deriver = KeyDeriver(SECRET_KEY.encode())
|
||||
|
||||
jwt_key = deriver.derive("jwt", key_len=32)
|
||||
|
||||
aes_key = deriver.derive("totp", key_len=32)
|
||||
cipher = AES256Cipher(aes_key)
|
||||
|
||||
|
||||
# Хэширование паролей
|
||||
pwd_context = CryptContext(
|
||||
schemes=["argon2"],
|
||||
@@ -88,7 +150,7 @@ def _create_token(
|
||||
}
|
||||
if token_type == "refresh":
|
||||
to_encode.update({"jti": str(uuid4())})
|
||||
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||||
return jwt.encode(to_encode, jwt_key, algorithm=ALGORITHM)
|
||||
|
||||
|
||||
def create_partial_token(data: dict) -> str:
|
||||
@@ -119,7 +181,7 @@ def decode_token(
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
try:
|
||||
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||
payload = jwt.decode(token, jwt_key, algorithms=[ALGORITHM])
|
||||
username: str | None = payload.get("sub")
|
||||
user_id: int | None = payload.get("user_id")
|
||||
token_type: str | None = payload.get("type")
|
||||
|
||||
Reference in New Issue
Block a user