import uuid from typing import Any, Dict, Generic, Optional from bson import ObjectId from fastapi import Depends from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin, models, exceptions, schemas from fastapi_users.authentication import BearerTransport, AuthenticationBackend from fastapi_users.authentication.strategy.db import AccessTokenDatabase, DatabaseStrategy from .models import User, get_user_db, AccessToken, get_access_token_db SECRET = "SECRET" class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]): reset_password_token_secret = SECRET verification_token_secret = SECRET async def get_by_email(self, user_email: str) -> models.UP: return await self.get_by_login(user_email) async def get_by_login(self, user_login: str) -> models.UP: """ Get a user by user_login. :param user_login: E-mail of the user to retrieve. :raises UserNotExists: The user does not exist. :return: A user. """ user = await self.user_db.get_by_login(user_login) if user is None: raise exceptions.UserNotExists() return user async def create( self, user_create: schemas.UC, safe: bool = False ) -> models.UP: """ Create a user in database. Triggers the on_after_register handler on success. :param user_create: The UserCreate model to create. :param safe: If True, sensitive values like is_superuser or is_verified will be ignored during the creation, defaults to False. :raises UserAlreadyExists: A user already exists with the same e-mail. :return: A new user. """ await self.validate_password(user_create.password, user_create) existing_user = await self.user_db.get_by_login(user_create.login) if existing_user is not None: raise exceptions.UserAlreadyExists() user_dict = ( user_create.create_update_dict() if safe else user_create.create_update_dict_superuser() ) password = user_dict.pop("password") user_dict["hashed_password"] = self.password_helper.hash(password) created_user = await self.user_db.create(user_dict) await self.on_after_register(created_user) return created_user def parse_id(self, value: Any) -> uuid.UUID: if isinstance(value, ObjectId): return value if isinstance(value, uuid.UUID): return value try: return uuid.UUID(value) except ValueError as e: raise exceptions.InvalidID() from e async def get_user_manager(user_db=Depends(get_user_db)): yield UserManager(user_db) def get_database_strategy( access_token_db: AccessTokenDatabase[AccessToken] = Depends(get_access_token_db), ) -> DatabaseStrategy: return DatabaseStrategy(access_token_db, lifetime_seconds=None) bearer_transport = BearerTransport(tokenUrl="auth/login") auth_backend = AuthenticationBackend( name="db", transport=bearer_transport, get_strategy=get_database_strategy, ) fastapi_users = FastAPIUsers[User, uuid.UUID]( get_user_manager, [auth_backend], ) get_current_user = fastapi_users.current_user(active=True) get_current_superuser = fastapi_users.current_user(active=True, superuser=True) def get_auth_router(): return fastapi_users.get_auth_router(auth_backend)