Initial commit

This commit is contained in:
2025-01-16 00:24:42 +01:00
parent c804af2a92
commit a2a42437a5
58 changed files with 10919 additions and 0 deletions

0
api/app/__init__.py Normal file
View File

View File

81
api/app/account/models.py Normal file
View File

@@ -0,0 +1,81 @@
from uuid import UUID, uuid4
from enum import Enum
from sqlmodel import Field, SQLModel, select
from category.models import CategoryRead
class AccountType(Enum):
Asset = "Asset" # < Denotes a generic asset account.
Checkings = "Checkings" # < Standard checking account
Savings = "Savings" # < Typical savings account
Cash = "Cash" # < Denotes a shoe-box or pillowcase stuffed with cash
Liability = "Liability" # < Denotes a generic liability account.
CreditCard = "CreditCard" # < Credit card accounts
Loan = "Loan" # < Loan and mortgage accounts (liability)
CertificateDep = "CertificateDep" # < Certificates of Deposit
Investment = "Investment" # < Investment account
MoneyMarket = "MoneyMarket" # < Money Market Account
Currency = "Currency" # < Denotes a currency trading account.
Income = "Income" # < Denotes an income account
Expense = "Expense" # < Denotes an expense account
AssetLoan = "AssetLoan" # < Denotes a loan (asset of the owner of this object)
Stock = "Stock" # < Denotes an security account as sub-account for an investment
Equity = "Equity" # < Denotes an equity account e.g. opening/closing balance
Payee = "Payee"
class AccountBase(SQLModel):
name: str = Field(index=True)
type: AccountType = Field(index=True)
default_category_id: UUID | None = Field(default=None, foreign_key="category.id")
class AccountBaseId(AccountBase):
id: UUID | None = Field(default_factory=uuid4, primary_key=True)
class Account(AccountBaseId, table=True):
@classmethod
def create(cls, account, session):
account_db = cls.model_validate(account)
session.add(account_db)
session.commit()
session.refresh(account_db)
return account_db
@classmethod
def list(cls):
return select(Account)
@classmethod
def get(cls, session, account_id):
return session.get(Account, account_id)
@classmethod
def update(cls, session, account_db, account_data):
account_db.sqlmodel_update(account_data)
session.add(account_db)
session.commit()
session.refresh(account_db)
return account_db
@classmethod
def delete(cls, session, account):
session.delete(account)
session.commit()
class AccountRead(AccountBaseId):
default_category: CategoryRead
class AccountWrite(AccountBase):
pass
class AccountCreate(AccountWrite):
pass
class AccountUpdate(AccountWrite):
pass

48
api/app/account/routes.py Normal file
View File

@@ -0,0 +1,48 @@
from uuid import UUID
from fastapi import APIRouter, HTTPException, Depends, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select
from fastapi_pagination import Page
from fastapi_pagination.ext.sqlmodel import paginate
from .models import Account, AccountCreate, AccountRead, AccountUpdate
from db import SessionDep
from user.manager import get_current_user
router = APIRouter()
@router.post("")
def create_account(account: AccountCreate, session: SessionDep, current_user=Depends(get_current_user)) -> AccountRead:
Account.create(account, session)
return account
@router.get("")
def read_accounts(session: SessionDep, current_user=Depends(get_current_user)) -> Page[AccountRead]:
return paginate(session, Account.list())
@router.get("/{account_id}")
def read_account(account_id: UUID, session: SessionDep, current_user=Depends(get_current_user)) -> AccountRead:
account = Account.get(session, account_id)
if not account:
raise HTTPException(status_code=404, detail="Account not found")
return account
@router.put("/{account_id}")
def update_account(account_id: UUID, account: AccountUpdate, session: SessionDep, current_user=Depends(get_current_user)) -> AccountRead:
db_account = Account.get(session, account_id)
if not db_account:
raise HTTPException(status_code=404, detail="Account not found")
account_data = account.model_dump(exclude_unset=True)
account = Account.update(session, db_account, account_data)
return account
@router.delete("/{account_id}")
def delete_account(account_id: UUID, session: SessionDep, current_user=Depends(get_current_user)):
account = Account.get(session, account_id)
if not account:
raise HTTPException(status_code=404, detail="Account not found")
Account.delete(session, account)
return {"ok": True}

117
api/app/alembic.ini Normal file
View File

@@ -0,0 +1,117 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
# Use forward slashes (/) also on windows to provide an os agnostic path
script_location = migrations
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
# version_path_separator = newline
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = sqlite:///database.db
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

View File

