from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from db import PostScheduler # ========================= # Flask app (будет установлен из app.py) # ========================= flask_app = None # ========================= # Scheduler # ========================= scheduler: BackgroundScheduler | None = None JOB_ID = "vk_publish_job" def clearLog(): from collections import deque from config import Config with open(Config.LOG_FILE, "r", encoding="utf-8") as f: last_lines = deque(f, maxlen=500) with open(Config.LOG_FILE, "w", encoding="utf-8") as f: f.writelines(last_lines) def init_scheduler(app): """ Инициализация планировщика с Flask-приложением Вызывать ОДИН раз при старте приложения """ global flask_app, scheduler flask_app = app if scheduler is None: scheduler = BackgroundScheduler() trigger = CronTrigger(hour=0, minute=0) scheduler.add_job( clearLog, trigger=trigger, id="clear_log_job", replace_existing=True ) scheduler.start() # ========================= # JOB wrapper # ========================= def vk_publish_job(): """ Обёртка для APScheduler """ if flask_app is None: raise RuntimeError("Scheduler is not initialized with Flask app") from vk_handler import handle_vk_post with flask_app.app_context(): handle_vk_post() # ========================= # Enable job # ========================= def enable_publish_job(): """ Включает выполнение публикации постов """ if not scheduler: return scheduleData = PostScheduler.query.first() if not scheduleData or not scheduleData.enabled: disable_publish_job() return trigger = CronTrigger( hour=f"{scheduleData.hour}", minute=f"{scheduleData.minute}", day="*", ) scheduler.add_job( vk_publish_job, trigger=trigger, id=JOB_ID, replace_existing=True, ) # ========================= # Disable job # ========================= def disable_publish_job(): if scheduler and scheduler.get_job(JOB_ID): scheduler.remove_job(JOB_ID) # ========================= # Status # ========================= def get_scheduler_status() -> dict: scheduler_running = bool(scheduler and scheduler.running) job = scheduler.get_job(JOB_ID) if scheduler_running else None return { "scheduler": scheduler_running, "vk_publish_job": job is not None, "next_run_time": ( job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if job and job.next_run_time else None ), }