Files
2025-04-07 02:33:27 +02:00

31 lines
1.0 KiB
Python

import asyncio
import contextlib
from fastapi_users.exceptions import UserAlreadyExists
from fastapi_users.schemas import BaseUserCreate
from hub import init_db as hub_init_db
from hub.auth import get_user_manager
from hub.user import get_user_db
get_user_db_context = contextlib.asynccontextmanager(get_user_db)
get_user_manager_context = contextlib.asynccontextmanager(get_user_manager)
async def create_user(email: str, password: str, is_superuser: bool = False):
try:
await hub_init_db()
async with get_user_db_context() as user_db:
async with get_user_manager_context(user_db) as user_manager:
user = await user_manager.create(
BaseUserCreate(
email=email, password=password, is_superuser=is_superuser
)
)
print(f"User created {user}")
except UserAlreadyExists:
print(f"User {email} already exists")
if __name__ == "__main__":
asyncio.run(create_user("test@test.te", "test"))