#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ twitch-irl-docker Copyright (C) 2022 gpkvt This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . """ import logging from obswebsocket import obsws, requests class ObsRemote(): def __init__(self, conf): global stats self._error = False self._stats = False self._conf = conf stats = {} stats['streaming'] = False def connect(self): try: logging.debug('Connecting to OBS') self._ws = obsws(self._conf['OBS_URL'], self._conf['OBS_PORT'], self._conf['OBS_PASSWORD']) self._ws.connect() self._ws.register(stream_status) except: logging.error('Could not connect to OBS') self._error = "connect" self._ws = False pass return self._ws def is_streaming(self): logging.debug('is_streaming') try: logging.debug('Getting streaming status') status = self._ws.call(requests.GetStreamingStatus()) status = status.getStreaming() logging.debug('Streaming status: %s', status) self._is_streaming = status except: logging.warning('Could not get streaming status') self._is_streaming = False self._error = "is_streaming" return self._is_streaming def get_scene(self): logging.debug('get_scene') try: logging.debug('Getting current OBS scene') scene = self._ws.call(requests.GetCurrentScene()) current_scene = scene.getName() self._current_scene = current_scene except: logging.warning('Could not get current OBS scene') self._current_scene = False self._error = "get_scene" return self._current_scene def get_sources_and_filter(self): logging.debug('get_sources_and_filter') active_vlc_sources = "" active_filters = "" active_audio = "" muted_vlc_sources = "" muted_filters = "" muted_audio = "" volume_settings = "" logging.debug('Getting OBS scene elements') sources = self._ws.call(requests.GetSourcesList()) for s in sources.getSources(): if s['typeId'] == "vlc_source": active_vlc_sources += str(s['name'])+',' filters = self._ws.call(requests.GetSourceFilters(s['name'])) for f in filters.getFilters(): if f['enabled']: active_filters += str(s['name'])+'|'+str(f['name'])+',' else: muted_filters += str(s['name'])+'|'+str(f['name'])+',' else: muted_vlc_sources = str(s['name'])+',' if s['typeId'] == "vlc_source" or s['typeId'] == 'ffmpeg_source' or s['typeId'] == 'wasapi_input_capture' or s['typeId'] == 'wasapi_output_capture': audio_status = self._ws.call(requests.GetMute(s['name'])) if audio_status.getMuted(): muted_audio += str(s['name'])+',' volume_settings += '0'+',' else: active_audio += str(s['name'])+',' volume = self._ws.call(requests.GetVolume(s['name'])) volume_settings += str(round(float(volume.getVolume()) * 100,1))+',' active_vlc_sources = active_vlc_sources[:-1] active_filters = active_filters[:-1] active_audio = active_audio[:-1] muted_vlc_sources = muted_vlc_sources[:-1] muted_filters = muted_filters[:-1] muted_audio = muted_audio[:-1] volume_settings = volume_settings[:-1] sources_and_filter = { "active_vlc_sources": active_vlc_sources, "active_filters": active_filters, "active_audio": active_audio, "muted_vlc_sources": muted_vlc_sources, "muted_filters": muted_filters, "muted_audio": muted_audio, "volume_settings": volume_settings} return sources_and_filter def set_scene(self, scene): logging.debug('set_scene') try: logging.debug('Settings OBS scene') self._ws.call(requests.SetCurrentScene(scene)) sources = self._ws.call(requests.GetSourcesList()) # Reload all vlc_sources in scene, otherwise video might get stuck for s in sources.getSources(): if s['typeId'] == 'vlc_source': self._ws.call(requests.SetSceneItemRender(s['name'], False)) self._ws.call(requests.SetSceneItemRender(s['name'], True)) current_scene = self._ws.call(requests.GetCurrentScene()) except Exception as e: logging.warning('Could not set OBS scene') logging.warning(e) current_scene = False self._error = "set_scene" pass self._current_scene = current_scene return current_scene def error(self): logging.debug('OBS Error: %s', str(self._error)) error = self._error self._error = False return error def get(self): logging.debug('get') global stats return stats stats = {} def stream_status(message): logging.debug('stream_status') global stats # pyobs thread crashes when OBS gets closed # We need to know as we won't get any status infos afterwards # As we will receive many other events after Exit we need to make sure # that stats will not be overwritten, otherwise there is no way to # inform the main loop. if stats['streaming'] == "Exit": return if message.name == "Exiting": logging.debug('OBS was shut down') stats['streaming'] = "Exit" stats['error'] = False return # Collect stats, as only certain events will have the dict key below # we wrap everything in try/except. This may produce errors in debuglog. # TODO: Check event type try: stats = { 'num-dropped-frames': message.getNumDroppedFrames(), 'preview-only': message.getPreviewOnly(), 'num-total-frames': message.getNumTotalFrames(), 'stream-timecode': message.getTotalStreamTime(), 'output-total-frames': message.getOutputTotalFrames(), 'average-frame-time': message.getAverageFrameTime(), 'kbits-per-sec': message.getKbitsPerSec(), 'render-missed-frames': message.getRenderMissedFrames(), 'total-stream-time': message.getTotalStreamTime(), 'memory-usage': message.getMemoryUsage(), 'bytes-per-sec': message.getBytesPerSec(), 'strain': message.getStrain(), 'recording': message.getRecording(), 'cpu-usage': message.getCpuUsage(), 'replay-buffer-active': message.getReplayBufferActive(), 'output-skipped-frames': message.getOutputSkippedFrames(), 'fps': message.getFps(), 'render-total-frames': message.getRenderTotalFrames(), 'streaming': message.getStreaming(), 'free-disk-space': message.getFreeDiskSpace(), } except Exception as e: logging.error(e) stats = { 'streaming': False, 'error': True } logging.debug(stats) def main(): print("Nothing to see here") if __name__ == "__main__": main()