склади и инструмент готовы

This commit is contained in:
2025-12-07 19:36:28 +03:00
parent 54bf21d52d
commit 65a3bc1671
65 changed files with 3485 additions and 115 deletions
+4
View File
@@ -3,3 +3,7 @@ from .for_DB import *
from .password import *
from .image import *
from .safe_filename import *
from .web import *
from .request_parser import RequestParser
requestDict = RequestParser()
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+34 -37
View File
@@ -1,56 +1,53 @@
from utils.loggers import logger
# def saveImage(imageData, fileName: str):
# try:
# imageFormat = imageData.split(';')[0].split('/')[1]
# if imageFormat != 'png':
# logger.error(f"Неподдерживаемый формат изображения: {imageFormat}")
# return False
# imageData = imageData.split(';base64,')[1]
# with open(f"static/images/{fileName}", "wb") as f:
# f.write(base64.b64decode(imageData))
# logger.info(f"Изображение {fileName} успешно сохранено")
# return True
# except Exception as e:
# logger.error(f"Ошибка сохранения изображения: {str(e)}")
# return False
# UPLOAD_DIR = "uploads"
# os.makedirs(UPLOAD_DIR, exist_ok=True)
def saveImage(file_bytes, fileName: str):
def saveImage(file_bytes: bytes, file_name: str) -> bool:
import os
from PIL import Image
import io
# Загружаем изображение через Pillow
# Убедимся, что путь существует
file_name = f"api/{file_name}"
os.makedirs(os.path.dirname(file_name), exist_ok=True)
# Загружаем изображение
try:
img = Image.open(io.BytesIO(file_bytes))
except Exception:
logger.error("Неподдерживаемый формат изображения")
return False
# Конвертация (если нужно)
if img.mode not in ("RGB", "RGBA", "P"):
img = img.convert("RGB")
# Сохранение в выбранный формат
try:
logger.info(f"Сохраняем изображение {fileName}")
img.save(fileName, "PNG")
img.load()
except Exception as e:
logger.error(f"Ошибка сохранения изображения: {str(e)}")
logger.error(f"[ImageSave] Unsupported image format: {e}")
return False
logger.info(f"Изображение {fileName} успешно сохранено")
return True
# Конвертация в нормальный режим (PNG поддерживает RGBA/RGB)
try:
if img.mode not in ("RGB", "RGBA"):
img = img.convert("RGB")
except Exception as e:
logger.error(f"[ImageSave] Mode conversion error: {e}")
return False
# Сохраняем как PNG
try:
target_path = file_name
if not target_path.lower().endswith(".png"):
target_path += ".png"
logger.info(f"[ImageSave] Saving image to {target_path}")
img.save(target_path, "PNG")
return True
except Exception as e:
logger.error(f"[ImageSave] Error saving image: {e}")
return False
def deleteImage(fileName: str):
if fileName.endswith("default.png"):
return True
try:
import os
file_name = f"api/{file_name}"
logger.info(f"Удаляем изображение {fileName}")
os.remove(f"static/images/{fileName}")
logger.info(f"Изображение {fileName} успешно удалено")
+17 -4
View File
@@ -1,10 +1,20 @@
import logging
import logging.config
import json
from pathlib import Path
class SmartLogger(logging.Logger):
def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False, stacklevel=2):
def _log(
self,
level,
msg,
args,
exc_info=None,
extra=None,
stack_info=False,
stacklevel=2,
):
# Увеличиваем stacklevel до 2, чтобы показать реального вызывающего
if isinstance(msg, (dict, list)):
msg = json.dumps(msg, indent=4, ensure_ascii=False)
@@ -12,9 +22,12 @@ class SmartLogger(logging.Logger):
logging.setLoggerClass(SmartLogger)
logging.config.fileConfig("config/log.ini")
# logging.config.fileConfig("config/log.ini")
logger = logging.getLogger("toolbox")
log_config_path = Path("config/log.ini")
logging.config.fileConfig(log_config_path, disable_existing_loggers=False)
def setLogLevel(level: str = ""):
match level:
@@ -26,10 +39,10 @@ def setLogLevel(level: str = ""):
loggerLevel = logging.ERROR
case _:
loggerLevel = logging.INFO
root_logger = logging.getLogger()
for handler in root_logger.handlers:
handler.setLevel(loggerLevel)
# Также меняем уровень самого логгера
logger.setLevel(loggerLevel)
logger.setLevel(loggerLevel)
-3
View File
@@ -5,8 +5,6 @@ from argon2.exceptions import (
InvalidHash,
)
from utils.loggers import logger
pwd_hasher = PasswordHasher(
time_cost=3,
@@ -26,7 +24,6 @@ def pwd_verify(pwd_plain: str, stored_hash: str) -> bool:
try:
valid = pwd_hasher.verify(stored_hash, pwd_plain)
except (VerifyMismatchError, VerificationError, InvalidHash) as e:
logger.warning(f"Password verification failed: {e.__class__.__name__}")
return False
return valid
+58
View File
@@ -0,0 +1,58 @@
# async def upload(requestData: dict = Depends(RequestParser())):
import json
from fastapi import Request, UploadFile
class RequestParser:
def __init__(self, include_files=True):
self.include_files = include_files
async def __call__(self, request: Request) -> dict:
parsed = {
"method": request.method,
"url": str(request.url),
"query": dict(request.query_params),
"headers": dict(request.headers),
"cookies": request.cookies,
"body": None,
"files": {},
}
# ----- BODY (JSON, Text) -----
raw_body = await request.body()
if raw_body:
try:
parsed["body"] = json.loads(raw_body.decode("utf-8"))
except Exception:
parsed["body"] = raw_body.decode("utf-8", errors="ignore")
# ----- FORM / FILES -----
try:
form = await request.form()
for key, value in form.items():
# Файл
if isinstance(value, UploadFile):
if self.include_files:
file_bytes = await value.read()
parsed["files"][key] = {
"filename": value.filename,
"content_type": value.content_type,
"content": file_bytes,
}
else:
# Добавляем как текстовое поле
if parsed["body"] is None:
parsed["body"] = {}
if isinstance(parsed["body"], dict):
parsed["body"][key] = value
except Exception:
pass
parsed["headers"].pop("cookie", None)
# logger.info(f"[RequestParser] Parsed request:")
# logger.info(parsed)
return parsed
+39
View File
@@ -0,0 +1,39 @@
from fastapi import Request
from fastapi.templating import Jinja2Templates
from typing import Any
import config
templates = Jinja2Templates(directory=config.TEMPLATES_DIR)
def getUrl(name: str, **path_params: Any):
from api import app
try:
url = app.url_path_for(name, **path_params)
except Exception:
url = app.url_path_for(name)
return url
async def render(
request: Request,
):
context = {
"request": request,
"content": {"app_secret": config.APP_SECRET},
}
fileName = f"{request.scope['path']}/index.html"
response = templates.TemplateResponse(fileName, context)
return response
def_list = [
getUrl,
]
for def_ in def_list:
templates.env.globals[def_.__name__] = def_