#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ twitch-irl-docker Copyright (C) 2022 gpkvt This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . """ import time import json import requests import logging from bs4 import BeautifulSoup from pprint import pprint class RtmpStats(): def __init__(self, conf): self._error = False self._conf = conf self._stream = conf['RTMP_STREAM_NAME'] self._error = self.error() self._xml_timestamp = 0 self._xml = False if self._conf['DEBUG']: logging.warning('Debug mode active: Using http://localhost/debug.xml as RTMP status source') self._conf['RTMP_STAT_URL'] = 'http://localhost/debug.xml' def get_tag(self, xml, tag): for value in xml.find_all(tag): return str(value.text) return 0 def load(self): ''' Load stats from RTMP server ''' logging.debug('Getting RTMP data from: ' + str(self._conf['RTMP_STAT_URL'])) try: logging.debug('Loading RTMP stats') xml = requests.get(self._conf['RTMP_STAT_URL']) if xml.status_code != 200: logging.error('Could not get RTMP stats from %s', str(self._conf['RTMP_STAT_URL'])) self._error = True xml = xml.text logging.debug('Parsing RTMP stats') xml = BeautifulSoup(xml, 'lxml') self._xml_timestamp = time.time() self._xml = xml return xml except Exception as e: logging.error('Could not get/parse RTMP stats from %s', str(self._conf['RTMP_STAT_URL'])) logging.error(e) self._error = True return False def get(self): ''' Returns RTMP stat values ''' xml = self.load() rtmp_stats = {} rtmp_stats['uptime'] = self.get_tag(xml, 'uptime') rtmp_stats['accepted'] = self.get_tag(xml, 'naccepted') rtmp_stats['bw_in_sum'] = self.get_tag(xml, 'bw_in') rtmp_stats['bw_out_sum'] = self.get_tag(xml, 'bw_out') rtmp_stats['bytes_in_sum'] = self.get_tag(xml, 'bytes_in') rtmp_stats['bytes_out_sum'] = self.get_tag(xml, 'bytes_out') for stream in xml.findAll('stream'): stream_name = self.get_tag(stream, 'name') if stream_name not in rtmp_stats: rtmp_stats[stream_name] = {} rtmp_stats[stream_name]['video'] = {} rtmp_stats[stream_name]['audio'] = {} rtmp_stats[stream_name]['audio'] = {} rtmp_stats[stream_name]['client'] = {} rtmp_stats[stream_name]['bw_audio'] = self.get_tag(stream, 'bw_audio') rtmp_stats[stream_name]['bw_video'] = self.get_tag(stream, 'bw_video') rtmp_stats[stream_name]['bw_in'] = self.get_tag(stream, 'bw_in') rtmp_stats[stream_name]['bw_out'] = self.get_tag(stream, 'bw_out') rtmp_stats[stream_name]['bytes_in'] = self.get_tag(stream, 'bytes_in') rtmp_stats[stream_name]['bytes_out'] = self.get_tag(stream, 'bytes_out') rtmp_stats[stream_name]['clients'] = self.get_tag(stream, 'nclients') for video in stream.find_all('video'): rtmp_stats[stream_name]['video']['width'] = self.get_tag(video, 'width') rtmp_stats[stream_name]['video']['height'] = self.get_tag(video, 'height') rtmp_stats[stream_name]['video']['frame_rate'] = self.get_tag(video, 'frame_rate') rtmp_stats[stream_name]['video']['codec'] = self.get_tag(video, 'codec') rtmp_stats[stream_name]['video']['profile'] = self.get_tag(video, 'profile') for audio in stream.find_all('audio'): rtmp_stats[stream_name]['audio']['channels'] = self.get_tag(audio, 'channels') rtmp_stats[stream_name]['audio']['sample_rate'] = self.get_tag(audio, 'sample_rate') rtmp_stats[stream_name]['audio']['codec'] = self.get_tag(audio, 'codec') rtmp_stats[stream_name]['audio']['profile'] = self.get_tag(audio, 'profile') client_id = 0 for client in stream.find_all('client'): if id not in rtmp_stats[stream_name]['client']: rtmp_stats[stream_name]['client'][client_id] = {} rtmp_stats[stream_name]['client'][client_id]['address'] = self.get_tag(client, 'address') rtmp_stats[stream_name]['client'][client_id]['time'] = self.get_tag(client, 'time') rtmp_stats[stream_name]['client'][client_id]['flashver'] = self.get_tag(client, 'flashver') rtmp_stats[stream_name]['client'][client_id]['dropped'] = self.get_tag(client, 'dropped') rtmp_stats[stream_name]['client'][client_id]['avsync'] = self.get_tag(client, 'avsync') rtmp_stats[stream_name]['client'][client_id]['timestamp'] = self.get_tag(client, 'timestamp') if self.get_tag(client, 'publishing'): rtmp_stats[stream_name]['client'][client_id]['publishing'] = True else: rtmp_stats[stream_name]['client'][client_id]['publishing'] = False client_id = client_id + 1 self._stats = rtmp_stats return rtmp_stats def check_low_bitrate(self): ''' Check if bitrate is below threshold ''' bitrate = self.get_bitrate() if int(bitrate) <= int(self._conf['LOW_BITRATE']): logging.info('Bitrate is low') return True else: logging.debug('Bitrate is good') return False def get_bitrate(self): ''' Get current bitrate for given stream ''' stream = self._conf['RTMP_STREAM_NAME'] streams = 0 xml = self.load() try: #bitrate = xml.select_one('rtmp server application ' + stream + ' stream bw_in') bitrate = xml.select_one('rtmp bw_in') streams = len(xml.find_all('publishing')) if not streams: streams = 1 bitrate = bitrate.text bitrate = int(bitrate) / 1024 / int(streams) except Exception as e: bitrate = 0 logging.warning('Could not get bitrate') logging.warning(e) self._error = True pass logging.debug('Current Bitrate: %s', str(bitrate)) self._bitrate = bitrate return bitrate def get_publishing_status(self): ''' Get publishing status for given stream ''' logging.debug('Getting publishing status for: ' + str(self._conf['RTMP_STREAM_NAME'])) stream = self._conf['RTMP_STREAM_NAME'] xml = self.load() try: publishing = xml.select_one('rtmp server application ' + str(stream) + ' stream publishing') except Exception as e: publishing = False logging.warning('Could not get publishing status') logging.warning(e) self._error = True pass if publishing: publishing = True else: publishing = False return publishing def error(self): logging.debug('RTMP Error: %s', str(self._error)) return self._error def get_json(self): json_stats = json.loads(json.dumps(self._stats), parse_int=str) return json_stats def main(): print("Nothing to see here") if __name__ == "__main__": main()