Files
toolbox/db/handlers/user.py
T

295 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy import or_, select
from db import CRUD
from db.handlers.access import AccessLevelHandler
from db.handlers.toolbox import ToolboxHandler
from utils import logger, pwd_hash, saveImage, safeFilename, deleteImage, pwd_verify
from db.schemas.user import User
from db.handlers.records import ServiceRecordsHandler, StocksRecordsHandler
def handleUserPhoto(imageData, login: str):
import base64
login = safeFilename(login)
fileName = f"static/images/users/{login}.png"
if imageData.startswith("data:image"):
header, encoded = imageData.split(",", 1)
else:
encoded = imageData
file_bytes = base64.b64decode(encoded)
if not saveImage(file_bytes, fileName):
return None
return fileName
class UserHandler:
async def add(userData: dict, user_id: int = None) -> dict:
login = userData.get("login", None)
if not login:
logger.error("Не указан логин")
return {}
userName = userData.get("username", None)
if not userName:
logger.error("Не указано имя пользователя")
return {}
query = select(User).where(or_(User.login == login, User.username == userName))
user = await CRUD.read(query)
if user:
logger.error("Пользователь с таким логином или именем уже существует")
return {}
if "access_level_id" not in userData:
logger.error("Не указан уровень доступа")
return {}
if "password" not in userData:
logger.error("Не указан пароль")
return {}
userData["hashed_password"] = pwd_hash(userData.pop("password"))
if "photo" in userData:
imageData = userData.pop("photo")
imageFileName = handleUserPhoto(imageData, login)
if imageFileName:
userData["photo"] = imageFileName
try:
newUser = await User(**userData).save()
except Exception as e:
logger.error(f"Ошибка сохранения пользователя: {str(e)}")
return {}
if not newUser:
logger.error("Пользователь не сохранен")
return {}
logger.info(
f"Пользователь {newUser.username} успешно добавлен, id: {newUser.id}"
)
userAccessLevel = await AccessLevelHandler.get(newUser.access_level_id)
if not userAccessLevel:
logger.error("Уровень доступа не найден")
return {}
if userAccessLevel.get("available_own_toolbox"):
newToolboxData = {
"title": newUser.username,
"description": f"Оборудование, полученное сотрудником {newUser.username}, под личную материальную ответственность",
"owner_id": newUser.id,
}
newToolbox = await ToolboxHandler.add(newToolboxData)
logger.info(
f"Тулбокс {newToolbox['title']} успешно создан и закреплен за пользователем {newUser.username}"
)
newUserData = newUser.toDict()
newUserData.pop("hashed_password")
await ServiceRecordsHandler.add(user_id, {"Добавлен пользователь": newUserData})
newUserData["access_level_data"] = userAccessLevel
return newUserData
async def edit(userData: dict, user_id: int = None) -> dict:
id = userData.get("id", None)
if not id:
logger.error("Не указан id пользователя")
return {"error": "Не указан id пользователя"}
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return {"error": "Пользователь не найден"}
if len(userData.keys()) == 0:
logger.error("Не указаны изменяемые данные")
return {"error": "Не указаны изменяемые данные"}
if "password" in userData:
userData["hashed_password"] = pwd_hash(userData.pop("password"))
if "photo" in userData:
imageData = userData.pop("photo")
if imageData != "":
login = user.login if "login" not in userData else userData["login"]
photoFile = handleUserPhoto(imageData, login)
if photoFile:
userData["photo"] = photoFile
deleteImage(user.photo)
else:
logger.error("Ошибка обновления фото пользователя")
return {"error": "Ошибка обновления фото пользователя"}
else:
userData["photo"] = "static/images/users/default.png"
deleteImage(user.photo)
if "login" in userData:
uniqueLogin = await CRUD.read(
select(User).where(User.login == userData["login"])
)
if uniqueLogin and uniqueLogin.id != user.id:
logger.error("Пользователь с таким логином уже существует")
return {"error": "Пользователь с таким логином уже существует"}
if "username" in userData:
uniqueUserName = await CRUD.read(
select(User).where(User.username == userData["username"])
)
if uniqueUserName and uniqueUserName.id != user.id:
logger.error("Пользователь с таким именем уже существует")
return {"error": "Пользователь с таким именем уже существует"}
try:
userData.pop("id")
editedUser = await user.edit(**userData)
except Exception as e:
logger.error(f"Ошибка обновления пользователя: {str(e)}")
return {"error": "Ошибка обновления пользователя"}
if not editedUser:
logger.error("Ошибка обновления пользователя")
return {"error": "Ошибка обновления пользователя"}
logger.info(
f"Пользователь {editedUser.username} успешно обновлен, изменены данные: {userData.keys()}"
)
if user.access_level_id != editedUser.access_level_id:
userAccessLevel = await AccessLevelHandler.get(user.access_level_id)
userAccessLevelNew = await AccessLevelHandler.get(
editedUser.access_level_id
)
if not userAccessLevel or not userAccessLevelNew:
logger.error("Уровень доступа не найден")
return {"error": "Уровень доступа не найден"}
if not userAccessLevel.get("available_own_toolbox"):
if userAccessLevelNew.get("available_own_toolbox"):
newToolboxData = {
"title": editedUser.username,
"description": f"Оборудование, полученное сотрудником '{editedUser.username}' под личную материальную ответственность",
"owner_id": editedUser.id,
}
newToolbox = await ToolboxHandler.addNewToolbox(newToolboxData)
logger.info(
f"Тулбокс {newToolbox['title']} успешно создан и закреплен за пользователем {editedUser.username}"
)
editUserData = editedUser.toDict()
editUserData.pop("hashed_password")
await ServiceRecordsHandler.add(user_id, {"Изменен пользователь": editUserData})
return editUserData
async def getAll() -> list[dict]:
query = select(User)
users = await CRUD.read(query, True)
return [user.toDict() for user in users]
async def get(id: int) -> dict:
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return {}
userdata = user.toDict()
userdata.pop("hashed_password")
return userdata
async def delete(id: int, user_id: int = None) -> dict:
userRecordsCount = await StocksRecordsHandler.getUserRecords(id)
if userRecordsCount > 0:
logger.error(f"У пользователя {id} есть записи: {userRecordsCount}")
return {"error": "У пользователя есть записи"}
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return {"error": "Пользователь не найден"}
try:
userName = user.username
photoFile = user.photo
result = await CRUD.delete(user)
except Exception as e:
logger.error(f"Ошибка удаления пользователя: {str(e)}")
return {"error": "Ошибка удаления пользователя"}
if result:
deleteImage(photoFile)
logger.info(
f"Пользователь {userName} {'успешно удален' if result else 'не удален'}"
)
await ServiceRecordsHandler.add(user_id, {"Удален пользователь": userName})
return {"error": "Ошибка удаления пользователя"} if not result else {}
async def deletePhoto(id: int, user_id: int = None) -> bool:
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return False
try:
deleteImage(user.photo)
user.photo = "images/users/default.png"
await user.save()
except Exception as e:
logger.error(f"Ошибка удаления фото пользователя: {str(e)}")
return False
logger.info(f"Фото пользователя {user.username} успешно удалено")
await ServiceRecordsHandler.add(
user_id, {"Удалено фото пользователя": user.username}
)
return True
async def checkActive(id: int) -> bool:
query = select(User).where(User.id == id)
user = await CRUD.read(query)
if not user:
logger.error("Пользователь с таким id не найден")
return False
return user.is_active
async def auth(login: str, password: str) -> dict:
query = select(User).where(User.login == login)
user = await CRUD.read(query)
if not user:
logger.error(f"Пользователь с логином {login} не найден")
return {}
if not user.is_active:
logger.error(f"Пользователь {user.username} не активен")
await ServiceRecordsHandler.add(
user.id, {"Пользователь не активен": user.username}
)
return {}
if not pwd_verify(password, user.hashed_password):
logger.error(f"Неверный пароль пользователя {user.username}")
await ServiceRecordsHandler.add(
user.id, {"Неверный пароль пользователя": user.username}
)
return {}
userData = user.toDict()
userData.pop("hashed_password")
await ServiceRecordsHandler.add(
user.id, {"Авторизован пользователь": user.username}
)
logger.info(f"Пользователь {user.username} успешно авторизован")
return userData
async def initialize():
from .access import AccessLevelHandler
logger.info("Инициализация пользователей")
accessLevelsList = await AccessLevelHandler.getAll()
acessLevels = {
accessLevel["title"]: accessLevel["id"] for accessLevel in accessLevelsList
}
password = "Alex0172"
baseUsers = {
"admin": {
"login": "admin",
"username": "Администратор",
"password": password,
"access_level_id": acessLevels["Администратор"],
},
"manager": {
"login": "manager",
"username": "Менеджер",
"password": password,
"access_level_id": acessLevels["Менеджер"],
},
"storekeeper": {
"login": "storekeeper",
"username": "Кладовщик",
"password": password,
"access_level_id": acessLevels["Кладовщик"],
},
"employee": {
"login": "employee",
"username": "Сотрудник",
"password": password,
"access_level_id": acessLevels["Сотрудник"],
},
}
for user in baseUsers.values():
await UserHandler.add(user)
logger.info("Инициализация пользователей завершена")
return