from datetime import datetime import logging import wave import config import medods import aiofiles import os async def calls_to_log(in_dict): date_dir = datetime.now().strftime("%Y-%m-%d") date_dir_path = os.path.join("log", date_dir) if not os.path.exists(date_dir_path): os.makedirs(date_dir_path) file_name = f"log/{date_dir}/{in_dict.get('Linkedid')}.log" async with aiofiles.open(file_name, "a") as f: await f.write(f"\n\n{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") for key, value in in_dict.items(): await f.write(f"{key}: {value}\n") class CallHandler: def __init__(self): self.date = datetime.now().date() self.calls = {} self.pending = [] self.finished = [] async def handle_event(self, event): def check_linkedid(event): linkedid = event.get("Linkedid") try: linkedid_split = linkedid.split(".") uniqueid = event.get("Uniqueid") uniqueid_split = uniqueid.split(".") if int(linkedid_split[1]) <= int(uniqueid_split[1]): if len(uniqueid_split[1]) - len(linkedid_split[1]) > 0: return False return True return False except: return False def check_date(): if self.date != datetime.now().date(): self.date = datetime.now().date() logging.info(f"Date changed to {self.date} and reset calls database") if config.DEBUG: if len(self.calls) > 0: logging.warning("Calls unhandled:") logging.warning(self.calls) self.finished = [] self.pending = [] self.calls = {} try: if check_linkedid(event): if config.DEBUG: await calls_to_log(event) linkedid = event.get("Linkedid") if linkedid in self.finished: return if linkedid not in self.calls.keys(): check_date() context = event.get("Context") if context != "from-trunk": if context == "from-internal": self.finished.append(linkedid) return event_channel = event.get("Channel") if len(event_channel) < 7 or ( "LOCAL" in event.get("Variable", "") and "LocalAddress" not in event.keys() and "CID-CallingPres" not in event.keys() ): return for filter in config.AMI_CHANNEL_FILTER: if event_channel.startswith(filter) or ( event.get("Value") == filter.split("/")[1] or event.get("Extension") == filter.split("/")[1] or event.get("Exten") == filter.split("/")[1] ): await self.incoming_call(event, linkedid) return self.finished.append(linkedid) return else: event_now = datetime.now() self.calls[linkedid]["last_activity"] = event_now if self.calls[linkedid]["started"] is None: if ( ( event.get("DialStatus") == "ANSWER" and event.get("ChannelStateDesc") == "Up" ) or ( event.get("Event") == "AgentCalled" and event.get("ChannelStateDesc") == "Up" ) or ( event.get("Variable") in ("BRIDGEPVTCALLID", "BRIDGEPEER") ) or event.get("BridgeTechnology") == "simple_bridge" ): for var in config.ID_VARS: answered = event.get(var) if answered in config.OPERATORS: await self.call_started( linkedid, answered, event_now ) break if event.get("Disposition") == "NO ANSWER": await self.call_lost(linkedid) return else: await self.check_pending() if linkedid in self.pending: return duration = int( ( event_now - self.calls[linkedid]["started"] ).total_seconds() ) if duration == 1: talkTime = 0 for var in ("TalkTime", "BillableSeconds"): if var in event.keys(): talkTime += int(event.get(var)) break duration = max(duration, talkTime) def check_call_end(event): if ( "BillableSeconds" in event.keys() and ( event.get("Disposition") == "ANSWERED" or ( ( event.get("Event") == "DeviceStateChange" or event.get("Event") == "Hangup" ) and event.get("ChannelStateDesc") != "Ring" ) or event.get("Variable") == "ANSWEREDTIME_MS" ) and ( event.get("Uniqueid") != linkedid or ( event.get("Cause-txt") == "Normal Clearing" or event.get("Context") == "macro-hangupcall" or event.get("Application") == "Hangup" ) ) and ( event.get("DestinationContext") != "ext-local" or event.get("Event") == "AgentComplete" or event.get("Reason") == "agent" or event.get("AppData") == "hangupcall," or ( "ConnectedLineNum" in event.keys() and event.get("ConnectedLineNum") in event.get("LastData") and ( event.get("Variable") == "RTPAUDIOQOS" or event.get("Variable") == "MACRO_PRIORITY" or event.get("Variable") == "MACRO_DEPTH" ) ) ) ): return True if ( event.get("Event") == "AgentComplete" and event.get("ConnectedLineNum") in config.OPERATORS ): return True if ( event.get("Event") == "BridgeLeave" and event.get("CallerIDNum") in config.OPERATORS ): return True if ( "TalkTime" in event.keys() and event.get("Event") == "VarSet" ): return True if ( event.get("Value") == "novm" and event.get("Event") == "VarSet" and event.get("Context") == "macro-dial-one" and event.get("BridgeTechnology") == "simple_bridge" and event.get("ChannelStateDesc") != "Ring" ): return True if "Value" in event.keys() and ( event.get("Value").startswith("ANSWER") and event.get("Event") == "VarSet" and event.get("Context") == "macro-dial-one" and event.get("BridgeTechnology") == "simple_bridge" and event.get("ChannelStateDesc") != "Ring" ): return True if ( event.get("Application") == "Hangup" and ( event.get("Disposition") != "NO ANSWER" or event.get("Event") == "QueueMemberStatus" ) and event.get("ChannelStateDesc") != "Ring" and ( event.get("Uniqueid") != linkedid or event.get("Cause-txt") == "Normal Clearing" ) ): return True if ( event.get("AppData") == "hangupcall," and event.get("ChannelStateDesc") == "Up" and ( event.get("Uniqueid") != linkedid or event.get("Context") == "ext-queues" ) and (duration > 1) ): return True if ( ( "macro-hang" in event.get("Context", "") and event.get("ChannelStateDesc") == "Up" ) and ( event.get("ConnectedLineNum") in config.OPERATORS or ( event.get("CallerIDNum") in config.OPERATORS and event.get("Event") == "BridgeLeave" ) ) and ( event.get("Uniqueid") != linkedid or event.get("Event") == "Newexten" ) ): return True if ( event.get("Event") == "Cdr" and event.get("ChannelStateDesc") != "Ring" ): return True if ( event.get("ChannelStateDesc") == "Up" and event.get("Application") == "GosubIf" and event.get("Variable") == "RTPAUDIOQOSJITTER" ): return True return False if ( check_call_end(event) and event.get("Context") != "from-internal-xfer" and not event.get("Event").startswith("RTC") and "internal" not in event.get("Channel", "") ): await self.call_pending(linkedid) return except: pass async def incoming_call(self, event, linkedid): def phone_number(number: str): if len(number) == 6: number = f"78162{number}" else: if number.startswith("810"): number = number[3:] if number.startswith("8"): number = f"7{number[1:]}" return number client = phone_number( event.get("CallerIDNum") if len(event.get("CallerIDNum", "")) > 1 else event.get("CallerIDName") ) self.calls[linkedid] = { "started": None, "last_activity": datetime.now(), "client": client, } exten = event.get("Exten") if event.get("Exten") else event.get("Extension") logging.info( f"New incoming call: ID={linkedid}, Client={client}, Phone={exten}" ) await medods.incoming_call(linkedid, client, exten) async def call_started(self, linkedid, responsible): self.calls[linkedid]["started"] = datetime.now() logging.info(f"Call started: ID={linkedid}, Responsible={responsible}") await medods.call_started(linkedid) async def call_pending(self, linkedid): self.pending.append(linkedid) async def check_pending(self): check_time = datetime.now() actual_calls = {} for k, v in self.calls.items(): if k not in self.pending: actual_calls[k] = v for call_id, call_data in actual_calls.items(): if (check_time - call_data["last_activity"]).total_seconds() / 60 >= 60: if call_data["started"] is not None: await self.call_pending(call_id) else: await self.call_lost(call_id) await self.call_finished(call_id, True) pending = [f for f in self.pending] for linkedid in pending: if ( check_time - self.calls[linkedid]["last_activity"] ).total_seconds() / 60 >= 5: await self.call_finished(linkedid) async def call_finished(self, linkedid, call_lost: bool = False): self.finished.append(linkedid) if not call_lost: self.pending.remove(linkedid) def get_record_data(linkedid): def get_files_data(client): def get_wav_duration(filename): with wave.open(filename, "rb") as wav_file: return wav_file.getnframes() / wav_file.getframerate() directory = f"/var/spool/asterisk/monitor/{datetime.now().year}/{datetime.now().month}/{datetime.now().day}" tree = {} for root, _, files in os.walk(directory): for file in files: if ( file.endswith(".wav") and file.startswith("external") and client in file ): file_path = os.path.join(root, file) try: file_duration = get_wav_duration(file_path) if file_duration is not None and file_duration > 0: tree[file_path] = { "duration": file_duration, "created": datetime.fromtimestamp( os.path.getctime(file_path) ), } except: pass return tree client = self.calls[linkedid]["client"] started = ( self.calls[linkedid]["started"] if self.calls[linkedid]["started"] is not None else self.calls[linkedid]["last_activity"] ) files = get_files_data(client[1:]) result = {"id": "File not found", "duration": 0} control_difference = 999 for file, file_data in files.items(): record_time = file_data["created"] difference = abs(record_time - started).total_seconds() if difference < control_difference: control_difference = difference result["id"] = ( file.split("/")[-1].split("-")[-1].replace(".wav", "") ) result["duration"] = file_data["duration"] return result if not call_lost: record_data = get_record_data(linkedid) uniqueid = record_data.get("id") duration = ( int(record_data.get("duration")) if record_data.get("duration") > 0 else int( ( self.calls[linkedid]["last_activity"] - self.calls[linkedid]["started"] ).total_seconds() ) ) logging.info( f"Call finished: ID={linkedid}, Duration={duration}, Record ID={uniqueid}" ) await medods.call_finished(linkedid, duration) await medods.call_record_file(linkedid, uniqueid) else: await self.call_lost(linkedid) self.calls.pop(linkedid) async def call_lost(self, linkedid): logging.info(f"Call lost: ID={linkedid}") await medods.call_lost(linkedid)