from sqlalchemy import select from db.schemas.stock import Placement, Stock from utils import logger async def filterQuantity(stocksData, filtered): def filterStock(stock): if stock.quantity > 0: return stock else: return False def convertPlacementData(placementList: list) -> dict: placementDict = {} for placementData in placementList: toolboxId = placementData.toolbox_id toolkitId = placementData.toolkit_id placement = placementData.placement if toolboxId not in placementDict: placementDict[toolboxId] = {} if toolkitId not in placementDict[toolboxId]: placementDict[toolboxId][toolkitId] = placement return placementDict async def combinePlacementAndStock(stockData): if stockData: if isinstance(stockData, list): placementList = await PlacementHandler.getAll() placementDict = convertPlacementData(placementList) for stock in stockData: toolboxId = stock.get("toolbox_id") toolkitId = stock.get("toolkit_id") stock["placement"] = placementDict.get(toolboxId, {}).get( toolkitId, None ) else: toolboxId = stockData.get("toolbox_id") toolkitId = stockData.get("toolkit_id") placement = await PlacementHandler.get(toolboxId, toolkitId) stockData["placement"] = placement.placement return stockData exitData = [] if isinstance(stocksData, list): if len(stocksData) > 0: stocksData.sort(key=lambda stock: stock.created_at) filteredStocks = ( list(filter(filterStock, stocksData)) if filtered else stocksData ) if filteredStocks: exitData = [stock.toDict() for stock in filteredStocks] else: stock = filterStock(stocksData) if filtered else stocksData if stock: exitData = stock.toDict() return await combinePlacementAndStock(exitData) class PlacementHandler: async def add(**kwargs): placement = kwargs.get("placement", None) if not placement: return toolboxId = kwargs.get("toolbox_id", None) toolkitId = kwargs.get("toolkit_id", None) if not (toolkitId and toolboxId): logger.error("toolkit_id or toolbox_id not found") return chechExists = await PlacementHandler.get(toolboxId, toolkitId) if chechExists: if placement != chechExists.placement: logger.info( f"Обновление места хранения инструмента {toolkitId} в ящике {toolboxId} с {chechExists.placement} на {placement}" ) return await PlacementHandler.edit(chechExists, placement) else: logger.info( f"Новое место хранения инструмента {toolkitId} в ящике {toolboxId} успешно создано {placement}" ) return await Placement(**kwargs).save() async def get(toolboxId: int, toolkitId: int) -> Placement: from db import CRUD return await CRUD.read( select(Placement) .where(Placement.toolbox_id == toolboxId) .where(Placement.toolkit_id == toolkitId) ) async def getAll() -> list[Placement]: from db import CRUD return await CRUD.read(select(Placement), all=True) async def edit(toolboxId: int, toolkitId: int, placement: str): placementDB = await PlacementHandler.get(toolboxId, toolkitId) if placementDB and placementDB.placement != placement: return await placementDB.edit(placement) class StockHandler: async def add(**kwargs) -> dict: newStock = await Stock(**kwargs).save() if "placement" in kwargs and kwargs["placement"]: await PlacementHandler.add(**kwargs) logger.info( f"Новая запись об инструменте {newStock.toolkit_data.title} на складе {newStock.toolbox_data.title} успешно создана" ) return newStock.toDict() async def getAll(filtered: bool = True): from db import CRUD stocks = await CRUD.read(select(Stock), all=True) return await filterQuantity(stocks, filtered) async def get(stockId: int, filtered: bool = True, record: bool = False): from db import CRUD stock = await CRUD.read(select(Stock).where(Stock.id == stockId)) if not stock: logger.error(f"Запись {stockId} об остатках не найдена") return {} return await filterQuantity(stock, filtered) if not record else stock async def getByToolboxId(toolboxId: int, filtered: bool = True): from db import CRUD query = select(Stock).where(Stock.toolbox_id == toolboxId) stocks = await CRUD.read(query, True) return await filterQuantity(stocks, filtered) async def getByToolkitId(toolkitId: int, filtered: bool = True): from db import CRUD query = select(Stock).where(Stock.toolkit_id == toolkitId) stocks = await CRUD.read(query, True) return await filterQuantity(stocks, filtered) async def checkToolkitExists(toolkitId: int) -> bool: from db import CRUD query = select(Stock).where(Stock.toolkit_id == toolkitId) stocks = await CRUD.read(query) return True if stocks else False async def getByToolboxIdAndToolkitId( toolboxId: int, toolkitId: int, filtered: bool = True ) -> list[dict]: from db import CRUD query = ( select(Stock) .where(Stock.toolbox_id == toolboxId) .where(Stock.toolkit_id == toolkitId) ) stocks = await CRUD.read(query, True) return await filterQuantity(stocks, filtered) async def getByToolboxIdAndToolkitIdAndQPrice( toolboxId: int, toolkitId: int, price: float, filtered: bool = True ) -> list[dict]: from db import CRUD query = ( select(Stock) .where(Stock.toolbox_id == toolboxId) .where(Stock.toolkit_id == toolkitId) .where(Stock.price == price) ) stocks = await CRUD.read(query, True) return await filterQuantity(stocks, filtered) async def edit(stockId: int, **kwargs) -> dict: from db import CRUD stock = await CRUD.read(select(Stock).where(Stock.id == stockId)) if not stock: logger.error("Запись об остатках не найдена") return {} try: stockInfo = f"инструмента {stock.toolkit_data.title} на складе {stock.toolbox_data.title}" editedStock = await stock.edit(**kwargs) except Exception as e: logger.error(f"Ошибка обновления записи об остатках: {str(e)}") return {} if not editedStock: logger.error("Запись об остатках не обновлена") return {} logger.info( f"Запись об остатках {stockInfo} успешно обновлена, изменены данные: {kwargs.keys()}" ) return editedStock.toDict()