from sqlalchemy import or_, select from db import CRUD from db.handlers.access import AccessLevelHandler from db.handlers.toolbox import ToolboxHandler from utils import logger, pwd_hash, saveImage, safeFilename, deleteImage, pwd_verify from db.schemas.user import User from db.handlers.records import ServiceRecordsHandler def handleUserPhoto(imageData, login: str): login = safeFilename(login) fileName = f"users/{login}.png" if not saveImage(imageData, fileName): return None return fileName class UserHandler: async def add(userData: dict, user_id: int = None) -> dict: login = userData.get("login", None) if not login: logger.error("Не указан логин") return {} userName = userData.get("username", None) if not userName: logger.error("Не указано имя пользователя") return {} query = select(User).where(or_(User.login == login, User.username == userName)) user = await CRUD.read(query) if user: logger.error("Пользователь с таким логином или именем уже существует") return {} if "access_level_id" not in userData: logger.error("Не указан уровень доступа") return {} if "password" not in userData: logger.error("Не указан пароль") return {} userData["hashed_password"] = pwd_hash(userData.pop("password")) if "photo" in userData: imageData = userData.pop("photo") photoFile = handleUserPhoto(imageData, login) if photoFile: userData["photo"] = photoFile try: newUser = await User(**userData).save() except Exception as e: logger.error(f"Ошибка сохранения пользователя: {str(e)}") return {} if not newUser: logger.error("Пользователь не сохранен") return {} logger.info( f"Пользователь {newUser.username} успешно добавлен, id: {newUser.id}" ) userAccessLevel = await AccessLevelHandler.get(newUser.access_level_id) if not userAccessLevel: logger.error("Уровень доступа не найден") return {} if userAccessLevel.get("available_own_toolbox"): newToolboxData = { "title": f"Тулбокс {newUser.username}", "description": f"Оборудование, полученное сотрудником {newUser.username}, под личную материальную ответственность", "owner_id": newUser.id, } newToolbox = await ToolboxHandler.add(newToolboxData) logger.info( f"Тулбокс {newToolbox['title']} успешно создан и закреплен за пользователем {newUser.username}" ) await ServiceRecordsHandler.add( user_id, {"Добавлен пользователь": newUser.toDict()} ) newUserData = newUser.toDict() newUserData["access_level_data"] = userAccessLevel return newUserData async def edit(userData: dict, user_id: int = None) -> dict: id = userData.get("id", None) if not id: logger.error("Не указан id пользователя") return {} query = select(User).where(User.id == id) user = await CRUD.read(query) if not user: logger.error("Пользователь с таким id не найден") return {} changedUserData = userData.get("changedUserData", {}) if len(changedUserData.keys()) == 0: logger.error("Не указаны изменяемые данные") return {} if "password" in changedUserData: userData["hashed_password"] = pwd_hash(changedUserData.pop("password")) if "photo" in changedUserData: imageData = changedUserData.pop("photo") photoFile = handleUserPhoto(imageData, user.login) if photoFile: changedUserData["photo"] = photoFile deleteImage(user.photo) try: editedUser = await user.edit(**changedUserData) except Exception as e: logger.error(f"Ошибка обновления пользователя: {str(e)}") return {} if not editedUser: logger.error("Ошибка обновления пользователя") return {} logger.info( f"Пользователь {editedUser.username} успешно обновлен, изменены данные: {changedUserData.keys()}" ) if not user.available_own_toolbox: if editedUser.available_own_toolbox: newToolboxData = { "title": f"Тулбокс {editedUser.username}", "description": f"Оборудование, полученное сотрудником {editedUser.username}, под личную материальную ответственность", "owner_id": editedUser.id, } newToolbox = await ToolboxHandler.addNewToolbox(newToolboxData) logger.info( f"Тулбокс {newToolbox['title']} успешно создан и закреплен за пользователем {editedUser.username}" ) await ServiceRecordsHandler.add( user_id, {"Изменен пользователь": editedUser.toDict()} ) return editedUser.toDict() async def getAll() -> list[dict]: query = select(User) users = await CRUD.read(query, True) return [user.toDict() for user in users] async def get(id: int) -> dict: query = select(User).where(User.id == id) user = await CRUD.read(query) if not user: logger.error("Пользователь с таким id не найден") return {} return user.toDict() async def delete(id: int, user_id: int = None) -> bool: query = select(User).where(User.id == id) user = await CRUD.read(query) if not user: logger.error("Пользователь с таким id не найден") return False try: userName = user.username result = await CRUD.delete(user) except Exception as e: logger.error(f"Ошибка удаления пользователя: {str(e)}") return False logger.info( f"Пользователь {userName} {'успешно удален' if result else 'не удален'}" ) await ServiceRecordsHandler.add(user_id, {"Удален пользователь": userName}) return result async def deletePhoto(id: int, user_id: int = None) -> bool: query = select(User).where(User.id == id) user = await CRUD.read(query) if not user: logger.error("Пользователь с таким id не найден") return False try: deleteImage(user.photo) user.photo = "images/users/default.png" await user.save() except Exception as e: logger.error(f"Ошибка удаления фото пользователя: {str(e)}") return False logger.info(f"Фото пользователя {user.username} успешно удалено") await ServiceRecordsHandler.add( user_id, {"Удалено фото пользователя": user.username} ) return True async def auth(login: str, password: str) -> dict: query = select(User).where(User.login == login) user = await CRUD.read(query) if not user: logger.error("Пользователь с таким логином не найден") return {} if not pwd_verify(password, user.hashed_password): logger.error("Неверный пароль") return {} userData = user.toDict() userData.pop("hashed_password") await ServiceRecordsHandler.add( user.id, {"Авторизован пользователь": user.username} ) return userData async def initialize(): from .access import AccessLevelHandler logger.info("Инициализация пользователей") accessLevelsList = await AccessLevelHandler.getAll() acessLevels = { accessLevel["title"]: accessLevel["id"] for accessLevel in accessLevelsList } password = "Alex0172" baseUsers = { "admin": { "login": "admin", "username": "Администратор - Demo", "password": password, "access_level_id": acessLevels["Администратор"], }, "manager": { "login": "manager", "username": "Менеджер - Demo", "password": password, "access_level_id": acessLevels["Менеджер"], }, "storekeeper": { "login": "storekeeper", "username": "Кладовщик - Demo", "password": password, "access_level_id": acessLevels["Кладовщик"], }, "employee": { "login": "employee", "username": "Сотрудник - Demo", "password": password, "access_level_id": acessLevels["Сотрудник"], }, } for user in baseUsers.values(): await UserHandler.add(user) logger.info("Инициализация пользователей завершена") return