from datetime import date from sqlalchemy import and_ from sqlalchemy.orm import aliased from sqlmodel import select from account.models import Account from account.schemas import OpeningTransaction from transaction.models import Split, Transaction class AccountResource: @classmethod def get_by_path(cls, session, path): if not path: return None return session.exec(select(Account).where(Account.path == path)).first() @classmethod def get_parent(cls, session, model): if model.parent_account_id is None: return None model.parent_account = cls.get(session, model.parent_account_id) return model.parent_account @classmethod def create_opening_transaction(cls, session, account, schema): equity_account = cls.get_by_path(session, "/Equity/") t = Transaction() t.transaction_date = schema.opening_date split_opening = Split() split_opening.id = 0 split_opening.transaction = t split_opening.account = account split_opening.amount = schema.opening_balance split_equity = Split() split_equity.id = 1 split_equity.transaction = t split_equity.account = equity_account split_equity.amount = - schema.opening_balance account.transaction_splits.append(split_opening) @classmethod def fetch_opening_transaction(cls, session, account_id): split_account = aliased(Split) split_equity = aliased(Split) account_equity = aliased(Account) return session.execute(select(Transaction) .join(split_account, and_(split_account.transaction_id == Transaction.id, split_account.account_id == account_id)) .join(split_equity, split_equity.transaction_id == Transaction.id) .join(account_equity, and_(account_equity.id == split_equity.account_id, account_equity.path == "/Equity/")) ).first()[0] @classmethod def get_opening_transaction(cls, session, account_id): transaction = cls.fetch_opening_transaction(session, account_id) if transaction is None: return None return OpeningTransaction( opening_date=transaction.transaction_date, opening_balance=transaction.splits[0].amount ) @classmethod def update_opening_transaction(cls, session, account, schema): opening_transaction = cls.fetch_opening_transaction(session, account.id) stmt = select(Transaction).join(Split) \ .where(Transaction.id != opening_transaction.id) \ .where(Split.account_id == account.id) \ .order_by(Transaction.transaction_date.asc()) first_transaction = session.exec(stmt).first() if first_transaction and schema.opening_date > first_transaction[0].transaction_date: raise ValueError("Account opening date is posterior to its first transaction date") opening_transaction = cls.fetch_opening_transaction(session, account.id) opening_transaction.transaction_date = schema.opening_date opening_transaction.splits[0].amount = schema.opening_balance opening_transaction.splits[1].amount = - schema.opening_balance session.commit() session.refresh(opening_transaction) return OpeningTransaction( opening_date=opening_transaction.transaction_date, opening_balance=opening_transaction.splits[0].amount ) @classmethod def schema_to_model(cls, session, schema, model=None): try: if model: model = Account.model_validate(model, update=schema) else: schema.path = "" schema.family = "" model = Account.model_validate(schema) cls.create_opening_transaction(session, model, schema) except Exception as e: print(e) raise model.compute_family() cls.validate_parent(session, model) model.compute_path() return model @classmethod def validate_parent(cls, session, model): if model.parent_account_id is None: return True parent = cls.get_parent(session, model) if not parent: raise KeyError("Parent account not found.") if parent.family != model.family: raise ValueError("Account family mismatch with parent account..") if model.path and parent.path.startswith(model.path): raise ValueError("Parent Account is descendant") model.parent_account = parent return model @classmethod def create(cls, account, session): account_db = cls.schema_to_model(session, account) session.add(account_db) session.flush() session.refresh(account_db) session.commit() session.refresh(account_db) return account_db @classmethod def create_equity_account(cls, session): if cls.get_by_path(session, "/Equity/") is None: account_db = Account(name="Equity", family="Equity", type="Equity", path="/Equity/") session.add(account_db) session.commit() @classmethod def select(cls): return select(Account) @classmethod def list(cls, filters): return filters.sort(filters.filter( cls.select() )) @classmethod def list_accounts(cls, filters): return cls.list(filters).where( Account.family.in_(["Asset", "Liability"]) ) @classmethod def list_assets(cls, filters): return cls.list(filters).where(Account.family == "Asset") @classmethod def list_liabilities(cls, filters): return cls.list(filters).where(Account.family == "Liability") @classmethod def list_categories(cls, filters): return cls.list(filters).where( Account.type.in_(["Expense", "Income"]) ) @classmethod def list_expenses(cls, filters): return cls.list(filters).where(Account.family == "Expense") @classmethod def list_incomes(cls, filters): return cls.list(filters).where(Account.family == "Income") @classmethod def get(cls, session, account_id): return session.exec(cls.select().where(Account.id == account_id)).first() @classmethod def update(cls, session, account_db, account_data): previous_path = account_db.path account_db.sqlmodel_update(cls.schema_to_model(session, account_data, account_db)) if previous_path != account_db.path or account_data['name'] != account_db.name: account_db.update_children_path(session, previous_path) session.add(account_db) session.commit() session.refresh(account_db) return account_db @classmethod def delete(cls, session, account): session.delete(account) session.commit()