Files
toolbox/db/handlers/user.py
T

236 lines
10 KiB
Python

from sqlalchemy import or_, select
from db import CRUD
from db.handlers.access import AccessLevelHandler
from db.handlers.toolbox import ToolboxHandler
from utils import logger, pwd_hash, saveImage, safeFilename, deleteImage, pwd_verify
from db.schemas.user import User
from db.handlers.records import ServiceRecordsHandler
def handleUserPhoto(imageData, login: str):
login = safeFilename(login)
fileName = f"users/{login}.png"
if not saveImage(imageData, fileName):
return None
return fileName
class UserHandler:
async def add(userData: dict, user_id: int = None) -> dict:
login = userData.get("login", None)
if not login:
logger.error("Не указан логин")
return {}
userName = userData.get("username", None)
if not userName:
logger.error("Не указано имя пользователя")
return {}
query = select(User).where(or_(User.login == login, User.username == userName))
user = await CRUD.read(query)
if user:
logger.error("Пользователь с таким логином или именем уже существует")
return {}
if "access_level_id" not in userData:
logger.error("Не указан уровень доступа")
return {}
if "password" not in userData:
logger.error("Не указан пароль")
return {}
userData["hashed_password"] = pwd_hash(userData.pop("password"))
if "photo" in userData:
imageData = userData.pop("photo")
photoFile = handleUserPhoto(imageData, login)
if photoFile:
userData["photo"] = photoFile
try:
newUser = await User(**userData).save()
except Exception as e:
logger.error(f"Ошибка сохранения пользователя: {str(e)}")
return {}
if not newUser:
logger.error("Пользователь не сохранен")
return {}
logger.info(
f"Пользователь {newUser.username} успешно добавлен, id: {newUser.id}"
)
userAccessLevel = await AccessLevelHandler.get(newUser.access_level_id)
if not userAccessLevel:
logger.error("Уровень доступа не найден")
return {}
if userAccessLevel.get("available_own_toolbox"):
newToolboxData = {
"title": newUser.username,
"description": f"Оборудование, полученное сотрудником {newUser.username}, под личную материальную ответственность",
"owner_id": newUser.id,
}
newToolbox = await ToolboxHandler.add(newToolboxData)
logger.info(
f"Тулбокс {newToolbox['title']} успешно создан и закреплен за пользователем {newUser.username}"
)
await ServiceRecordsHandler.add(
user_id, {"Добавлен пользователь": newUser.toDict()}
)
newUserData = newUser.toDict()
newUserData["access_level_data"] = userAccessLevel
return newUserData
async def edit(userData: dict, user_id: int = None) -> dict:
id = userData.get("id", None)
if not id:
logger.error("Не указан id пользователя")
return {}
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return {}
changedUserData = userData.get("changedUserData", {})
if len(changedUserData.keys()) == 0:
logger.error("Не указаны изменяемые данные")
return {}
if "password" in changedUserData:
userData["hashed_password"] = pwd_hash(changedUserData.pop("password"))
if "photo" in changedUserData:
imageData = changedUserData.pop("photo")
photoFile = handleUserPhoto(imageData, user.login)
if photoFile:
changedUserData["photo"] = photoFile
deleteImage(user.photo)
try:
editedUser = await user.edit(**changedUserData)
except Exception as e:
logger.error(f"Ошибка обновления пользователя: {str(e)}")
return {}
if not editedUser:
logger.error("Ошибка обновления пользователя")
return {}
logger.info(
f"Пользователь {editedUser.username} успешно обновлен, изменены данные: {changedUserData.keys()}"
)
if not user.available_own_toolbox:
if editedUser.available_own_toolbox:
newToolboxData = {
"title": editedUser.username,
"description": f"Оборудование, полученное сотрудником '{editedUser.username}' под личную материальную ответственность",
"owner_id": editedUser.id,
}
newToolbox = await ToolboxHandler.addNewToolbox(newToolboxData)
logger.info(
f"Тулбокс {newToolbox['title']} успешно создан и закреплен за пользователем {editedUser.username}"
)
await ServiceRecordsHandler.add(
user_id, {"Изменен пользователь": editedUser.toDict()}
)
return editedUser.toDict()
async def getAll() -> list[dict]:
query = select(User)
users = await CRUD.read(query, True)
return [user.toDict() for user in users]
async def get(id: int) -> dict:
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return {}
return user.toDict()
async def delete(id: int, user_id: int = None) -> bool:
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return False
try:
userName = user.username
result = await CRUD.delete(user)
except Exception as e:
logger.error(f"Ошибка удаления пользователя: {str(e)}")
return False
logger.info(
f"Пользователь {userName} {'успешно удален' if result else 'не удален'}"
)
await ServiceRecordsHandler.add(user_id, {"Удален пользователь": userName})
return result
async def deletePhoto(id: int, user_id: int = None) -> bool:
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return False
try:
deleteImage(user.photo)
user.photo = "images/users/default.png"
await user.save()
except Exception as e:
logger.error(f"Ошибка удаления фото пользователя: {str(e)}")
return False
logger.info(f"Фото пользователя {user.username} успешно удалено")
await ServiceRecordsHandler.add(
user_id, {"Удалено фото пользователя": user.username}
)
return True
async def auth(login: str, password: str) -> dict:
query = select(User).where(User.login == login)
user = await CRUD.read(query)
if not user:
logger.error(f"Пользователь с логином {login} не найден")
return {}
if not pwd_verify(password, user.hashed_password):
logger.error(f"Неверный пароль пользователя {user.username}")
await ServiceRecordsHandler.add(
user.id, {"Неверный пароль пользователя": user.username}
)
return {}
userData = user.toDict()
userData.pop("hashed_password")
await ServiceRecordsHandler.add(
user.id, {"Авторизован пользователь": user.username}
)
logger.info(f"Пользователь {user.username} успешно авторизован")
return userData
async def initialize():
from .access import AccessLevelHandler
logger.info("Инициализация пользователей")
accessLevelsList = await AccessLevelHandler.getAll()
acessLevels = {
accessLevel["title"]: accessLevel["id"] for accessLevel in accessLevelsList
}
password = "Alex0172"
baseUsers = {
"admin": {
"login": "admin",
"username": "Администратор - Demo",
"password": password,
"access_level_id": acessLevels["Администратор"],
},
"manager": {
"login": "manager",
"username": "Менеджер - Demo",
"password": password,
"access_level_id": acessLevels["Менеджер"],
},
"storekeeper": {
"login": "storekeeper",
"username": "Кладовщик - Demo",
"password": password,
"access_level_id": acessLevels["Кладовщик"],
},
"employee": {
"login": "employee",
"username": "Сотрудник - Demo",
"password": password,
"access_level_id": acessLevels["Сотрудник"],
},
}
for user in baseUsers.values():
await UserHandler.add(user)
logger.info("Инициализация пользователей завершена")
return