@@ -0,0 +1,51 @@
from uuid import UUID, uuid4
from enum import Enum
from sqlmodel import Field, SQLModel, select
class CategoryBase(SQLModel):
name: str = Field(index=True)
class CategoryRead(CategoryBase):
id: UUID | None = Field(default_factory=uuid4, primary_key=True)
class Category(CategoryRead, table=True):
@classmethod
def create(cls, category, session):
category_db = cls.model_validate(category)
session.add(category_db)
session.commit()
session.refresh(category_db)
return category_db
@classmethod
def list(cls):
return select(Category)
@classmethod
def get(cls, session, category_id):
return session.get(Category, category_id)
@classmethod
def update(cls, session, category_db, category_data):
category_db.sqlmodel_update(category_data)
session.add(category_db)
session.commit()
session.refresh(category_db)
return category_db
@classmethod
def delete(cls, session, category):
session.delete(category)
session.commit()
class CategoryWrite(CategoryBase):
pass
class CategoryCreate(CategoryWrite):
pass
class CategoryUpdate(CategoryWrite):
pass

View File

@@ -0,0 +1,47 @@
from uuid import UUID
from fastapi import APIRouter, HTTPException, Depends
from fastapi_pagination import Page
from fastapi_pagination.ext.sqlmodel import paginate
from category.models import Category, CategoryCreate, CategoryRead, CategoryUpdate
from db import SessionDep
from user.manager import get_current_user
router = APIRouter()
@router.post("")
def create_category(category: CategoryCreate, session: SessionDep, current_user=Depends(get_current_user)) -> CategoryRead:
Category.create(category, session)
return category
@router.get("")
def read_categories(session: SessionDep, current_user=Depends(get_current_user)) -> Page[CategoryRead]:
return paginate(session, Category.list())
@router.get("/{category_id}")
def read_category(category_id: UUID, session: SessionDep, current_user=Depends(get_current_user)) -> CategoryRead:
category = Category.get(session, category_id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
return category
@router.put("/{category_id}")
def update_category(category_id: UUID, category: CategoryUpdate, session: SessionDep, current_user=Depends(get_current_user)) -> CategoryRead:
db_category = Category.get(session, category_id)
if not db_category:
raise HTTPException(status_code=404, detail="Category not found")
category_data = category.model_dump(exclude_unset=True)
category = Category.update(session, db_category, category_data)
return category
@router.delete("/{category_id}")
def delete_category(category_id: UUID, session: SessionDep, current_user=Depends(get_current_user)):
category = Category.get(session, category_id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
Category.delete(session, category)
return {"ok": True}

21
api/app/db.py Normal file
View File

@@ -0,0 +1,21 @@
from typing import Annotated
from fastapi import Depends
from sqlmodel import Field, Session, SQLModel, create_engine, select
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session() -> Session:
with Session(engine) as session:
yield session
SessionDep = Annotated[Session, Depends(get_session)]

47
api/app/main.py Normal file
View File

@@ -0,0 +1,47 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.security import OAuth2PasswordBearer
from fastapi.middleware.cors import CORSMiddleware
from fastapi_pagination import add_pagination
from db import create_db_and_tables
from user import user_router, auth_router, create_admin_account
from account.routes import router as account_router
from category.routes import router as category_router
@asynccontextmanager
async def lifespan(app: FastAPI):
create_db_and_tables()
create_admin_account()
yield
#do something before end
app = FastAPI(lifespan=lifespan)
add_pagination(app)
origins = [
"http://localhost:8000",
"http://localhost:5173",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth_router, prefix="/auth", tags=["auth"], )
app.include_router(user_router, prefix="/users", tags=["users"])
app.include_router(account_router, prefix="/accounts", tags=["accounts"])
app.include_router(category_router, prefix="/categories", tags=["categories"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app", host='0.0.0.0', port=8000, reload=True)

View File

@@ -0,0 +1 @@
Generic single-database configuration.

83
api/app/migrations/env.py Normal file
View File

@@ -0,0 +1,83 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from sqlmodel import SQLModel
from account.models import Account
from user.models import User, AccessToken
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
#target_metadata = None
target_metadata = SQLModel.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,27 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
import sqlmodel
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

4
api/app/user/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .manager import auth_router, reset_password_router, create_admin_account
from .routes import router as user_router
user_router.include_router(reset_password_router)

80
api/app/user/manager.py Normal file
View File

@@ -0,0 +1,80 @@
import uuid
from sqlmodel import select
from fastapi import Depends
from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin, models, exceptions, schemas
from fastapi_users.authentication import BearerTransport, AuthenticationBackend
from fastapi_users.authentication.strategy.db import AccessTokenDatabase, DatabaseStrategy
from .models import User, get_user_db, AccessToken, get_access_token_db, UserRead, UserUpdate, UserCreate
from db import get_session
SECRET = "SECRET"
TOKEN_LIFETIME = 3600
bearer_transport = BearerTransport(tokenUrl="auth/login")
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
pass
async def get_user_manager(user_db=Depends(get_user_db)) -> UserManager:
yield UserManager(user_db)
def get_database_strategy(
access_token_db: AccessTokenDatabase[AccessToken] = Depends(get_access_token_db),
) -> DatabaseStrategy:
return DatabaseStrategy(access_token_db, lifetime_seconds=TOKEN_LIFETIME)
auth_backend = AuthenticationBackend(
name="db",
transport=bearer_transport,
get_strategy=get_database_strategy,
)
fastapi_users = FastAPIUsers[User, uuid.UUID](
get_user_manager,
[auth_backend],
)
get_current_user = fastapi_users.current_user(active=True)
get_current_superuser = fastapi_users.current_user(active=True, superuser=True)
#user_router = fastapi_users.get_users_router(UserRead, UserUpdate)
#user_router.include_router(fastapi_users.get_reset_password_router())
reset_password_router = fastapi_users.get_reset_password_router()
auth_router = fastapi_users.get_auth_router(auth_backend)
def create_admin_account():
session = get_session().__next__()
admin_email = 'root@root.fr'
statement = select(User).where(User.email == admin_email).limit(1)
admin_user = session.exec(statement).first()
if admin_user is not None:
return
import secrets
from fastapi_users.password import PasswordHelper
password_length = 16
password = secrets.token_urlsafe(password_length)
admin_user = User(
id=uuid.uuid4(),
email=admin_email,
hashed_password=PasswordHelper().hash(password),
is_active=True,
is_superuser=True,
is_verified=True
)
session.add(admin_user)
session.commit()
print(f"""Admin account created:
login: {admin_email}
password: {password}""")

40
api/app/user/models.py Normal file
View File

@@ -0,0 +1,40 @@
import uuid
from sqlmodel import select
from fastapi import Depends
from fastapi_users import schemas
from fastapi_users_db_sqlmodel import SQLModelBaseUserDB, SQLModelUserDatabase
from fastapi_users_db_sqlmodel.access_token import SQLModelBaseAccessToken, SQLModelAccessTokenDatabase
from db import get_session, SessionDep
class User(SQLModelBaseUserDB, table=True):
pass
class UserDatabase(SQLModelUserDatabase):
def list(self):
return select(self.user_model)
async def get_user_db(session: SessionDep):
yield UserDatabase(session, User)
class AccessToken(SQLModelBaseAccessToken, table=True):
pass
class AccessTokenDatabase(SQLModelAccessTokenDatabase):
pass
async def get_access_token_db(session = Depends(get_session)):
yield AccessTokenDatabase(session, AccessToken)
class UserRead(schemas.BaseUser[uuid.UUID]):
pass
class UserCreate(schemas.BaseUserCreate):
pass
class UserUpdate(schemas.BaseUserUpdate):
pass

68
api/app/user/routes.py Normal file
View File

@@ -0,0 +1,68 @@
import uuid
from fastapi import APIRouter, Depends, HTTPException
from .models import User, UserCreate, UserRead, UserUpdate
from .manager import get_user_manager, get_current_user, get_current_superuser
from fastapi_pagination import Page
from fastapi_pagination.ext.sqlmodel import paginate
router = APIRouter()
@router.post("", response_description="User added to the database")
async def create(user_form: UserCreate, user_manager=Depends(get_user_manager), current_user=Depends(get_current_superuser)) -> dict:
await user_manager.create(user_form, safe=True)
return {"message": "User added successfully"}
@router.get("", response_model=Page[UserRead], response_description="User records retrieved")
async def read_list(current_user=Depends(get_current_superuser), user_manager=Depends(get_user_manager)) -> Page[UserRead]:
return paginate(user_manager.user_db.session, user_manager.user_db.list())
@router.get("/me", response_description="User record retrieved")
async def read_me(current_user=Depends(get_current_user), user_manager=Depends(get_user_manager)) -> UserRead:
user = await user_manager.get(current_user.id)
return user
@router.get("/{user_id}", response_description="User record retrieved")
async def read_id(user_id: uuid.UUID, current_user=Depends(get_current_superuser), user_manager=Depends(get_user_manager)) -> UserRead:
user = await user_manager.get(user_id)
if not user:
raise HTTPException(
status_code=404,
detail="User not found."
)
return UserRead(**user.dict())
@router.put("/{user_id}", response_description="User record updated")
async def update(user_id: uuid.UUID, user_data: UserUpdate, current_user=Depends(get_current_superuser), user_manager=Depends(get_user_manager)) -> UserRead:
user = await user_manager.get(user_id)
if not user:
raise HTTPException(
status_code=404,
detail="User not found."
)
await user_manager.update(user_data, user, safe=True)
return user
@router.delete("/{user_id}", response_description="User record deleted from the database")
async def delete(user_id: uuid.UUID, current_user=Depends(get_current_superuser), user_manager=Depends(get_user_manager)) -> dict:
user = await user_manager.get(user_id)
if not user:
raise HTTPException(
status_code=404,
detail="User not found."
)
await user_manager.delete(user)
return {
"message": "User deleted successfully."
}