twitch-irl-docker/build/scripts/lib/obs.py

220 lines
7.9 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
twitch-irl-docker
Copyright (C) 2022 gpkvt
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import logging
from obswebsocket import obsws, requests
class ObsRemote():
def __init__(self, conf):
global stats
self._error = False
self._stats = False
self._conf = conf
stats = {}
stats['streaming'] = False
def connect(self):
try:
logging.debug('Connecting to OBS')
self._ws = obsws(self._conf['OBS_URL'], self._conf['OBS_PORT'], self._conf['OBS_PASSWORD'])
self._ws.connect()
self._ws.register(stream_status)
except:
logging.error('Could not connect to OBS')
self._error = "connect"
self._ws = False
pass
return self._ws
def is_streaming(self):
logging.debug('is_streaming')
try:
logging.debug('Getting streaming status')
status = self._ws.call(requests.GetStreamingStatus())
status = status.getStreaming()
logging.debug('Streaming status: %s', status)
self._is_streaming = status
except:
logging.warning('Could not get streaming status')
self._is_streaming = False
self._error = "is_streaming"
return self._is_streaming
def get_scene(self):
logging.debug('get_scene')
try:
logging.debug('Getting current OBS scene')
scene = self._ws.call(requests.GetCurrentScene())
current_scene = scene.getName()
self._current_scene = current_scene
except:
logging.warning('Could not get current OBS scene')
self._current_scene = False
self._error = "get_scene"
return self._current_scene
def get_sources_and_filter(self):
logging.debug('get_sources_and_filter')
active_vlc_sources = ""
active_filters = ""
active_audio = ""
muted_vlc_sources = ""
muted_filters = ""
muted_audio = ""
volume_settings = ""
logging.debug('Getting OBS scene elements')
sources = self._ws.call(requests.GetSourcesList())
for s in sources.getSources():
if s['typeId'] == "vlc_source":
active_vlc_sources += str(s['name'])+','
filters = self._ws.call(requests.GetSourceFilters(s['name']))
for f in filters.getFilters():
if f['enabled']:
active_filters += str(s['name'])+'|'+str(f['name'])+','
else:
muted_filters += str(s['name'])+'|'+str(f['name'])+','
else:
muted_vlc_sources = str(s['name'])+','
if s['typeId'] == "vlc_source" or s['typeId'] == 'ffmpeg_source' or s['typeId'] == 'wasapi_input_capture' or s['typeId'] == 'wasapi_output_capture':
audio_status = self._ws.call(requests.GetMute(s['name']))
if audio_status.getMuted():
muted_audio += str(s['name'])+','
volume_settings += '0'+','
else:
active_audio += str(s['name'])+','
volume = self._ws.call(requests.GetVolume(s['name']))
volume_settings += str(round(float(volume.getVolume()) * 100,1))+','
active_vlc_sources = active_vlc_sources[:-1]
active_filters = active_filters[:-1]
active_audio = active_audio[:-1]
muted_vlc_sources = muted_vlc_sources[:-1]
muted_filters = muted_filters[:-1]
muted_audio = muted_audio[:-1]
volume_settings = volume_settings[:-1]
sources_and_filter = { "active_vlc_sources": active_vlc_sources, "active_filters": active_filters, "active_audio": active_audio, "muted_vlc_sources": muted_vlc_sources, "muted_filters": muted_filters, "muted_audio": muted_audio, "volume_settings": volume_settings}
return sources_and_filter
def set_scene(self, scene):
logging.debug('set_scene')
try:
logging.debug('Settings OBS scene')
self._ws.call(requests.SetCurrentScene(scene))
sources = self._ws.call(requests.GetSourcesList())
# Reload all vlc_sources in scene, otherwise video might get stuck
for s in sources.getSources():
if s['typeId'] == 'vlc_source':
self._ws.call(requests.SetSceneItemRender(s['name'], False))
self._ws.call(requests.SetSceneItemRender(s['name'], True))
current_scene = self._ws.call(requests.GetCurrentScene())
except Exception as e:
logging.warning('Could not set OBS scene')
logging.warning(e)
current_scene = False
self._error = "set_scene"
pass
self._current_scene = current_scene
return current_scene
def error(self):
logging.debug('OBS Error: %s', str(self._error))
error = self._error
self._error = False
return error
def get(self):
logging.debug('get')
global stats
return stats
stats = {}
def stream_status(message):
logging.debug('stream_status')
global stats
# pyobs thread crashes when OBS gets closed
# We need to know as we won't get any status infos afterwards
# As we will receive many other events after Exit we need to make sure
# that stats will not be overwritten, otherwise there is no way to
# inform the main loop.
if stats['streaming'] == "Exit":
return
if message.name == "Exiting":
logging.debug('OBS was shut down')
stats['streaming'] = "Exit"
stats['error'] = False
return
# Collect stats, as only certain events will have the dict key below
# we wrap everything in try/except. This may produce errors in debuglog.
# TODO: Check event type
try:
stats = {
'num-dropped-frames': message.getNumDroppedFrames(),
'preview-only': message.getPreviewOnly(),
'num-total-frames': message.getNumTotalFrames(),
'stream-timecode': message.getTotalStreamTime(),
'output-total-frames': message.getOutputTotalFrames(),
'average-frame-time': message.getAverageFrameTime(),
'kbits-per-sec': message.getKbitsPerSec(),
'render-missed-frames': message.getRenderMissedFrames(),
'total-stream-time': message.getTotalStreamTime(),
'memory-usage': message.getMemoryUsage(),
'bytes-per-sec': message.getBytesPerSec(),
'strain': message.getStrain(),
'recording': message.getRecording(),
'cpu-usage': message.getCpuUsage(),
'replay-buffer-active': message.getReplayBufferActive(),
'output-skipped-frames': message.getOutputSkippedFrames(),
'fps': message.getFps(),
'render-total-frames': message.getRenderTotalFrames(),
'streaming': message.getStreaming(),
'free-disk-space': message.getFreeDiskSpace(),
}
except Exception as e:
logging.error(e)
stats = {
'streaming': False,
'error': True
}
logging.debug(stats)
def main():
print("Nothing to see here")
if __name__ == "__main__":
main()