Adding chtlawfirm to the api

This commit is contained in:
2025-04-10 22:43:00 +02:00
parent 72a8e7fb91
commit ef9ae99cb6
34 changed files with 1648 additions and 44 deletions

View File

View File

@@ -0,0 +1,135 @@
from collections.abc import Callable, Mapping
from typing import Any, Optional, Union
from pydantic import ValidationInfo, field_validator
from fastapi_filter.base.filter import BaseFilterModel
_odm_operator_transformer: dict[str, Callable[[Optional[str]], Optional[dict[str, Any]]]] = {
"neq": lambda value: {"$ne": value},
"gt": lambda value: {"$gt": value},
"gte": lambda value: {"$gte": value},
"in": lambda value: {"$in": value},
"isnull": lambda value: None if value else {"$ne": None},
"lt": lambda value: {"$lt": value},
"lte": lambda value: {"$lte": value},
"not": lambda value: {"$ne": value},
"ne": lambda value: {"$ne": value},
"not_in": lambda value: {"$nin": value},
"nin": lambda value: {"$nin": value},
"like": lambda value: {"$regex": f".*{value}.*"},
"ilike": lambda value: {"$regex": f".*{value}.*", "$options": "i"},
"exists": lambda value: {"$exists": value},
}
class Filter(BaseFilterModel):
"""Base filter for beanie related filters.
Example:
```python
class MyModel:
id: PrimaryKey()
name: StringField(null=True)
count: IntField()
created_at: DatetimeField()
class MyModelFilter(Filter):
id: Optional[int]
id__in: Optional[str]
count: Optional[int]
count__lte: Optional[int]
created_at__gt: Optional[datetime]
name__ne: Optional[str]
name__nin: Optional[list[str]]
name__isnull: Optional[bool]
```
"""
def sort(self):
if not self.ordering_values:
return None
sort = {}
for column in self.ordering_values:
direction = 1
if column[0] in ["+", "-"]:
if column[0] == "-":
direction = -1
column = column[1:]
sort[column] = direction
return sort
@field_validator("*", mode="before")
@classmethod
def split_str(
cls: type["BaseFilterModel"], value: Optional[str], field: ValidationInfo
) -> Optional[Union[list[str], str]]:
if (
field.field_name is not None
and (
field.field_name == cls.Constants.ordering_field_name
or field.field_name.endswith("__in")
or field.field_name.endswith("__nin")
)
and isinstance(value, str)
):
if not value:
# Empty string should return [] not ['']
return []
return list(value.split(","))
return value
def _get_filter_conditions(self, nesting_depth: int = 1) -> list[tuple[Mapping[str, Any], Mapping[str, Any]]]:
filter_conditions: list[tuple[Mapping[str, Any], Mapping[str, Any]]] = []
for field_name, value in self.filtering_fields:
field_value = getattr(self, field_name)
if isinstance(field_value, Filter):
if not field_value.model_dump(exclude_none=True, exclude_unset=True):
continue
filter_conditions.append(
(
{field_name: _odm_operator_transformer["neq"](None)},
{"fetch_links": True, "nesting_depth": nesting_depth},
)
)
for part, part_options in field_value._get_filter_conditions(nesting_depth=nesting_depth + 1): # noqa: SLF001
for sub_field_name, sub_value in part.items():
filter_conditions.append(
(
{f"{field_name}.{sub_field_name}": sub_value},
{"fetch_links": True, "nesting_depth": nesting_depth, **part_options},
)
)
elif "__" in field_name:
stripped_field_name, operator = field_name.split("__")
search_criteria = _odm_operator_transformer[operator](value)
filter_conditions.append(({stripped_field_name: search_criteria}, {}))
elif field_name == self.Constants.search_field_name and hasattr(self.Constants, "search_model_fields"):
search_conditions = [
{search_field: _odm_operator_transformer["ilike"](value)}
for search_field in self.Constants.search_model_fields
]
filter_conditions.append(({"$or": search_conditions}, {}))
else:
filter_conditions.append(({field_name: value}, {}))
return filter_conditions
def filter(self, query):
data = self._get_filter_conditions()
for filter_condition, filter_kwargs in data:
for field_name, value in filter_condition.items():
if field_name in query:
query[field_name] = query[field_name] | value
else:
query[field_name] = value
return query
class FilterSchema(Filter):
label__ilike: Optional[str] = None
order_by: Optional[list[str]] = None

View File

