93 lines
3.9 KiB
Python
93 lines
3.9 KiB
Python
import os
|
|
from typing import Any, Optional
|
|
|
|
from beanie import PydanticObjectId
|
|
from fastapi import Depends, Response, status
|
|
from fastapi_users import BaseUserManager, FastAPIUsers, schemas, models
|
|
from fastapi_users.authentication import AuthenticationBackend, BearerTransport, CookieTransport, Strategy
|
|
from fastapi_users.authentication.strategy import AccessTokenDatabase, DatabaseStrategy
|
|
from fastapi_users_db_beanie.access_token import BeanieBaseAccessTokenDocument, BeanieAccessTokenDatabase
|
|
from fastapi_users.openapi import OpenAPIResponseType
|
|
from httpx_oauth.clients.google import GoogleOAuth2
|
|
from httpx_oauth.clients.discord import DiscordOAuth2
|
|
from starlette.responses import JSONResponse, RedirectResponse
|
|
|
|
from hub.user import User, get_user_db
|
|
from hub.user.schemas import UserSchema
|
|
|
|
|
|
SECRET = os.getenv("FASTAPI_USERS_SECRET")
|
|
google_oauth_client = GoogleOAuth2(os.getenv("GOOGLE_CLIENT_ID"), os.getenv("GOOGLE_CLIENT_SECRET"))
|
|
discord_oauth_client = DiscordOAuth2(os.getenv("DISCORD_CLIENT_ID"), os.getenv("DISCORD_CLIENT_SECRET"))
|
|
|
|
TOKEN_LIFETIME = 3600
|
|
|
|
|
|
class AccessToken(BeanieBaseAccessTokenDocument):
|
|
pass
|
|
|
|
async def get_access_token_db():
|
|
yield BeanieAccessTokenDatabase(AccessToken)
|
|
|
|
|
|
def get_database_strategy(
|
|
access_token_db: AccessTokenDatabase[AccessToken] = Depends(get_access_token_db),
|
|
) -> DatabaseStrategy:
|
|
return DatabaseStrategy(access_token_db, lifetime_seconds=TOKEN_LIFETIME)
|
|
|
|
|
|
class UserManager(BaseUserManager[User, PydanticObjectId]):
|
|
reset_password_token_secret = SECRET
|
|
verification_token_secret = SECRET
|
|
|
|
def parse_id(self, value: Any) -> models.ID:
|
|
return str(value)
|
|
|
|
|
|
async def get_user_manager(user_db=Depends(get_user_db)):
|
|
yield UserManager(user_db)
|
|
|
|
|
|
class CookieTransportMe(CookieTransport):
|
|
async def get_login_me_response(self, token: str, user) -> Response:
|
|
user_schema = UserSchema(**user.model_dump())
|
|
response = JSONResponse(status_code=status.HTTP_200_OK, content=user_schema.model_dump(mode="json"))
|
|
return self._set_login_cookie(response, token)
|
|
|
|
@staticmethod
|
|
def get_openapi_login_responses_success() -> OpenAPIResponseType:
|
|
return {status.HTTP_200_OK: {"model": UserSchema}}
|
|
|
|
|
|
class AuthenticationBackendMe(AuthenticationBackend):
|
|
async def login(self, strategy: Strategy[models.UP, models.ID], user: models.UP) -> Response:
|
|
token = await strategy.write_token(user)
|
|
return await self.transport.get_login_me_response(token, user)
|
|
|
|
|
|
class CookieTransportOauth(CookieTransport):
|
|
async def get_login_response(self, token: str) -> Response:
|
|
response = RedirectResponse("/login?oauth=success", status_code=status.HTTP_301_MOVED_PERMANENTLY)
|
|
return self._set_login_cookie(response, token)
|
|
|
|
@staticmethod
|
|
def get_openapi_login_responses_success() -> OpenAPIResponseType:
|
|
return {status.HTTP_301_MOVED_PERMANENTLY: {"model": None}}
|
|
|
|
|
|
cookie_transport = CookieTransportMe(cookie_name="rpkapiusersauth")
|
|
auth_backend = AuthenticationBackendMe(name="db", transport=cookie_transport, get_strategy=get_database_strategy, )
|
|
|
|
fastapi_users = FastAPIUsers[User, PydanticObjectId](get_user_manager, [auth_backend])
|
|
|
|
auth_router = fastapi_users.get_auth_router(auth_backend, requires_verification=True)
|
|
register_router = fastapi_users.get_register_router(UserSchema, schemas.BaseUserCreate)
|
|
password_router = fastapi_users.get_reset_password_router()
|
|
verification_router = fastapi_users.get_verify_router(UserSchema)
|
|
users_router = fastapi_users.get_users_router(UserSchema, schemas.BaseUserUpdate)
|
|
|
|
cookie_transport = CookieTransportOauth(cookie_name="rpkapiusersauth")
|
|
auth_backend = AuthenticationBackend(name="db", transport=cookie_transport, get_strategy=get_database_strategy, )
|
|
google_oauth_router = fastapi_users.get_oauth_router(google_oauth_client, auth_backend, SECRET, is_verified_by_default=True)
|
|
discord_oauth_router = fastapi_users.get_oauth_router(discord_oauth_client, auth_backend, SECRET, is_verified_by_default=True)
|