from beanie import PydanticObjectId from beanie.operators import And, RegEx, Eq from fastapi import APIRouter, HTTPException, Depends from fastapi_paginate import Page, Params, add_pagination from fastapi_paginate.ext.motor import paginate from ..user.manager import get_current_user, get_current_superuser def parse_sort(sort_by): if not sort_by: return [] fields = [] for field in sort_by.split(','): dir, col = field.split('(') fields.append((col[:-1], 1 if dir == 'asc' else -1)) return fields def Or(filters): return {'$or': filters} def parse_query(query: str, model): if query is None: return {} and_array = [] for criterion in query.split(' AND '): [column, operator, value] = criterion.split(' ', 2) column = column.lower() if column == 'fulltext': if not model.Settings.fulltext_search: continue or_array = [] for field in model.Settings.fulltext_search: or_array.append(RegEx(field, value, 'i')) operand = Or(or_array) if len(or_array) > 1 else or_array[0] elif operator == 'eq': operand = Eq(column, value) and_array.append(operand) if and_array: return And(and_array) if len(and_array) > 1 else and_array[0] else: return {} def get_crud_router(model, model_create, model_read, model_update): router = APIRouter() @router.post("/", response_description="{} added to the database".format(model.__name__)) async def create(item: model_create, user=Depends(get_current_user)) -> dict: await item.validate_foreign_key() o = await model(**item.dict()).create() return {"message": "{} added successfully".format(model.__name__), "id": o.id} @router.get("/{id}", response_description="{} record retrieved".format(model.__name__)) async def read_id(id: PydanticObjectId, user=Depends(get_current_user)) -> model_read: item = await model.get(id) return model_read(**item.dict()) @router.get("/", response_model=Page[model_read], response_description="{} records retrieved".format(model.__name__)) async def read_list(size: int = 50, page: int = 1, sort_by: str = None, query: str = None, user=Depends(get_current_user)) -> Page[model_read]: sort = parse_sort(sort_by) query = parse_query(query, model_read) collection = model.get_motor_collection() items = paginate(collection, query, Params(**{'size': size, 'page': page}), sort=sort) return await items @router.put("/{id}", response_description="{} record updated".format(model.__name__)) async def update(id: PydanticObjectId, req: model_update, user=Depends(get_current_user)) -> model_read: req = {k: v for k, v in req.dict().items() if v is not None} update_query = {"$set": { field: value for field, value in req.items() }} item = await model.get(id) if not item: raise HTTPException( status_code=404, detail="{} record not found!".format(model.__name__) ) await item.update(update_query) return model_read(**item.dict()) @router.delete("/{id}", response_description="{} record deleted from the database".format(model.__name__)) async def delete(id: PydanticObjectId, user=Depends(get_current_superuser)) -> dict: item = await model.get(id) if not item: raise HTTPException( status_code=404, detail="{} record not found!".format(model.__name__) ) await item.delete() return { "message": "{} deleted successfully".format(model.__name__) } add_pagination(router) return router