165 lines
5.1 KiB
Python
165 lines
5.1 KiB
Python
from sqlalchemy import literal_column
|
|
from sqlalchemy.orm import aliased
|
|
from sqlmodel import select
|
|
|
|
from account.models import Account
|
|
from transaction.models import Split, Transaction
|
|
|
|
|
|
class AccountResource:
|
|
@classmethod
|
|
def get_by_path(cls, session, path):
|
|
if not path:
|
|
return None
|
|
|
|
return session.exec(select(Account).where(Account.path == path)).first()
|
|
|
|
@classmethod
|
|
def get_parent(cls, session, model):
|
|
if model.parent_account_id is None:
|
|
return None
|
|
|
|
model.parent_account = cls.get(session, model.parent_account_id)
|
|
return model.parent_account
|
|
|
|
@classmethod
|
|
def create_opening_transaction(cls, session, account, schema):
|
|
|
|
equity_account = cls.get_by_path(session, "/Equity/")
|
|
t = Transaction()
|
|
split_opening = Split()
|
|
split_opening.id = 0
|
|
split_opening.transaction = t
|
|
split_opening.account = account
|
|
split_opening.amount = schema.opening_balance
|
|
|
|
split_equity = Split()
|
|
split_equity.id = 1
|
|
split_equity.transaction = t
|
|
split_equity.account = equity_account
|
|
split_equity.amount = - schema.opening_balance
|
|
|
|
account.transaction_splits.append(split_opening)
|
|
|
|
@classmethod
|
|
def schema_to_model(cls, session, schema, model=None):
|
|
try:
|
|
if model:
|
|
model = Account.model_validate(model, update=schema)
|
|
else:
|
|
schema.path = ""
|
|
schema.family = ""
|
|
model = Account.model_validate(schema)
|
|
cls.create_opening_transaction(session, model, schema)
|
|
except Exception as e:
|
|
print(e)
|
|
raise
|
|
|
|
model.compute_family()
|
|
cls.validate_parent(session, model)
|
|
model.compute_path()
|
|
|
|
return model
|
|
|
|
@classmethod
|
|
def validate_parent(cls, session, model):
|
|
if model.parent_account_id is None:
|
|
return True
|
|
|
|
parent = cls.get_parent(session, model)
|
|
if not parent:
|
|
raise KeyError("Parent account not found.")
|
|
|
|
if parent.family != model.family:
|
|
raise ValueError("Account family mismatch with parent account..")
|
|
|
|
if model.path and parent.path.startswith(model.path):
|
|
raise ValueError("Parent Account is descendant")
|
|
|
|
model.parent_account = parent
|
|
return model
|
|
|
|
@classmethod
|
|
def create(cls, account, session):
|
|
account_db = cls.schema_to_model(session, account)
|
|
session.add(account_db)
|
|
session.flush()
|
|
session.refresh(account_db)
|
|
|
|
session.commit()
|
|
session.refresh(account_db)
|
|
|
|
return account_db
|
|
|
|
@classmethod
|
|
def create_equity_account(cls, session):
|
|
if cls.get_by_path(session, "/Equity/") is None:
|
|
account_db = Account(name="Equity", family="Equity", type="Equity", path="/Equity/")
|
|
session.add(account_db)
|
|
session.commit()
|
|
|
|
@classmethod
|
|
def select(cls):
|
|
split_filter = aliased(Split)
|
|
account_filter = aliased(Account)
|
|
columns = [Account.id, Account.name, Account.parent_account_id, Account.family, Account.type, Account.path,
|
|
Split.amount.label("opening_balance"), literal_column('"1970-01-01"').label("opening_date")]
|
|
return (select(*columns)
|
|
.join(Split)
|
|
.join(Transaction)
|
|
.join(split_filter)
|
|
.join(account_filter, account_filter.id == split_filter.account_id and Account.path == "/Equity/"))
|
|
|
|
@classmethod
|
|
def list(cls, filters):
|
|
return filters.sort(filters.filter(
|
|
cls.select()
|
|
))
|
|
|
|
@classmethod
|
|
def list_accounts(cls, filters):
|
|
return cls.list(filters).where(
|
|
Account.family.in_(["Asset", "Liability"])
|
|
)
|
|
|
|
@classmethod
|
|
def list_assets(cls, filters):
|
|
return cls.list(filters).where(Account.family == "Asset")
|
|
|
|
@classmethod
|
|
def list_liabilities(cls, filters):
|
|
return cls.list(filters).where(Account.family == "Liability")
|
|
|
|
@classmethod
|
|
def list_categories(cls, filters):
|
|
return cls.list(filters).where(
|
|
Account.type.in_(["Expense", "Income"])
|
|
)
|
|
|
|
@classmethod
|
|
def list_expenses(cls, filters):
|
|
return cls.list(filters).where(Account.family == "Expense")
|
|
|
|
@classmethod
|
|
def list_incomes(cls, filters):
|
|
return cls.list(filters).where(Account.family == "Income")
|
|
|
|
@classmethod
|
|
def get(cls, session, account_id):
|
|
return session.exec(cls.select().where(Account.id == account_id)).first()
|
|
|
|
@classmethod
|
|
def update(cls, session, account_db, account_data):
|
|
previous_path = account_db.path
|
|
account_db.sqlmodel_update(cls.schema_to_model(session, account_data, account_db))
|
|
if previous_path != account_db.path or account_data['name'] != account_db.name:
|
|
account_db.update_children_path(session, previous_path)
|
|
session.add(account_db)
|
|
session.commit()
|
|
session.refresh(account_db)
|
|
return account_db
|
|
|
|
@classmethod
|
|
def delete(cls, session, account):
|
|
session.delete(account)
|
|
session.commit() |