187 lines
6.9 KiB
Python
187 lines
6.9 KiB
Python
import datetime
|
|
from db import ApiEndpoint, MedodsAPI, UsersMedods, VkPost, db
|
|
from http_client import send_request
|
|
|
|
|
|
def updateMedodsUsers() -> bool:
|
|
from app import logger
|
|
|
|
try:
|
|
medodsApi = MedodsAPI.query.first()
|
|
if not medodsApi:
|
|
return False
|
|
requestParams = ApiEndpoint.query.filter_by(title="Список докторов").first()
|
|
if not requestParams:
|
|
return False
|
|
response = send_request(
|
|
requestParams.method,
|
|
f"{medodsApi.url}{requestParams.url_path}",
|
|
params=requestParams.query_params,
|
|
)
|
|
if not response:
|
|
logger.error("Ответ не получен")
|
|
return False
|
|
responseData = response.json()
|
|
usersFromDB = []
|
|
for user in responseData["data"]:
|
|
if user["availabilityForOnlineRecording"] != "available":
|
|
continue
|
|
userDict = {
|
|
"id": int(user["id"]),
|
|
"name": f"{user['surname']} {user['name']} {user['secondName']}",
|
|
"short_name": f"{user['surname']} {user['name'][:1]}. {user['secondName'][:1]}.",
|
|
"sex": user["sex"],
|
|
"step": int(user["appointmentDuration"]),
|
|
"specialties": [spec["title"] for spec in user["specialties"]],
|
|
}
|
|
usersFromDB.append(userDict)
|
|
actualUsersIds = [user["id"] for user in usersFromDB]
|
|
allExistingUsers = UsersMedods.query.all()
|
|
for user in allExistingUsers:
|
|
if user.id not in actualUsersIds:
|
|
logger.info(f"Удален доктор {user.name} {user.surname}")
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
for user in usersFromDB:
|
|
existingUser = UsersMedods.query.filter_by(id=user["id"]).first()
|
|
if existingUser:
|
|
changes = False
|
|
if existingUser.name != user["name"]:
|
|
existingUser.name = user["name"]
|
|
existingUser.short_name = user["short_name"]
|
|
changes = True
|
|
if existingUser.step != user["step"]:
|
|
existingUser.step = user["step"]
|
|
changes = True
|
|
if existingUser.specialties != user["specialties"]:
|
|
existingUser.specialties = user["specialties"]
|
|
changes = True
|
|
|
|
if changes:
|
|
db.session.commit()
|
|
else:
|
|
newUser = UsersMedods(
|
|
id=user["id"],
|
|
name=user["name"],
|
|
short_name=user["short_name"],
|
|
sex=user["sex"],
|
|
step=user["step"],
|
|
specialties=user["specialties"],
|
|
)
|
|
db.session.add(newUser)
|
|
db.session.commit()
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Ошибка при обновлении списка докторов: {e}")
|
|
return False
|
|
|
|
|
|
def getFreeSlots(vkPost) -> dict:
|
|
from app import logger
|
|
|
|
try:
|
|
if not vkPost:
|
|
logger.error("Информация для размещения поста не найдена")
|
|
return {}
|
|
selectedUsersIds = vkPost.selected_users
|
|
selectedUsers = UsersMedods.query.filter(
|
|
UsersMedods.id.in_(selectedUsersIds)
|
|
).all()
|
|
requestParams = ApiEndpoint.query.filter_by(
|
|
title="Свободные записи на дату"
|
|
).first()
|
|
medodsApi = MedodsAPI.query.first()
|
|
|
|
if not requestParams or len(selectedUsers) == 0 or not medodsApi:
|
|
logger.error("Ошибка получения необходимых параметров")
|
|
return {}
|
|
|
|
userParams = [{str(user.id): {"step": user.step}} for user in selectedUsers]
|
|
|
|
tomorrow = datetime.date.today() + datetime.timedelta(days=1)
|
|
the_day_after_tomorrow = tomorrow + datetime.timedelta(days=1)
|
|
startDate = tomorrow.strftime("%Y-%m-%d")
|
|
endDate = the_day_after_tomorrow.strftime("%Y-%m-%d")
|
|
|
|
json = requestParams.payload
|
|
json["startDate"] = startDate
|
|
json["endDate"] = endDate
|
|
json["userParams"] = userParams
|
|
|
|
url = medodsApi.url + requestParams.url_path
|
|
|
|
logger.info(url)
|
|
logger.info(json)
|
|
response = send_request(
|
|
requestParams.method,
|
|
url,
|
|
json,
|
|
)
|
|
if not response:
|
|
logger.error("Ответ не получен")
|
|
return {}
|
|
slotsDataFull = response.json()
|
|
logger.info(slotsDataFull)
|
|
if len(slotsDataFull.keys()) == 0:
|
|
logger.error("Нет свободных приемов")
|
|
return {}
|
|
firstKey = list(slotsDataFull.keys())[0]
|
|
slotsData = {"date": firstKey, "slots": slotsDataFull.get(firstKey)}
|
|
return slotsData
|
|
except Exception as e:
|
|
logger.error(f"Ошибка при получении свободных приемов: {e}")
|
|
return {}
|
|
|
|
|
|
def setDynamicText():
|
|
from app import logger
|
|
|
|
try:
|
|
logger.info("Получение информации о посте")
|
|
vkPost = VkPost.query.first()
|
|
if not vkPost:
|
|
logger.error("Информация для размещения поста не найдена")
|
|
return vkPost
|
|
logger.info("Получение информации о свободных приемах")
|
|
freeSlots = getFreeSlots(vkPost)
|
|
if len(freeSlots.keys()) == 0:
|
|
logger.error("Нет свободных приемов")
|
|
return vkPost
|
|
userIds = [int(key) for key in freeSlots["slots"].keys()]
|
|
usersMedods = UsersMedods.query.filter(UsersMedods.id.in_(userIds)).all()
|
|
if len(usersMedods) == 0:
|
|
logger.error("Не найдены доктора с свободными приемами")
|
|
return vkPost
|
|
|
|
usersMedods.sort(key=lambda x: x.name)
|
|
users = [user.toDict() for user in usersMedods]
|
|
|
|
dateText = (
|
|
freeSlots["date"][8:]
|
|
+ "."
|
|
+ freeSlots["date"][5:7]
|
|
+ "."
|
|
+ freeSlots["date"][:4]
|
|
)
|
|
|
|
dynamicText = f"📌 Свободная запись на 📅 {dateText}:\n\n"
|
|
|
|
for user in users:
|
|
sex_icon = "👨⚕️" if user["sex"] == "male" else "👩⚕️"
|
|
slots = freeSlots["slots"][str(user["id"])]
|
|
name = user["name"] if vkPost.full_name else user["shortName"]
|
|
|
|
dynamicText += f"{sex_icon} {name} ({', '.join(user['specialties'])}):\n"
|
|
dynamicText += f"🕒 {', '.join(slots)}\n\n"
|
|
|
|
vkPost.dynamic_text = dynamicText
|
|
db.session.commit()
|
|
|
|
return vkPost
|
|
|
|
except Exception as e:
|
|
logger.error(f"Ошибка при обновлении списка докторов: {e}")
|
|
return
|