import asyncio from datetime import datetime import json import logging import logging.config import os import aiohttp from aiohttp import ClientSession # AMI_CHANNEL_FILTER = [] AMI_CHANNEL_FILTER = ["PJSIP/Megafon_3", "PJSIP/rt_769402"] ID_VARS = ("ConnectedLineNum", "CallerIDNum", "DestCallerIDNum", "Source") CALL_DEBUG = True CALL_DEBUG = False FIXRECORDS = True FIXRECORDS = False MULTICHECK = True MULTICHECK = False OPERATORS = ("10", "12", "13") records = {} async def send_post_request(body: dict): async def post_request( url: str, headers: dict = None, json: dict = None, **kwargs, ): try: async with ClientSession() as session: async with session.post( url, json=json, headers=headers, **kwargs ) as response: response.raise_for_status() return await response.json() except aiohttp.ClientError as e: return None data = {"call": body} headers = { "Content-Type": "application/json", "Authorization": f"Bearer NjlmMmYzNThlOWNjYTI5ZGNlYTYzNz", } url = "http://188.64.134.62:3000/api/v2/telephony/common" res = await post_request(url=url, headers=headers, json=data) if CALL_DEBUG: loggingDict("Server data", {"body": body, "response": res}) def redirect_ids(responsibles): return responsibles async def incoming_call(linkedid, client, exten): return None async def call_started(linkedid, responsible): return None async def call_lost(linkedid, responsible): return None async def call_finished(linkedid, duration): body = { "status": "call_finished", "call_session_id": linkedid, "duration": duration, } if FIXRECORDS: await send_post_request(body) return None async def call_record_file(linkedid, uniqueid): if CALL_DEBUG: logging.info(f"Record ID: {linkedid} -> {uniqueid}") body = { "status": "call_record_file", "call_session_id": linkedid, "file_link": f"http://192.168.75.10:3050/{uniqueid}", } if FIXRECORDS: await send_post_request(body) return None def loggingDict(title: str, data: dict) -> None: logging.info( f"{title}: %s", json.dumps(data, indent=4, ensure_ascii=False).encode("utf-8").decode("utf-8"), ) async def log_to_list(filename: str = None): if not filename: filename = "log/call.log" if CALL_DEBUG else "log/file.log" events = [] with open(filename, "r", encoding="utf-8") as f: lines = f.read().splitlines() count = 0 event = {} m_time = "" line_num = 0 for line in lines: try: int(line[0:1]) m_time = line except: if line == "": count += 1 else: line_data = line.split(": ") try: event[line_data[0]] = line_data[1] except Exception as e: logging.exception(e) logging.error(line_data) if count == 3: count = 0 if len(event) > 0: events.append(event) # loggingDict("event", event) event = {"m_time": m_time, "line_num": line_num} line_num += 1 events.append(event) return events async def csv_to_dict(filename: str = None): if not filename: filename = "log/cdr.csv" asterisk_records = {} with open(filename, "r", encoding="utf-8") as f: # lines = f.readlines() lines = f.read().splitlines() titles = lines[0].strip().split(",") lines.pop(0) for line in lines: event = {} line_data = line.strip().split(",") for i in range(len(titles)): event[titles[i]] = line_data[i] if ( event.get("peeraccount").startswith("external") and event.get("accountcode") == "ANSWERED" and event.get("dst") in OPERATORS ): asterisk_records[event.get("sequence")] = { "id": event.get("did"), "time": event.get("calldate"), "responsible": event.get("dst"), "duration": event.get("amaflags"), } # logging.info(f"{event.get('sequence')} -> {event.get('did')}") # loggingDict("event", event) # loggingDict("asterisk_records", asterisk_records) return asterisk_records class CallHandler: def __init__(self): self.calls = {} self.date = datetime.now().date() self.finished = [] async def handle_event(self, event): def check_linkedid(event): linkedid = event.get("Linkedid") try: linkedid_split = linkedid.split(".") uniqueid = event.get("Uniqueid") uniqueid_split = uniqueid.split(".") if int(linkedid_split[1]) <= int(uniqueid_split[1]): return True return False except: return False def check_date(): if self.date != datetime.now().date(): self.date = datetime.now().date() logging.info(f"Date changed to {self.date} and reset calls database") self.finished = [] self.calls = {} def channel_to_responsible(channel: str): try: if channel.startswith("Local"): channel_data = channel.split("@") resp = channel_data[0] resp_data = resp.split("/") responsible = resp_data[-1] int(responsible) return responsible except: pass return None try: if check_linkedid(event): linkedid = event.get("Linkedid") if linkedid in self.finished: return if linkedid not in self.calls.keys(): check_date() context = event.get("Context") if context == "from-internal": self.finished.append(linkedid) if CALL_DEBUG: loggingDict("INTERNAL", event) return if context != "from-trunk": return if len(AMI_CHANNEL_FILTER) > 0: event_channel = event.get("Channel") if len(event_channel) < 7: return for filter in AMI_CHANNEL_FILTER: if event_channel.startswith(filter): await self.incoming_call(event, linkedid) if CALL_DEBUG: loggingDict("NEW", event) return self.finished.append(linkedid) if CALL_DEBUG: loggingDict("FILTER", event) return else: await self.incoming_call(event, linkedid) if CALL_DEBUG: loggingDict(f"NEW: {linkedid}", event) return else: if CALL_DEBUG: logging.info( f"Time: {event.get('m_time')}, Line: {event.get('line_num')}" ) if event.get("ChannelStateDesc") == "Ring": if ( event.get("Context") == "macro-user-callerid" and ( event.get("Event") == "VarSet" or ( event.get("Event") != "Newexten" and event.get("Variable") == "MACRO_DEPTH" ) ) and event.get("Application") not in ("ExecIf", "Goto", "Return") ) or ( event.get("Variable") == "DIALEDPEERNUMBER" and event.get("Exten") in OPERATORS ): try: uniq = event.get("Uniqueid") if uniq != linkedid: if ( uniq not in self.calls[linkedid]["records"].values() ): target = "records" else: target = "records_duble" if CALL_DEBUG: logging.warning( f"{uniq} -> {event.get('Channel')}" ) uniq_data = uniq.split(".") if int(uniq_data[1]) > int(linkedid.split(".")[1]): responsible = channel_to_responsible( event.get("Channel") ) if responsible in OPERATORS: if ( responsible in self.calls[linkedid][ "records" ].keys() ): resp_uniq = self.calls[linkedid][ "records" ][responsible] if int(uniq_data[-1]) < int( resp_uniq.split(".")[-1] ): return if ( responsible not in self.calls[linkedid][ "responsibles" ] ): self.calls[linkedid][ "responsibles" ].append(responsible) if target == "records": if ( responsible in self.calls[linkedid][ target ].keys() ): free_uniq = self.calls[linkedid][ target ][responsible] self.calls[linkedid][target][ responsible ] = uniq for ( dub_id, dub_uniq, ) in self.calls[ linkedid ]["records_duble"].items(): if dub_uniq == free_uniq: if ( dub_id not in self.calls[ linkedid ][target].keys() ): self.calls[linkedid][ target ][dub_id] = dub_uniq self.calls[linkedid][ "records_duble" ].pop(dub_id) else: self.calls[linkedid][target][ responsible ] = uniq else: if ( uniq not in self.calls[linkedid][ target ].values() ): if CALL_DEBUG: logging.warning( f"{uniq} -> {responsible}" ) if responsible not in self.calls[ linkedid ]["records"].keys() or ( responsible in self.calls[linkedid][ "records" ].keys() and self.calls[linkedid][ "records" ][responsible] != uniq ): if CALL_DEBUG: logging.warning( f"Add record ID: {responsible} -> {uniq}, type: {target}" ) self.calls[linkedid][target][ responsible ] = uniq else: if CALL_DEBUG: loggingDict( f"{responsible} -> {uniq}", self.calls[linkedid][ "records" ], ) if CALL_DEBUG: loggingDict( "records", self.calls[linkedid]["records"], ) loggingDict( "records_duble", self.calls[linkedid][ "records_duble" ], ) loggingDict( f"Add record ID: {responsible} -> {uniq}, type: {target}", event, ) return except: return if ( event.get("Context") == "sub-record-check" and ( ".wav" in event.get("AppData") or "external" in event.get("AppData") ) and event.get("Exten") == event.get("Extension") == "recordcheck" and event.get("Uniqueid") != linkedid # and event.get("Uniqueid") # not in self.calls[linkedid]["records"].values() ): responsible = channel_to_responsible(event.get("Channel")) if ( responsible in OPERATORS and responsible not in self.calls[linkedid]["responsibles"] ): self.calls[linkedid]["records"][responsible] = ( event.get("Uniqueid") ) if ( responsible not in self.calls[linkedid]["responsibles"] ): self.calls[linkedid]["responsibles"].append( responsible ) if self.calls[linkedid]["started"] is None: if ( ( event.get("DialStatus") == "ANSWER" and event.get("DestChannelStateDesc") == "Up" ) or event.get("Variable") in ("BRIDGEPVTCALLID", "BRIDGEPEER") or event.get("BridgeTechnology") == "simple_bridge" ): for var in ID_VARS: answered = event.get(var) if CALL_DEBUG: logging.info( f"{var} -> {answered} <= {self.calls[linkedid]['responsibles']} == {answered in self.calls[linkedid]['responsibles']}" ) if answered in self.calls[linkedid]["responsibles"]: if CALL_DEBUG: logging.info(f"Call started: {linkedid}") await self.call_started( linkedid, answered, event.get("m_time") ) if CALL_DEBUG: loggingDict("Start", event) break if event.get("Disposition") == "NO ANSWER": await self.call_lost(linkedid) if CALL_DEBUG: loggingDict("LOST", event) return else: transfered = False if ( event.get("BridgeTechnology") == "simple_bridge" and event.get("Context") == "from-internal-xfer" and event.get("ChannelStateDesc") == "Up" ): new_responsible = event.get("Exten") old_responsible = event.get("CallerIDNum") transfered = True if ( event.get("BridgeTechnology") == event.get("ToBridgeTechnology") == event.get("FromBridgeTechnology") == "simple_bridge" and event.get("CallerIDNum") not in OPERATORS and event.get("ChannelStateDesc") == "Up" ): new_responsible = event.get("CallerIDNum") old_responsible = event.get("ConnectedLineNum") transfered = True if transfered: self.call_transfered( linkedid, new_responsible, old_responsible, event.get("m_time"), ) m_time = event.get("m_time") pre_m_data = m_time.split(" ") if len(pre_m_data) > 1: m_time = pre_m_data[1] m_time_data = m_time.split(":") duration = int( ( datetime.now().replace( hour=int(m_time_data[0]), minute=int(m_time_data[1]), second=int(m_time_data[2]), ) - self.calls[linkedid]["started"] ).total_seconds() ) if ( ( ( "BillableSeconds" in event.keys() and event.get("Disposition") == "ANSWERED" and ( event.get("Uniqueid") != linkedid or ( event.get("Cause-txt") == "Normal Clearing" or event.get("Context") == "macro-hangupcall" or event.get("Application") == "Hangup" ) ) ) or ( "TalkTime" in event.keys() and event.get("Event") == "VarSet" ) or ( event.get("Application") == "Hangup" and event.get("Disposition") != "NO ANSWER" and event.get("ChannelStateDesc") != "Ring" and ( event.get("Uniqueid") != linkedid or event.get("Cause-txt") == "Normal Clearing" ) and duration > 1 ) or ( event.get("AppData") == "hangupcall," and event.get("ChannelStateDesc") == "Up" and ( event.get("Uniqueid") != linkedid or event.get("Context") == "ext-queues" ) # and duration >= 1 ) or ( ( event.get("Context") == "macro-hangupcall" and event.get("ChannelStateDesc") == "Up" ) and ( ( event.get("ConnectedLineNum") in OPERATORS or event.get("ConnectedLineNum") in self.calls[linkedid]["responsibles"] ) or ( ( event.get("CallerIDNum") in OPERATORS or event.get("CallerIDNum") in self.calls[linkedid]["responsibles"] ) and event.get("Event") == "BridgeLeave" ) ) and ( event.get("Uniqueid") != linkedid or duration > 1 ) ) or (event.get("Event") == "Cdr") ) and event.get("Context") != "from-internal-xfer" and not event.get("Event").startswith("RTC") ): if CALL_DEBUG: loggingDict("Call ENDED?", event) tolk_time = 0 for var in ("BillableSeconds", "TalkTime"): if var in event.keys(): if CALL_DEBUG: logging.info(f"{var} -> {event.get(var)}") tolk_time += int(event.get(var)) break if tolk_time > duration: duration = tolk_time if CALL_DEBUG: logging.info( f"{duration=}, {tolk_time=}, {m_time=}, {self.calls[linkedid]['started']=}" ) transfer_duration = None if self.calls[linkedid]["transfered"] is not None: transfer_duration = int( ( datetime.now().replace( hour=int(m_time_data[0]), minute=int(m_time_data[1]), second=int(m_time_data[2]), ) - self.calls[linkedid]["transfered"] ).total_seconds() ) # if duration >= 1: if duration >= 1 and ( transfer_duration is None or transfer_duration > 1 ): record_id = None if ( event.get("AppData") == "hangupcall," and event.get("Cause") == "16" and event.get("Context") == "ext-local" and event.get("ConnectedLineNum") in OPERATORS and event.get("Uniqueid") != linkedid ): record_id = event.get("Uniqueid") if record_id is None and duration < 2: return if event.get("Event") == "AttendedTransfer": record_id = event.get("TransfereeUniqueid") if ( event.get("Context") == "macro-hangupcall" and event.get("Uniqueid") != linkedid and event.get("ConnectedLineNum") in OPERATORS and "BillableSeconds" not in event.keys() ): record_id = event.get("Uniqueid") if ( event.get("Application") == "Hangup" and event.get("Membership") == "static" and event.get("ConnectedLineNum") in OPERATORS and event.get("Uniqueid") != linkedid ): record_id = event.get("Uniqueid") if record_id is None: for var in ID_VARS: answered = event.get(var) if CALL_DEBUG: logging.info( f"{answered}, {var}, {self.calls[linkedid]['responsibles']}" ) if ( answered in self.calls[linkedid]["responsibles"] ): try: record_id = self.calls[linkedid][ "records" ][answered] except: record_id = self.calls[linkedid][ "records_duble" ][answered] break if record_id is None: answered = channel_to_responsible( event.get("Channel") ) if answered in self.calls[linkedid]["responsibles"]: try: record_id = self.calls[linkedid]["records"][ answered ] except: record_id = self.calls[linkedid][ "records_duble" ][answered] if record_id is None: if CALL_DEBUG: logging.warning( f"Call not finished: {linkedid}" ) return await self.call_finished( linkedid, duration, record_id, m_time ) if CALL_DEBUG: logging.info( f"Call finished: {linkedid}: {duration} -> {record_id} <- {m_time}" ) # loggingDict("FINISH", event) return if CALL_DEBUG: logging.warning(f"Call not finished: {linkedid}") except Exception as e: # logging.error( # f"{linkedid} Time: {event.get('m_time')}, Line: {event.get('line_num')} Error: {e}" # ) # logging.info(f"Transfered: {self.calls[linkedid]['responsibles']}") pass async def incoming_call(self, event, linkedid): self.calls[linkedid] = { "responsibles": [], "started": None, "transfered": None, "records": {}, "records_duble": {}, } exten = event.get("Exten") if event.get("Exten") else event.get("Extension") await incoming_call(linkedid, event.get("CallerIDNum"), exten) def call_transfered(self, linkedid, new_responsible, old_responsible, m_time): pre_m_data = m_time.split(" ") if len(pre_m_data) > 1: m_time = pre_m_data[1] m_time_data = m_time.split(":") try: int(new_responsible) if new_responsible not in self.calls[linkedid]["responsibles"]: if old_responsible in self.calls[linkedid]["records"].keys(): self.calls[linkedid]["records"][new_responsible] = self.calls[ linkedid ]["records"][old_responsible] self.calls[linkedid]["responsibles"].append(new_responsible) self.calls[linkedid]["transfered"] = datetime.now().replace( hour=int(m_time_data[0]), minute=int(m_time_data[1]), second=int(m_time_data[2]), ) if CALL_DEBUG: logging.warning( f"Transfered: from {old_responsible} to {new_responsible}" ) else: if old_responsible in self.calls[linkedid]["records_duble"].keys(): self.calls[linkedid]["records"][new_responsible] = self.calls[ linkedid ]["records_duble"][old_responsible] self.calls[linkedid]["responsibles"].append(new_responsible) self.calls[linkedid]["transfered"] = datetime.now().replace( hour=int(m_time_data[0]), minute=int(m_time_data[1]), second=int(m_time_data[2]), ) if CALL_DEBUG: logging.warning( f"Transfered: from {old_responsible} to {new_responsible}" ) except: logging.info(f"Call not transfered: {new_responsible}") async def call_started(self, linkedid, responsible, m_time): pre_m_data = m_time.split(" ") if len(pre_m_data) > 1: m_time = pre_m_data[1] m_time_data = m_time.split(":") self.calls[linkedid]["started"] = datetime.now().replace( hour=int(m_time_data[0]), minute=int(m_time_data[1]), second=int(m_time_data[2]), ) # logging.info(f"Call started: ID={linkedid}, Responsible={responsible}") await call_started(linkedid, redirect_ids(responsible)) async def call_finished(self, linkedid, duration, uniqueid, m_time): # logging.info( # f"Call finished: ID={linkedid}, Duration={duration}, UniqueID={uniqueid}" # ) self.finished.append(linkedid) self.calls.pop(linkedid) records[linkedid] = {"duration": duration, "uniqueid": uniqueid, "time": m_time} await call_finished(linkedid, duration) await call_record_file(linkedid, uniqueid) async def call_lost(self, linkedid): # logging.info( # f"Call lost: ID={linkedid}, responsibles: {self.calls[linkedid]['responsibles']}" # ) await call_lost(linkedid, redirect_ids(self.calls[linkedid]["responsibles"])) async def multiCheck(): path = "log/data/" log_files = [f.split(".")[0] for f in os.listdir(path) if f.endswith(".log")] return log_files async def main(): if MULTICHECK and not CALL_DEBUG: check_list = await multiCheck() check_total = { "OK": 0, "Wrong ID": 0, "Wrong Duration": 0, "Error": 0, } else: check_list = [None] for filename in check_list: events = ( await log_to_list(f"log/data/{filename}.log") if MULTICHECK else await log_to_list() ) handler = CallHandler() for event in events: await handler.handle_event(event) asterisk = ( await csv_to_dict(f"log/data/{filename}.csv") if MULTICHECK else await csv_to_dict() ) report_date = filename if filename else datetime.now().strftime("%Y-%m-%d") check_dict = { "summary": { "OK": 0, "Wrong ID": 0, "Wrong Duration": 0, "Error": 0, "date": report_date, }, "details": {}, } for linkedid, call_data in asterisk.items(): record = records.get(linkedid) # logging.info(record) if record is not None and record["uniqueid"] == call_data["id"]: check_dict["details"][linkedid] = True check_dict["summary"]["OK"] += 1 if not CALL_DEBUG: if (int(record["duration"]) - int(call_data["duration"])) > -2: # logging.info( # f"{call_data['time']}: {linkedid} -> {call_data['id']} <{call_data['duration']}> (Asterisk) == {record['uniqueid']} <{record['duration']}> (Medods)" # ) pass else: logging.warning( f"{call_data['time']}: {linkedid} -> {call_data['id']} <{call_data['duration']}> (Asterisk) == {record['uniqueid']} <{record['duration']}> -{record['time']}- (Medods)" ) check_dict["summary"]["Wrong Duration"] += 1 else: check_dict["details"][linkedid] = False if not CALL_DEBUG: if linkedid not in handler.finished: if record is None: logging.error( f"{call_data['time']}: {linkedid} -> {call_data['id']} [{call_data['responsible']}] (Asterisk) != {record} (Medods)" ) check_dict["summary"]["Error"] += 1 else: logging.warning( f"{call_data['time']}: {linkedid} -> {call_data['id']} [{call_data['responsible']}] (Asterisk) != {record['uniqueid']} (Medods)" ) check_dict["summary"]["Wrong ID"] += 1 if MULTICHECK and not CALL_DEBUG: for key in check_total.keys(): check_total[key] += check_dict["summary"][key] if not CALL_DEBUG: loggingDict("Day result", check_dict["summary"]) if MULTICHECK and not CALL_DEBUG: loggingDict("Total result", check_total) if __name__ == "__main__": logging.config.fileConfig("log.ini", disable_existing_loggers=False) asyncio.run(main())