from datetime import datetime, date import vk_api from db import UsersBirthdate, VkAPI, db from medods_handler import setDynamicText def handle_vk_post(): from app import logger logger.info("Публикация поста") vkApi = VkAPI.query.first() if not vkApi: logger.error("Информация для работы не найдена") return vkPost = setDynamicText() if not vkPost: logger.error("Информация для размещения поста не найдена") return if not vkPost.dynamic_text: logger.info("Не требуется публикация поста") return vk_session = vk_api.VkApi(token=vkApi.access_token) vk = vk_session.get_api() new_post = vk.wall.post( owner_id=-vkApi.group_id, from_group=1, message=f"{vkPost.dynamic_text}\n{vkPost.static_text}".strip(), attachments=f"photo-{vkApi.group_id}_{vkApi.base_photo_url}", ) logger.info(f"Пост #{new_post.get('post_id')} создан") vkPost.dynamic_text = None vkPost.post_id = new_post.get("post_id") vkPost.publish_at = datetime.now() db.session.commit() def handle_vk_birthdate(): from app import logger from sqlalchemy import func, or_, and_ logger.info("Публикация поста с днем рождения") today = date.today() day = f"{today.day:02d}" month = f"{today.month:02d}" def is_leap_year(year: int) -> bool: return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0) # условия ТОЛЬКО по дате date_conditions = [ and_( func.strftime("%d", UsersBirthdate.birthdate) == day, func.strftime("%m", UsersBirthdate.birthdate) == month, ) ] # 29 февраля для невисокосного года if today.month == 2 and today.day == 28 and not is_leap_year(today.year): date_conditions.append( and_( func.strftime("%d", UsersBirthdate.birthdate) == "29", func.strftime("%m", UsersBirthdate.birthdate) == "02", ) ) birthdayUsers = UsersBirthdate.query.filter( UsersBirthdate.enabled.is_(True), or_(*date_conditions), ).all() if not birthdayUsers: logger.info(f"Нет пользователей с днем рождения {day}.{month}") return else: logger.info(f"Найдено {len(birthdayUsers)} пользователей с днем рождения") vkApi = VkAPI.query.first() if not vkApi: logger.error("Информация для работы не найдена") return vk_session = vk_api.VkApi(token=vkApi.access_token) vk = vk_session.get_api() for user in birthdayUsers: if not user.congratulations or not user.photo_link: logger.error(f"Пользователь {user.short_name} не настроен для публикации") continue try: new_post = vk.wall.post( owner_id=-vkApi.group_id, from_group=1, message=user.congratulations.strip(), attachments=user.photo_link.replace("https://vk.com/", ""), ) logger.info(f"Пост #{new_post.get('post_id')} создан") user.post_link = ( f"https://vk.com/wall-{vkApi.group_id}_{new_post.get('post_id')}" ) user.publish_at = datetime.now() db.session.commit() except Exception as e: logger.error(f"Ошибка при публикации поста: {e}")