212 lines
7.0 KiB
Python
212 lines
7.0 KiB
Python
from datetime import date
|
|
|
|
from sqlalchemy import and_
|
|
from sqlalchemy.orm import aliased
|
|
from sqlmodel import select
|
|
|
|
from account.models import Account
|
|
from account.schemas import OpeningTransaction
|
|
from transaction.models import Split, Transaction
|
|
from transaction.resource import TransactionResource
|
|
|
|
|
|
class AccountResource:
|
|
@classmethod
|
|
def get_by_path(cls, session, path):
|
|
if not path:
|
|
return None
|
|
|
|
return session.exec(select(Account).where(Account.path == path)).first()
|
|
|
|
@classmethod
|
|
def get_parent(cls, session, model):
|
|
if model.parent_account_id is None:
|
|
return None
|
|
|
|
model.parent_account = cls.get(session, model.parent_account_id)
|
|
return model.parent_account
|
|
|
|
@classmethod
|
|
def create_opening_transaction(cls, session, account, schema):
|
|
equity_account = cls.get_by_path(session, "/Equity/")
|
|
t = Transaction()
|
|
t.transaction_date = schema.opening_date
|
|
t.sequence = TransactionResource.get_sequence_number(session)
|
|
split_opening = Split()
|
|
split_opening.id = 0
|
|
split_opening.transaction = t
|
|
split_opening.account = account
|
|
split_opening.amount = schema.opening_balance
|
|
|
|
split_equity = Split()
|
|
split_equity.id = 1
|
|
split_equity.transaction = t
|
|
split_equity.account = equity_account
|
|
split_equity.amount = - schema.opening_balance
|
|
|
|
account.transaction_splits.append(split_opening)
|
|
|
|
@classmethod
|
|
def fetch_opening_transaction(cls, session, account_id):
|
|
split_account = aliased(Split)
|
|
split_equity = aliased(Split)
|
|
account_equity = aliased(Account)
|
|
|
|
return session.execute(select(Transaction)
|
|
.join(split_account, and_(split_account.transaction_id == Transaction.id, split_account.account_id == account_id))
|
|
.join(split_equity, split_equity.transaction_id == Transaction.id)
|
|
.join(account_equity, and_(account_equity.id == split_equity.account_id, account_equity.path == "/Equity/"))
|
|
).first()[0]
|
|
|
|
@classmethod
|
|
def get_opening_transaction(cls, session, account_id):
|
|
transaction = cls.fetch_opening_transaction(session, account_id)
|
|
|
|
if transaction is None:
|
|
return None
|
|
|
|
return OpeningTransaction(
|
|
opening_date=transaction.transaction_date,
|
|
opening_balance=transaction.splits[0].amount
|
|
)
|
|
|
|
@classmethod
|
|
def update_opening_transaction(cls, session, account, schema):
|
|
opening_transaction = cls.fetch_opening_transaction(session, account.id)
|
|
|
|
stmt = select(Transaction).join(Split) \
|
|
.where(Transaction.id != opening_transaction.id) \
|
|
.where(Split.account_id == account.id) \
|
|
.order_by(Transaction.transaction_date.asc())
|
|
|
|
first_transaction = session.exec(stmt).first()
|
|
if first_transaction and schema.opening_date > first_transaction[0].transaction_date:
|
|
raise ValueError("Account opening date is posterior to its first transaction date")
|
|
|
|
opening_transaction = cls.fetch_opening_transaction(session, account.id)
|
|
opening_transaction.transaction_date = schema.opening_date
|
|
opening_transaction.splits[0].amount = schema.opening_balance
|
|
opening_transaction.splits[1].amount = - schema.opening_balance
|
|
|
|
session.commit()
|
|
session.refresh(opening_transaction)
|
|
|
|
return OpeningTransaction(
|
|
opening_date=opening_transaction.transaction_date,
|
|
opening_balance=opening_transaction.splits[0].amount
|
|
)
|
|
|
|
@classmethod
|
|
def schema_to_model(cls, session, schema, model=None):
|
|
try:
|
|
if model:
|
|
model = Account.model_validate(model, update=schema)
|
|
else:
|
|
schema.path = ""
|
|
schema.family = ""
|
|
model = Account.model_validate(schema)
|
|
cls.create_opening_transaction(session, model, schema)
|
|
except Exception as e:
|
|
print(e)
|
|
raise
|
|
|
|
model.compute_family()
|
|
cls.validate_parent(session, model)
|
|
model.compute_path()
|
|
|
|
return model
|
|
|
|
@classmethod
|
|
def validate_parent(cls, session, model):
|
|
if model.parent_account_id is None:
|
|
return True
|
|
|
|
parent = cls.get_parent(session, model)
|
|
if not parent:
|
|
raise KeyError("Parent account not found.")
|
|
|
|
if parent.family != model.family:
|
|
raise ValueError("Account family mismatch with parent account..")
|
|
|
|
if model.path and parent.path.startswith(model.path):
|
|
raise ValueError("Parent Account is descendant")
|
|
|
|
model.parent_account = parent
|
|
return model
|
|
|
|
@classmethod
|
|
def create(cls, account, session):
|
|
account_db = cls.schema_to_model(session, account)
|
|
session.add(account_db)
|
|
session.flush()
|
|
session.refresh(account_db)
|
|
|
|
session.commit()
|
|
session.refresh(account_db)
|
|
|
|
return account_db
|
|
|
|
@classmethod
|
|
def create_equity_account(cls, session):
|
|
if cls.get_by_path(session, "/Equity/") is None:
|
|
account_db = Account(name="Equity", family="Equity", type="Equity", path="/Equity/")
|
|
session.add(account_db)
|
|
session.commit()
|
|
|
|
@classmethod
|
|
def select(cls):
|
|
return select(Account)
|
|
|
|
@classmethod
|
|
def list(cls, filters):
|
|
return filters.sort(filters.filter(
|
|
cls.select()
|
|
))
|
|
|
|
@classmethod
|
|
def list_accounts(cls, filters):
|
|
return cls.list(filters).where(
|
|
Account.family.in_(["Asset", "Liability"])
|
|
)
|
|
|
|
@classmethod
|
|
def list_assets(cls, filters):
|
|
return cls.list(filters).where(Account.family == "Asset")
|
|
|
|
@classmethod
|
|
def list_liabilities(cls, filters):
|
|
return cls.list(filters).where(Account.family == "Liability")
|
|
|
|
@classmethod
|
|
def list_categories(cls, filters):
|
|
return cls.list(filters).where(
|
|
Account.type.in_(["Expense", "Income"])
|
|
)
|
|
|
|
@classmethod
|
|
def list_expenses(cls, filters):
|
|
return cls.list(filters).where(Account.family == "Expense")
|
|
|
|
@classmethod
|
|
def list_incomes(cls, filters):
|
|
return cls.list(filters).where(Account.family == "Income")
|
|
|
|
@classmethod
|
|
def get(cls, session, account_id):
|
|
return session.exec(cls.select().where(Account.id == account_id)).first()
|
|
|
|
@classmethod
|
|
def update(cls, session, account_db, account_data):
|
|
previous_path = account_db.path
|
|
account_db.sqlmodel_update(cls.schema_to_model(session, account_data, account_db))
|
|
if previous_path != account_db.path or account_data['name'] != account_db.name:
|
|
account_db.update_children_path(session, previous_path)
|
|
session.add(account_db)
|
|
session.commit()
|
|
session.refresh(account_db)
|
|
return account_db
|
|
|
|
@classmethod
|
|
def delete(cls, session, account):
|
|
session.delete(account)
|
|
session.commit() |