from .user import * from .access import * from .toolkit import * from .categories import * from .toolbox import * from .stock import * from typing import Optional from sqlalchemy import inspect, text from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine from sqlalchemy.schema import CreateTable from utils import logger # Импортируем все модели from db import Base from db.handlers import InitializeDatabase class DatabaseInitializer: existing_tables: Optional[list[str]] = None def __init__(self, database_url: str): self.database_url = database_url self.engine: Optional[AsyncEngine] = None self.metadata = Base.metadata async def initialize(self, force: bool = False, reNewDB: bool = False): """Main database initialization method""" try: self.engine = create_async_engine(self.database_url) async with self.engine.begin() as conn: if force: logger.info("Принудительное удаление и создание баз...") await self._drop_all() await self._create_tables_directly() await self._initialize_data() elif not await self._check_tables_exist(conn): logger.info("Не все необходимые таблицы существуют. Создаем...") await self._create_tables_directly() # Проверяем после создания async with self.engine.begin() as conn: if not await self._check_tables_exist(conn): raise RuntimeError("Не все необходимые таблицы существуют!") if reNewDB: logger.info("Принудительная загрузка данных...") await self._initialize_data() else: logger.info("Все необходимые таблицы существуют. Пропускаем...") except Exception as e: logger.error(f"Инициализация базы завершилась ошибкой: {str(e)}") raise finally: if self.engine: await self.engine.dispose() async def _check_tables_exist(self, conn) -> bool: """Check if all tables from metadata exist""" try: DatabaseInitializer.existing_tables = await conn.run_sync( lambda sync_conn: inspect(sync_conn).get_table_names() ) required_tables = set(self.metadata.tables.keys()) if not required_tables: logger.error("Нет данных о таблицах в метаданных") return False missing_tables = required_tables - set(DatabaseInitializer.existing_tables) if missing_tables: logger.warning("Существующие таблицы:") logger.info(DatabaseInitializer.existing_tables) logger.warning("Необходимые таблицы:") logger.info(required_tables) logger.warning("Отсутствующие таблицы:") logger.info(missing_tables) return False return True except Exception as e: logger.warning(f"Проверка таблиц завершилась ошибкой: {str(e)}") return False async def _create_tables_directly(self): """Create tables directly using SQLAlchemy (bypass Alembic)""" async with self.engine.begin() as conn: # Создаем все таблицы из метаданных for table in self.metadata.sorted_tables: try: if ( DatabaseInitializer.existing_tables and table.name in DatabaseInitializer.existing_tables ): logger.debug( f"Таблица {table.name} уже существует. Пропускаем..." ) continue create_stmt = CreateTable(table) logger.info(f"Создаем таблицу: {table.name}") await conn.execute(create_stmt) except Exception as e: logger.error(f"Ошибка создания таблицы: {str(e)}") logger.warning("Все таблицы успешно созданы") async def _drop_all(self): """Drop all tables""" async with self.engine.begin() as conn: # Получаем список всех таблиц existing_tables = await conn.run_sync( lambda sync_conn: inspect(sync_conn).get_table_names() ) for table in existing_tables: drop_stmt = text( f'DROP TABLE "{table}" CASCADE' ) # Кавычки на случай спец. символов logger.info(f"Удаляем таблицу: {table}") await conn.execute(drop_stmt) logger.warning("Все таблицы успешно удалены") async def _initialize_data(self): """Initialize required data""" try: logger.info("Инициализация данных...") await InitializeDatabase.initialize() logger.info("Данные успешно инициализированы") except Exception as e: logger.error(f"Инициализация данных завершилась ошибкой: {str(e)}") raise __all__ = [ "User", "Access", "Toolbox", "Category", "Stock", "Toolkit", ]