Files
medods_vk/vk_handler.py
T
2025-12-23 03:09:27 +03:00

111 lines
3.6 KiB
Python

from datetime import datetime, date
import vk_api
from db import UsersBirthdate, VkAPI, db
from medods_handler import setDynamicText
def handle_vk_post():
from app import logger
logger.info("Публикация поста")
vkApi = VkAPI.query.first()
if not vkApi:
logger.error("Информация для работы не найдена")
return
vkPost = setDynamicText()
if not vkPost:
logger.error("Информация для размещения поста не найдена")
return
if not vkPost.dynamic_text:
logger.info("Не требуется публикация поста")
return
vk_session = vk_api.VkApi(token=vkApi.access_token)
vk = vk_session.get_api()
new_post = vk.wall.post(
owner_id=-vkApi.group_id,
from_group=1,
message=f"{vkPost.dynamic_text}\n{vkPost.static_text}".strip(),
attachments=f"photo-{vkApi.group_id}_{vkApi.base_photo_url}",
)
logger.info(f"Пост #{new_post.get('post_id')} создан")
vkPost.dynamic_text = None
vkPost.post_id = new_post.get("post_id")
vkPost.publish_at = datetime.now()
db.session.commit()
def handle_vk_birthdate():
from app import logger
from sqlalchemy import func, or_, and_
logger.info("Публикация поста с днем рождения")
today = date.today()
day = f"{today.day:02d}"
month = f"{today.month:02d}"
def is_leap_year(year: int) -> bool:
return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0)
# Основное условие: совпадение дня и месяца
conditions = [
and_(
func.strftime("%d", UsersBirthdate.birthdate) == day,
func.strftime("%m", UsersBirthdate.birthdate) == month,
)
]
# Если 28 февраля и год НЕ високосный — добавляем родившихся 29.02
if today.month == 2 and today.day == 28 and not is_leap_year(today.year):
conditions.append(
and_(
func.strftime("%d", UsersBirthdate.birthdate) == "29",
func.strftime("%m", UsersBirthdate.birthdate) == "02",
)
)
conditions.append(UsersBirthdate.enabled == True)
birthdayUsers = UsersBirthdate.query.filter(or_(*conditions)).all()
if not birthdayUsers:
logger.info("Нет пользователей с днем рождения")
return
vkApi = VkAPI.query.first()
if not vkApi:
logger.error("Информация для работы не найдена")
return
vk_session = vk_api.VkApi(token=vkApi.access_token)
vk = vk_session.get_api()
for user in birthdayUsers:
if not user.congratulations or not user.photo_link:
logger.error(f"Пользователь {user.short_name} не настроен для публикации")
continue
try:
new_post = vk.wall.post(
owner_id=-vkApi.group_id,
from_group=1,
message=user.congratulations.strip(),
attachments=user.photo_link,
)
logger.info(f"Пост #{new_post.get('post_id')} создан")
user.post_link = (
f"https://vk.com/wall-{vkApi.group_id}_{new_post.get('post_id')}"
)
user.publish_at = datetime.now()
db.session.commit()
except Exception as e:
logger.error(f"Ошибка при публикации поста: {e}")