from typing import Optional from sqlalchemy import inspect, text from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine from sqlalchemy.schema import CreateTable from sqlalchemy.dialects.postgresql import dialect as pg_dialect from utils import logger from db.handlers.access import AccessLevelHandler from db.handlers.user import UserHandler from db.handlers.toolbox import ToolboxHandler from db.handlers.categories import CategoryHandler from db.handlers.toolkit import ToolkitHandler from db.handlers.actions import StocksActions class DatabaseInitializer: existing_tables: Optional[list[str]] = None def __init__(self, database_url: str): from db import Base self.database_url = database_url self.engine: Optional[AsyncEngine] = None self.metadata = Base.metadata # ========================================================== # PUBLIC # ========================================================== async def initialize(self, force: bool = False, reNewDB: bool = False): """Main database initialization method""" try: self.engine = create_async_engine(self.database_url) async with self.engine.begin() as conn: if force: logger.warning("Принудительное удаление и создание БД...") await self._drop_all() await self._create_tables_directly() await self._initialize_data(reNewDB) return tables_exist = await self._check_tables_exist(conn) if not tables_exist: logger.warning("Не все таблицы существуют. Создаем недостающие...") await self._create_tables_directly() # 🔥 СИНХРОНИЗАЦИЯ СХЕМЫ logger.info("Синхронизация схемы БД...") await self._sync_schema(conn) if reNewDB: logger.warning("Принудительная загрузка данных...") await self._initialize_data(reNewDB) logger.info("Инициализация БД завершена успешно") except Exception as e: logger.exception("Инициализация базы завершилась ошибкой") raise finally: if self.engine: await self.engine.dispose() # ========================================================== # CHECK # ========================================================== async def _check_tables_exist(self, conn) -> bool: """Check if all tables from metadata exist""" try: DatabaseInitializer.existing_tables = await conn.run_sync( lambda sync_conn: inspect(sync_conn).get_table_names() ) required_tables = set(self.metadata.tables.keys()) if not required_tables: logger.error("Нет данных о таблицах в метаданных") return False missing_tables = required_tables - set(DatabaseInitializer.existing_tables) if missing_tables: logger.warning("Отсутствующие таблицы:") logger.warning(missing_tables) return False return True except Exception as e: logger.warning(f"Проверка таблиц завершилась ошибкой: {str(e)}") return False # ========================================================== # CREATE / DROP # ========================================================== async def _create_tables_directly(self): """Create tables directly using SQLAlchemy (bypass Alembic)""" async with self.engine.begin() as conn: for table in self.metadata.sorted_tables: if ( DatabaseInitializer.existing_tables and table.name in DatabaseInitializer.existing_tables ): logger.debug(f"Таблица {table.name} уже существует") continue logger.info(f"Создаем таблицу: {table.name}") await conn.execute(CreateTable(table)) async def _drop_all(self): """Drop all tables""" async with self.engine.begin() as conn: existing_tables = await conn.run_sync( lambda sync_conn: inspect(sync_conn).get_table_names() ) for table in existing_tables: logger.warning(f"Удаляем таблицу: {table}") await conn.execute(text(f'DROP TABLE "{table}" CASCADE')) # ========================================================== # SCHEMA SYNC # ========================================================== async def _sync_schema(self, conn): await self._add_missing_columns(conn) await self._add_missing_foreign_keys(conn) async def _add_missing_columns(self, conn): for table_name, table in self.metadata.tables.items(): db_columns = await conn.run_sync( lambda sync_conn: { col["name"] for col in inspect(sync_conn).get_columns(table_name) } ) for column in table.columns: if column.name in db_columns: continue ddl = ( f"ALTER TABLE {table_name} " f"ADD COLUMN {self._compile_column(column)}" ) logger.warning(f"[ADD COLUMN] {table_name}.{column.name}") await conn.execute(text(ddl)) async def _add_missing_foreign_keys(self, conn): for table_name, table in self.metadata.tables.items(): db_fks = await conn.run_sync( lambda sync_conn: inspect(sync_conn).get_foreign_keys(table_name) ) db_fk_pairs = { ( tuple(fk["constrained_columns"]), fk["referred_table"], ) for fk in db_fks } for fk in table.foreign_keys: col = fk.parent.name ref_table = fk.column.table.name ref_col = fk.column.name key = ((col,), ref_table) if key in db_fk_pairs: continue constraint_name = f"fk_{table_name}_{col}" ddl = f""" ALTER TABLE {table_name} ADD CONSTRAINT {constraint_name} FOREIGN KEY ({col}) REFERENCES {ref_table}({ref_col}) """ if fk.ondelete: ddl += f" ON DELETE {fk.ondelete}" logger.warning(f"[ADD FK] {table_name}.{col} → {ref_table}.{ref_col}") await conn.execute(text(ddl)) # ========================================================== # HELPERS # ========================================================== def _compile_column(self, column) -> str: """Compile SQLAlchemy Column to SQL""" ddl = f"{column.name} {column.type.compile(dialect=pg_dialect())}" if not column.nullable: ddl += " NULL" # безопасно, NOT NULL позже вручную if column.server_default is not None: ddl += f" DEFAULT {column.server_default.arg}" return ddl # ========================================================== # DATA INIT # ========================================================== async def _initialize_data(self, waiting: bool = False): try: logger.info("Инициализация данных...") await AccessLevelHandler.initialize() await UserHandler.initialize() await ToolboxHandler.initialize() await CategoryHandler.initialize() await ToolkitHandler.initialize() await StocksActions.initialize() logger.info("Данные успешно инициализированы") except Exception: logger.exception("Ошибка инициализации данных") raise