Files
medods_vk/scheduler.py
T

124 lines
3.0 KiB
Python

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from db import PostScheduler
# =========================
# Flask app (будет установлен из app.py)
# =========================
flask_app = None
# =========================
# Scheduler
# =========================
scheduler: BackgroundScheduler | None = None
JOB_ID = "vk_publish_job"
def clearLog():
from collections import deque
from config import Config
with open(Config.LOG_FILE, "r", encoding="utf-8") as f:
last_lines = deque(f, maxlen=500)
with open(Config.LOG_FILE, "w", encoding="utf-8") as f:
f.writelines(last_lines)
def init_scheduler(app):
"""
Инициализация планировщика с Flask-приложением
Вызывать ОДИН раз при старте приложения
"""
global flask_app, scheduler
flask_app = app
if scheduler is None:
scheduler = BackgroundScheduler()
trigger = CronTrigger(hour=0, minute=0)
scheduler.add_job(
clearLog, trigger=trigger, id="clear_log_job", replace_existing=True
)
scheduler.start()
# =========================
# JOB wrapper
# =========================
def vk_publish_job():
"""
Обёртка для APScheduler
"""
if flask_app is None:
raise RuntimeError("Scheduler is not initialized with Flask app")
from vk_handler import handle_vk_post
with flask_app.app_context():
handle_vk_post()
# =========================
# Enable job
# =========================
def enable_publish_job():
"""
Включает выполнение публикации постов
"""
if not scheduler:
return
scheduleData = PostScheduler.query.first()
if not scheduleData or not scheduleData.enabled:
disable_publish_job()
return
start_hour = scheduleData.start_hour
end_hour = scheduleData.end_hour
interval_minutes = scheduleData.interval_minutes
trigger = CronTrigger(
hour="12",
minute="0",
day="*",
)
# trigger = CronTrigger(
# hour=f"{start_hour}-{end_hour - 1}",
# minute=f"*/{interval_minutes}",
# )
scheduler.add_job(
vk_publish_job,
trigger=trigger,
id=JOB_ID,
replace_existing=True,
)
# =========================
# Disable job
# =========================
def disable_publish_job():
if scheduler and scheduler.get_job(JOB_ID):
scheduler.remove_job(JOB_ID)
# =========================
# Status
# =========================
def get_scheduler_status() -> dict:
scheduler_running = bool(scheduler and scheduler.running)
job = scheduler.get_job(JOB_ID) if scheduler_running else None
return {
"scheduler": scheduler_running,
"vk_publish_job": job is not None,
"next_run_time": (
job.next_run_time.strftime("%Y-%m-%d %H:%M:%S")
if job and job.next_run_time
else None
),
}