Files
toolbox/db/initialize.py
T

228 lines
8.3 KiB
Python

from typing import Optional
from sqlalchemy import inspect, text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from sqlalchemy.schema import CreateTable
from sqlalchemy.dialects.postgresql import dialect as pg_dialect
from utils import logger
from db.handlers.access import AccessLevelHandler
from db.handlers.user import UserHandler
from db.handlers.toolbox import ToolboxHandler
from db.handlers.categories import CategoryHandler
from db.handlers.toolkit import ToolkitHandler
from db.handlers.actions import StocksActions
class DatabaseInitializer:
existing_tables: Optional[list[str]] = None
def __init__(self, database_url: str):
from db import Base
self.database_url = database_url
self.engine: Optional[AsyncEngine] = None
self.metadata = Base.metadata
# ==========================================================
# PUBLIC
# ==========================================================
async def initialize(self, force: bool = False, reNewDB: bool = False):
"""Main database initialization method"""
try:
self.engine = create_async_engine(self.database_url)
async with self.engine.begin() as conn:
if force:
logger.warning("Принудительное удаление и создание БД...")
await self._drop_all()
await self._create_tables_directly()
await self._initialize_data(reNewDB)
return
tables_exist = await self._check_tables_exist(conn)
if not tables_exist:
logger.warning("Не все таблицы существуют. Создаем недостающие...")
await self._create_tables_directly()
# 🔥 СИНХРОНИЗАЦИЯ СХЕМЫ
logger.info("Синхронизация схемы БД...")
await self._sync_schema(conn)
if reNewDB:
logger.warning("Принудительная загрузка данных...")
await self._initialize_data(reNewDB)
logger.info("Инициализация БД завершена успешно")
except Exception as e:
logger.exception("Инициализация базы завершилась ошибкой")
raise
finally:
if self.engine:
await self.engine.dispose()
# ==========================================================
# CHECK
# ==========================================================
async def _check_tables_exist(self, conn) -> bool:
"""Check if all tables from metadata exist"""
try:
DatabaseInitializer.existing_tables = await conn.run_sync(
lambda sync_conn: inspect(sync_conn).get_table_names()
)
required_tables = set(self.metadata.tables.keys())
if not required_tables:
logger.error("Нет данных о таблицах в метаданных")
return False
missing_tables = required_tables - set(DatabaseInitializer.existing_tables)
if missing_tables:
logger.warning("Отсутствующие таблицы:")
logger.warning(missing_tables)
return False
return True
except Exception as e:
logger.warning(f"Проверка таблиц завершилась ошибкой: {str(e)}")
return False
# ==========================================================
# CREATE / DROP
# ==========================================================
async def _create_tables_directly(self):
"""Create tables directly using SQLAlchemy (bypass Alembic)"""
async with self.engine.begin() as conn:
for table in self.metadata.sorted_tables:
if (
DatabaseInitializer.existing_tables
and table.name in DatabaseInitializer.existing_tables
):
logger.debug(f"Таблица {table.name} уже существует")
continue
logger.info(f"Создаем таблицу: {table.name}")
await conn.execute(CreateTable(table))
async def _drop_all(self):
"""Drop all tables"""
async with self.engine.begin() as conn:
existing_tables = await conn.run_sync(
lambda sync_conn: inspect(sync_conn).get_table_names()
)
for table in existing_tables:
logger.warning(f"Удаляем таблицу: {table}")
await conn.execute(text(f'DROP TABLE "{table}" CASCADE'))
# ==========================================================
# SCHEMA SYNC
# ==========================================================
async def _sync_schema(self, conn):
await self._add_missing_columns(conn)
await self._add_missing_foreign_keys(conn)
async def _add_missing_columns(self, conn):
for table_name, table in self.metadata.tables.items():
db_columns = await conn.run_sync(
lambda sync_conn: {
col["name"] for col in inspect(sync_conn).get_columns(table_name)
}
)
for column in table.columns:
if column.name in db_columns:
continue
ddl = (
f"ALTER TABLE {table_name} "
f"ADD COLUMN {self._compile_column(column)}"
)
logger.warning(f"[ADD COLUMN] {table_name}.{column.name}")
await conn.execute(text(ddl))
async def _add_missing_foreign_keys(self, conn):
for table_name, table in self.metadata.tables.items():
db_fks = await conn.run_sync(
lambda sync_conn: inspect(sync_conn).get_foreign_keys(table_name)
)
db_fk_pairs = {
(
tuple(fk["constrained_columns"]),
fk["referred_table"],
)
for fk in db_fks
}
for fk in table.foreign_keys:
col = fk.parent.name
ref_table = fk.column.table.name
ref_col = fk.column.name
key = ((col,), ref_table)
if key in db_fk_pairs:
continue
constraint_name = f"fk_{table_name}_{col}"
ddl = f"""
ALTER TABLE {table_name}
ADD CONSTRAINT {constraint_name}
FOREIGN KEY ({col})
REFERENCES {ref_table}({ref_col})
"""
if fk.ondelete:
ddl += f" ON DELETE {fk.ondelete}"
logger.warning(f"[ADD FK] {table_name}.{col}{ref_table}.{ref_col}")
await conn.execute(text(ddl))
# ==========================================================
# HELPERS
# ==========================================================
def _compile_column(self, column) -> str:
"""Compile SQLAlchemy Column to SQL"""
ddl = f"{column.name} {column.type.compile(dialect=pg_dialect())}"
if not column.nullable:
ddl += " NULL" # безопасно, NOT NULL позже вручную
if column.server_default is not None:
ddl += f" DEFAULT {column.server_default.arg}"
return ddl
# ==========================================================
# DATA INIT
# ==========================================================
async def _initialize_data(self, waiting: bool = False):
try:
logger.info("Инициализация данных...")
await AccessLevelHandler.initialize()
await UserHandler.initialize()
await ToolboxHandler.initialize()
await CategoryHandler.initialize()
await ToolkitHandler.initialize()
await StocksActions.initialize()
logger.info("Данные успешно инициализированы")
except Exception:
logger.exception("Ошибка инициализации данных")
raise