Files
medods_crz/asterisk_ami.py
T

105 lines
3.5 KiB
Python

import asyncio
from datetime import datetime
import socket
import aiofiles
import call_handler
import config
from main import logger
def send_action(s, action):
s.send(action.encode())
response = ""
while True:
chunk = s.recv(4096)
try:
response += chunk.decode()
except UnicodeDecodeError:
pass
if "\r\n\r\n" in response:
break
response_parts = response.split("\r\n\r\n", 1)
body = response_parts[1]
body_dict = {
line.split(":")[0].strip(): line.split(":")[1].strip()
for line in body.split("\n")
if len(line.split(":")) > 1
}
return body_dict
async def full_log(event):
file_name = f"log/{datetime.now().strftime('%Y-%m-%d')}.log"
async with aiofiles.open(file_name, "a") as f:
await f.write("\n\n" + datetime.now().strftime("%H:%M:%S") + "\n\n")
for key, value in event.items():
await f.write(f"{key}: {value}\n")
async def ami_listening():
callhandler = call_handler.CallHandler()
# процедура регулярной проверки соединения
async def check_connection():
while True:
try:
await asyncio.sleep(3600)
s.send(b"Action: Ping\r\n\r\n")
await asyncio.sleep(1)
except (ConnectionError, ConnectionResetError):
await asyncio.sleep(1)
return True # возвращаемся в начало функции ami_listening
# проверка соединения каждые 5 секунд
conn_check_task = asyncio.create_task(check_connection())
conn_check_task.add_done_callback(lambda t: ami_listening() if t.result() else None)
# запуск check_pending ежедневно в 23:00
async def daily_check_pending():
while True:
now = datetime.now()
if now.hour == 23 and now.minute == 0 and now.second < 5:
await callhandler.check_pending()
await asyncio.sleep(60)
else:
await asyncio.sleep(1)
daily_check_pending_task = asyncio.create_task(daily_check_pending())
while True:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
logger.info("Connecting to Asterisk...")
s.connect((config.AMI_HOST, config.AMI_PORT))
break
except ConnectionRefusedError:
logger.warning("Connection refused. Retrying...")
await asyncio.sleep(1)
logger.info("Connected to Asterisk")
logger.info("Logging in...")
login_action = f"Action: Login\r\nUsername: {config.AMI_USER}\r\nSecret: {config.AMI_PASSWORD}\r\n\r\n"
send_action(s, login_action)
logger.info("Logged in")
logger.info("Listening for events...")
events_action = "Action: Events\r\nEventMask: on\r\n\r\n"
send_action(s, events_action)
try:
while True:
event = send_action(s, "")
if event:
await callhandler.handle_event(event)
if config.DEBUG:
await full_log(event)
except (KeyboardInterrupt, SystemExit):
logger.info("Exiting...")
s.close()
daily_check_pending_task.cancel()
except ConnectionResetError:
logger.warning("Connection reset. Restarting...")
await ami_listening()