Files
cht-lawfirm/back/app/user/manager.py

114 lines
3.3 KiB
Python

import uuid
from typing import Any, Dict, Generic, Optional
from bson import ObjectId
from fastapi import Depends
from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin, models, exceptions, schemas
from fastapi_users.authentication import BearerTransport, AuthenticationBackend
from fastapi_users.authentication.strategy.db import AccessTokenDatabase, DatabaseStrategy
from .models import User, get_user_db, AccessToken, get_access_token_db
SECRET = "SECRET"
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
reset_password_token_secret = SECRET
verification_token_secret = SECRET
async def get_by_email(self, user_email: str) -> models.UP:
return await self.get_by_login(user_email)
async def get_by_login(self, user_login: str) -> models.UP:
"""
Get a user by user_login.
:param user_login: E-mail of the user to retrieve.
:raises UserNotExists: The user does not exist.
:return: A user.
"""
user = await self.user_db.get_by_login(user_login)
if user is None:
raise exceptions.UserNotExists()
return user
async def create(
self,
user_create: schemas.UC,
safe: bool = False
) -> models.UP:
"""
Create a user in database.
Triggers the on_after_register handler on success.
:param user_create: The UserCreate model to create.
:param safe: If True, sensitive values like is_superuser or is_verified
will be ignored during the creation, defaults to False.
:raises UserAlreadyExists: A user already exists with the same e-mail.
:return: A new user.
"""
await self.validate_password(user_create.password, user_create)
existing_user = await self.user_db.get_by_login(user_create.login)
if existing_user is not None:
raise exceptions.UserAlreadyExists()
user_dict = (
user_create.create_update_dict()
if safe
else user_create.create_update_dict_superuser()
)
password = user_dict.pop("password")
user_dict["hashed_password"] = self.password_helper.hash(password)
created_user = await self.user_db.create(user_dict)
await self.on_after_register(created_user)
return created_user
def parse_id(self, value: Any) -> uuid.UUID:
if isinstance(value, ObjectId):
return value
if isinstance(value, uuid.UUID):
return value
try:
return uuid.UUID(value)
except ValueError as e:
raise exceptions.InvalidID() from e
async def get_user_manager(user_db=Depends(get_user_db)):
yield UserManager(user_db)
def get_database_strategy(
access_token_db: AccessTokenDatabase[AccessToken] = Depends(get_access_token_db),
) -> DatabaseStrategy:
return DatabaseStrategy(access_token_db, lifetime_seconds=3600)
bearer_transport = BearerTransport(tokenUrl="auth/login")
auth_backend = AuthenticationBackend(
name="db",
transport=bearer_transport,
get_strategy=get_database_strategy,
)
fastapi_users = FastAPIUsers[User, uuid.UUID](
get_user_manager,
[auth_backend],
)
get_current_user = fastapi_users.current_user(active=True)
def get_auth_router():
return fastapi_users.get_auth_router(auth_backend)