Files
roleplay-contract/api/rpk-api/hub/auth/__init__.py

97 lines
4.1 KiB
Python

import os
from typing import Any
from beanie import PydanticObjectId, Document
from fastapi import Depends, Response, status, APIRouter
from fastapi_users import BaseUserManager, FastAPIUsers, schemas, models
from fastapi_users.authentication import AuthenticationBackend, CookieTransport, Strategy
from fastapi_users.authentication.strategy import AccessTokenDatabase, DatabaseStrategy
from fastapi_users_db_beanie.access_token import BeanieBaseAccessToken, BeanieAccessTokenDatabase
from fastapi_users.openapi import OpenAPIResponseType
from httpx_oauth.clients.google import GoogleOAuth2
from httpx_oauth.clients.discord import DiscordOAuth2
from starlette.responses import JSONResponse, RedirectResponse
from hub.user import User, get_user_db
from hub.user.schemas import UserSchema, UserUpdateSchema
SECRET = os.getenv("FASTAPI_USERS_SECRET")
google_oauth_client = GoogleOAuth2(os.getenv("GOOGLE_CLIENT_ID"), os.getenv("GOOGLE_CLIENT_SECRET"))
discord_oauth_client = DiscordOAuth2(os.getenv("DISCORD_CLIENT_ID"), os.getenv("DISCORD_CLIENT_SECRET"))
TOKEN_LIFETIME = 3600
class AccessToken(BeanieBaseAccessToken, Document):
pass
async def get_access_token_db():
yield BeanieAccessTokenDatabase(AccessToken)
def get_database_strategy(
access_token_db: AccessTokenDatabase[AccessToken] = Depends(get_access_token_db),
) -> DatabaseStrategy:
return DatabaseStrategy(access_token_db, lifetime_seconds=TOKEN_LIFETIME)
class UserManager(BaseUserManager[User, PydanticObjectId]):
reset_password_token_secret = SECRET
verification_token_secret = SECRET
def parse_id(self, value: Any) -> models.ID:
return str(value)
async def get_user_manager(user_db=Depends(get_user_db)):
yield UserManager(user_db)
class CookieTransportMe(CookieTransport):
async def get_login_me_response(self, token: str, user) -> Response:
user_schema = UserSchema(**user.model_dump())
response = JSONResponse(status_code=status.HTTP_200_OK, content=user_schema.model_dump(mode="json"))
return self._set_login_cookie(response, token)
@staticmethod
def get_openapi_login_responses_success() -> OpenAPIResponseType:
return {status.HTTP_200_OK: {"model": UserSchema}}
class AuthenticationBackendMe(AuthenticationBackend):
async def login(self, strategy: Strategy[models.UP, models.ID], user: models.UP) -> Response:
token = await strategy.write_token(user)
return await self.transport.get_login_me_response(token, user)
class CookieTransportOauth(CookieTransport):
async def get_login_response(self, token: str) -> Response:
response = RedirectResponse("/login?oauth=success", status_code=status.HTTP_301_MOVED_PERMANENTLY)
return self._set_login_cookie(response, token)
@staticmethod
def get_openapi_login_responses_success() -> OpenAPIResponseType:
return {status.HTTP_301_MOVED_PERMANENTLY: {"model": None}}
cookie_transport = CookieTransportMe(cookie_name="rpkapiusersauth")
auth_backend = AuthenticationBackendMe(name="db", transport=cookie_transport, get_strategy=get_database_strategy, )
fastapi_users = FastAPIUsers[User, PydanticObjectId](get_user_manager, [auth_backend])
get_current_user = fastapi_users.current_user(active=True)
get_current_superuser = fastapi_users.current_user(active=True, superuser=True)
auth_router = fastapi_users.get_auth_router(auth_backend, requires_verification=True)
register_router = fastapi_users.get_register_router(UserSchema, schemas.BaseUserCreate)
password_router = fastapi_users.get_reset_password_router()
verification_router = fastapi_users.get_verify_router(UserSchema)
users_router = fastapi_users.get_users_router(UserSchema, UserUpdateSchema)
cookie_transport = CookieTransportOauth(cookie_name="rpkapiusersauth")
auth_backend = AuthenticationBackend(name="db", transport=cookie_transport, get_strategy=get_database_strategy, )
google_oauth_router = fastapi_users.get_oauth_router(google_oauth_client, auth_backend, SECRET, is_verified_by_default=True)
discord_oauth_router = fastapi_users.get_oauth_router(discord_oauth_client, auth_backend, SECRET, is_verified_by_default=True)