import json from flask import Flask, redirect, request, jsonify, render_template, url_for from config import Config from db import ( BirthdateScheduler, PostScheduler, Protection, UsersBirthdate, UsersMedods, VkAPI, VkPost, db, MedodsAPI, ApiEndpoint, ) from medods_handler import updateMedodsUsers, updateUsersBirthdate from scheduler import ( enable_birthdate_job, get_birthdate_scheduler_status, get_scheduler_status, init_scheduler, enable_publish_job, ) from http_client import send_request import logging import os app = Flask(__name__) app.config.from_object(Config) db.init_app(app) init_scheduler(app) os.makedirs("logs", exist_ok=True) class SmartLogger(logging.Logger): def _log( self, level, msg, args, exc_info=None, extra=None, stack_info=False, stacklevel=2, ): if isinstance(msg, (dict, list)): msg = json.dumps(msg, indent=4, ensure_ascii=False) super()._log(level, msg, args, exc_info, extra, stack_info, stacklevel) logging.setLoggerClass(SmartLogger) logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[ logging.FileHandler(Config.LOG_FILE, encoding="utf-8"), logging.StreamHandler(), ], ) logger = logging.getLogger(__name__) def getLogs(): from collections import deque with open(Config.LOG_FILE, "r", encoding="utf-8") as f: last_lines = deque(f, maxlen=500) return "".join(reversed(last_lines)) @app.route("/", methods=["GET", "POST"]) def index(): match request.method: case "GET": exitData = {} medodsApi = MedodsAPI.query.first() if medodsApi: exitData["medodsApi"] = { "updated_at": medodsApi.updated_at.strftime("%Y-%m-%d %H:%M:%S") } if medodsApi.url: exitData["medodsApi"]["url"] = True else: exitData["medodsApi"]["url"] = False if medodsApi.identity and medodsApi.secretKey: exitData["medodsApi"]["apiKey"] = True else: exitData["medodsApi"]["apiKey"] = False apiEndpointsCount = ApiEndpoint.query.count() exitData["medodsApi"]["apiEndpointsCount"] = apiEndpointsCount vkApi = VkAPI.query.first() if vkApi: exitData["vkApi"] = { "updated_at": vkApi.updated_at.strftime("%Y-%m-%d %H:%M:%S") } if vkApi.group_id: exitData["vkApi"]["group_id"] = True else: exitData["vkApi"]["group_id"] = False if vkApi.access_token: exitData["vkApi"]["access_token"] = True else: exitData["vkApi"]["access_token"] = False if vkApi.base_photo_url: exitData["vkApi"]["base_photo_url"] = True else: exitData["vkApi"]["base_photo_url"] = False vkPost = VkPost.query.first() if vkPost: exitData["vkPost"] = { "updated_at": vkPost.updated_at.strftime("%Y-%m-%d %H:%M:%S") } if vkPost.static_text: exitData["vkPost"]["static_text"] = True else: exitData["vkPost"]["static_text"] = False if vkPost.selected_users: exitData["vkPost"]["selected_users"] = len(vkPost.selected_users) else: exitData["vkPost"]["selected_users"] = False if vkPost.full_name is not None: exitData["vkPost"]["full_name"] = vkPost.full_name if vkPost.post_id and vkApi and vkApi.group_id: exitData["vkPost"][ "post_link" ] = f"https://vk.com/wall-{vkApi.group_id}_{vkPost.post_id}" else: exitData["vkPost"]["post_link"] = False if vkPost.publish_at: exitData["vkPost"]["publish_at"] = vkPost.publish_at.strftime( "%Y-%m-%d %H:%M:%S" ) else: exitData["vkPost"]["publish_at"] = False usersMedods = UsersMedods.query.count() exitData["vkPost"]["usersMedods"] = usersMedods exitData["vkPost"]["scheduler"] = get_scheduler_status() return render_template("index.html", exitData=exitData) case "POST": return jsonify(logs=getLogs()) case _: return "Method not allowed", 405 @app.route("/medods", methods=["GET"]) def medods(): medods_api = MedodsAPI.query.first() data = {} if medods_api: apiKey = False if medods_api.identity and medods_api.secretKey: apiKey = True data = { "url": medods_api.url, "apiKey": apiKey, } return render_template("medods.html", data=data) @app.route("/vk", methods=["GET"]) def vk(): vkDB = VkAPI.query.first() data = {} if vkDB: data = {"vk_settings": vkDB.toDict()} return render_template("vk.html", data=data) @app.route("/posts", methods=["GET"]) def posts(): medodsUsers = UsersMedods.query.all() if medodsUsers: medodsUsers = [user.toDict() for user in medodsUsers] vkPost = VkPost.query.first() if vkPost: vkPost = vkPost.toDict() schedulerStatus = get_scheduler_status() schedulerSettings = PostScheduler.query.first() if schedulerSettings: schedulerSettings = schedulerSettings.toDict() return render_template( "posts.html", data={ "medodsUsers": medodsUsers, "vkPost": vkPost, "schedulerStatus": schedulerStatus, "schedulerSettings": schedulerSettings, }, ) @app.route("/birthdate", methods=["GET"]) def birthdate(): schedulerStatus = get_birthdate_scheduler_status() schedulerSettings = BirthdateScheduler.query.first() if schedulerSettings: schedulerSettings = schedulerSettings.toDict() return render_template( "birthdate.html", data={ "schedulerStatus": schedulerStatus, "schedulerSettings": schedulerSettings, }, ) @app.route("/api/medods", methods=["POST"]) def api_medods(): try: data = request.json apiKey = data.get("apiKey", None) url = data.get("url", None) if url is not None: logger.info("Получен url") try: medodsRecord = MedodsAPI.query.first() medodsRecord.url = url db.session.commit() logger.info("Обновлен url") except Exception: db.session.merge(MedodsAPI(url=url)) db.session.commit() logger.info("Добавлен url") if apiKey: logger.info("Получены ключи") try: medodsRecord = MedodsAPI.query.first() medodsRecord.identity = apiKey["identity"] medodsRecord.secretKey = apiKey["secretKey"] db.session.commit() logger.info("Обновлены ключи") except Exception: db.session.merge( MedodsAPI( identity=apiKey["identity"], secretKey=apiKey["secretKey"] ) ) db.session.commit() logger.info("Добавлены ключи") return jsonify({"ok": True}) except Exception as e: logger.error(f"Ошибка при обновлении ключей: {e}") return jsonify({"ok": False}), 500 @app.route("/api/requests", methods=["GET", "POST", "PATCH", "DELETE"]) def api_requests(): requestData = ( request.json if request.method in ["POST", "PATCH", "DELETE"] else None ) match request.method: case "DELETE": try: db.session.execute( db.delete(ApiEndpoint).where(ApiEndpoint.id == requestData["id"]) ) db.session.commit() except Exception as e: logger.error(f"Ошибка при удалении запроса {requestData['id']}: {e}") logger.info("Удален запрос") return jsonify({"status": "ok"}) case "POST": logger.info("Добавлен/обновлен запрос") if "id" in requestData: logger.info("Обновлен запрос") try: db.session.execute( db.update(ApiEndpoint) .where(ApiEndpoint.id == requestData["id"]) .values( method=requestData.get("method"), title=requestData.get("title"), url_path=requestData.get("url_path"), payload=requestData.get("payload", {}), query_params=requestData.get("query_params", {}), ) ) db.session.commit() except Exception as e: logger.error(f"Ошибка при обновлении запроса: {e}") return jsonify({"status": "ok"}) else: logger.info("Добавлен запрос") try: db.session.merge(ApiEndpoint(**requestData)) db.session.commit() except Exception as e: logger.error(f"Ошибка при добавлении запроса: {e}") return jsonify({"status": "ok"}) case "PATCH": logger.info("Выполнен запрос") try: requestParams = ApiEndpoint.query.filter_by( id=requestData["id"] ).first() medodsDB = MedodsAPI.query.first() baseUrl = medodsDB.url response = send_request( requestParams.method, f"{baseUrl}{requestParams.url_path}", requestParams.payload, requestParams.query_params, ) exitData = {} try: exitData = response.json() except: exitData = response.text return jsonify(exitData) except Exception as e: logger.error(f"Ошибка при выполнении запроса: {e}") return jsonify({"status": "error"}), 500 case "GET": logger.info("Получен список запросов") requestsDB = ApiEndpoint.query.all() if requestsDB: requestsList = [r.toDict() for r in requestsDB] return jsonify( { "status": "ok", "requests": requestsList, } ) case _: logger.error("Неверный метод запроса") return jsonify({"status": "error"}), 405 @app.route("/api/birthdate", methods=["GET", "POST", "PATCH"]) def api_birthdate(): match request.method: case "POST": reqData = request.json userUpdate = reqData.get("userUpdate", None) if userUpdate: logger.info("Обновлен пользователь") userId = userUpdate["userId"] userData = userUpdate["userData"] try: userDB = UsersBirthdate.query.filter_by(id=userId).first() userDB.photo_link = userData["photoLink"] userDB.congratulations = userData["congratulations"] db.session.commit() return jsonify({"status": "ok"}) except Exception as e: logger.error(f"Ошибка при обновлении пользователя: {e}") return jsonify({"status": "error"}), 500 scheduleSettings = reqData.get("scheduleSettings") if scheduleSettings: logger.info("Обновлены настройки расписания") try: scheduleDB = BirthdateScheduler.query.first() if scheduleDB: scheduleDB.hour = scheduleSettings["hour"] scheduleDB.minute = scheduleSettings["minute"] scheduleDB.enabled = scheduleSettings["enabled"] else: scheduleDB = BirthdateScheduler( hour=scheduleSettings["hour"], minute=scheduleSettings["minute"], enabled=scheduleSettings["enabled"], ) db.session.add(scheduleDB) db.session.commit() enable_birthdate_job() scheduleInfo = get_birthdate_scheduler_status() return jsonify( { "status": "ok", "next_run_time": scheduleInfo.get("next_run_time"), } ) except Exception as e: logger.error(f"Ошибка при обновлении настройки расписания: {e}") return jsonify({"status": "error"}), 500 case "GET": logger.info("Получен список пользователей") users = UsersBirthdate.query.all() if users: users = [u.toDict() for u in users] return jsonify( { "status": "ok", "users": users, } ) case "PATCH": success = updateUsersBirthdate() if success: return jsonify({"status": "ok"}) else: return jsonify({"status": "error"}), 500 case _: logger.error("Неверный метод запроса") return jsonify({"status": "error"}), 405 @app.route("/api/vk", methods=["POST"]) def api_vk(): requestData = request.json if "id" in requestData: logger.info("Обновлен запрос") try: db.session.execute( db.update(VkAPI) .where(VkAPI.id == requestData["id"]) .values( group_id=( int(requestData.get("group_id", 0)) if requestData.get("group_id") else 0 ), access_token=requestData.get("access_token"), base_photo_url=( int(requestData.get("base_photo_url", 0)) if requestData.get("base_photo_url") else 0 ), ) ) db.session.commit() except Exception as e: logger.error(f"Ошибка при обновлении запроса: {e}") return jsonify({"status": "ok"}) else: logger.info("Добавлен запрос") try: db.session.merge(VkAPI(**requestData)) db.session.commit() except Exception as e: logger.error(f"Ошибка при добавлении запроса: {e}") return jsonify({"status": "ok"}) @app.route("/api/posts", methods=["POST", "GET"]) def api_posts(): response = {"status": "ok"} match request.method: case "POST": requestData = request.json logger.info("Настройки публикации и расписания") vkPostData = requestData.get("vkPostData", None) if vkPostData: selectedUsers = vkPostData.get("selectedUsers", None) static_text = vkPostData.get("static_text", None) full_name = vkPostData.get("full_name", None) logger.info("Обновление настроек публикации") try: vkPost = VkPost.query.first() if vkPost: if selectedUsers: vkPost.selected_users = selectedUsers if static_text: vkPost.static_text = static_text if full_name is not None: vkPost.full_name = full_name else: vkPostData["selected_users"] = vkPostData.pop("selectedUsers") db.session.merge(VkPost(**vkPostData)) db.session.commit() except Exception as e: logger.error(f"Ошибка при обновлении настроек публикации: {e}") schedulerData = requestData.get("schedulerData", None) if schedulerData: logger.info("Обновление расписания публикации") try: scheduler = PostScheduler.query.first() hour = schedulerData.get("hour", None) minute = schedulerData.get("minute", None) enabled = schedulerData.get("enabled", None) if scheduler: if hour is not None: if scheduler.hour != hour: scheduler.hour = hour if minute is not None: if scheduler.minute != minute: scheduler.minute = minute if enabled is not None: if scheduler.enabled != enabled: scheduler.enabled = enabled else: db.session.merge( PostScheduler( hour=int(hour), minute=int(minute), enabled=enabled, ) ) db.session.commit() enable_publish_job() scheduleInfo = get_scheduler_status() response["next_run_time"] = scheduleInfo.get("next_run_time") except Exception as e: logger.error(f"Ошибка при обновлении расписания публикации: {e}") return jsonify(response) case "GET": queryParams = request.args.to_dict() action = queryParams.get("action", None) if action: match action: case "update_users": logger.info("Обновить список пользователей") result = updateMedodsUsers() return jsonify({"ok": result}) case "handle_posts": from vk_handler import handle_vk_post logger.info("Выполнить публикацию") handle_vk_post() return jsonify({"ok": True}) case _: logger.error("Неверный метод запроса") return jsonify({"status": "error"}), 405 return jsonify({"ok": False, "status": "error", "message": "no action"}) case _: logger.error("Неверный метод запроса") return jsonify({"status": "error"}), 405 @app.route("/login", methods=["GET", "POST", "PATCH"]) def login(): match request.method: case "GET": return render_template("login.html") case "POST": data = request.get_json() password = data.get("password") protection = Protection.query.first() if protection: if not protection.verify_password(password): return ( jsonify({"status": "error", "errorMessage": "Неверный пароль"}), 401, ) else: protection.generate_token() db.session.commit() else: protection = Protection() protection.set_password(password) protection.generate_token() db.session.add(protection) db.session.commit() return jsonify({"status": "ok", "token": protection.token}), 200 case "PATCH": data = request.get_json() new_password = data.get("password") if not new_password: return {"status": "error", "errorMessage": "Пароль не задан"}, 400 protection = Protection.query.first() if not protection: protection = Protection() protection.set_password(new_password) protection.generate_token() db.session.add(protection) db.session.commit() else: protection.set_password(new_password) db.session.commit() return {"status": "ok"} case _: logger.error("Неверный метод запроса") return jsonify({"status": "error"}), 405 def init_app(): with app.app_context(): db.create_all() enable_publish_job() enable_birthdate_job() logger.info("Приложение запущено") @app.before_request def check_auth_cookie(): endpoint = request.endpoint if endpoint is None: return if endpoint == "login" or endpoint.startswith("static"): return p = Protection.query.first() if not p or not p.verify_token(request.cookies.get("auth_token")): return redirect(url_for("login")) if __name__ == "__main__": init_app() app.run(host="0.0.0.0", port=80)