121 lines
4.2 KiB
Python
121 lines
4.2 KiB
Python
from beanie import PydanticObjectId
|
|
from beanie.odm.operators.find.comparison import In
|
|
from beanie.operators import And, RegEx, Eq
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends
|
|
from fastapi_filter import FilterDepends
|
|
from fastapi_pagination import Page, add_pagination
|
|
from fastapi_pagination.ext.motor import paginate
|
|
|
|
from ..db import get_db_client
|
|
from ..user.manager import get_current_user
|
|
|
|
|
|
def parse_sort(sort_by):
|
|
if not sort_by:
|
|
return []
|
|
|
|
fields = []
|
|
for field in sort_by.split(','):
|
|
direction, column = field.split('(')
|
|
fields.append((column[:-1], 1 if direction == 'asc' else -1))
|
|
|
|
return fields
|
|
|
|
|
|
def Or(filters):
|
|
return {'$or': filters}
|
|
|
|
|
|
def parse_query(query: str, model):
|
|
if query is None:
|
|
return {}
|
|
|
|
and_array = []
|
|
for criterion in query.split(' AND '):
|
|
[column, operator, value] = criterion.split(' ', 2)
|
|
column = column.lower()
|
|
operand = None
|
|
if column == 'fulltext':
|
|
if not model.Settings.fulltext_search:
|
|
continue
|
|
|
|
or_array = []
|
|
for field in model.Settings.fulltext_search:
|
|
words_and_array = []
|
|
for word in value.split(' '):
|
|
words_and_array.append(RegEx(field, word, 'i'))
|
|
or_array.append(And(*words_and_array) if len(words_and_array) > 1 else words_and_array[0])
|
|
operand = Or(or_array) if len(or_array) > 1 else or_array[0]
|
|
|
|
elif operator == 'eq':
|
|
operand = Eq(column, value)
|
|
elif operator == 'in':
|
|
operand = In(column, value.split(','))
|
|
|
|
if operand:
|
|
and_array.append(operand)
|
|
|
|
if and_array:
|
|
return And(*and_array) if len(and_array) > 1 else and_array[0]
|
|
else:
|
|
return {}
|
|
|
|
#user=Depends(get_current_user)
|
|
def get_tenant_db_cursor(instance: str="westside", firm: str="cht", db_client=Depends(get_db_client), user=None):
|
|
return db_client[f"tenant_{instance}_{firm}"]
|
|
|
|
def get_crud_router(model, model_create, model_read, model_update, model_filter):
|
|
model_name = model.__name__
|
|
router = APIRouter()
|
|
|
|
@router.post("/", response_description=f"{model_name} added to the database")
|
|
async def create(schema: model_create, db=Depends(get_tenant_db_cursor)) -> model_read:
|
|
await schema.validate_foreign_key(db)
|
|
record = await model.create(db, schema)
|
|
return model_read.from_model(record)
|
|
|
|
@router.get("/{record_id}", response_description=f"{model_name} record retrieved")
|
|
async def read_one(record_id: PydanticObjectId, db=Depends(get_tenant_db_cursor)) -> model_read:
|
|
record = await model.get(db, record_id)
|
|
if not record:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"{model_name} record not found!"
|
|
)
|
|
|
|
return model_read.from_model(record)
|
|
|
|
@router.get("/", response_model=Page[model_read], response_description=f"{model_name} records retrieved")
|
|
async def read_list(filters: model_filter=FilterDepends(model_filter), db=Depends(get_tenant_db_cursor)) -> Page[model_read]:
|
|
return await paginate(model.list(db, filters))
|
|
|
|
@router.put("/{record_id}", response_description=f"{model_name} record updated")
|
|
async def update(record_id: PydanticObjectId, schema: model_update, db=Depends(get_tenant_db_cursor)) -> model_read:
|
|
record = await model.get(db, record_id)
|
|
if not record:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"{model_name} record not found!"
|
|
)
|
|
|
|
record = await model.update(db, record, schema)
|
|
return model_read.from_model(record)
|
|
|
|
@router.delete("/{record_id}", response_description=f"{model_name} record deleted from the database")
|
|
async def delete(record_id: PydanticObjectId, db=Depends(get_tenant_db_cursor)) -> dict:
|
|
record = await model.get(db, record_id)
|
|
if not record:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"{model_name} record not found!"
|
|
)
|
|
|
|
await model.delete(db, record)
|
|
return {
|
|
"message": f"{model_name} deleted successfully"
|
|
}
|
|
|
|
add_pagination(router)
|
|
return router
|