from sqlalchemy import or_, select from db import CRUD from db.handlers.access import AccessLevelHandler from db.handlers.toolbox import ToolboxHandler from utils import logger, pwd_hash, saveImage, safeFilename, deleteImage, pwd_verify from db.schemas.user import User from db.handlers.records import ServiceRecordsHandler, StocksRecordsHandler def handleUserPhoto(imageData, login: str): import base64 login = safeFilename(login) fileName = f"static/images/users/{login}.png" if imageData.startswith("data:image"): header, encoded = imageData.split(",", 1) else: encoded = imageData file_bytes = base64.b64decode(encoded) if not saveImage(file_bytes, fileName): return None return fileName class UserHandler: async def add(userData: dict, user_id: int = None) -> dict: login = userData.get("login", None) if not login: logger.error("Не указан логин") return {} userName = userData.get("username", None) if not userName: logger.error("Не указано имя пользователя") return {} query = select(User).where(or_(User.login == login, User.username == userName)) user = await CRUD.read(query) if user: logger.error("Пользователь с таким логином или именем уже существует") return {} if "access_level_id" not in userData: logger.error("Не указан уровень доступа") return {} if "password" not in userData: logger.error("Не указан пароль") return {} userData["hashed_password"] = pwd_hash(userData.pop("password")) if "photo" in userData: imageData = userData.pop("photo") imageFileName = handleUserPhoto(imageData, login) if imageFileName: userData["photo"] = imageFileName try: newUser = await User(**userData).save() except Exception as e: logger.error(f"Ошибка сохранения пользователя: {str(e)}") return {} if not newUser: logger.error("Пользователь не сохранен") return {} logger.info( f"Пользователь {newUser.username} успешно добавлен, id: {newUser.id}" ) userAccessLevel = await AccessLevelHandler.get(newUser.access_level_id) if not userAccessLevel: logger.error("Уровень доступа не найден") return {} if userAccessLevel.get("available_own_toolbox"): newToolboxData = { "title": newUser.username, "description": f"Оборудование, полученное сотрудником {newUser.username}, под личную материальную ответственность", "owner_id": newUser.id, } newToolbox = await ToolboxHandler.add(newToolboxData) logger.info( f"Тулбокс {newToolbox['title']} успешно создан и закреплен за пользователем {newUser.username}" ) newUserData = newUser.toDict() newUserData.pop("hashed_password") await ServiceRecordsHandler.add(user_id, {"Добавлен пользователь": newUserData}) newUserData["access_level_data"] = userAccessLevel return newUserData async def edit(userData: dict, user_id: int = None) -> dict: id = userData.get("id", None) if not id: logger.error("Не указан id пользователя") return {"error": "Не указан id пользователя"} query = select(User).where(User.id == id) user = await CRUD.read(query) if not user: logger.error("Пользователь с таким id не найден") return {"error": "Пользователь не найден"} if len(userData.keys()) == 0: logger.error("Не указаны изменяемые данные") return {"error": "Не указаны изменяемые данные"} if "password" in userData: userData["hashed_password"] = pwd_hash(userData.pop("password")) if "photo" in userData: imageData = userData.pop("photo") if imageData != "": login = user.login if "login" not in userData else userData["login"] photoFile = handleUserPhoto(imageData, login) if photoFile: userData["photo"] = photoFile deleteImage(user.photo) else: logger.error("Ошибка обновления фото пользователя") return {"error": "Ошибка обновления фото пользователя"} else: userData["photo"] = "static/images/users/default.png" deleteImage(user.photo) if "login" in userData: uniqueLogin = await CRUD.read( select(User).where(User.login == userData["login"]) ) if uniqueLogin and uniqueLogin.id != user.id: logger.error("Пользователь с таким логином уже существует") return {"error": "Пользователь с таким логином уже существует"} if "username" in userData: uniqueUserName = await CRUD.read( select(User).where(User.username == userData["username"]) ) if uniqueUserName and uniqueUserName.id != user.id: logger.error("Пользователь с таким именем уже существует") return {"error": "Пользователь с таким именем уже существует"} try: userData.pop("id") editedUser = await user.edit(**userData) except Exception as e: logger.error(f"Ошибка обновления пользователя: {str(e)}") return {"error": "Ошибка обновления пользователя"} if not editedUser: logger.error("Ошибка обновления пользователя") return {"error": "Ошибка обновления пользователя"} logger.info( f"Пользователь {editedUser.username} успешно обновлен, изменены данные: {userData.keys()}" ) if user.access_level_id != editedUser.access_level_id: userAccessLevel = await AccessLevelHandler.get(user.access_level_id) userAccessLevelNew = await AccessLevelHandler.get( editedUser.access_level_id ) if not userAccessLevel or not userAccessLevelNew: logger.error("Уровень доступа не найден") return {"error": "Уровень доступа не найден"} if not userAccessLevel.get("available_own_toolbox"): if userAccessLevelNew.get("available_own_toolbox"): newToolboxData = { "title": editedUser.username, "description": f"Оборудование, полученное сотрудником '{editedUser.username}' под личную материальную ответственность", "owner_id": editedUser.id, } newToolbox = await ToolboxHandler.addNewToolbox(newToolboxData) logger.info( f"Тулбокс {newToolbox['title']} успешно создан и закреплен за пользователем {editedUser.username}" ) editUserData = editedUser.toDict() editUserData.pop("hashed_password") await ServiceRecordsHandler.add(user_id, {"Изменен пользователь": editUserData}) return editUserData async def getAll() -> list[dict]: query = select(User) users = await CRUD.read(query, True) return [user.toDict() for user in users] async def get(id: int) -> dict: query = select(User).where(User.id == id) user = await CRUD.read(query) if not user: logger.error("Пользователь с таким id не найден") return {} userdata = user.toDict() userdata.pop("hashed_password") return userdata async def delete(id: int, user_id: int = None) -> dict: userRecordsCount = await StocksRecordsHandler.getUserRecords(id) if userRecordsCount > 0: logger.error(f"У пользователя {id} есть записи: {userRecordsCount}") return {"error": "У пользователя есть записи"} query = select(User).where(User.id == id) user = await CRUD.read(query) if not user: logger.error("Пользователь с таким id не найден") return {"error": "Пользователь не найден"} try: userName = user.username photoFile = user.photo result = await CRUD.delete(user) except Exception as e: logger.error(f"Ошибка удаления пользователя: {str(e)}") return {"error": "Ошибка удаления пользователя"} if result: deleteImage(photoFile) logger.info( f"Пользователь {userName} {'успешно удален' if result else 'не удален'}" ) await ServiceRecordsHandler.add(user_id, {"Удален пользователь": userName}) return {"error": "Ошибка удаления пользователя"} if not result else {} async def deletePhoto(id: int, user_id: int = None) -> bool: query = select(User).where(User.id == id) user = await CRUD.read(query) if not user: logger.error("Пользователь с таким id не найден") return False try: deleteImage(user.photo) user.photo = "images/users/default.png" await user.save() except Exception as e: logger.error(f"Ошибка удаления фото пользователя: {str(e)}") return False logger.info(f"Фото пользователя {user.username} успешно удалено") await ServiceRecordsHandler.add( user_id, {"Удалено фото пользователя": user.username} ) return True async def checkActive(id: int) -> bool: query = select(User).where(User.id == id) user = await CRUD.read(query) if not user: logger.error("Пользователь с таким id не найден") return False return user.is_active async def auth(login: str, password: str) -> dict: query = select(User).where(User.login == login) user = await CRUD.read(query) if not user: logger.error(f"Пользователь с логином {login} не найден") return {} if not user.is_active: logger.error(f"Пользователь {user.username} не активен") await ServiceRecordsHandler.add( user.id, {"Пользователь не активен": user.username} ) return {} if not pwd_verify(password, user.hashed_password): logger.error(f"Неверный пароль пользователя {user.username}") await ServiceRecordsHandler.add( user.id, {"Неверный пароль пользователя": user.username} ) return {} userData = user.toDict() userData.pop("hashed_password") await ServiceRecordsHandler.add( user.id, {"Авторизован пользователь": user.username} ) logger.info(f"Пользователь {user.username} успешно авторизован") return userData async def initialize(): from .access import AccessLevelHandler logger.info("Инициализация пользователей") accessLevelsList = await AccessLevelHandler.getAll() acessLevels = { accessLevel["title"]: accessLevel["id"] for accessLevel in accessLevelsList } password = "demoAccess" baseUsers = { "admin": { "login": "admin", "username": "Администратор", "password": password, "access_level_id": acessLevels["Администратор"], }, "manager": { "login": "manager", "username": "Менеджер", "password": password, "access_level_id": acessLevels["Менеджер"], }, "storekeeper": { "login": "storekeeper", "username": "Кладовщик", "password": password, "access_level_id": acessLevels["Кладовщик"], }, "employee": { "login": "employee", "username": "Сотрудник", "password": password, "access_level_id": acessLevels["Сотрудник"], }, } for user in baseUsers.values(): await UserHandler.add(user) logger.info("Инициализация пользователей завершена") return