111 lines
3.5 KiB
Python
111 lines
3.5 KiB
Python
from beanie import PydanticObjectId
|
|
from beanie.operators import And, RegEx, Eq
|
|
|
|
from fastapi import APIRouter, HTTPException
|
|
from fastapi_paginate import Page, Params, add_pagination
|
|
from fastapi_paginate.ext.motor import paginate
|
|
|
|
|
|
def parse_sort(sort_by):
|
|
if not sort_by:
|
|
return []
|
|
|
|
fields = []
|
|
for field in sort_by.split(','):
|
|
dir, col = field.split('(')
|
|
fields.append((col[:-1], 1 if dir == 'asc' else -1))
|
|
|
|
return fields
|
|
|
|
|
|
def Or(filters):
|
|
return {'$or': filters}
|
|
|
|
|
|
def parse_query(query: str, model):
|
|
if query is None:
|
|
return {}
|
|
|
|
and_array = []
|
|
for criterion in query.split(' AND '):
|
|
[column, operator, value] = criterion.split(' ', 2)
|
|
column = column.lower()
|
|
if column == 'fulltext':
|
|
if not model.Settings.fulltext_search:
|
|
continue
|
|
|
|
or_array = []
|
|
for field in model.Settings.fulltext_search:
|
|
or_array.append(RegEx(field, value, 'i'))
|
|
operand = Or(or_array) if len(or_array) > 1 else or_array[0]
|
|
|
|
elif operator == 'eq':
|
|
operand = Eq(column, value)
|
|
|
|
and_array.append(operand)
|
|
|
|
if and_array:
|
|
return And(and_array) if len(and_array) > 1 else and_array[0]
|
|
else:
|
|
return {}
|
|
|
|
|
|
def get_crud_router(model, model_create, model_read, model_update):
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/", response_description="{} added to the database".format(model.__name__))
|
|
async def create(item: model_create) -> dict:
|
|
await item.validate_foreign_key()
|
|
o = await model(**item.dict()).create()
|
|
return {"message": "{} added successfully".format(model.__name__), "id": o.id}
|
|
|
|
@router.get("/{id}", response_description="{} record retrieved".format(model.__name__))
|
|
async def read_id(id: PydanticObjectId) -> model_read:
|
|
item = await model.get(id)
|
|
return model_read(**item.dict())
|
|
|
|
@router.get("/", response_model=Page[model_read], response_description="{} records retrieved".format(model.__name__))
|
|
async def read_list(size: int = 50, page: int = 1, sort_by: str = None, query: str = None) -> Page[model_read]:
|
|
sort = parse_sort(sort_by)
|
|
query = parse_query(query, model_read)
|
|
|
|
collection = model.get_motor_collection()
|
|
items = paginate(collection, query, Params(**{'size': size, 'page': page}), sort=sort)
|
|
return await items
|
|
|
|
@router.put("/{id}", response_description="{} record updated".format(model.__name__))
|
|
async def update(id: PydanticObjectId, req: model_update) -> model_read:
|
|
req = {k: v for k, v in req.dict().items() if v is not None}
|
|
update_query = {"$set": {
|
|
field: value for field, value in req.items()
|
|
}}
|
|
|
|
item = await model.get(id)
|
|
if not item:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="{} record not found!".format(model.__name__)
|
|
)
|
|
|
|
await item.update(update_query)
|
|
return model_read(**item.dict())
|
|
|
|
@router.delete("/{id}", response_description="{} record deleted from the database".format(model.__name__))
|
|
async def delete(id: PydanticObjectId) -> dict:
|
|
item = await model.get(id)
|
|
|
|
if not item:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="{} record not found!".format(model.__name__)
|
|
)
|
|
|
|
await item.delete()
|
|
return {
|
|
"message": "{} deleted successfully".format(model.__name__)
|
|
}
|
|
|
|
add_pagination(router)
|
|
return router
|