Files
toolbox/db/initialize.py
T

145 lines
6.6 KiB
Python

from db.handlers.access import AccessLevelHandler
from db.handlers.user import UserHandler
from db.handlers.toolbox import ToolboxHandler
from db.handlers.categories import CategoryHandler
from db.handlers.toolkit import ToolkitHandler
from db.handlers.actions import StocksActions
from typing import Optional
from sqlalchemy import inspect, text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from sqlalchemy.schema import CreateTable
from utils import logger
class DatabaseInitializer:
existing_tables: Optional[list[str]] = None
def __init__(self, database_url: str):
from db import Base
self.database_url = database_url
self.engine: Optional[AsyncEngine] = None
self.metadata = Base.metadata
async def initialize(self, force: bool = False, reNewDB: bool = False):
"""Main database initialization method"""
try:
self.engine = create_async_engine(self.database_url)
async with self.engine.begin() as conn:
if force:
logger.info("Принудительное удаление и создание баз...")
await self._drop_all()
await self._create_tables_directly()
await self._initialize_data(reNewDB)
elif not await self._check_tables_exist(conn):
logger.info("Не все необходимые таблицы существуют. Создаем...")
await self._create_tables_directly()
# Проверяем после создания
async with self.engine.begin() as conn:
if not await self._check_tables_exist(conn):
raise RuntimeError("Не все необходимые таблицы существуют!")
if reNewDB:
logger.info("Принудительная загрузка данных...")
await self._initialize_data(reNewDB)
else:
logger.info("Все необходимые таблицы существуют. Пропускаем...")
except Exception as e:
logger.error(f"Инициализация базы завершилась ошибкой: {str(e)}")
raise
finally:
if self.engine:
await self.engine.dispose()
async def _check_tables_exist(self, conn) -> bool:
"""Check if all tables from metadata exist"""
try:
DatabaseInitializer.existing_tables = await conn.run_sync(
lambda sync_conn: inspect(sync_conn).get_table_names()
)
required_tables = set(self.metadata.tables.keys())
if not required_tables:
logger.error("Нет данных о таблицах в метаданных")
return False
missing_tables = required_tables - set(DatabaseInitializer.existing_tables)
if missing_tables:
logger.warning("Существующие таблицы:")
logger.info(DatabaseInitializer.existing_tables)
logger.warning("Необходимые таблицы:")
logger.info(required_tables)
logger.warning("Отсутствующие таблицы:")
logger.info(missing_tables)
return False
return True
except Exception as e:
logger.warning(f"Проверка таблиц завершилась ошибкой: {str(e)}")
return False
async def _create_tables_directly(self):
"""Create tables directly using SQLAlchemy (bypass Alembic)"""
async with self.engine.begin() as conn:
# Создаем все таблицы из метаданных
for table in self.metadata.sorted_tables:
try:
if (
DatabaseInitializer.existing_tables
and table.name in DatabaseInitializer.existing_tables
):
logger.debug(
f"Таблица {table.name} уже существует. Пропускаем..."
)
continue
create_stmt = CreateTable(table)
logger.info(f"Создаем таблицу: {table.name}")
await conn.execute(create_stmt)
except Exception as e:
logger.error(f"Ошибка создания таблицы: {str(e)}")
logger.warning("Все таблицы успешно созданы")
async def _drop_all(self):
"""Drop all tables"""
async with self.engine.begin() as conn:
# Получаем список всех таблиц
existing_tables = await conn.run_sync(
lambda sync_conn: inspect(sync_conn).get_table_names()
)
for table in existing_tables:
drop_stmt = text(
f'DROP TABLE "{table}" CASCADE'
) # Кавычки на случай спец. символов
logger.info(f"Удаляем таблицу: {table}")
await conn.execute(drop_stmt)
logger.warning("Все таблицы успешно удалены")
async def _initialize_data(self, waiting: bool = False):
"""Initialize required data"""
try:
logger.info("Инициализация данных...")
logger.warning("Инициализация Прав доступа...")
await AccessLevelHandler.initialize()
logger.warning("Инициализация Пользователей...")
await UserHandler.initialize()
logger.warning("Инициализация Туллбоксов...")
await ToolboxHandler.initialize()
logger.warning("Инициализация Категорий...")
await CategoryHandler.initialize()
logger.warning("Инициализация Инструментов...")
await ToolkitHandler.initialize()
logger.warning("Инициализация Складов...")
await StocksActions.initialize()
logger.info("Данные успешно инициализированы")
except Exception as e:
logger.error(f"Инициализация данных завершилась ошибкой: {str(e)}")
raise