Files
toolbox/db/handlers/stock.py
T

196 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy import select
from db.schemas.stock import Placement, Stock
from utils import logger
async def filterQuantity(stocksData, filtered):
def filterStock(stock):
if stock.quantity > 0:
return stock
else:
return False
def convertPlacementData(placementList: list) -> dict:
placementDict = {}
for placementData in placementList:
toolboxId = placementData.toolbox_id
toolkitId = placementData.toolkit_id
placement = placementData.placement
if toolboxId not in placementDict:
placementDict[toolboxId] = {}
if toolkitId not in placementDict[toolboxId]:
placementDict[toolboxId][toolkitId] = placement
return placementDict
async def combinePlacementAndStock(stockData):
if stockData:
if isinstance(stockData, list):
placementList = await PlacementHandler.getAll()
placementDict = convertPlacementData(placementList)
for stock in stockData:
toolboxId = stock.get("toolbox_id")
toolkitId = stock.get("toolkit_id")
stock["placement"] = placementDict.get(toolboxId, {}).get(
toolkitId, None
)
else:
toolboxId = stockData.get("toolbox_id")
toolkitId = stockData.get("toolkit_id")
placement = await PlacementHandler.get(toolboxId, toolkitId)
stockData["placement"] = placement.placement
return stockData
exitData = []
if isinstance(stocksData, list):
if len(stocksData) > 0:
stocksData.sort(key=lambda stock: stock.created_at)
filteredStocks = (
list(filter(filterStock, stocksData)) if filtered else stocksData
)
if filteredStocks:
exitData = [stock.toDict() for stock in filteredStocks]
else:
stock = filterStock(stocksData) if filtered else stocksData
if stock:
exitData = stock.toDict()
return await combinePlacementAndStock(exitData)
class PlacementHandler:
async def add(**kwargs):
placement = kwargs.get("placement", None)
if not placement:
return
toolboxId = kwargs.get("toolbox_id", None)
toolkitId = kwargs.get("toolkit_id", None)
if not (toolkitId and toolboxId):
logger.error("toolkit_id or toolbox_id not found")
return
chechExists = await PlacementHandler.get(toolboxId, toolkitId)
if chechExists:
if placement != chechExists.placement:
logger.info(
f"Обновление места хранения инструмента {toolkitId} в ящике {toolboxId} с {chechExists.placement} на {placement}"
)
return await PlacementHandler.edit(chechExists, placement)
else:
logger.info(
f"Новое место хранения инструмента {toolkitId} в ящике {toolboxId} успешно создано {placement}"
)
return await Placement(**kwargs).save()
async def get(toolboxId: int, toolkitId: int) -> Placement:
from db import CRUD
return await CRUD.read(
select(Placement)
.where(Placement.toolbox_id == toolboxId)
.where(Placement.toolkit_id == toolkitId)
)
async def getAll() -> list[Placement]:
from db import CRUD
return await CRUD.read(select(Placement), all=True)
async def edit(toolboxId: int, toolkitId: int, placement: str):
placementDB = await PlacementHandler.get(toolboxId, toolkitId)
if placementDB and placementDB.placement != placement:
return await placementDB.edit(placement)
class StockHandler:
async def add(**kwargs) -> dict:
newStock = await Stock(**kwargs).save()
if "placement" in kwargs and kwargs["placement"]:
if kwargs["placement"] != "":
await PlacementHandler.add(**kwargs)
logger.info(
f"Новая запись об инструменте {newStock.toolkit_data.title} на складе {newStock.toolbox_data.title} успешно создана"
)
return newStock.toDict()
async def getAll(filtered: bool = True):
from db import CRUD
stocks = await CRUD.read(select(Stock), all=True)
return await filterQuantity(stocks, filtered)
async def get(stockId: int, filtered: bool = True, record: bool = False):
from db import CRUD
stock = await CRUD.read(select(Stock).where(Stock.id == stockId))
if not stock:
logger.error(f"Запись {stockId} об остатках не найдена")
return {}
return await filterQuantity(stock, filtered) if not record else stock
async def getByToolboxId(toolboxId: int, filtered: bool = True):
from db import CRUD
query = select(Stock).where(Stock.toolbox_id == toolboxId)
stocks = await CRUD.read(query, True)
return await filterQuantity(stocks, filtered)
async def getByToolkitId(toolkitId: int, filtered: bool = True):
from db import CRUD
query = select(Stock).where(Stock.toolkit_id == toolkitId)
stocks = await CRUD.read(query, True)
return await filterQuantity(stocks, filtered)
async def checkToolkitExists(toolkitId: int) -> bool:
from db import CRUD
query = select(Stock).where(Stock.toolkit_id == toolkitId)
stocks = await CRUD.read(query)
return True if stocks else False
async def getByToolboxIdAndToolkitId(
toolboxId: int, toolkitId: int, filtered: bool = True
) -> list[dict]:
from db import CRUD
query = (
select(Stock)
.where(Stock.toolbox_id == toolboxId)
.where(Stock.toolkit_id == toolkitId)
)
stocks = await CRUD.read(query, True)
return await filterQuantity(stocks, filtered)
async def getByToolboxIdAndToolkitIdAndQPrice(
toolboxId: int, toolkitId: int, price: float, filtered: bool = True
) -> list[dict]:
from db import CRUD
query = (
select(Stock)
.where(Stock.toolbox_id == toolboxId)
.where(Stock.toolkit_id == toolkitId)
.where(Stock.price == price)
)
stocks = await CRUD.read(query, True)
return await filterQuantity(stocks, filtered)
async def edit(stockId: int, **kwargs) -> dict:
from db import CRUD
stock = await CRUD.read(select(Stock).where(Stock.id == stockId))
if not stock:
logger.error("Запись об остатках не найдена")
return {}
try:
stockInfo = f"инструмента {stock.toolkit_data.title} на складе {stock.toolbox_data.title}"
editedStock = await stock.edit(**kwargs)
except Exception as e:
logger.error(f"Ошибка обновления записи об остатках: {str(e)}")
return {}
if not editedStock:
logger.error("Запись об остатках не обновлена")
return {}
logger.info(
f"Запись об остатках {stockInfo} успешно обновлена, изменены данные: {kwargs.keys()}"
)
return editedStock.toDict()