Files
medods_to_n3_health/src/db/schemas/signings.py
T
2026-02-15 17:02:40 +03:00

375 lines
13 KiB
Python

from datetime import datetime, timedelta
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, func, select
from sqlalchemy.orm import relationship
from db import Base, CRUD
from utils import logger, answer, toDict
class Signing(Base):
__tablename__ = "signings"
id = Column(Integer, primary_key=True, index=True)
userIdLpu = Column(
String, ForeignKey("practitioners.userIdLpu", ondelete="CASCADE")
)
idPatientMis = Column(
String, ForeignKey("patients.idPatientMis", ondelete="CASCADE")
)
storagePath = Column(String, nullable=True)
trackingId = Column(String, nullable=True)
documents = relationship(
"Document",
cascade="all, delete-orphan",
lazy="joined",
uselist=True,
single_parent=True,
)
statuses = relationship(
"Statuses",
cascade="all, delete-orphan",
lazy="joined",
uselist=True,
single_parent=True,
)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def toDict(self):
return toDict(self)
async def save(self) -> "Signing":
return await CRUD.create(self, refresh=True)
async def edit(self, **kwargs) -> "Signing":
return await CRUD.update(Signing, self.id, **kwargs)
async def delete(self) -> bool:
return await CRUD.delete(self)
@staticmethod
async def addSigning(userIdLpu: str, idPatientMis: str):
from utils.background import enable_job
if type(userIdLpu) != str:
userIdLpu = str(userIdLpu)
if type(idPatientMis) != str:
idPatientMis = str(idPatientMis)
enable_job()
return await Signing(userIdLpu=userIdLpu, idPatientMis=idPatientMis).save()
@staticmethod
async def getSigningById(id: int, toDict: bool = True):
singing = await CRUD.read(select(Signing).where(Signing.id == id))
if singing:
data = singing.toDict() if toDict else singing
return answer(data=data)
logger.error(f"Подписание не найдено, id: {id}")
return answer(success=False, message="Подписание не найдено")
@staticmethod
async def getSigningsByIdPatientMis(idPatientMis: str, toDict: bool = True):
if type(idPatientMis) != str:
idPatientMis = str(idPatientMis)
signings = await CRUD.read(
select(Signing).where(Signing.idPatientMis == idPatientMis), True
)
if signings:
data = [signing.toDict() for signing in signings] if toDict else signings
return answer(data=data)
logger.error(f"Подписания не найдены, idPatientMis: {idPatientMis}")
return answer(success=False, message="Подписания не найдены")
@staticmethod
async def getSigningsByUserIdLpu(userIdLpu: str, toDict: bool = True):
signings = await CRUD.read(
select(Signing).where(Signing.userIdLpu == userIdLpu), True
)
if signings:
data = [signing.toDict() for signing in signings] if toDict else signings
return answer(data=data)
logger.error(f"Подписания не найдены, userIdLpu: {userIdLpu}")
return answer(success=False, message="Подписания не найдены")
@staticmethod
async def getSigningsByUserIdLpuAndIdPatientMis(
userIdLpu: str, idPatientMis: str, toDict: bool = True
):
signings = await CRUD.read(
select(Signing)
.where(Signing.userIdLpu == userIdLpu)
.where(Signing.idPatientMis == idPatientMis),
True,
)
if signings:
data = [signing.toDict() for signing in signings] if toDict else signings
return answer(data=data)
logger.error(
f"Подписания не найдены, userIdLpu: {userIdLpu}, idPatientMis: {idPatientMis}"
)
return answer(success=False, message="Подписания не найдены")
@staticmethod
async def updateStatuses():
checkList = []
singingsInProgress = await CRUD.read(
select(Signing).where(Signing.storagePath == None), True
)
if singingsInProgress and len(singingsInProgress) > 0:
for signing in singingsInProgress:
if len(signing.statuses) > 0:
maxCode = max([status.status for status in signing.statuses])
if maxCode >= 204:
continue
checkList.append(signing.id)
logger.info(f"Проверка статусов для {len(checkList)} подписаний")
if len(checkList) > 0:
from db.schemas.statuses import Statuses
for id in checkList:
await Statuses.updateStatus(id)
else:
from utils.background import disable_job
logger.info("Нет подписаний для проверки")
disable_job()
@staticmethod
async def getFilteredSingings(userIdLpu: str, filters: dict):
def checkResult(result):
if not result or len(result) == 0:
return False
else:
return True
userFilter = None
match filters.get("sender"):
case "my":
userFilter = True
case "not_my":
userFilter = False
case _:
pass
periodFilterStart = None
periodFilterStop = None
match filters.get("period"):
case "today":
periodFilterStart = datetime.now().replace(
hour=0, minute=0, second=0, microsecond=0
)
case "3days":
periodFilterStart = (datetime.now() - timedelta(days=2)).replace(
hour=0, minute=0, second=0, microsecond=0
)
case "7days":
periodFilterStart = (datetime.now() - timedelta(days=6)).replace(
hour=0, minute=0, second=0, microsecond=0
)
case "30days":
periodFilterStart = (datetime.now() - timedelta(days=30)).replace(
hour=0, minute=0, second=0, microsecond=0
)
case "90days":
periodFilterStart = (datetime.now() - timedelta(days=90)).replace(
hour=0, minute=0, second=0, microsecond=0
)
case "custom":
periodFilterStart = datetime.strptime(
filters.get("dateFrom"), "%Y-%m-%d"
)
periodFilterStop = datetime.strptime(filters.get("dateTo"), "%Y-%m-%d")
if periodFilterStop < periodFilterStart:
periodFilterStop, periodFilterStart = (
periodFilterStart,
periodFilterStop,
)
case _:
pass
query = select(Signing)
if userFilter is not None:
if userFilter: # True - только свои
query = query.where(Signing.userIdLpu == userIdLpu)
else: # False - все кроме своих
query = query.where(Signing.userIdLpu != userIdLpu)
if periodFilterStart is not None:
query = query.where(Signing.created_at >= periodFilterStart)
if periodFilterStop is not None:
query = query.where(Signing.created_at <= periodFilterStop)
query = query.order_by(Signing.created_at.desc())
result = await CRUD.read(query, True)
if not checkResult(result):
return answer(data=[])
match filters.get("status"):
case "processing":
result = [
signing
for signing in result
if (signing.storagePath is None and signing.trackingId is not None)
]
case "completed":
result = [
signing for signing in result if signing.storagePath is not None
]
case "error":
result = [
signing
for signing in result
if (signing.trackingId is None and signing.storagePath is None)
]
case _:
pass
if not checkResult(result):
return answer(data=[])
esiaFilter = None
match filters.get("esia", filters.get("signatureType")):
case "esia":
esiaFilter = True
case "not_esia":
esiaFilter = False
case _:
pass
from db.schemas.practitioners import Practitioner
userIdLpus = set([signing.userIdLpu for signing in result])
query = select(Practitioner).where(Practitioner.userIdLpu.in_(userIdLpus))
if filters.get("senderName") is not None and filters.get("senderName") != "":
query = query.where(
func.lower(Practitioner.familyName).like(
"%" + filters.get("senderName").lower() + "%"
)
)
practitioners = await CRUD.read(query, True)
if not practitioners or len(practitioners) == 0:
result = []
else:
practitionersData = {
practitioner.userIdLpu: {
"userName": practitioner.name,
"esiaAuth": practitioner.esiaAuth,
}
for practitioner in practitioners
}
result = [signing.toDict() for signing in result]
match esiaFilter:
case True:
result = [
{
**signing,
"userName": practitionersData[signing.get("userIdLpu")][
"userName"
],
"esiaAuth": practitionersData[signing.get("userIdLpu")][
"esiaAuth"
],
}
for signing in result
if practitionersData[signing.get("userIdLpu")]["esiaAuth"]
]
case False:
result = [
{
**signing,
"userName": practitionersData[signing.get("userIdLpu")][
"userName"
],
"esiaAuth": practitionersData[signing.get("userIdLpu")][
"esiaAuth"
],
}
for signing in result
if not practitionersData[signing.get("userIdLpu")]["esiaAuth"]
]
case _:
result = [
{
**signing,
"userName": practitionersData[signing.get("userIdLpu")][
"userName"
],
"esiaAuth": practitionersData[signing.get("userIdLpu")][
"esiaAuth"
],
}
for signing in result
]
if esiaFilter is not None and not checkResult(result):
return answer(data=[])
from db.schemas.patients import Patient
idPatientMises = set([signing.get("idPatientMis") for signing in result])
query = select(Patient).where(Patient.idPatientMis.in_(idPatientMises))
if filters.get("patientName") is not None and filters.get("patientName") != "":
query = query.where(
func.lower(Patient.name).like(
"%" + filters.get("patientName").lower() + "%"
)
)
patients = await CRUD.read(query, True)
if not patients or len(patients) == 0:
result = []
else:
patientsData = {patient.idPatientMis: patient.name for patient in patients}
result = [
{
**signing,
"patientName": patientsData[signing.get("idPatientMis")],
}
for signing in result
]
if (
filters.get("documentNumber") is not None
and filters.get("documentNumber") != ""
):
try:
search_number = int(filters.get("documentNumber"))
result = [
signing
for signing in result
if any(
doc.get("number") == search_number
for doc in signing.get("documents", [])
)
]
except ValueError:
# Если не удалось преобразовать в число, пропускаем фильтр
pass
return answer(data=result)