Files
toolbox/db/__init__.py
T
2025-11-29 14:51:45 +03:00

116 lines
4.4 KiB
Python

from sqlalchemy import delete, update
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.exc import InvalidRequestError
from sqlalchemy.pool import NullPool
import config
from utils import loggerDB
DATABASE_URL = f"postgresql+asyncpg://{config.DB_USER}:{config.DB_PASS}@{config.DB_HOST}:{config.DB_PORT}/{config.DB_NAME}"
engine = create_async_engine(DATABASE_URL, poolclass=NullPool)
SessionLocal = async_sessionmaker(engine)
Base = declarative_base()
class CRUD:
async def create(db_data, refresh: bool = False):
try:
is_lst = isinstance(db_data, list)
async with SessionLocal() as db:
if is_lst:
loggerDB.info(f"Создаю {len(db_data)} записей")
try:
db.add_all(db_data)
except InvalidRequestError:
for data in db_data:
await db.merge(data)
else:
loggerDB.info("Создаю запись")
db.add(db_data)
await db.commit()
if refresh:
if is_lst:
loggerDB.info(f"Обновляю {len(db_data)} записей")
for data in db_data:
await db.refresh(data)
else:
loggerDB.info("Обновляю запись")
await db.refresh(db_data)
loggerDB.info("Запись создана")
return db_data if refresh else None
except Exception as e:
loggerDB.error(f"Ошибка создания: {str(e)}", exc_info=True)
return None
async def read(query, all: bool = False):
try:
async with SessionLocal() as db:
loggerDB.info(f"Чтение записей. Все: {all}")
results = await db.execute(query)
loggerDB.info(f"Чтение завершено")
return (
results.unique().scalars().all()
if all
else results.unique().scalars().first()
)
except Exception as e:
loggerDB.error(f"Ошибка чтения: {str(e)}", exc_info=True)
return None
async def delete(db_data) -> bool:
def itemInfo(instance):
from sqlalchemy import inspect
state = inspect(instance)
if state.identity is None:
pKey = None
pValue = None
else:
mapper = state.mapper
pKey = mapper.primary_key[0].name
pValue = getattr(instance, pKey)
return {"key": pKey, "value": pValue, "class": instance.__class__}
async def deleteFromDB(data, db):
itemData = itemInfo(data)
query = delete(itemData["class"]).where(
getattr(itemData["class"], itemData["key"]) == itemData["value"]
)
await db.execute(query)
async with SessionLocal() as db:
try:
if isinstance(db_data, list):
loggerDB.info(f"Удаляю записей: {len(db_data)}")
for data in db_data:
await deleteFromDB(data, db)
else:
loggerDB.info("Удаляю запись")
await deleteFromDB(db_data, db)
await db.commit()
loggerDB.info("Запись удалена")
return True
except Exception as e:
await db.rollback()
loggerDB.error(f"Ошибка удаления: {str(e)}", exc_info=True)
return False
async def update(db_data, id, **kwargs):
async with SessionLocal() as db:
try:
query = update(db_data).where(db_data.id == id).values(**kwargs)
item = await db.execute(query)
await db.commit()
loggerDB.info("Запись обновлена")
return item
except Exception as e:
await db.rollback()
loggerDB.error(f"Ошибка обновления: {str(e)}", exc_info=True)
return None