91 lines
2.9 KiB
Python
91 lines
2.9 KiB
Python
from beanie import PydanticObjectId
|
|
from beanie.odm.enums import SortDirection
|
|
|
|
from fastapi import APIRouter, HTTPException
|
|
from fastapi_paginate import Page, Params, add_pagination
|
|
from fastapi_paginate.ext.motor import paginate
|
|
|
|
from typing import TypeVar, List, Generic, Any, Dict
|
|
|
|
|
|
T = TypeVar('T')
|
|
U = TypeVar('U')
|
|
V = TypeVar('V')
|
|
W = TypeVar('W')
|
|
|
|
|
|
def parse_sort(sort_by):
|
|
fields = []
|
|
for field in sort_by.split(','):
|
|
dir, col = field.split('(')
|
|
fields.append((col[:-1], 1 if dir == 'asc' else -1))
|
|
|
|
return fields
|
|
|
|
|
|
def parse_query(query) -> Dict[Any, Any]:
|
|
return {}
|
|
|
|
|
|
def get_crud_router(model, model_create, model_read, model_update):
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/", response_description="{} added to the database".format(model.__name__))
|
|
async def create(item: model_create) -> dict:
|
|
await item.validate_foreign_key()
|
|
o = await model(**item.dict()).create()
|
|
return {"message": "{} added successfully".format(model.__name__), "id": o}
|
|
|
|
@router.get("/{id}", response_description="{} record retrieved".format(model.__name__))
|
|
async def read_id(id: PydanticObjectId) -> model_read:
|
|
item = await model.get(id)
|
|
return model_read(**item.dict())
|
|
|
|
@router.get("/", response_model=Page[model_read], response_description="{} records retrieved".format(model.__name__))
|
|
async def read_list(size: int = 50, page: int = 1, sort_by: str = None, query: str = None) -> Page[model_read]:
|
|
sort = parse_sort(sort_by)
|
|
query = parse_query(query)
|
|
# limit=limit, skip=offset,
|
|
|
|
collection = model.get_motor_collection()
|
|
items = paginate(collection, query, Params(**{'size': size, 'page': page}), sort=sort)
|
|
return await items
|
|
|
|
@router.put("/{id}", response_description="{} record updated".format(model.__name__))
|
|
async def update(id: PydanticObjectId, req: model_update) -> model_read:
|
|
req = {k: v for k, v in req.dict().items() if v is not None}
|
|
update_query = {"$set": {
|
|
field: value for field, value in req.items()
|
|
}}
|
|
|
|
item = await model.get(id)
|
|
if not item:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="{} record not found!".format(model.__name__)
|
|
)
|
|
|
|
await item.update(update_query)
|
|
return model_read(**item.dict())
|
|
|
|
@router.delete("/{id}", response_description="{} record deleted from the database".format(model.__name__))
|
|
async def delete(id: PydanticObjectId) -> dict:
|
|
item = await model.get(id)
|
|
|
|
if not item:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="{} record not found!".format(model.__name__)
|
|
)
|
|
|
|
await item.delete()
|
|
return {
|
|
"message": "{} deleted successfully".format(model.__name__)
|
|
}
|
|
|
|
add_pagination(router)
|
|
return router
|
|
|
|
|