from sqlalchemy import select from db.schemas import Stock from utils import logger def filterQuantity(stocksData, filtered): def filterStock(stock): if stock.quantity > 0: return stock else: return False if isinstance(stocksData, list): if len(stocksData) == 0: return [] stocksData.sort(key=lambda stock: stock.created_at) filteredStocks = ( list(filter(filterStock, stocksData)) if filtered else stocksData ) return [stock.toDict() for stock in filteredStocks] if filteredStocks else [] else: stock = filterStock(stocksData) if filtered else stocksData if stock: return stock.toDict() else: return {} class StockHandler: async def add(**kwargs) -> dict: newStock = await Stock(**kwargs).save() logger.info( f"Новая запись об инструменте {newStock.toolkit_data.title} на складе {newStock.toolbox_data.title} успешно создана" ) return newStock.toDict() async def getAll(filtered: bool = True): from db import CRUD stocks = await CRUD.read(select(Stock), all=True) return filterQuantity(stocks, filtered) async def get(stockId: int, filtered: bool = True, record: bool = False): from db import CRUD stock = await CRUD.read(select(Stock).where(Stock.id == stockId)) if not stock: logger.error("Запись об остатках не найдена") return {} return filterQuantity(stock, filtered) if not record else stock async def getByToolboxId(toolboxId: int, filtered: bool = True): from db import CRUD query = select(Stock).where(Stock.toolbox_id == toolboxId) stocks = await CRUD.read(query, True) return filterQuantity(stocks, filtered) async def getByToolkitId(toolkitId: int, filtered: bool = True): from db import CRUD query = select(Stock).where(Stock.toolkit_id == toolkitId) stocks = await CRUD.read(query, True) return filterQuantity(stocks, filtered) async def getByToolboxIdAndToolkitId( toolboxId: int, toolkitId: int, filtered: bool = True ) -> list[dict]: from db import CRUD query = ( select(Stock) .where(Stock.toolbox_id == toolboxId) .where(Stock.toolkit_id == toolkitId) ) stocks = await CRUD.read(query, True) return filterQuantity(stocks, filtered) async def getByToolboxIdAndToolkitIdAndQPrice( toolboxId: int, toolkitId: int, price: float, filtered: bool = True ) -> list[dict]: from db import CRUD query = ( select(Stock) .where(Stock.toolbox_id == toolboxId) .where(Stock.toolkit_id == toolkitId) .where(Stock.price == price) ) stocks = await CRUD.read(query, True) return filterQuantity(stocks, filtered) async def edit(stockId: int, **kwargs) -> dict: from db import CRUD stock = await CRUD.read(select(Stock).where(Stock.id == stockId)) if not stock: logger.error("Запись об остатках не найдена") return {} try: stockInfo = f"инструмента {stock.toolkit_data.title} на складе {stock.toolbox_data.title}" editedStock = await stock.edit(**kwargs) except Exception as e: logger.error(f"Ошибка обновления записи об остатках: {str(e)}") return {} if not editedStock: logger.error("Запись об остатках не обновлена") return {} logger.info( f"Запись об остатках {stockInfo} успешно обновлена, изменены данные: {kwargs.keys()}" ) return editedStock.toDict()