Files
toolbox/db/handlers/user.py
T

236 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy import or_, select
from db import CRUD
from db.handlers.access import AccessLevelHandler
from db.handlers.toolbox import ToolboxHandler
from utils import logger, pwd_hash, saveImage, safeFilename, deleteImage, pwd_verify
from db.schemas.user import User
from db.handlers.records import ServiceRecordsHandler
def handleUserPhoto(imageData, login: str):
login = safeFilename(login)
fileName = f"users/{login}.png"
if not saveImage(imageData, fileName):
return None
return fileName
class UserHandler:
async def add(userData: dict, user_id: int = None) -> dict:
login = userData.get("login", None)
if not login:
logger.error("Не указан логин")
return {}
userName = userData.get("username", None)
if not userName:
logger.error("Не указано имя пользователя")
return {}
query = select(User).where(or_(User.login == login, User.username == userName))
user = await CRUD.read(query)
if user:
logger.error("Пользователь с таким логином или именем уже существует")
return {}
if "access_level_id" not in userData:
logger.error("Не указан уровень доступа")
return {}
if "password" not in userData:
logger.error("Не указан пароль")
return {}
userData["hashed_password"] = pwd_hash(userData.pop("password"))
if "photo" in userData:
imageData = userData.pop("photo")
photoFile = handleUserPhoto(imageData, login)
if photoFile:
userData["photo"] = photoFile
try:
newUser = await User(**userData).save()
except Exception as e:
logger.error(f"Ошибка сохранения пользователя: {str(e)}")
return {}
if not newUser:
logger.error("Пользователь не сохранен")
return {}
logger.info(
f"Пользователь {newUser.username} успешно добавлен, id: {newUser.id}"
)
userAccessLevel = await AccessLevelHandler.get(newUser.access_level_id)
if not userAccessLevel:
logger.error("Уровень доступа не найден")
return {}
if userAccessLevel.get("available_own_toolbox"):
newToolboxData = {
"title": f"Т{newUser.username}",
"description": f"Оборудование, полученное сотрудником {newUser.username}, под личную материальную ответственность",
"owner_id": newUser.id,
}
newToolbox = await ToolboxHandler.add(newToolboxData)
logger.info(
f"Тулбокс {newToolbox['title']} успешно создан и закреплен за пользователем {newUser.username}"
)
await ServiceRecordsHandler.add(
user_id, {"Добавлен пользователь": newUser.toDict()}
)
newUserData = newUser.toDict()
newUserData["access_level_data"] = userAccessLevel
return newUserData
async def edit(userData: dict, user_id: int = None) -> dict:
id = userData.get("id", None)
if not id:
logger.error("Не указан id пользователя")
return {}
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return {}
changedUserData = userData.get("changedUserData", {})
if len(changedUserData.keys()) == 0:
logger.error("Не указаны изменяемые данные")
return {}
if "password" in changedUserData:
userData["hashed_password"] = pwd_hash(changedUserData.pop("password"))
if "photo" in changedUserData:
imageData = changedUserData.pop("photo")
photoFile = handleUserPhoto(imageData, user.login)
if photoFile:
changedUserData["photo"] = photoFile
deleteImage(user.photo)
try:
editedUser = await user.edit(**changedUserData)
except Exception as e:
logger.error(f"Ошибка обновления пользователя: {str(e)}")
return {}
if not editedUser:
logger.error("Ошибка обновления пользователя")
return {}
logger.info(
f"Пользователь {editedUser.username} успешно обновлен, изменены данные: {changedUserData.keys()}"
)
if not user.available_own_toolbox:
if editedUser.available_own_toolbox:
newToolboxData = {
"title": f"Тулбокс {editedUser.username}",
"description": f"Оборудование, полученное сотрудником '{editedUser.username}' под личную материальную ответственность",
"owner_id": editedUser.id,
}
newToolbox = await ToolboxHandler.addNewToolbox(newToolboxData)
logger.info(
f"Тулбокс {newToolbox['title']} успешно создан и закреплен за пользователем {editedUser.username}"
)
await ServiceRecordsHandler.add(
user_id, {"Изменен пользователь": editedUser.toDict()}
)
return editedUser.toDict()
async def getAll() -> list[dict]:
query = select(User)
users = await CRUD.read(query, True)
return [user.toDict() for user in users]
async def get(id: int) -> dict:
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return {}
return user.toDict()
async def delete(id: int, user_id: int = None) -> bool:
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return False
try:
userName = user.username
result = await CRUD.delete(user)
except Exception as e:
logger.error(f"Ошибка удаления пользователя: {str(e)}")
return False
logger.info(
f"Пользователь {userName} {'успешно удален' if result else 'не удален'}"
)
await ServiceRecordsHandler.add(user_id, {"Удален пользователь": userName})
return result
async def deletePhoto(id: int, user_id: int = None) -> bool:
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return False
try:
deleteImage(user.photo)
user.photo = "images/users/default.png"
await user.save()
except Exception as e:
logger.error(f"Ошибка удаления фото пользователя: {str(e)}")
return False
logger.info(f"Фото пользователя {user.username} успешно удалено")
await ServiceRecordsHandler.add(
user_id, {"Удалено фото пользователя": user.username}
)
return True
async def auth(login: str, password: str) -> dict:
query = select(User).where(User.login == login)
user = await CRUD.read(query)
if not user:
logger.error(f"Пользователь с логином {login} не найден")
return {}
if not pwd_verify(password, user.hashed_password):
logger.error(f"Неверный пароль пользователя {user.username}")
await ServiceRecordsHandler.add(
user.id, {"Неверный пароль пользователя": user.username}
)
return {}
userData = user.toDict()
userData.pop("hashed_password")
await ServiceRecordsHandler.add(
user.id, {"Авторизован пользователь": user.username}
)
logger.info(f"Пользователь {user.username} успешно авторизован")
return userData
async def initialize():
from .access import AccessLevelHandler
logger.info("Инициализация пользователей")
accessLevelsList = await AccessLevelHandler.getAll()
acessLevels = {
accessLevel["title"]: accessLevel["id"] for accessLevel in accessLevelsList
}
password = "Alex0172"
baseUsers = {
"admin": {
"login": "admin",
"username": "Администратор - Demo",
"password": password,
"access_level_id": acessLevels["Администратор"],
},
"manager": {
"login": "manager",
"username": "Менеджер - Demo",
"password": password,
"access_level_id": acessLevels["Менеджер"],
},
"storekeeper": {
"login": "storekeeper",
"username": "Кладовщик - Demo",
"password": password,
"access_level_id": acessLevels["Кладовщик"],
},
"employee": {
"login": "employee",
"username": "Сотрудник - Demo",
"password": password,
"access_level_id": acessLevels["Сотрудник"],
},
}
for user in baseUsers.values():
await UserHandler.add(user)
logger.info("Инициализация пользователей завершена")
return