295 lines
14 KiB
Python
295 lines
14 KiB
Python
from sqlalchemy import or_, select
|
||
from db import CRUD
|
||
from db.handlers.access import AccessLevelHandler
|
||
from db.handlers.toolbox import ToolboxHandler
|
||
from utils import logger, pwd_hash, saveImage, safeFilename, deleteImage, pwd_verify
|
||
from db.schemas.user import User
|
||
from db.handlers.records import ServiceRecordsHandler, StocksRecordsHandler
|
||
|
||
|
||
def handleUserPhoto(imageData, login: str):
|
||
import base64
|
||
|
||
login = safeFilename(login)
|
||
fileName = f"static/images/users/{login}.png"
|
||
if imageData.startswith("data:image"):
|
||
header, encoded = imageData.split(",", 1)
|
||
else:
|
||
encoded = imageData
|
||
file_bytes = base64.b64decode(encoded)
|
||
if not saveImage(file_bytes, fileName):
|
||
return None
|
||
return fileName
|
||
|
||
|
||
class UserHandler:
|
||
async def add(userData: dict, user_id: int = None) -> dict:
|
||
login = userData.get("login", None)
|
||
if not login:
|
||
logger.error("Не указан логин")
|
||
return {}
|
||
userName = userData.get("username", None)
|
||
if not userName:
|
||
logger.error("Не указано имя пользователя")
|
||
return {}
|
||
query = select(User).where(or_(User.login == login, User.username == userName))
|
||
user = await CRUD.read(query)
|
||
if user:
|
||
logger.error("Пользователь с таким логином или именем уже существует")
|
||
return {}
|
||
if "access_level_id" not in userData:
|
||
logger.error("Не указан уровень доступа")
|
||
return {}
|
||
if "password" not in userData:
|
||
logger.error("Не указан пароль")
|
||
return {}
|
||
userData["hashed_password"] = pwd_hash(userData.pop("password"))
|
||
if "photo" in userData:
|
||
imageData = userData.pop("photo")
|
||
imageFileName = handleUserPhoto(imageData, login)
|
||
if imageFileName:
|
||
userData["photo"] = imageFileName
|
||
try:
|
||
newUser = await User(**userData).save()
|
||
except Exception as e:
|
||
logger.error(f"Ошибка сохранения пользователя: {str(e)}")
|
||
return {}
|
||
if not newUser:
|
||
logger.error("Пользователь не сохранен")
|
||
return {}
|
||
logger.info(
|
||
f"Пользователь {newUser.username} успешно добавлен, id: {newUser.id}"
|
||
)
|
||
userAccessLevel = await AccessLevelHandler.get(newUser.access_level_id)
|
||
if not userAccessLevel:
|
||
logger.error("Уровень доступа не найден")
|
||
return {}
|
||
if userAccessLevel.get("available_own_toolbox"):
|
||
newToolboxData = {
|
||
"title": newUser.username,
|
||
"description": f"Оборудование, полученное сотрудником {newUser.username}, под личную материальную ответственность",
|
||
"owner_id": newUser.id,
|
||
}
|
||
newToolbox = await ToolboxHandler.add(newToolboxData)
|
||
logger.info(
|
||
f"Тулбокс {newToolbox['title']} успешно создан и закреплен за пользователем {newUser.username}"
|
||
)
|
||
newUserData = newUser.toDict()
|
||
newUserData.pop("hashed_password")
|
||
await ServiceRecordsHandler.add(user_id, {"Добавлен пользователь": newUserData})
|
||
newUserData["access_level_data"] = userAccessLevel
|
||
return newUserData
|
||
|
||
async def edit(userData: dict, user_id: int = None) -> dict:
|
||
id = userData.get("id", None)
|
||
if not id:
|
||
logger.error("Не указан id пользователя")
|
||
return {"error": "Не указан id пользователя"}
|
||
query = select(User).where(User.id == id)
|
||
user = await CRUD.read(query)
|
||
if not user:
|
||
logger.error("Пользователь с таким id не найден")
|
||
return {"error": "Пользователь не найден"}
|
||
if len(userData.keys()) == 0:
|
||
logger.error("Не указаны изменяемые данные")
|
||
return {"error": "Не указаны изменяемые данные"}
|
||
if "password" in userData:
|
||
userData["hashed_password"] = pwd_hash(userData.pop("password"))
|
||
if "photo" in userData:
|
||
imageData = userData.pop("photo")
|
||
if imageData != "":
|
||
login = user.login if "login" not in userData else userData["login"]
|
||
photoFile = handleUserPhoto(imageData, login)
|
||
if photoFile:
|
||
userData["photo"] = photoFile
|
||
deleteImage(user.photo)
|
||
else:
|
||
logger.error("Ошибка обновления фото пользователя")
|
||
return {"error": "Ошибка обновления фото пользователя"}
|
||
else:
|
||
userData["photo"] = "static/images/users/default.png"
|
||
deleteImage(user.photo)
|
||
if "login" in userData:
|
||
uniqueLogin = await CRUD.read(
|
||
select(User).where(User.login == userData["login"])
|
||
)
|
||
if uniqueLogin and uniqueLogin.id != user.id:
|
||
logger.error("Пользователь с таким логином уже существует")
|
||
return {"error": "Пользователь с таким логином уже существует"}
|
||
if "username" in userData:
|
||
uniqueUserName = await CRUD.read(
|
||
select(User).where(User.username == userData["username"])
|
||
)
|
||
if uniqueUserName and uniqueUserName.id != user.id:
|
||
logger.error("Пользователь с таким именем уже существует")
|
||
return {"error": "Пользователь с таким именем уже существует"}
|
||
try:
|
||
userData.pop("id")
|
||
editedUser = await user.edit(**userData)
|
||
except Exception as e:
|
||
logger.error(f"Ошибка обновления пользователя: {str(e)}")
|
||
return {"error": "Ошибка обновления пользователя"}
|
||
if not editedUser:
|
||
logger.error("Ошибка обновления пользователя")
|
||
return {"error": "Ошибка обновления пользователя"}
|
||
logger.info(
|
||
f"Пользователь {editedUser.username} успешно обновлен, изменены данные: {userData.keys()}"
|
||
)
|
||
if user.access_level_id != editedUser.access_level_id:
|
||
userAccessLevel = await AccessLevelHandler.get(user.access_level_id)
|
||
userAccessLevelNew = await AccessLevelHandler.get(
|
||
editedUser.access_level_id
|
||
)
|
||
if not userAccessLevel or not userAccessLevelNew:
|
||
logger.error("Уровень доступа не найден")
|
||
return {"error": "Уровень доступа не найден"}
|
||
if not userAccessLevel.get("available_own_toolbox"):
|
||
if userAccessLevelNew.get("available_own_toolbox"):
|
||
newToolboxData = {
|
||
"title": editedUser.username,
|
||
"description": f"Оборудование, полученное сотрудником '{editedUser.username}' под личную материальную ответственность",
|
||
"owner_id": editedUser.id,
|
||
}
|
||
newToolbox = await ToolboxHandler.addNewToolbox(newToolboxData)
|
||
logger.info(
|
||
f"Тулбокс {newToolbox['title']} успешно создан и закреплен за пользователем {editedUser.username}"
|
||
)
|
||
editUserData = editedUser.toDict()
|
||
editUserData.pop("hashed_password")
|
||
await ServiceRecordsHandler.add(user_id, {"Изменен пользователь": editUserData})
|
||
return editUserData
|
||
|
||
async def getAll() -> list[dict]:
|
||
query = select(User)
|
||
users = await CRUD.read(query, True)
|
||
return [user.toDict() for user in users]
|
||
|
||
async def get(id: int) -> dict:
|
||
query = select(User).where(User.id == id)
|
||
user = await CRUD.read(query)
|
||
if not user:
|
||
logger.error("Пользователь с таким id не найден")
|
||
return {}
|
||
userdata = user.toDict()
|
||
userdata.pop("hashed_password")
|
||
return userdata
|
||
|
||
async def delete(id: int, user_id: int = None) -> dict:
|
||
userRecordsCount = await StocksRecordsHandler.getUserRecords(id)
|
||
if userRecordsCount > 0:
|
||
logger.error(f"У пользователя {id} есть записи: {userRecordsCount}")
|
||
return {"error": "У пользователя есть записи"}
|
||
query = select(User).where(User.id == id)
|
||
user = await CRUD.read(query)
|
||
if not user:
|
||
logger.error("Пользователь с таким id не найден")
|
||
return {"error": "Пользователь не найден"}
|
||
try:
|
||
userName = user.username
|
||
photoFile = user.photo
|
||
result = await CRUD.delete(user)
|
||
except Exception as e:
|
||
logger.error(f"Ошибка удаления пользователя: {str(e)}")
|
||
return {"error": "Ошибка удаления пользователя"}
|
||
if result:
|
||
deleteImage(photoFile)
|
||
logger.info(
|
||
f"Пользователь {userName} {'успешно удален' if result else 'не удален'}"
|
||
)
|
||
await ServiceRecordsHandler.add(user_id, {"Удален пользователь": userName})
|
||
return {"error": "Ошибка удаления пользователя"} if not result else {}
|
||
|
||
async def deletePhoto(id: int, user_id: int = None) -> bool:
|
||
query = select(User).where(User.id == id)
|
||
user = await CRUD.read(query)
|
||
if not user:
|
||
logger.error("Пользователь с таким id не найден")
|
||
return False
|
||
try:
|
||
deleteImage(user.photo)
|
||
user.photo = "images/users/default.png"
|
||
await user.save()
|
||
except Exception as e:
|
||
logger.error(f"Ошибка удаления фото пользователя: {str(e)}")
|
||
return False
|
||
logger.info(f"Фото пользователя {user.username} успешно удалено")
|
||
await ServiceRecordsHandler.add(
|
||
user_id, {"Удалено фото пользователя": user.username}
|
||
)
|
||
return True
|
||
|
||
async def checkActive(id: int) -> bool:
|
||
query = select(User).where(User.id == id)
|
||
user = await CRUD.read(query)
|
||
if not user:
|
||
logger.error("Пользователь с таким id не найден")
|
||
return False
|
||
return user.is_active
|
||
|
||
async def auth(login: str, password: str) -> dict:
|
||
query = select(User).where(User.login == login)
|
||
user = await CRUD.read(query)
|
||
if not user:
|
||
logger.error(f"Пользователь с логином {login} не найден")
|
||
return {}
|
||
if not user.is_active:
|
||
logger.error(f"Пользователь {user.username} не активен")
|
||
await ServiceRecordsHandler.add(
|
||
user.id, {"Пользователь не активен": user.username}
|
||
)
|
||
return {}
|
||
if not pwd_verify(password, user.hashed_password):
|
||
logger.error(f"Неверный пароль пользователя {user.username}")
|
||
await ServiceRecordsHandler.add(
|
||
user.id, {"Неверный пароль пользователя": user.username}
|
||
)
|
||
return {}
|
||
userData = user.toDict()
|
||
userData.pop("hashed_password")
|
||
await ServiceRecordsHandler.add(
|
||
user.id, {"Авторизован пользователь": user.username}
|
||
)
|
||
logger.info(f"Пользователь {user.username} успешно авторизован")
|
||
return userData
|
||
|
||
async def initialize():
|
||
from .access import AccessLevelHandler
|
||
|
||
logger.info("Инициализация пользователей")
|
||
accessLevelsList = await AccessLevelHandler.getAll()
|
||
acessLevels = {
|
||
accessLevel["title"]: accessLevel["id"] for accessLevel in accessLevelsList
|
||
}
|
||
password = "Alex0172"
|
||
baseUsers = {
|
||
"admin": {
|
||
"login": "admin",
|
||
"username": "Администратор",
|
||
"password": password,
|
||
"access_level_id": acessLevels["Администратор"],
|
||
},
|
||
"manager": {
|
||
"login": "manager",
|
||
"username": "Менеджер",
|
||
"password": password,
|
||
"access_level_id": acessLevels["Менеджер"],
|
||
},
|
||
"storekeeper": {
|
||
"login": "storekeeper",
|
||
"username": "Кладовщик",
|
||
"password": password,
|
||
"access_level_id": acessLevels["Кладовщик"],
|
||
},
|
||
"employee": {
|
||
"login": "employee",
|
||
"username": "Сотрудник",
|
||
"password": password,
|
||
"access_level_id": acessLevels["Сотрудник"],
|
||
},
|
||
}
|
||
for user in baseUsers.values():
|
||
await UserHandler.add(user)
|
||
|
||
logger.info("Инициализация пользователей завершена")
|
||
return
|