@@ -0,0 +1,117 @@
from datetime import datetime, UTC
from typing import Optional
from beanie import PydanticObjectId
from motor.motor_asyncio import AsyncIOMotorCollection
from pydantic import BaseModel, Field, computed_field
class CrudDocument(BaseModel):
id: Optional[PydanticObjectId] = Field(default=None)
created_at: datetime = Field(default=datetime.now(UTC), nullable=False, title="Créé le")
# created_by: str
updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False, title="Modifié le")
# updated_by: str
@property
def _id(self):
return self.id
@computed_field
def label(self) -> str:
return self.compute_label()
def compute_label(self) -> str:
return ""
class Settings:
fulltext_search = []
@classmethod
def _collection_name(cls):
return cls.__name__
@classmethod
def _get_collection(cls, db) -> AsyncIOMotorCollection:
return db.get_collection(cls._collection_name())
@classmethod
async def create(cls, db, create_schema):
values = cls.model_validate(create_schema.model_dump()).model_dump(mode="json")
result = await cls._get_collection(db).insert_one(values)
return await cls.get(db, result.inserted_id)
@classmethod
def find(cls, db, filters):
return {
"collection": cls._get_collection(db),
"query_filter": filters.filter({}),
"sort": filters.sort(),
}
@classmethod
def list(cls, db):
return cls._get_collection(db).find({})
@classmethod
async def get(cls, db, model_id):
value = await cls._get_collection(db).find_one({"_id": model_id})
if not value:
return None
value["id"] = value.pop("_id")
return cls.model_validate(value)
@classmethod
async def update(cls, db, model, update_schema):
update_query = {
"$set": {field: value for field, value in update_schema.model_dump(mode="json").items() if field!= "id" }
}
await cls._get_collection(db).update_one({"_id": model.id}, update_query)
return await cls.get(db, model.id)
@classmethod
async def delete(cls, db, model):
await cls._get_collection(db).delete_one({"_id": model.id})
def text_area(*args, **kwargs):
kwargs['widget'] = {
"formlyConfig": {
"type": "textarea",
"props": {
"placeholder": "Leaving this field empty will cause formData property to be `null`",
"rows": kwargs['size'] if 'size' in kwargs else 10
}
}
}
return Field(*args, **kwargs)
def RichtextMultiline(*args, **kwargs):
if 'props' not in kwargs:
kwargs['props'] = {}
kwargs['props']['richtext'] = True
kwargs['props']['multiline'] = True
return Field(*args, **kwargs)
def RichtextSingleline(*args, **kwargs):
if 'props' not in kwargs:
kwargs['props'] = {}
kwargs['props']['richtext'] = True
kwargs['props']['multiline'] = False
return Field(*args, **kwargs)
class DictionaryEntry(BaseModel):
key: str
value: str = ""

View File

@@ -0,0 +1,81 @@
from beanie import PydanticObjectId
from fastapi import APIRouter, HTTPException, Depends
from fastapi_filter import FilterDepends
from fastapi_pagination import Page, add_pagination
from fastapi_pagination.ext.motor import paginate
from hub.auth import get_current_user
from firm.core.models import CrudDocument
from firm.core.schemas import Writer, Reader
from firm.db import get_db_client
#instance: str="westside", firm: str="cht",
def get_tenant_db_cursor(db_client=Depends(get_db_client)):
instance = "westside"
firm = "cht"
return db_client[f"tenant_{instance}_{firm}"]
#instance: str="westside", firm: str="cht",
def get_logged_tenant_db_cursor(db_client=Depends(get_db_client), user=Depends(get_current_user)):
instance = "westside"
firm = "cht"
db_cursor = db_client[f"tenant_{instance}_{firm}"]
db_cursor.user = user
return db_cursor
def get_crud_router(model: CrudDocument, model_create: Writer, model_read: Reader, model_update: Writer, model_filter):
model_name = model.__name__
router = APIRouter()
@router.get("/", response_model=Page[model_read], response_description=f"{model_name} records retrieved")
async def read_list(filters: model_filter=FilterDepends(model_filter), db=Depends(get_logged_tenant_db_cursor)) -> Page[model_read]:
return await paginate(**model.find(db, filters))
@router.post("/", response_description=f"{model_name} added to the database")
async def create(schema: model_create, db=Depends(get_logged_tenant_db_cursor)) -> model_read:
await schema.validate_foreign_key(db)
record = await model.create(db, schema)
return model_read.validate_model(record)
@router.get("/{record_id}", response_description=f"{model_name} record retrieved")
async def read_one(record_id: PydanticObjectId, db=Depends(get_logged_tenant_db_cursor)) -> model_read:
record = await model.get(db, record_id)
if not record:
raise HTTPException(
status_code=404,
detail=f"{model_name} record not found!"
)
return model_read.from_model(record)
@router.put("/{record_id}", response_description=f"{model_name} record updated")
async def update(record_id: PydanticObjectId, schema: model_update, db=Depends(get_logged_tenant_db_cursor)) -> model_read:
record = await model.get(db, record_id)
if not record:
raise HTTPException(
status_code=404,
detail=f"{model_name} record not found!"
)
record = await model.update(db, record, schema)
return model_read.from_model(record)
@router.delete("/{record_id}", response_description=f"{model_name} record deleted from the database")
async def delete(record_id: PydanticObjectId, db=Depends(get_logged_tenant_db_cursor)) -> dict:
record = await model.get(db, record_id)
if not record:
raise HTTPException(
status_code=404,
detail=f"{model_name} record not found!"
)
await model.delete(db, record)
return {
"message": f"{model_name} deleted successfully"
}
add_pagination(router)
return router

View File

@@ -0,0 +1,18 @@
from typing import Optional
from beanie import PydanticObjectId
from pydantic import BaseModel, Field
class Reader(BaseModel):
id: Optional[PydanticObjectId] = Field(default=None, validation_alias="_id")
@classmethod
def from_model(cls, model):
schema = cls.model_validate(model, from_attributes=True)
return schema
class Writer(BaseModel):
async def validate_foreign_key(self, db):
pass