twitchtts/tts.py

1692 lines
56 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# pylint: disable=global-statement,broad-except,too-many-lines
"""
TwitchTTS
Copyright (C) 2022 gpkvt
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import os
import sys
import time
import json
import signal
import socket
import random
import logging
import datetime
import webbrowser
import urllib.request
from threading import Thread
from collections import Counter
from urllib.parse import parse_qs
from urllib.error import HTTPError
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
from logging.handlers import RotatingFileHandler
import yaml
import requests
import wikipedia
from fuzzywuzzy import process
class IRC:
""" IRC bot """
irc = socket.socket()
def __init__(self):
self.irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tts = {
"status": CONF['TTS_STARTENABLED'],
"whitelist": [],
"blacklist": []
}
self.wikitts = {
"status": CONF['FEATURE']['WIKITTS']
}
self.quickvote = {
"status": False,
"message": False,
"count": 0,
"data": {}
}
self.pick = {
"status": False,
"number": 1,
"count": 0,
"pickme": []
}
if 'WHITELIST_USER' in CONF:
self.tts_allowed = CONF['WHITELIST_USER']
def connect(self, server, port, channel, botnick, botpass):
""" Connect to Twitch IRC servers
:param str server: Servername or IP
:param int port: Server Port
:param str channel: Channel to connect to
:param str botnick: Username
:param str botpass: OAuth Token
"""
logging.info("Connecting to: %s", server)
logging.info('Waiting...')
botpass = botpass.replace('oauth:','')
try:
self.irc.connect((server, port))
except ConnectionResetError:
logging.fatal('Twitch refused to connect, please check your settings and try again.')
sys.exit(252)
self.irc.settimeout(1)
self.irc.send(bytes("PASS oauth:" + botpass + "\r\n", "UTF-8"))
self.irc.send(bytes(
"USER " + botnick + " " + botnick +" " + botnick + " :python\r\n", "UTF-8")
)
self.irc.send(bytes("NICK " + botnick + "\r\n", "UTF-8"))
self.irc.send(bytes("CAP REQ :twitch.tv/commands twitch.tv/tags \r\n", "UTF-8"))
time.sleep(5)
try:
self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8"))
except ConnectionResetError:
logging.warning('JOIN was refused, will try again in 30 seconds.')
time.sleep(30)
logging.warning('Please check your credentials, if this error persists.')
self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8"))
def sendmsg(self, channel, user, msg):
""" Send (a public) message to IRC channel
:param str channel: Channel to post msg to
:param str user: User to address message to
:param str msg: Message to send
"""
self.irc.send(bytes("PRIVMSG "+channel+" :"+user+" "+msg+"\r\n", "UTF-8"))
def __resp_ping(self):
""" Respond to PING
Respond to PING messages, to keep the connection alive.
"""
logging.debug('PING received')
self.irc.send(bytes('PONG :tmi.twitch.tv\r\n', "UTF-8"))
def __resp_notice(self, resp):
""" Respond to NOTICE
:param str resp: IRC Server message (in response to command)
"""
if 'Login authentication failed' in resp:
try:
raise RuntimeError()
except RuntimeError:
logging.exception('Login failed, please check your credentials and try again.')
sys.exit(251)
def __resp_clearmsg(self, resp):
""" Respond to CLEARMSG
:param str resp: IRC Server message
"""
logging.info('CLEARMSG received')
msgid = False
global MSG_QUEUE_RAW
filtered_msg_queue = []
tags = resp.split(';')
for tag in tags:
if "target-msg-id=" in tag:
msgid = tag.rsplit('target-msg-id=',1)[1]
logging.debug('Trying to suppress message')
logging.debug(msgid)
for msg in list(MSG_QUEUE_RAW):
if msg['msgid'] == msgid:
logging.info('Suppressing message %s', msgid)
else:
filtered_msg_queue.append(msg)
MSG_QUEUE_RAW = filtered_msg_queue
def __resp_privmsg(self, resp):
""" Respond to PRIVMSG
:param str resp: IRC Server message
"""
logging.debug('PRIVMSG received')
tags = self.__get_tags(resp)
if not tags:
return False
message = self.__get_message(resp)
self.__priviledged_commands(message, tags)
self.__unpriviledged_commands(message, tags)
def __get_tags(self, resp):
""" Strip tags from response
:param str resp: IRC Server message
:return dict tags: Message metadata (tags)
"""
tags = resp.split(';')
for tag in tags:
if tag.startswith('badges='):
badges = tag.rsplit('badges=',1)[1]
if tag.startswith('subscriber='):
subscriber = tag.rsplit('subscriber=',1)[1]
logging.debug('Subscriber: %s', subscriber)
if tag.startswith('id='):
msgid = tag.rsplit('id=',1)[1]
logging.debug('Message ID: %s', msgid)
if tag.startswith('display-name='):
user = tag.rsplit('display-name=',1)[1].lower()
logging.debug('Username: %s', user)
tags = {}
try:
tags['badges'] = badges
tags['subscriber'] = subscriber
tags['msgid'] = msgid
tags['user'] = user
except: # pylint: disable=bare-except
logging.error('Received an invalid message')
return False
return tags
def __get_message(self, resp):
""" Transform IRC server message and determine length
:param str resp: IRC Server message
:return dict msg: Processed message
"""
msg = {}
msg['message'] = resp.rsplit('PRIVMSG #',1)[1].split(':',1)[1].replace('\r\n','')
msg['length'] = len(msg['message'])
return msg
def __unpriviledged_commands(self, message, tags):
""" Process unpriviledged commands
:param dict message: Message
:param dict tags: Message metadata (tags)
"""
msg = message['message']
user = tags['user']
if msg.startswith('#pickme') and self.pick['status'] is True:
logging.info('Pickme detected')
self.pick['count'] = self.pick['count'] + 1
self.pick['pickme'].append(user)
logging.debug("pickme %s added", user)
return
if msg.startswith('#') and self.quickvote['status'] is True:
logging.info('Quickvote: Cast detected')
self.quickvote['count'] += 1
self.quickvote['data'][user] = msg.lower()
logging.debug("quickvote: %s", self.quickvote)
return
if msg.startswith('!tts'):
logging.info('!tts command detected')
self.__ttscmd(message, tags)
return
if msg.startswith('!wiki') and CONF['FEATURE']['WIKI']:
logging.debug("!wiki command detected")
self.__wikicmd(tags, msg)
return
if CONF['FEATURE']['QUOTE']:
if msg.startswith('!addquote'):
logging.debug("!addquote command detected")
self.__addquotecmd(tags, msg)
return
if msg.startswith('!smartquote') or msg.startswith('!sq'):
logging.debug("!smartquote command detected")
self.__quotecmd(msg)
return
def __priviledged_commands(self, message, tags):
""" Process priviledged commands
:param dict message: Message
:param dict tags: Message metadata (tags)
"""
msg = message['message']
badges = tags['badges']
user = tags['user']
if 'broadcaster' in badges or 'moderator' in badges:
if msg.startswith('!ping') and CONF['FEATURE']['PING']:
logging.debug("Ping check received.")
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
"Pong!"
)
elif msg.startswith('!version') and CONF['FEATURE']['VERSION']:
logging.debug("!version command detected")
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
VERSION
)
elif msg.startswith('!toff'):
logging.info('TTS is now turned off')
msg_queue.clear()
MSG_QUEUE_RAW.clear()
self.tts['status'] = False
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
CONF['MESSAGE']['TOFF']
)
elif msg.startswith('!ton'):
logging.info('TTS is now turned on')
msg_queue.clear()
MSG_QUEUE_RAW.clear()
self.tts['status'] = True
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
CONF['MESSAGE']['TON']
)
elif msg.startswith('!wton'):
logging.info('Wiki TTS is now turned on')
self.wikitts['status'] = True
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
CONF['MESSAGE']['WIKITON']
)
elif msg.startswith('!wtoff'):
logging.info('Wiki TTS is now turned off')
self.wikitts['status'] = False
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
CONF['MESSAGE']['WIKITOFF']
)
elif msg.startswith('!pick') and CONF['FEATURE']['PICK']:
logging.debug("!pick command detected")
self.__pickcmd(msg)
elif msg.startswith('!random') and CONF['FEATURE']['RANDOM']:
logging.info('!random command detected')
self.__randomcmd(msg)
elif msg.startswith('!quickvote') and CONF['FEATURE']['QUOTE']:
logging.info("!quickvote command detected")
self.__quickvotecmd(msg)
elif msg.startswith('!ptts'):
logging.debug("!ptts command detected")
self.__ptts(msg)
elif msg.startswith('!dtts'):
logging.debug("!dtts command detected")
self.__dtts(msg)
elif msg.startswith('!usermap'):
logging.info('!usermap command detected')
self.__usermap(msg)
elif msg.startswith('!delay'):
logging.info('!delay command detected')
self.__delay(msg)
def check_subonly(self, tags):
""" Check if subonly mode is enabled and sender is sub
:param dict tags: Message metadata (tags)
:return bool:
"""
if not CONF['IRC_SUBONLY']:
return False
subscriber = tags['subscriber']
badges = tags['badges']
user = tags['user']
if subscriber != "0" or 'moderator' in badges or 'broadcaster' in badges:
logging.info('TTS is sub-only and user has allowance')
return False
logging.debug('TTS is sub-only')
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
CONF['MESSAGE']['SUBONLY']
)
return True
def check_modonly(self, tags):
""" Check if modonly mode is enabled and sender is mod
:param dict tags: Message metadata (tags)
:return bool:
"""
if not CONF['IRC_MODONLY']:
return False
badges = tags['badges']
user = tags['user']
if 'moderator' in badges or 'broadcaster' in badges:
logging.info('TTS is mod-only and user has allowance')
return False
logging.debug('TTS is mod-only')
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
CONF['MESSAGE']['MODONLY']
)
return True
def check_tts_disabled(self, user):
""" Check if TTS is disabled
:param str user: Username
:return bool:
"""
if not self.tts['status']:
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
CONF['MESSAGE']['DISABLED']
)
return True
logging.debug('TTS is enabled')
return False
def check_msg_too_long(self, message, user):
""" Check if message is too long
:param dict message: Message
:param str user: Username
:return bool:
"""
if message['length'] > CONF['IRC_TTS_LEN']:
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
CONF['MESSAGE']['TOO_LONG']
)
return True
logging.debug('Check length: Message is ok')
return False
def check_user_denied(self, user):
""" Check if user is on denied list
Check if the given user is on the TTS blacklist
:param str user: Username
:return bool:
"""
if user in self.tts['denied']:
logging.info("%s is not allowed to use TTS", user)
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
CONF['MESSAGE']['DENIED']
)
return True
logging.debug("%s is allowed to use TTS", user)
return False
def check_whitelist(self, user):
""" Check Whitelist
Checks if the given user is on the TTS whitelist
:param str user: Username
:return bool:
"""
if CONF['WHITELIST']:
if user not in self.tts_allowed:
logging.debug("tts_allowed: %s", self.tts_allowed)
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
CONF['MESSAGE']['WHITELISTONLY']
)
return False
return True
return False
def send_tts_msg(self, message, tags):
""" Send message to TTS queue
:param dict tags: Message metadata (tags)
:param str message: IRC message
"""
logging.info('Valid TTS message, adding to raw queue')
tts = True
now = datetime.datetime.now()
timestamp = str(time.time_ns())
user = tags['user']
msgid = tags['msgid']
badges = tags['badges']
subscriber = tags['subscriber']
msg = message['message']
msglen = message['length']
msg = msg.replace('!tts','',1)
msg = {
"TTS": tts,
"msg": msg,
"badges": badges,
"subscriber": subscriber,
"msgid": msgid,
"user": user,
"length": msglen,
"queuetime": now,
"timestamp": timestamp
}
MSG_QUEUE_RAW.append(msg)
def get_response(self):
""" Get and process response from IRC """
try:
resp = self.irc.recv(2048).decode("UTF-8")
logging.debug('resp: %s', resp)
except socket.timeout:
return
except Exception:
logging.exception('An unknown error occured while getting a IRC response.')
sys.exit(255)
if resp.find('PING') != -1:
self.__resp_ping()
if resp.find('CLEARMSG') != -1:
self.__resp_clearmsg(resp)
if resp.find('NOTICE') != -1:
self.__resp_notice(resp)
if resp.find('PRIVMSG') != -1:
self.__resp_privmsg(resp)
def __ttscmd(self, msg, tags):
""" !tts command
Check if message is valid and send it to queue
:param str msg: The IRC message triggering the command
:param dict tags: The message metadata
"""
user = tags['user']
if IRC.check_tts_disabled(self, user):
logging.info('TTS is disabled')
elif IRC.check_msg_too_long(self, msg, user):
logging.info('TTS message is too long')
elif IRC.check_user_denied(self, user):
logging.info('User is not allowed to use TTS')
elif IRC.check_subonly(self, tags):
logging.info('TTS is sub-only')
elif IRC.check_modonly(self, tags):
logging.info('TTS is mod-only')
elif IRC.check_whitelist(self, user):
logging.info('User is not on whitelist')
else:
logging.info('Sending TTS message to raw_queue')
IRC.send_tts_msg(self, msg, tags)
def __addquotecmd(self, tags, msg):
""" !addquote command
Adds a newline to quotes.txt
:param dict tags: Message metadata (tags)
:param str msg: IRC message triggering the command
"""
user = tags['user']
if IRC.check_user_denied(self, user):
logging.info('User is not allowed to use TTS')
elif IRC.check_subonly(self, tags):
logging.info('TTS is sub-only')
else:
try:
with open("quotes.txt", "rb") as file:
nol = len(file.readlines())
file.close()
except FileNotFoundError:
logging.warning("quotes.txt does not exists, will create")
nol = 0
nol = nol + 1
quote = msg.replace("!addquote ", "").strip()
quote = quote.split(" ",1)
username = quote[0]
date = time.strftime("%d.%m.%Y")
try:
token = CONF['IRC_OAUTH_TOKEN'].replace('oauth:','')
login = CONF['IRC_CHANNEL'].replace('#','')
api_endpoint = f"https://api.twitch.tv/helix/users?login={login}"
headers = {
'Content-type': 'application/x-form-urlencoded',
'Authorization': 'Bearer '+token,
'Client-Id': 'ebo548vs6tq54c9zlrgin2yfzzlrrs'
}
req = requests.get(url=api_endpoint, headers=headers)
data = req.json()
user_id = data['data'][0]['id']
api_endpoint = f"https://api.twitch.tv/helix/channels?broadcaster_id={user_id}"
headers = {
'Content-type': 'application/x-form-urlencoded',
'Authorization': 'Bearer '+token,
'Client-Id': 'ebo548vs6tq54c9zlrgin2yfzzlrrs'
}
req = requests.get(url=api_endpoint, headers=headers)
data = req.json()
game = data['data'][0]['game_name']
quote = f"#{nol}: \"{quote[1]}\" -{username}/{game} ({date})\n"
except Exception:
logging.warning('Could not get metadata for quote')
quote = f"#{nol}: \"{quote[1]}\" -{username} ({date})\n"
logging.info('Adding quote %s', quote)
with open("quotes.txt", "ab") as file:
file.write(quote.encode('utf-8'))
message = f"{CONF['MESSAGE']['QUOTE_ADDED_PREFIX']} \
#{nol} {CONF['MESSAGE']['QUOTE_ADDED_SUFFIX']}"
raw_msg = {
"TTS": True,
"msg": message,
"badges": True,
"subscriber": True,
"msgid": True,
"user": CONF['IRC_USERNAME'],
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
message
)
def __wikicmd(self, tags, msg):
""" !wiki command
Search for wikipedia articles and return the first 3 sentences
:param dict tags: Message metadata (tags)
:param str msg: IRC message triggering the command
"""
try:
user = tags['user']
wikipedia.set_lang(CONF['WIKI_LANG'])
msg = msg.replace('!wiki', '').strip()
wikiresult = wikipedia.summary(msg, sentences=CONF['FEATURE']['WIKISENTENCES'])
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
wikiresult
)
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user),
wikipedia.page(msg).url
)
message = wikiresult.replace('==', '')
raw_msg = {
"TTS": True,
"msg": message,
"badges": True,
"subscriber": True,
"msgid": True,
"user": 'wikipedia',
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
if self.wikitts['status']:
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
except wikipedia.exceptions.DisambiguationError:
user = f"@{user}"
self.sendmsg(CONF['IRC_CHANNEL'], user, CONF['MESSAGE']['WIKI_TOO_MANY'])
except Exception:
user = f"@{user}"
self.sendmsg(CONF['IRC_CHANNEL'], user, CONF['MESSAGE']['WIKI_NO_RESULT'])
def __quotecmd(self, msg = False):
""" !smartquote command
Gets a line from quotes.txt. If a number if given as msg
it fetches the given line number. If a string is given
it fetches the best matching line. If nothing is given
it fetches a random line.
:param dict tags: Message metadata (tags)
:param str msg: IRC message triggering the command
"""
try:
query = msg.replace('!smartquote', '').strip()
query = msg.replace('!sq', '').strip()
if query.isdigit():
logging.info('Fetching quote #%s', query)
file = open("quotes.txt", "rb")
quotes = file.readlines()
for line in quotes:
if line.decode('utf-8').startswith("#"+str(query)+":"):
quote = line
break
file.close()
elif query != "":
logging.info('Fetching match for %s', query)
file = open("quotes.txt", "rb")
quotes = file.readlines()
matches = process.extract(query, quotes, limit=10)
quotes = []
for match, score in matches:
if score >= 60:
quotes.append(match)
logging.debug('Quotes: %s', quotes)
if len(quotes) >= 5:
quote = random.choice(quotes)
else:
quote = quotes[0]
else:
logging.info('Fetching random quote')
with open("quotes.txt", "rb") as file:
lines = file.read().splitlines()
quote = random.choice(lines)
except FileNotFoundError:
logging.error('"quotes.txt does not exists.')
except IndexError:
logging.error('Error fetching quote.')
if not 'quote' in vars():
logging.info('No quote found.')
quote = CONF['MESSAGE']['QUOTE_NOT_FOUND']
self.sendmsg(
CONF['IRC_CHANNEL'],
"",
quote
)
return False
if not isinstance(quote, str):
quote = quote.decode('utf-8')
logging.info('Sending quote to TTS')
logging.debug("Quote: %s", quote)
self.sendmsg(
CONF['IRC_CHANNEL'],
"",
quote
)
message = quote.rsplit('(', 1)[0]
raw_msg = {
"TTS": True,
"msg": message,
"badges": True,
"subscriber": True,
"msgid": True,
"user": CONF['IRC_USERNAME'],
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
return True
def __delay(self, msg):
""" !delay command
Adjust the delay setting in config.yml
:param str msg: The IRC message triggering the command
"""
try:
delay = msg.split(' ')[1]
except Exception:
delay = False
if delay:
with open('config.yml','r', encoding='utf-8') as yamlfile:
cur_yaml = yaml.safe_load(yamlfile)
cur_yaml['irc']['clearmsg_timeout'] = int(delay)
if cur_yaml:
with open('config.yml','w', encoding='utf-8') as yamlfile:
yaml.safe_dump(cur_yaml, yamlfile)
load_config()
def __usermap(self, msg):
""" !usermap command
Adds a new entry to usermapping in config.yml
:param str msg: The IRC message triggering the command
"""
try:
msg = msg.replace('!usermap ', '')
splitmsg = msg.split(" ")
username, *mappingname = splitmsg
mappingname = ' '.join(mappingname)
except Exception:
username = False
mappingname = False
if username and mappingname:
with open('config.yml','r', encoding='utf-8') as yamlfile:
cur_yaml = yaml.safe_load(yamlfile)
cur_yaml['usermapping'].update({username: mappingname})
if cur_yaml:
with open('config.yml','w', encoding='utf-8') as yamlfile:
yaml.safe_dump(cur_yaml, yamlfile)
load_config()
def __pickcmd(self, msg):
""" !pick command
Pick a number of users who typed #pickme in the chat
after the pick command was started.
:param str msg: Number of users to pick
"""
if self.pick['status']:
logging.info('Pick stopped')
logging.debug("Got %s participats, wanted %s",
self.pick['count'],
self.pick['number']
)
try:
if int(self.pick['count']) > int(self.pick['number']):
picks = random.sample(self.pick['pickme'], self.pick['number'])
logging.info('Got more than the requested number of participants')
else:
picks = self.pick['pickme']
logging.info('Got less than or exactly the \
requested number of participants')
converted_picks = [str(element) for element in picks]
joined_picks = " ".join(converted_picks)
except Exception:
logging.error("There was an error during picking.")
joined_picks = False
message = f"{CONF['MESSAGE']['PICKRESULT']} {joined_picks}"
if joined_picks:
raw_msg = {
"TTS": True,
"msg": message,
"badges": True,
"subscriber": True,
"msgid": True,
"user": CONF['IRC_USERNAME'],
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
self.sendmsg(
CONF['IRC_CHANNEL'], "",
CONF['MESSAGE']['PICKRESULT']
)
self.sendmsg(
CONF['IRC_CHANNEL'], "*",
joined_picks
)
else:
self.sendmsg(
CONF['IRC_CHANNEL'], "*",
CONF['MESSAGE']['PICKNONE']
)
self.pick['status'] = False
self.pick['pickme'] = []
self.pick['count'] = 0
return
logging.debug('Pick started')
self.pick['status'] = True
try:
msg = msg.split(' ')[1].strip()
self.pick['number'] = msg
except IndexError:
self.pick['number'] = self.pick['number']
logging.info("Will pick %s participants", self.pick['number'])
self.sendmsg(
CONF['IRC_CHANNEL'], "@chat",
CONF['MESSAGE']['PICKSTART']
)
return
def __quickvotecmd(self, msg):
""" !quickvote command
Starts or stops the !quickvote function. On stop calculates the 5 most casted
votes and send them to chat. The highest vote is send to msg_queue.
:param str msg: The IRC message triggering the command
"""
if self.quickvote['status']:
logging.debug('Quickvote stopped')
if self.quickvote['count'] == 0:
logging.info("Nobody voted")
self.sendmsg(
CONF['IRC_CHANNEL'],
"@chat",
CONF['MESSAGE']['VOTEEND']
)
self.sendmsg(
CONF['IRC_CHANNEL'],
"*",
CONF['MESSAGE']['VOTENOBODY']
)
raw_msg = {
"TTS": True,
"msg": CONF['MESSAGE']['VOTENOBODY'],
"badges": True,
"subscriber": True,
"msgid": True,
"user": CONF['IRC_USERNAME'],
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
logging.info('The result is: %s', CONF['MESSAGE']['VOTENOBODY'])
logging.debug('Votemsg: %s', msg)
self.quickvote['status'] = False
self.quickvote['data'] = {}
return
logging.info("Counting votes")
count = 0
count = Counter(self.quickvote['data'].values()).most_common(5)
self.sendmsg(
CONF['IRC_CHANNEL'],
"@chat",
CONF['MESSAGE']['VOTEEND']
)
logging.debug(count)
result = str(count[0][0].replace('#',''))
message = f"{CONF['MESSAGE']['VOTERESULT']} {result}"
raw_msg = {
"TTS": True,
"msg": message ,
"badges": True,
"subscriber": True,
"msgid": True,
"user": CONF['IRC_USERNAME'],
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
logging.info('The result is: %s', CONF['MESSAGE']['VOTERESULT'] +" "+ str(count[0]))
logging.debug('Votemsg: %s', msg)
for key, value in count:
message = f"{key} ({value}) {CONF['MESSAGE']['VOTES']})"
self.sendmsg(
CONF['IRC_CHANNEL'], "*",
message
)
self.quickvote['status'] = False
self.quickvote['data'] = {}
self.quickvote['count'] = 0
return
logging.debug('Quickvote started')
self.quickvote['status'] = True
self.quickvote['message'] = msg.split('!quickvote', 1)[1].strip()
if self.quickvote['message']:
self.sendmsg(
CONF['IRC_CHANNEL'], "@chat",
CONF['MESSAGE']['VOTESTART'] + " (" + str(self.quickvote['message']) + ")"
)
else:
self.sendmsg(
CONF['IRC_CHANNEL'],
"@chat",
CONF['MESSAGE']['VOTESTART']
)
return
def __randomcmd(self, msg):
""" !random command
Read a random line from randomfile and put it into msg_queue
If no file is given in msg a standard file will be used
:param str msg: The IRC message triggering the command
:return bool:
"""
randomfile = msg.replace('!random', '').strip().lower()
if randomfile:
randomfile = f"random_{os.path.basename(randomfile)}.txt"
else:
randomfile = "random.txt"
try:
with open(randomfile,"r", encoding="utf-8") as file:
lines = file.read().splitlines()
random_msg = random.choice(lines)
except FileNotFoundError:
logging.error('%s not found', randomfile)
return False
raw_msg = {
"TTS": True,
"msg": random_msg,
"badges": True,
"subscriber": True,
"msgid": True,
"user": CONF['IRC_USERNAME'],
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
return True
def __ptts(self, msg):
""" !ptts command
Add user to tts_allowed list and remove user from tts_denied list
:param str msg: The IRC message triggering the command
"""
user = msg.replace('!ptts', '').strip().lower()
if user.startswith('@'):
logging.debug('Removing "@" from username')
user = user.replace('@', '')
logging.info("Adding %s to whitelist", user)
self.tts['whitelist'].append(user)
if user in self.tts['blacklist']:
logging.info("Removing %s from deny list", user)
self.tts['blacklist'].remove(user)
def __dtts(self, msg):
""" !dtts command
Add user to tts_denied list and remove user from tts_allowed list
:param str msg: The IRC message triggering the command
"""
user = msg.replace('!dtts', '').strip().lower()
if user.startswith('@'):
logging.debug('Removing "@" from username')
user = user.replace('@', '')
if user not in self.tts['blacklist']:
logging.info("Adding %s to deny list", user)
self.tts['blacklist'].append(user)
if user in self.tts['whitelist']:
logging.info("Removing %s from allowed list", user)
self.tts['whitelist'].remove(user)
class ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
""" Threaded HTTP Server """
class HTTPserv(BaseHTTPRequestHandler):
""" Simple HTTP Server """
def log_message(self, format, *args): # pylint: disable=redefined-builtin
""" Suppress HTTP log messages """
return
def do_GET(self): # pylint: disable=invalid-name
""" Process GET requests """
if self.path == '/':
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
with open("tts.html", "rb") as file:
html = file.read()
self.wfile.write(html)
elif self.path == '/favicon.ico':
self.send_response(200)
self.send_header('Content-type', 'image/x-icon')
self.end_headers()
with open("favicon.ico", "rb") as file:
icon = file.read()
self.wfile.write(icon)
elif self.path == '/tts.js':
self.send_response(200)
self.send_header('Content-type', 'text/javascript')
self.end_headers()
with open("tts.js", "rb") as file:
html = file.read()
self.wfile.write(html)
elif self.path == '/jquery.js':
self.send_response(200)
self.send_header('Content-type', 'text/javascript')
self.end_headers()
with open("jquery.js", "rb") as file:
html = file.read()
self.wfile.write(html)
elif self.path == '/bootstrap.min.css':
self.send_response(200)
self.send_header('Content-type', 'text/css')
self.end_headers()
with open("bootstrap.min.css", "rb") as file:
html = file.read()
self.wfile.write(html)
elif self.path.startswith('/tts_queue'):
tts_json = ""
tts = {}
self.send_response(200)
self.send_header('Content-type', 'text/json')
self.end_headers()
usermap = CONF['USERMAP']
sorted_tts = {k: msg_queue[k] for k in sorted(msg_queue, reverse=True)}
logging.debug(usermap)
for key in list(sorted_tts.keys()):
if key not in tts_done:
if msg_queue[key][0].lower() in usermap:
logging.debug('Using usermap for user: %s (%s)',
msg_queue[key][0],
usermap[msg_queue[key][0].lower()]
)
user = usermap[msg_queue[key][0].lower()]
tts = {str(key): f"{user} {CONF['MESSAGE']['SAYS']}: {msg_queue[key][1]}"}
else:
logging.debug('No usermap entry found for user: %s', msg_queue[key][0])
tts = {
str(key):
f"{msg_queue[key][0]} {CONF['MESSAGE']['SAYS']}: {msg_queue[key][1]}"
}
tts_json = json.dumps(tts)
self.wfile.write(bytes(str(tts_json)+"\n", "utf-8"))
elif self.path.startswith('/tts_done'):
get_params = parse_qs(self.path)
if '/tts_done?id' in get_params:
logging.info("Removing message from queue")
tts_done.append(get_params['/tts_done?id'][0])
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes("OK\n", "utf-8"))
else:
self.send_response(500)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes("Internal Server error\n", "utf-8"))
elif self.path.startswith('/token'):
data = {}
data['client_id'] = "ebo548vs6tq54c9zlrgin2yfzzlrrs"
data['response_type'] = "token"
data['scope'] = "chat:edit chat:read"
if CONF['HTTP_PORT'] == 80:
data['redirect_uri'] = "http://localhost/token"
elif CONF['HTTP_PORT'] == 8080:
data['redirect_uri'] = "http://localhost:8080/token"
elif CONF['HTTP_PORT'] == 3000:
data['redirect_uri'] = "http://localhost:3000/token"
else:
self.send_response(500)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes("You can only use this function \
if HTTP_PORT is 80, 8080 or 3000. \
Please change your port, or use \
https://www.21x9.org/twitch instead.\n",
"utf-8")
)
return False
try:
url_values = urllib.parse.urlencode(data)
url = "https://id.twitch.tv/oauth2/authorize"
full_url = url + "?" + url_values
data = urllib.request.urlopen(full_url)
if data:
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(bytes("<html>\
<head>\
<title>OAuth Token Generator</title>\
</head>\
<body onload=\"displayCode();\">\
<div id=\"code\">\
<a href=\""+str(data.geturl())+"\">\
Click to start the OAuth process.\
</a>\
</div>\
<script>\
function displayCode() {\
var url = window.location.href;\
var test = url.indexOf(\"access_token\");\
var qs = new URL(window.location.href.replace(/#/g,\"?\"));\
var token = qs.searchParams.get(\"access_token\");\
var scope = qs.searchParams.get(\"scope\");\
if (test != -1) { \
document.getElementById(\"code\").innerHTML = \"\
<p>Token:<br />oauth:\" + token + \"</p><p>Scope:<br />\" + scope + \"</p><p>URL:<br />\"+ url +\"</p>\
<p>Copy the token into your config.yml and restart \
the bot.</p>\";}}\
</script>\
</body>\
</html>\n",
"utf-8")
)
else:
self.send_response(500)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes("Could not get OAuth-URL from Twitch\n", "utf-8"))
except Exception:
logging.error('Could not fetch OAuth-URL from Twitch.')
else:
self.send_response(404)
self.send_header('Server', 'TTS')
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes("File not found.\n", "utf-8"))
return True
def http_serve_forever(httpd):
""" httpd loop """
httpd.serve_forever()
def send_tts_queue():
""" Send messages to TTS """
for raw_msg in MSG_QUEUE_RAW:
logging.debug('Raw msg: %s', MSG_QUEUE_RAW)
now = datetime.datetime.now()
if now - raw_msg['queuetime'] > datetime.timedelta(seconds=CONF['IRC_CLEARMSG_TIMEOUT']):
logging.debug('clearmsg_timeout reached')
if raw_msg['timestamp'] not in msg_queue:
logging.info('Sending TTS message')
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
logging.debug("msg_queue: %s", msg_queue)
else:
logging.debug('Msg is already in queue')
def get_url(path=False):
""" Generate a valid URL from config values """
if CONF['HTTP_BIND'] == "0.0.0.0":
url = "localhost"
else:
url = CONF['HTTP_BIND']
url = f"http://{url}:{CONF['HTTP_PORT']}/"
if path:
url = f"{url}{path}"
return url
def check_oauth_token():
""" Check for valid authentication via Twitch API """
global CONF
logging.debug('Checking OAuth Token')
try:
url = 'https://id.twitch.tv/oauth2/validate'
oauth = CONF['IRC_OAUTH_TOKEN'].replace('oauth:','')
oauth = f"OAuth {oauth}"
request = urllib.request.Request(url)
request.add_header('Authorization', oauth)
urllib.request.urlopen(request)
except HTTPError:
logging.fatal('Twitch rejected your OAuth Token. Please check and generate a new one.')
logging.info('Please open http://%s:%s/token to generate your OAuth-Token.',
CONF['HTTP_BIND'],
CONF['HTTP_PORT']
)
url = get_url("token")
webbrowser.open_new_tab(url)
logging.info('Please complete the OAuth process and add the token into your "config.yml" \
within the next 5 minutes.')
time.sleep(300)
CONF = load_config()
check_oauth_token()
logging.info('OAuth Token is valid')
return CONF
def load_config():
""" Loading config variables """
logging.info("Loading configfile")
try:
with open("config.yml", "r", encoding="UTF-8") as ymlfile:
cfg = yaml.load(ymlfile, Loader=yaml.Loader)
except FileNotFoundError:
logging.fatal('Your config file is missing, please copy config-dist.yml \
to config.yml and review your settings.')
sys.exit(253)
for section in cfg:
try:
logging.debug('Fetching config: %s', section)
CONF['IRC_CHANNEL'] = cfg.get('irc', {}).get(
'channel', False
)
CONF['IRC_USERNAME'] = cfg.get('irc', {}).get(
'username', False
)
CONF['IRC_OAUTH_TOKEN'] = cfg.get('irc', {}).get(
'oauth_token', False
)
CONF['IRC_SERVER'] = cfg.get('irc', {}).get(
'server', "irc.chat.twitch.tv"
)
CONF['IRC_CLEARMSG_TIMEOUT'] = cfg.get(
'irc', {}).get('clearmsg_timeout', 60
)
CONF['IRC_SUBONLY'] = cfg.get('bot', {}).get(
'subonly', False
)
CONF['IRC_MODONLY'] = cfg.get('bot', {}).get(
'modonly', False
)
CONF['IRC_TTS_LEN'] = cfg.get('bot', {}).get(
'message_length', 200
)
CONF['TTS_STARTENABLED'] = cfg.get('bot', {}).get(
'start_enabled', True
)
CONF['WIKI_LANG'] = cfg.get('bot', {}).get(
'language', 'en'
)
CONF['LOG_LEVEL'] = cfg.get('log', {}).get(
'level', "INFO"
)
CONF['HTTP_PORT'] = cfg.get('http', {}).get(
'port', 80
)
CONF['HTTP_BIND'] = cfg.get('http', {}).get(
'bind', "localhost"
)
CONF['MESSAGE'] = {}
CONF['MESSAGE']['TOFF'] = cfg.get('messages', {}).get(
'toff', "TTS is now disabled."
)
CONF['MESSAGE']['TON'] = cfg.get('messages', {}).get(
'ton', "TTS is now active."
)
CONF['MESSAGE']['WIKITOFF'] = cfg.get('messages', {}).get(
'wikitoff', "Wiki TTS is now disabled."
)
CONF['MESSAGE']['WIKITON'] = cfg.get('messages', {}).get(
'wikiton', "Wiki TTS is now active."
)
CONF['MESSAGE']['TOO_LONG'] = cfg.get('messages', {}).get(
'too_long', "Sorry, your message is too long."
)
CONF['MESSAGE']['DISABLED'] = cfg.get('messages', {}).get(
'disabled', "Sorry, TTS is disabled."
)
CONF['MESSAGE']['DENIED'] = cfg.get('messages', {}).get(
'denied', "Sorry, you're not allowed to use TTS."
)
CONF['MESSAGE']['SUBONLY'] = cfg.get('messages', {}).get(
'subonly', "Sorry, TTS is sub-only."
)
CONF['MESSAGE']['MODONLY'] = cfg.get('messages', {}).get(
'modonly', "Sorry, TTS is mod-only."
)
CONF['MESSAGE']['READY'] = cfg.get('messages', {}).get(
'ready', "TTS bot is ready."
)
CONF['MESSAGE']['WHITELISTONLY'] = cfg.get('messages', {}).get(
'whitelist', False
)
CONF['MESSAGE']['SAYS'] = cfg.get('messages', {}).get(
'says', "says"
)
CONF['MESSAGE']['VOTESTART'] = cfg.get('messages', {}).get(
'votestart', "Quickvote started. Send #yourchoice to participate."
)
CONF['MESSAGE']['VOTEEND'] = cfg.get('messages', {}).get(
'voteend', "Quickvote ended. The results are:"
)
CONF['MESSAGE']['VOTENOBODY'] = cfg.get('messages', {}).get(
'votenobody', "Nobody casted a vote. :("
)
CONF['MESSAGE']['VOTERESULT'] = cfg.get('messages', {}).get(
'voteresult', "Voting has ended. The result is:"
)
CONF['MESSAGE']['VOTES'] = cfg.get('messages', {}).get(
'votes', "Votes"
)
CONF['MESSAGE']['PICKSTART'] = cfg.get('messages', {}).get(
'pickstart', "Pick started. Send #pickme to participate."
)
CONF['MESSAGE']['PICKRESULT'] = cfg.get('messages', {}).get(
'pickresult', "Pick ended. The results are:"
)
CONF['MESSAGE']['PICKNONE'] = cfg.get('messages', {}).get(
'picknone', "Pick ended. Nobody was picked."
)
CONF['MESSAGE']['QUOTE_NOT_FOUND'] = cfg.get('messages', {}).get(
'quotenotfound', "Sorry, no quote found."
)
CONF['MESSAGE']['QUOTE_ADDED_PREFIX'] = cfg.get('messages', {}).get(
'quoteaddedprefix', "Quote:"
)
CONF['MESSAGE']['QUOTE_ADDED_SUFFIX'] = cfg.get('messages', {}).get(
'quoteaddedsuffix', "added."
)
CONF['MESSAGE']['WIKI_TOO_MANY'] = cfg.get('messages', {}).get(
'wiki_too_many', "Sorry, there are too many possible results. \
Try a more narrow search."
)
CONF['MESSAGE']['WIKI_NO_RESULT'] = cfg.get('messages', {}).get(
'wiki_no_result', "Sorry, there was an error fetching the wikipedia answer."
)
CONF['FEATURE'] = {}
CONF['FEATURE']['WIKI'] = cfg.get('features', {}).get(
'wiki', True
)
CONF['FEATURE']['WIKITTS'] = cfg.get('features', {}).get(
'wikitts', True
)
CONF['FEATURE']['WIKISENTENCES'] = cfg.get('features', {}).get(
'wikisentences', 3
)
CONF['FEATURE']['PICK'] = cfg.get('features', {}).get(
'pick', True
)
CONF['FEATURE']['VOTE'] = cfg.get('features', {}).get(
'vote', True
)
CONF['FEATURE']['QUOTE'] = cfg.get('features', {}).get(
'quote', True
)
CONF['FEATURE']['RANDOM'] = cfg.get('features', {}).get(
'random', True
)
CONF['FEATURE']['VERSION'] = cfg.get('features', {}).get(
'version', True
)
CONF['FEATURE']['PING'] = cfg.get('features', {}).get(
'ping', True
)
CONF['USERMAP'] = cfg.get('usermapping', [])
if 'whitelist' in cfg:
CONF['WHITELIST'] = True
CONF['WHITELIST_USER'] = cfg['whitelist']
else:
CONF['WHITELIST'] = False
except KeyError:
logging.exception('Your config file is invalid, please check and try again.')
sys.exit(254)
if CONF['WHITELIST']:
logging.info('Whitelist mode enabled')
logging.debug('Whitelist: %s', CONF['WHITELIST_USER'])
if not CONF['IRC_CHANNEL']:
raise ValueError('Please add your twitch channel to config.yml.')
if not CONF['IRC_USERNAME']:
raise ValueError('Please add the bots username to config.yml.')
if not CONF['IRC_OAUTH_TOKEN']:
CONF['IRC_OAUTH_TOKEN'] = "Invalid"
return CONF
return CONF
def exceptionlog(etype, evalue, traceback):
""" Log Exceptions to file """
logging.error(
"Logging an uncaught exception",
exc_info=(etype, evalue, traceback)
)
def main():
""" Main loop """
global CONF
CONF = load_config()
lastreload = datetime.datetime.now()
for handler in rootLogger.handlers:
if isinstance(handler, logging.StreamHandler):
handler.setLevel(CONF['LOG_LEVEL'])
sys.excepthook = exceptionlog
if CONF['LOG_LEVEL'] == 'DEBUG':
sys.tracebacklimit = 5
logging.info("Starting Webserver")
httpd = ThreadingSimpleServer(
(CONF['HTTP_BIND'],
CONF['HTTP_PORT']),
HTTPserv
)
http_thread = Thread(target=http_serve_forever, daemon=True, args=(httpd, ))
http_thread.start()
check_oauth_token()
logging.info("Starting IRC bot")
irc = IRC()
irc.connect(
CONF['IRC_SERVER'],
6667,
CONF['IRC_CHANNEL'],
CONF['IRC_USERNAME'],
CONF['IRC_OAUTH_TOKEN']
)
irc.sendmsg(
CONF['IRC_CHANNEL'],
'MrDestructoid',
CONF['MESSAGE']['READY']
)
logging.info('Connected and joined')
url = get_url()
logging.info("Please open your browser and visit: %s", url)
webbrowser.open_new_tab(url)
while True:
if CONF['LOG_LEVEL'] == "DEBUG":
time.sleep(1)
try:
irc.get_response()
confreload = datetime.datetime.now()
if confreload - lastreload > datetime.timedelta(seconds=60):
CONF = load_config()
lastreload = datetime.datetime.now()
if irc.quickvote['status'] and irc.quickvote['message']:
logging.info('Quickvote is active')
irc.sendmsg(
CONF['IRC_CHANNEL'],
"@chat",
f"{CONF['MESSAGE']['VOTESTART']} ({irc.quickvote['message']})"
)
if not irc.tts['status']:
continue
logging.debug('MSG_QUEUE_RAW: %s', MSG_QUEUE_RAW)
send_tts_queue()
except KeyboardInterrupt:
httpd.shutdown()
logging.info('Exiting...')
os.kill(os.getpid(), signal.SIGTERM)
if __name__ == "__main__":
logFormatter = logging.Formatter(
"%(asctime)s %(module)s %(threadName)s %(levelname)s: %(message)s"
)
rootLogger = logging.getLogger()
fileHandler = RotatingFileHandler("tts.log", backupCount=9)
fileHandler.setFormatter(logFormatter)
rootLogger.addHandler(fileHandler)
fileHandler.doRollover()
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)
rootLogger.level = logging.DEBUG
sys.tracebacklimit = 3
VERSION = "1.8.0"
CONF = {}
tts_done = []
MSG_QUEUE_RAW = []
msg_queue = {}
if sys.argv[1:]:
if sys.argv[1] == "--version":
print('Simple TTS Bot')
print('Version %s', VERSION)
sys.exit(1)
main()