mirror of https://gitlab.com/gpvkt/twitchtts.git
792 lines
31 KiB
Python
792 lines
31 KiB
Python
#!/usr/bin/python3
|
|
# -*- coding: utf-8 -*-
|
|
# pylint: disable=line-too-long
|
|
|
|
"""
|
|
TwitchTTS
|
|
Copyright (C) 2022 gpkvt
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
import socket
|
|
import random
|
|
import logging
|
|
import datetime
|
|
import webbrowser
|
|
import socketserver
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
from threading import Thread
|
|
from collections import Counter
|
|
from urllib.parse import parse_qs
|
|
from http.server import BaseHTTPRequestHandler
|
|
|
|
import yaml
|
|
|
|
class IRC:
|
|
"""IRC bot"""
|
|
irc = socket.socket()
|
|
|
|
def __init__(self):
|
|
self.irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.tts_denied = []
|
|
self.tts_allowed = []
|
|
self.tts_status = conf['TTS_STARTENABLED']
|
|
self.quickvote_status = False
|
|
self.votemsg = False
|
|
self.poll = {}
|
|
self.pollcount = 0
|
|
|
|
if 'WHITELIST_USER' in conf:
|
|
self.tts_allowed = conf['WHITELIST_USER']
|
|
|
|
def connect(self, server, port, channel, botnick, botpass):
|
|
"""Connect to Twitch IRC servers"""
|
|
logging.info("Connecting to: %s", server)
|
|
try:
|
|
self.irc.connect((server, port))
|
|
except ConnectionResetError:
|
|
logging.fatal('Twitch refused to connect, please check your settings and try again.')
|
|
sys.exit(252)
|
|
|
|
self.irc.settimeout(1)
|
|
|
|
self.irc.send(bytes("PASS " + botpass + "\r\n", "UTF-8"))
|
|
self.irc.send(bytes(
|
|
"USER " + botnick + " " + botnick +" " + botnick + " :python\r\n", "UTF-8")
|
|
)
|
|
self.irc.send(bytes("NICK " + botnick + "\r\n", "UTF-8"))
|
|
self.irc.send(bytes("CAP REQ :twitch.tv/commands twitch.tv/tags \r\n", "UTF-8"))
|
|
time.sleep(5)
|
|
|
|
try:
|
|
self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8"))
|
|
except ConnectionResetError:
|
|
logging.warning('JOIN was refused, will try again in 30 seconds.')
|
|
time.sleep(30)
|
|
logging.warning('Please check your credentials, if this error persists.')
|
|
self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8"))
|
|
|
|
def sendmsg(self, channel, user, msg):
|
|
"""
|
|
Send (a public) message to IRC channel
|
|
|
|
Parameters:
|
|
channel (str): Channel to post msg to
|
|
user (str): User to address message to
|
|
msg (str): Message to send
|
|
"""
|
|
self.irc.send(bytes("PRIVMSG "+channel+" :"+user+" "+msg+"\r\n", "UTF-8"))
|
|
|
|
def get_response(self):
|
|
"""Get and process response from IRC"""
|
|
try:
|
|
resp = self.irc.recv(2048).decode("UTF-8")
|
|
logging.debug('resp:')
|
|
logging.debug(resp)
|
|
except socket.timeout:
|
|
return False
|
|
except Exception: # pylint: disable=broad-except
|
|
logging.exception('An unknown error occured while getting a IRC response.')
|
|
sys.exit(255)
|
|
|
|
if resp.find('PING') != -1:
|
|
logging.debug('PING received')
|
|
self.irc.send(bytes('PONG :tmi.twitch.tv\r\n', "UTF-8"))
|
|
|
|
if resp.find('CLEARMSG') != -1:
|
|
logging.info('CLEARMSG received')
|
|
msgid = False
|
|
|
|
global msg_queue_raw # pylint: disable=global-statement,invalid-name
|
|
filtered_msg_queue = []
|
|
|
|
tags = resp.split(';')
|
|
for tag in tags:
|
|
if "target-msg-id=" in tag:
|
|
msgid = tag.rsplit('target-msg-id=',1)[1]
|
|
logging.debug('Trying to suppress message')
|
|
logging.debug(msgid)
|
|
|
|
for msg in list(msg_queue_raw):
|
|
if msg['msgid'] == msgid:
|
|
logging.info('Suppressing message %s', msgid)
|
|
else:
|
|
filtered_msg_queue.append(msg)
|
|
|
|
msg_queue_raw = filtered_msg_queue
|
|
|
|
return True
|
|
|
|
if resp.find('NOTICE') != -1:
|
|
if 'Login authentication failed' in resp:
|
|
try:
|
|
raise RuntimeError()
|
|
except RuntimeError:
|
|
logging.exception('Login failed, please check your credentials and try again.')
|
|
sys.exit(251)
|
|
|
|
if resp.find('PRIVMSG') != -1:
|
|
logging.debug('PRIVMSG received')
|
|
badges = False
|
|
subscriber = False
|
|
msgid = False
|
|
msg = False
|
|
msglen = False
|
|
user = False
|
|
tts = False
|
|
|
|
tags = resp.split(';')
|
|
for tag in tags:
|
|
if tag.startswith('badges='):
|
|
badges = tag.rsplit('badges=',1)[1]
|
|
if tag.startswith('subscriber='):
|
|
subscriber = tag.rsplit('subscriber=',1)[1]
|
|
logging.debug('Subscriber: %s', subscriber)
|
|
if tag.startswith('id='):
|
|
msgid = tag.rsplit('id=',1)[1]
|
|
logging.debug('Message ID: %s', msgid)
|
|
if tag.startswith('display-name='):
|
|
user = tag.rsplit('display-name=',1)[1].lower()
|
|
logging.debug('Username: %s', user)
|
|
|
|
msg = resp.rsplit('PRIVMSG #',1)[1]
|
|
msg = msg.split(':',1)[1]
|
|
msg = msg.replace('\r\n','')
|
|
msglen = len(msg)
|
|
|
|
logging.debug('Msg: %s', msg)
|
|
logging.debug('Msg length: %s', msglen)
|
|
logging.debug('Deny List:')
|
|
logging.debug(self.tts_denied)
|
|
|
|
if msg.startswith('#') and self.quickvote_status is True:
|
|
logging.info('Quickvote: Cast detected')
|
|
self.pollcount += 1
|
|
self.poll[user] = msg.lower()
|
|
logging.debug(self.poll)
|
|
return True
|
|
|
|
if 'broadcaster' in badges or 'moderator' in badges:
|
|
if msg.startswith('!ping'):
|
|
logging.debug("Ping check received.")
|
|
self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), "Pong!")
|
|
return True
|
|
|
|
if msg.startswith('!dtts'):
|
|
logging.debug("!dtts command detected")
|
|
self.Commands.dtts(self, msg)
|
|
return True
|
|
|
|
if msg.startswith('!random'):
|
|
logging.info('!random command detected')
|
|
self.Commands.random(self, msg)
|
|
return True
|
|
|
|
if msg.startswith('!quickvote'):
|
|
logging.info("!quickvote command detected")
|
|
self.Commands.quickvote(self, msg)
|
|
return True
|
|
|
|
if msg.startswith('!ptts'):
|
|
logging.debug("!ptts command detected")
|
|
self.Commands.ptts(self, msg)
|
|
return True
|
|
|
|
if msg.startswith('!toff'):
|
|
logging.info('TTS is now turned off')
|
|
msg_queue.clear()
|
|
msg_queue_raw.clear()
|
|
self.tts_status = False
|
|
self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TOFF'])
|
|
return True
|
|
|
|
if msg.startswith('!ton'):
|
|
logging.info('TTS is now turned on')
|
|
msg_queue.clear()
|
|
msg_queue_raw.clear()
|
|
self.tts_status = True
|
|
self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TON'])
|
|
return True
|
|
|
|
if msg.startswith('!tts'):
|
|
logging.debug('!tts command detected')
|
|
logging.debug("tts status: %s", self.tts_status)
|
|
|
|
if not self.tts_status:
|
|
logging.info('TTS is disabled')
|
|
self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['DISABLED'])
|
|
return False
|
|
|
|
if msglen > conf['IRC_TTS_LEN']:
|
|
logging.info('TTS message is to long')
|
|
self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TOO_LONG'])
|
|
return False
|
|
|
|
if conf['IRC_SUBONLY']:
|
|
if subscriber != "0" or 'moderator' in badges or 'broadcaster' in badges:
|
|
logging.debug('TTS is sub-only and user has allowance')
|
|
else:
|
|
logging.info('TTS is sub-only')
|
|
self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['SUBONLY'])
|
|
return False
|
|
|
|
if conf['IRC_MODONLY']:
|
|
if 'moderator' in badges or 'broadcaster' in badges:
|
|
logging.debug('TTS is mod-only and user has allowance')
|
|
else:
|
|
logging.info('TTS is sub-only')
|
|
self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['MODONLY'])
|
|
return False
|
|
|
|
if user in self.tts_denied:
|
|
logging.info("%s is not allowed to use TTS", user)
|
|
self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['DENIED'])
|
|
return False
|
|
|
|
if conf['WHITELIST']:
|
|
if user not in self.tts_allowed:
|
|
logging.info('User is not on whitelist')
|
|
logging.info(self.tts_allowed)
|
|
self.sendmsg(
|
|
conf['IRC_CHANNEL'],
|
|
"@"+str(user), conf['MESSAGE']['WHITELISTONLY']
|
|
)
|
|
return False
|
|
|
|
logging.warning('Nobody is on the whitelist.')
|
|
self.sendmsg(
|
|
conf['IRC_CHANNEL'],
|
|
"@"+str(user), conf['MESSAGE']['WHITELISTONLY']
|
|
)
|
|
return False
|
|
|
|
logging.info('Valid TTS message, adding to raw queue')
|
|
tts = True
|
|
now = datetime.datetime.now()
|
|
msg = msg.replace('!tts','',1)
|
|
msg = {
|
|
"TTS": tts,
|
|
"msg": msg,
|
|
"badges": badges,
|
|
"subscriber": subscriber,
|
|
"msgid": msgid,
|
|
"user": user,
|
|
"length": msglen,
|
|
"queuetime": now,
|
|
"timestamp": str(time.time_ns())
|
|
}
|
|
msg_queue_raw.append(msg)
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
class Commands():
|
|
""" Bot commands """
|
|
|
|
def __init__(self):
|
|
self.tts_denied = []
|
|
self.tts_allowed = []
|
|
self.quickvote_status = self.quickvote_status
|
|
self.votemsg = self.votemsg
|
|
self.poll = self.poll
|
|
self.pollcount = self.pollcount
|
|
|
|
def quickvote(self, msg):
|
|
""" !quickvote command
|
|
|
|
Starts or stops the !quickvote function. On stop calculates the 5 most casted
|
|
votes and send them to chat. The highest vote is send to msg_queue.
|
|
|
|
:param str msg: The IRC message triggering the command
|
|
"""
|
|
|
|
if self.quickvote_status:
|
|
logging.debug('Quickvote stopped')
|
|
|
|
if self.pollcount == 0:
|
|
logging.info("Nobody voted")
|
|
IRC.sendmsg(self, conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTEEND'])
|
|
IRC.sendmsg(self, conf['IRC_CHANNEL'], "*", conf['MESSAGE']['VOTENOBODY'])
|
|
|
|
raw_msg = {
|
|
"TTS": True,
|
|
"msg": conf['MESSAGE']['VOTENOBODY'],
|
|
"badges": True,
|
|
"subscriber": True,
|
|
"msgid": True,
|
|
"user": conf['IRC_USERNAME'],
|
|
"length": conf['IRC_TTS_LEN'],
|
|
"queuetime": datetime.datetime.now(),
|
|
"timestamp": str(time.time_ns())
|
|
}
|
|
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
|
|
|
|
logging.info('The result is: %s', conf['MESSAGE']['VOTENOBODY'])
|
|
logging.debug('Votemsg: %s', msg)
|
|
|
|
self.quickvote_status = False
|
|
self.poll = {}
|
|
return False
|
|
|
|
logging.info("Counting votes")
|
|
count = 0
|
|
count = Counter(self.poll.values()).most_common(5)
|
|
IRC.sendmsg(self, conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTEEND'])
|
|
|
|
logging.debug(count)
|
|
|
|
raw_msg = {
|
|
"TTS": True,
|
|
"msg": conf['MESSAGE']['VOTERESULT'] +" "+ str(count[0][0].replace('#','')),
|
|
"badges": True,
|
|
"subscriber": True,
|
|
"msgid": True,
|
|
"user": conf['IRC_USERNAME'],
|
|
"length": conf['IRC_TTS_LEN'],
|
|
"queuetime": datetime.datetime.now(),
|
|
"timestamp": str(time.time_ns())
|
|
}
|
|
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
|
|
|
|
logging.info('The result is: %s', conf['MESSAGE']['VOTERESULT'] +" "+ str(count[0]))
|
|
logging.debug('Votemsg: %s', msg)
|
|
|
|
for key, value in count:
|
|
IRC.sendmsg(
|
|
self,
|
|
conf['IRC_CHANNEL'], "*",
|
|
str(key)+" ("+str(value)+ " "+ conf['MESSAGE']['VOTES'] + ")"
|
|
)
|
|
|
|
self.quickvote_status = False
|
|
self.poll = {}
|
|
self.pollcount = 0
|
|
return
|
|
|
|
logging.debug('Quickvote started')
|
|
self.quickvote_status = True
|
|
self.votemsg = msg.split('!quickvote', 1)[1].strip()
|
|
if self.votemsg:
|
|
IRC.sendmsg(self,
|
|
conf['IRC_CHANNEL'], "@chat",
|
|
conf['MESSAGE']['VOTESTART'] + " (" + str(self.votemsg) + ")"
|
|
)
|
|
else:
|
|
IRC.sendmsg(self, conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART'])
|
|
|
|
return
|
|
|
|
def random(self, msg):
|
|
""" !random command
|
|
|
|
Read a random line from randomfile and put it into msg_queue
|
|
If no file is given in msg a standard file will be used
|
|
|
|
:param str msg: The IRC message triggering the command
|
|
:raise: FileNotFoundError if randomfile does not exists
|
|
:return: True if line was successfully read and added to msg_queue
|
|
:rtype: bool
|
|
"""
|
|
|
|
randomfile = msg.replace('!random', '').strip().lower()
|
|
|
|
if randomfile:
|
|
randomfile = "random_"+str(os.path.basename(randomfile))+".txt"
|
|
else:
|
|
randomfile = "random.txt"
|
|
|
|
try:
|
|
with open(randomfile,"r", encoding="utf-8") as file:
|
|
lines = file.read().splitlines()
|
|
random_msg = random.choice(lines)
|
|
except FileNotFoundError:
|
|
logging.error('%s not found', randomfile)
|
|
return False
|
|
|
|
raw_msg = {
|
|
"TTS": True,
|
|
"msg": random_msg,
|
|
"badges": True,
|
|
"subscriber": True,
|
|
"msgid": True,
|
|
"user": conf['IRC_USERNAME'],
|
|
"length": conf['IRC_TTS_LEN'],
|
|
"queuetime": datetime.datetime.now(),
|
|
"timestamp": str(time.time_ns())
|
|
}
|
|
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
|
|
|
|
return True
|
|
|
|
def ptts(self, msg):
|
|
""" !ptts command
|
|
|
|
Add user to tts_allowed list and remove user from tts_denied list
|
|
|
|
:param str msg: The IRC message triggering the command
|
|
"""
|
|
|
|
user = msg.replace('!ptts', '').strip().lower()
|
|
|
|
if user.startswith('@'):
|
|
logging.debug('Removing "@" from username')
|
|
user = user.replace('@', '')
|
|
|
|
logging.info("Adding %s to whitelist", user)
|
|
self.tts_allowed.append(user)
|
|
|
|
if user in self.tts_denied:
|
|
logging.info("Removing %s from deny list", user)
|
|
self.tts_denied.remove(user)
|
|
|
|
return
|
|
|
|
def dtts(self, msg):
|
|
""" !dtts command
|
|
|
|
Add user to tts_denied list and remove user from tts_allowed list
|
|
|
|
:param str msg: The IRC message triggering the command
|
|
"""
|
|
|
|
user = msg.replace('!dtts', '').strip().lower()
|
|
|
|
if user.startswith('@'):
|
|
logging.debug('Removing "@" from username')
|
|
user = user.replace('@', '')
|
|
|
|
if user not in self.tts_denied:
|
|
logging.info("Adding %s to deny list", user)
|
|
self.tts_denied.append(user)
|
|
|
|
if user in self.tts_allowed:
|
|
logging.info("Removing %s from allowed list", user)
|
|
self.tts_allowed.remove(user)
|
|
|
|
return
|
|
|
|
class HTTPserv(BaseHTTPRequestHandler):
|
|
"""Simple HTTP Server"""
|
|
|
|
def log_message(self, format, *args): # pylint: disable=redefined-builtin
|
|
"""Suppress HTTP log messages"""
|
|
return
|
|
|
|
def do_GET(self): # pylint: disable=invalid-name
|
|
"""Process GET requests"""
|
|
if self.path == '/':
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/html')
|
|
self.end_headers()
|
|
with open("tts.html", "rb") as file:
|
|
html = file.read()
|
|
self.wfile.write(html)
|
|
|
|
elif self.path == '/favicon.ico':
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'image/x-icon')
|
|
self.end_headers()
|
|
with open("favicon.ico", "rb") as file:
|
|
icon = file.read()
|
|
self.wfile.write(icon)
|
|
|
|
elif self.path == '/tts.js':
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/javascript')
|
|
self.end_headers()
|
|
with open("tts.js", "rb") as file:
|
|
html = file.read()
|
|
self.wfile.write(html)
|
|
|
|
elif self.path == '/jquery.js':
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/javascript')
|
|
self.end_headers()
|
|
with open("jquery.js", "rb") as file:
|
|
html = file.read()
|
|
self.wfile.write(html)
|
|
|
|
elif self.path == '/bootstrap.min.css':
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/css')
|
|
self.end_headers()
|
|
with open("bootstrap.min.css", "rb") as file:
|
|
html = file.read()
|
|
self.wfile.write(html)
|
|
|
|
elif self.path.startswith('/tts_queue'):
|
|
tts_json = ""
|
|
tts = {}
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/json')
|
|
self.end_headers()
|
|
|
|
usermap = conf['USERMAP']
|
|
sorted_tts = {k: msg_queue[k] for k in sorted(msg_queue, reverse=False)}
|
|
|
|
for key in list(sorted_tts.keys()):
|
|
if key not in tts_done:
|
|
if msg_queue[key][0].lower() in usermap:
|
|
tts = {str(key): str(usermap[msg_queue[key][0].lower()]) + " " + str(conf['MESSAGE']['SAYS']) + ":" + str(msg_queue[key][1])}
|
|
else:
|
|
tts = {str(key): str(msg_queue[key][0]) + " " + str(conf['MESSAGE']['SAYS']) + ":" + str(msg_queue[key][1])}
|
|
|
|
tts_json = json.dumps(tts)
|
|
self.wfile.write(bytes(str(tts_json)+"\n", "utf-8"))
|
|
|
|
elif self.path.startswith('/tts_done'):
|
|
get_params = parse_qs(self.path)
|
|
if '/tts_done?id' in get_params:
|
|
logging.info("Removing message from queue")
|
|
tts_done.append(get_params['/tts_done?id'][0])
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/plain')
|
|
self.end_headers()
|
|
self.wfile.write(bytes("OK\n", "utf-8"))
|
|
else:
|
|
self.send_response(500)
|
|
self.send_header('Content-type', 'text/plain')
|
|
self.end_headers()
|
|
self.wfile.write(bytes("Internal Server error\n", "utf-8"))
|
|
|
|
elif self.path.startswith('/token') and conf['IRC_OAUTH_TOKEN'] == "Invalid":
|
|
data = {}
|
|
data['client_id'] = "ebo548vs6tq54c9zlrgin2yfzzlrrs"
|
|
data['response_type'] = "token"
|
|
data['scope'] = "chat:edit chat:read"
|
|
if conf['HTTP_PORT'] == 80:
|
|
data['redirect_uri'] = "http://localhost/token"
|
|
elif conf['HTTP_PORT'] == 8080:
|
|
data['redirect_uri'] = "http://localhost:8080/token"
|
|
elif conf['HTTP_PORT'] == 3000:
|
|
data['redirect_uri'] = "http://localhost:3000/token"
|
|
else:
|
|
self.send_response(500)
|
|
self.send_header('Content-type', 'text/plain')
|
|
self.end_headers()
|
|
self.wfile.write(bytes("You can only use this function if HTTP_PORT is 80, 8080 or 3000. Please change your port, or use https://www.21x9.org/twitch instead.\n", "utf-8"))
|
|
return False
|
|
|
|
try:
|
|
url_values = urllib.parse.urlencode(data)
|
|
url = "https://id.twitch.tv/oauth2/authorize"
|
|
full_url = url + "?" + url_values
|
|
data = urllib.request.urlopen(full_url)
|
|
if data:
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/html')
|
|
self.end_headers()
|
|
self.wfile.write(bytes("<html><head><title>OAuth Token Generator</title></head><body onload=\"displayCode();\"><div id=\"code\"><a href=\""+str(data.geturl())+"\">Click to start the OAuth process.</a></div><script>function displayCode() { var url = window.location.href; var test = url.indexOf(\"access_token\");if (test != -1) { token = url.substring(42,72); document.getElementById(\"code\").innerHTML = \"<p>oauth:\" + token + \"</p><p>Copy the token into your config.yml and restart the bot.</p>\";}}</script></body></html>\n", "utf-8"))
|
|
else:
|
|
self.send_response(500)
|
|
self.send_header('Content-type', 'text/plain')
|
|
self.end_headers()
|
|
self.wfile.write(bytes("Could not get OAuth-URL from Twitch\n", "utf-8"))
|
|
except Exception: # pylint: disable=broad-except
|
|
logging.error('Could not fetch OAuth-URL from Twitch.')
|
|
else:
|
|
self.send_response(404)
|
|
self.send_header('Server', 'TTS')
|
|
self.send_header('Content-type', 'text/plain')
|
|
self.end_headers()
|
|
self.wfile.write(bytes("File not found.\n", "utf-8"))
|
|
|
|
return
|
|
|
|
def http_serve_forever(httpd):
|
|
"""httpd loop"""
|
|
httpd.serve_forever()
|
|
|
|
def load_config():
|
|
"""Loading config variables"""
|
|
|
|
logging.info("Loading configfile")
|
|
|
|
try:
|
|
with open("config.yml", "r", encoding="UTF-8") as ymlfile:
|
|
cfg = yaml.load(ymlfile, Loader=yaml.Loader)
|
|
except FileNotFoundError:
|
|
logging.fatal('Your config file is missing, please copy config-dist.yml to config.yml and review your settings.')
|
|
sys.exit(253)
|
|
|
|
for section in cfg:
|
|
try:
|
|
logging.debug('Fetching config: %s', section)
|
|
conf['IRC_CHANNEL'] = cfg.get('irc', {}).get('channel', False)
|
|
conf['IRC_USERNAME'] = cfg.get('irc', {}).get('username', False)
|
|
conf['IRC_OAUTH_TOKEN'] = cfg.get('irc', {}).get('oauth_token', False)
|
|
conf['IRC_SERVER'] = cfg.get('irc', {}).get('server', "irc.chat.twitch.tv")
|
|
conf['IRC_CLEARMSG_TIMEOUT'] = cfg.get('irc', {}).get('clearmsg_timeout', 60)
|
|
|
|
conf['IRC_SUBONLY'] = cfg.get('bot', {}).get('subonly', False)
|
|
conf['IRC_MODONLY'] = cfg.get('bot', {}).get('modonly', False)
|
|
conf['IRC_TTS_LEN'] = cfg.get('bot', {}).get('message_length', 200)
|
|
conf['TTS_STARTENABLED'] = cfg.get('bot', {}).get('start_enabled', False)
|
|
|
|
conf['LOG_LEVEL'] = cfg.get('log', {}).get('level', "INFO")
|
|
conf['HTTP_PORT'] = cfg.get('http', {}).get('port', 80)
|
|
conf['HTTP_BIND'] = cfg.get('http', {}).get('bind', "localhost")
|
|
|
|
conf['MESSAGE'] = {}
|
|
conf['MESSAGE']['TOFF'] = cfg.get('messages', {}).get('toff', "TTS is now disabled.")
|
|
conf['MESSAGE']['TON'] = cfg.get('messages', {}).get('ton', "TTS is now active.")
|
|
conf['MESSAGE']['TOO_LONG'] = cfg.get('messages', {}).get('too_long', "Sorry, your message is too long.")
|
|
conf['MESSAGE']['DISABLED'] = cfg.get('messages', {}).get('disabled', "Sorry, TTS is disabled.")
|
|
conf['MESSAGE']['DENIED'] = cfg.get('messages', {}).get('denied', "Sorry, you're not allowed to use TTS.")
|
|
conf['MESSAGE']['SUBONLY'] = cfg.get('messages', {}).get('subonly', "Sorry, TTS is sub-only.")
|
|
conf['MESSAGE']['MODONLY'] = cfg.get('messages', {}).get('modonly', "Sorry, TTS is mod-only.")
|
|
conf['MESSAGE']['READY'] = cfg.get('messages', {}).get('ready', "TTS bot is ready.")
|
|
conf['MESSAGE']['WHITELISTONLY'] = cfg.get('messages', {}).get('whitelist', False)
|
|
conf['MESSAGE']['SAYS'] = cfg.get('messages', {}).get('says', "says")
|
|
|
|
conf['MESSAGE']['VOTESTART'] = cfg.get('messages', {}).get('votestart', "Quickvote started. Send #yourchoice to participate.")
|
|
conf['MESSAGE']['VOTEEND'] = cfg.get('messages', {}).get('voteend', "Quickvote ended. The results are:")
|
|
conf['MESSAGE']['VOTENOBODY'] = cfg.get('messages', {}).get('votenobody', "Nobody casted a vote. :(")
|
|
conf['MESSAGE']['VOTERESULT'] = cfg.get('messages', {}).get('voteresult', "Voting has ended. The result is:")
|
|
conf['MESSAGE']['VOTES'] = cfg.get('messages', {}).get('votes', "Stimmen")
|
|
|
|
conf['USERMAP'] = cfg.get('usermapping', [])
|
|
|
|
if 'whitelist' in cfg:
|
|
conf['WHITELIST'] = True
|
|
conf['WHITELIST_USER'] = cfg['whitelist']
|
|
else:
|
|
conf['WHITELIST'] = False
|
|
|
|
except KeyError:
|
|
logging.exception('Your config file is invalid, please check and try again.')
|
|
sys.exit(254)
|
|
|
|
if conf['WHITELIST']:
|
|
logging.info('Whitelist mode enabled')
|
|
logging.debug('Whitelist:')
|
|
logging.debug(conf['WHITELIST_USER'])
|
|
|
|
if not conf['IRC_CHANNEL']:
|
|
raise ValueError('Please add your twitch channel to config.yml.')
|
|
if not conf['IRC_USERNAME']:
|
|
raise ValueError('Please add the bots username to config.yml.')
|
|
if not conf['IRC_OAUTH_TOKEN']:
|
|
conf['IRC_OAUTH_TOKEN'] = "Invalid"
|
|
return conf
|
|
if not conf['IRC_OAUTH_TOKEN'].startswith('oauth:'):
|
|
raise ValueError('Your oauth-token is invalid, it has to start with: "oauth:"')
|
|
|
|
return conf
|
|
|
|
conf = {}
|
|
tts_done = []
|
|
msg_queue_raw = []
|
|
msg_queue = {}
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(module)s %(threadName)s %(levelname)s: %(message)s')
|
|
sys.tracebacklimit = 0
|
|
|
|
if sys.argv[1:]:
|
|
if sys.argv[1] == "--version":
|
|
print('Simple TTS Bot')
|
|
print('Version 1.2.1')
|
|
sys.exit(1)
|
|
|
|
def main():
|
|
"""Main loop"""
|
|
|
|
global conf # pylint: disable=global-statement,invalid-name
|
|
|
|
conf = load_config()
|
|
lastreload = datetime.datetime.now()
|
|
logging.getLogger().setLevel(conf['LOG_LEVEL'])
|
|
if conf['LOG_LEVEL'] == 'DEBUG':
|
|
sys.tracebacklimit = 5
|
|
|
|
logging.info("Starting Webserver")
|
|
httpd = socketserver.TCPServer((conf['HTTP_BIND'], conf['HTTP_PORT']), HTTPserv)
|
|
httpd.allow_reuse_port = True
|
|
httpd.allow_reuse_address = True
|
|
|
|
http_thread = Thread(target=http_serve_forever, daemon=True, args=(httpd, ))
|
|
http_thread.start()
|
|
|
|
if conf['IRC_OAUTH_TOKEN'] == "Invalid":
|
|
logging.error('No OAuth Token, skipping start of IRC bot.')
|
|
logging.error('Please open http://%s:%s/token to generate your OAuth-Token.', conf['HTTP_BIND'], conf['HTTP_PORT'])
|
|
url = 'http://'+str(conf['HTTP_BIND'])+':'+str(conf['HTTP_PORT'])+'/token'
|
|
webbrowser.open_new_tab(url)
|
|
logging.info('Please complete the OAuth process within the next 15 minutes.')
|
|
time.sleep(900)
|
|
sys.exit(250)
|
|
else:
|
|
logging.info("Starting IRC bot")
|
|
irc = IRC()
|
|
|
|
irc.connect(conf['IRC_SERVER'], 6667, conf['IRC_CHANNEL'], conf['IRC_USERNAME'], conf['IRC_OAUTH_TOKEN'])
|
|
irc.sendmsg(conf['IRC_CHANNEL'], 'MrDestructoid', conf['MESSAGE']['READY'])
|
|
|
|
logging.info("Please open your browser and visit: http://%s:%s/", conf['HTTP_BIND'], conf['HTTP_PORT'])
|
|
url = 'http://'+str(conf['HTTP_BIND'])+':'+str(conf['HTTP_PORT'])
|
|
webbrowser.open_new_tab(url)
|
|
|
|
while True:
|
|
if conf['LOG_LEVEL'] == "DEBUG":
|
|
time.sleep(1)
|
|
|
|
try:
|
|
irc.get_response()
|
|
|
|
confreload = datetime.datetime.now()
|
|
if confreload - lastreload > datetime.timedelta(seconds=60):
|
|
conf = load_config()
|
|
lastreload = datetime.datetime.now()
|
|
|
|
if irc.quickvote_status and irc.votemsg:
|
|
logging.info('Quickvote is active')
|
|
irc.sendmsg(conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART'] + " (" + str(irc.votemsg) + ")")
|
|
|
|
if not irc.tts_status:
|
|
logging.debug("TTS is disabled")
|
|
if conf['LOG_LEVEL'] == "DEBUG":
|
|
time.sleep(1)
|
|
continue
|
|
|
|
logging.debug('Raw message queue:')
|
|
logging.debug(msg_queue_raw)
|
|
|
|
for raw_msg in msg_queue_raw:
|
|
logging.debug('Raw msg:')
|
|
logging.debug(msg_queue_raw)
|
|
|
|
now = datetime.datetime.now()
|
|
if now - raw_msg['queuetime'] > datetime.timedelta(seconds=conf['IRC_CLEARMSG_TIMEOUT']):
|
|
logging.debug('clearmsg_timeout reached')
|
|
if raw_msg['timestamp'] not in msg_queue:
|
|
logging.info('Sending TTS message')
|
|
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
|
|
logging.debug(msg_queue)
|
|
else:
|
|
logging.debug('Msg is already in queue')
|
|
except KeyboardInterrupt:
|
|
logging.info('Exiting...')
|
|
sys.exit()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|