from beanie import PydanticObjectId from beanie.odm.enums import SortDirection from fastapi import APIRouter, HTTPException from fastapi_paginate import Page, Params, add_pagination from fastapi_paginate.ext.motor import paginate from typing import TypeVar, List, Generic, Any, Dict T = TypeVar('T') U = TypeVar('U') V = TypeVar('V') W = TypeVar('W') def parse_sort(sort_by): if not sort_by: return [] fields = [] for field in sort_by.split(','): dir, col = field.split('(') fields.append((col[:-1], 1 if dir == 'asc' else -1)) return fields def parse_query(query) -> Dict[Any, Any]: return {} def get_crud_router(model, model_create, model_read, model_update): router = APIRouter() @router.post("/", response_description="{} added to the database".format(model.__name__)) async def create(item: model_create) -> dict: await item.validate_foreign_key() o = await model(**item.dict()).create() return {"message": "{} added successfully".format(model.__name__), "id": o.id} @router.get("/{id}", response_description="{} record retrieved".format(model.__name__)) async def read_id(id: PydanticObjectId) -> model_read: item = await model.get(id) return model_read(**item.dict()) @router.get("/", response_model=Page[model_read], response_description="{} records retrieved".format(model.__name__)) async def read_list(size: int = 50, page: int = 1, sort_by: str = None, query: str = None) -> Page[model_read]: sort = parse_sort(sort_by) query = parse_query(query) # limit=limit, skip=offset, collection = model.get_motor_collection() items = paginate(collection, query, Params(**{'size': size, 'page': page}), sort=sort) return await items @router.put("/{id}", response_description="{} record updated".format(model.__name__)) async def update(id: PydanticObjectId, req: model_update) -> model_read: req = {k: v for k, v in req.dict().items() if v is not None} update_query = {"$set": { field: value for field, value in req.items() }} item = await model.get(id) if not item: raise HTTPException( status_code=404, detail="{} record not found!".format(model.__name__) ) await item.update(update_query) return model_read(**item.dict()) @router.delete("/{id}", response_description="{} record deleted from the database".format(model.__name__)) async def delete(id: PydanticObjectId) -> dict: item = await model.get(id) if not item: raise HTTPException( status_code=404, detail="{} record not found!".format(model.__name__) ) await item.delete() return { "message": "{} deleted successfully".format(model.__name__) } add_pagination(router) return router