from beanie import PydanticObjectId from beanie.odm.operators.find.comparison import In from beanie.operators import And, RegEx, Eq from fastapi import APIRouter, HTTPException, Depends from fastapi_filter import FilterDepends from fastapi_pagination import Page, add_pagination from fastapi_pagination.ext.motor import paginate from .models import CrudDocument from .schemas import Writer, Reader from ..db import get_db_client def parse_sort(sort_by): if not sort_by: return [] fields = [] for field in sort_by.split(','): direction, column = field.split('(') fields.append((column[:-1], 1 if direction == 'asc' else -1)) return fields def Or(filters): return {'$or': filters} def parse_query(query: str, model): if query is None: return {} and_array = [] for criterion in query.split(' AND '): [column, operator, value] = criterion.split(' ', 2) column = column.lower() operand = None if column == 'fulltext': if not model.Settings.fulltext_search: continue or_array = [] for field in model.Settings.fulltext_search: words_and_array = [] for word in value.split(' '): words_and_array.append(RegEx(field, word, 'i')) or_array.append(And(*words_and_array) if len(words_and_array) > 1 else words_and_array[0]) operand = Or(or_array) if len(or_array) > 1 else or_array[0] elif operator == 'eq': operand = Eq(column, value) elif operator == 'in': operand = In(column, value.split(',')) if operand: and_array.append(operand) if and_array: return And(*and_array) if len(and_array) > 1 else and_array[0] else: return {} #instance: str="westside", firm: str="cht", def get_tenant_db_cursor(db_client=Depends(get_db_client)): instance = "westside" firm = "cht" return db_client[f"tenant_{instance}_{firm}"] #instance: str="westside", firm: str="cht", #user=Depends(get_current_user) def get_logged_tenant_db_cursor(db_client=Depends(get_db_client), user=None): instance = "westside" firm = "cht" return db_client[f"tenant_{instance}_{firm}"] def get_crud_router(model: CrudDocument, model_create: Writer, model_read: Reader, model_update: Writer, model_filter): model_name = model.__name__ router = APIRouter() @router.get("/", response_model=Page[model_read], response_description=f"{model_name} records retrieved") async def read_list(filters: model_filter=FilterDepends(model_filter), db=Depends(get_logged_tenant_db_cursor)) -> Page[model_read]: return await paginate(model.list(db, filters)) @router.post("/", response_description=f"{model_name} added to the database") async def create(schema: model_create, db=Depends(get_logged_tenant_db_cursor)) -> model_read: await schema.validate_foreign_key(db) record = await model.create(db, schema) return model_read.validate_model(record) @router.get("/{record_id}", response_description=f"{model_name} record retrieved") async def read_one(record_id: PydanticObjectId, db=Depends(get_logged_tenant_db_cursor)) -> model_read: record = await model.get(db, record_id) if not record: raise HTTPException( status_code=404, detail=f"{model_name} record not found!" ) return model_read.from_model(record) @router.put("/{record_id}", response_description=f"{model_name} record updated") async def update(record_id: PydanticObjectId, schema: model_update, db=Depends(get_logged_tenant_db_cursor)) -> model_read: record = await model.get(db, record_id) if not record: raise HTTPException( status_code=404, detail=f"{model_name} record not found!" ) record = await model.update(db, record, schema) return model_read.from_model(record) @router.delete("/{record_id}", response_description=f"{model_name} record deleted from the database") async def delete(record_id: PydanticObjectId, db=Depends(get_logged_tenant_db_cursor)) -> dict: record = await model.get(db, record_id) if not record: raise HTTPException( status_code=404, detail=f"{model_name} record not found!" ) await model.delete(db, record) return { "message": f"{model_name} deleted successfully" } add_pagination(router) return router