twitchtts/tts.py

1328 lines
50 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# pylint: disable=bare-except
"""
TwitchTTS
Copyright (C) 2022 gpkvt
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import os
import sys
import time
import json
import signal
import socket
import random
import logging
import datetime
import webbrowser
import urllib.request
from threading import Thread
from collections import Counter
from urllib.parse import parse_qs
from urllib.error import HTTPError
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
import yaml
import requests
import wikipedia
from fuzzywuzzy import process
class IRC:
""" IRC bot """
irc = socket.socket()
def __init__(self):
self.irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tts_denied = []
self.tts_allowed = []
self.tts_status = CONF['TTS_STARTENABLED']
self.quickvote_status = False
self.pick_status = False
self.votemsg = False
self.poll = {}
self.pollcount = 0
self.pickme = []
self.picknumber = 1
self.pickcount = 0
if 'WHITELIST_USER' in CONF:
self.tts_allowed = CONF['WHITELIST_USER']
def connect(self, server, port, channel, botnick, botpass):
""" Connect to Twitch IRC servers """
logging.info("Connecting to: %s", server)
logging.info('Waiting...')
try:
self.irc.connect((server, port))
except ConnectionResetError:
logging.fatal('Twitch refused to connect, please check your settings and try again.')
sys.exit(252)
self.irc.settimeout(1)
self.irc.send(bytes("PASS " + botpass + "\r\n", "UTF-8"))
self.irc.send(bytes(
"USER " + botnick + " " + botnick +" " + botnick + " :python\r\n", "UTF-8")
)
self.irc.send(bytes("NICK " + botnick + "\r\n", "UTF-8"))
self.irc.send(bytes("CAP REQ :twitch.tv/commands twitch.tv/tags \r\n", "UTF-8"))
time.sleep(5)
try:
self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8"))
except ConnectionResetError:
logging.warning('JOIN was refused, will try again in 30 seconds.')
time.sleep(30)
logging.warning('Please check your credentials, if this error persists.')
self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8"))
def sendmsg(self, channel, user, msg):
"""
Send (a public) message to IRC channel
Parameters:
channel (str): Channel to post msg to
user (str): User to address message to
msg (str): Message to send
"""
self.irc.send(bytes("PRIVMSG "+channel+" :"+user+" "+msg+"\r\n", "UTF-8"))
def resp_ping(self):
""" Respond to PING """
logging.debug('PING received')
self.irc.send(bytes('PONG :tmi.twitch.tv\r\n', "UTF-8"))
def resp_notice(self, resp):
""" Respond to NOTICE """
if 'Login authentication failed' in resp:
try:
raise RuntimeError()
except RuntimeError:
logging.exception('Login failed, please check your credentials and try again.')
sys.exit(251)
def resp_clearmsg(self, resp):
""" Respond to CLEARMSG """
logging.info('CLEARMSG received')
msgid = False
global msg_queue_raw # pylint: disable=global-statement,invalid-name
filtered_msg_queue = []
tags = resp.split(';')
for tag in tags:
if "target-msg-id=" in tag:
msgid = tag.rsplit('target-msg-id=',1)[1]
logging.debug('Trying to suppress message')
logging.debug(msgid)
for msg in list(msg_queue_raw):
if msg['msgid'] == msgid:
logging.info('Suppressing message %s', msgid)
else:
filtered_msg_queue.append(msg)
msg_queue_raw = filtered_msg_queue
def resp_privmsg(self, resp):
""" Respond to PRIVMSG """
logging.debug('PRIVMSG received')
tags = self.get_tags(resp)
message = self.get_message(resp)
user = tags['user']
msg = message['message']
msglen = message['length']
logging.debug('Msg: %s', msg)
logging.debug('Msg length: %s', msglen)
logging.debug('Deny List:')
logging.debug(self.tts_denied)
self.priviledged_commands(message, tags)
if msg.startswith('#pickme') and self.pick_status is True:
logging.info('Pickme detected')
self.pickcount = self.pickcount + 1
self.pickme.append(user)
logging.debug("pickme %s added", user)
return
if msg.startswith('#') and self.quickvote_status is True:
logging.info('Quickvote: Cast detected')
self.pollcount += 1
self.poll[user] = msg.lower()
logging.debug("poll: %s", self.poll)
return
if msg.startswith('!tts'):
logging.info('!tts command detected')
self.Commands.tts(self, message, tags)
return
if msg.startswith('!wiki') and CONF['FEATURE']['WIKI']:
logging.debug("!wiki command detected")
self.Commands.wiki(self, tags, msg)
return
if CONF['FEATURE']['QUOTE']:
if msg.startswith('!addquote'):
logging.debug("!addquote command detected")
self.Commands.addquote(self, tags, msg)
return
if msg.startswith('!smartquote') or msg.startswith('!sq'):
logging.debug("!smartquote command detected")
self.Commands.quote(self, tags, msg)
return
def get_tags(self, resp):
""" Strip tags from response """
tags = resp.split(';')
for tag in tags:
if tag.startswith('badges='):
badges = tag.rsplit('badges=',1)[1]
if tag.startswith('subscriber='):
subscriber = tag.rsplit('subscriber=',1)[1]
logging.debug('Subscriber: %s', subscriber)
if tag.startswith('id='):
msgid = tag.rsplit('id=',1)[1]
logging.debug('Message ID: %s', msgid)
if tag.startswith('display-name='):
user = tag.rsplit('display-name=',1)[1].lower()
logging.debug('Username: %s', user)
tags = {}
tags['badges'] = badges
tags['subscriber'] = subscriber
tags['msgid'] = msgid
tags['user'] = user
return tags
def get_message(self, resp):
""" Process message """
msg = {}
msg['message'] = resp.rsplit('PRIVMSG #',1)[1].split(':',1)[1].replace('\r\n','')
msg['length'] = len(msg['message'])
return msg
def priviledged_commands(self, message, tags):
""" Process priviledged commands """
msg = message['message']
badges = tags['badges']
user = tags['user']
if 'broadcaster' in badges or 'moderator' in badges:
if msg.startswith('!ping') and CONF['FEATURE']['PING']:
logging.debug("Ping check received.")
self.sendmsg(CONF['IRC_CHANNEL'], "@"+str(user), "Pong!")
elif msg.startswith('!version') and CONF['FEATURE']['VERSION']:
logging.debug("!version command detected")
self.sendmsg(CONF['IRC_CHANNEL'], "@"+str(user), VERSION)
elif msg.startswith('!pick') and CONF['FEATURE']['PICK']:
logging.debug("!pick command detected")
self.Commands.pick(self, msg)
elif msg.startswith('!random') and CONF['FEATURE']['RANDOM']:
logging.info('!random command detected')
self.Commands.random(self, msg)
elif msg.startswith('!quickvote') and CONF['FEATURE']['QUOTE']:
logging.info("!quickvote command detected")
self.Commands.quickvote(self, msg)
elif msg.startswith('!ptts'):
logging.debug("!ptts command detected")
self.Commands.ptts(self, msg)
elif msg.startswith('!dtts'):
logging.debug("!dtts command detected")
self.Commands.dtts(self, msg)
elif msg.startswith('!usermap'):
logging.info('!usermap command detected')
self.Commands.usermap(self, msg)
elif msg.startswith('!delay'):
logging.info('!delay command detected')
self.Commands.delay(self, msg)
elif msg.startswith('!toff'):
logging.info('TTS is now turned off')
msg_queue.clear()
msg_queue_raw.clear()
self.tts_status = False
self.sendmsg(CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['TOFF'])
elif msg.startswith('!ton'):
logging.info('TTS is now turned on')
msg_queue.clear()
msg_queue_raw.clear()
self.tts_status = True
self.sendmsg(CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['TON'])
def check_subonly(self, tags):
""" subonly """
if not CONF['IRC_SUBONLY']:
return False
subscriber = tags['subscriber']
badges = tags['badges']
user = tags['user']
if subscriber != "0" or 'moderator' in badges or 'broadcaster' in badges:
logging.info('TTS is sub-only and user has allowance')
return False
logging.debug('TTS is sub-only')
self.sendmsg(CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['SUBONLY'])
return True
def check_modonly(self, tags):
""" modonly """
if not CONF['IRC_MODONLY']:
return False
badges = tags['badges']
user = tags['user']
if 'moderator' in badges or 'broadcaster' in badges:
logging.info('TTS is mod-only and user has allowance')
return False
logging.debug('TTS is mod-only')
self.sendmsg(CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['MODONLY'])
return True
def check_tts_disabled(self, user):
""" Check if TTS is disabled """
if not self.tts_status:
self.sendmsg(CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['DISABLED'])
return True
logging.debug('TTS is enabled')
return False
def check_msg_too_long(self, message, user):
""" Check if message is too long """
if message['length'] > CONF['IRC_TTS_LEN']:
self.sendmsg(CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['TOO_LONG'])
return True
logging.debug('Check length: Message is ok')
return False
def check_user_denied(self, user):
""" Check if user is on denied list """
if user in self.tts_denied:
logging.info("%s is not allowed to use TTS", user)
self.sendmsg(CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['DENIED'])
return True
logging.debug("%s is allowed to use TTS", user)
return False
def check_whitelist(self, user):
""" Check Whitelist """
if CONF['WHITELIST']:
if user not in self.tts_allowed:
logging.debug("tts_allowed: %s", self.tts_allowed)
self.sendmsg(
CONF['IRC_CHANNEL'],
"@"+str(user), CONF['MESSAGE']['WHITELISTONLY']
)
return False
return True
return False
def send_tts_msg(self, message, tags):
""" Send message to TTS queue """
logging.info('Valid TTS message, adding to raw queue')
tts = True
now = datetime.datetime.now()
timestamp = str(time.time_ns())
user = tags['user']
msgid = tags['msgid']
badges = tags['badges']
subscriber = tags['subscriber']
msg = message['message']
msglen = message['length']
msg = msg.replace('!tts','',1)
msg = {
"TTS": tts,
"msg": msg,
"badges": badges,
"subscriber": subscriber,
"msgid": msgid,
"user": user,
"length": msglen,
"queuetime": now,
"timestamp": timestamp
}
msg_queue_raw.append(msg)
def get_response(self):
"""Get and process response from IRC"""
try:
resp = self.irc.recv(2048).decode("UTF-8")
logging.debug('resp:')
logging.debug(resp)
except socket.timeout:
return
except Exception: # pylint: disable=broad-except
logging.exception('An unknown error occured while getting a IRC response.')
sys.exit(255)
if resp.find('PING') != -1:
self.resp_ping()
if resp.find('CLEARMSG') != -1:
self.resp_clearmsg(resp)
if resp.find('NOTICE') != -1:
self.resp_notice(resp)
if resp.find('PRIVMSG') != -1:
self.resp_privmsg(resp)
class Commands():
""" Bot commands """
def __init__(self):
self.tts_denied = []
self.tts_allowed = []
self.quickvote_status = self.quickvote_status
self.pick_status = self.pick_status
self.votemsg = self.votemsg
self.poll = self.poll
self.pollcount = self.pollcount
self.pickme = self.pickme
self.picknumber = self.picknumber
self.pickcount = self.pickcount
def tts(self, msg, tags):
""" !tts command
Check if message is valid and send it to queue
:param str msg: The IRC message triggering the command
:param dict tags: The message metadata
"""
user = tags['user']
if IRC.check_tts_disabled(self, user):
logging.info('TTS is disabled')
elif IRC.check_msg_too_long(self, msg, user):
logging.info('TTS message is too long')
elif IRC.check_user_denied(self, user):
logging.info('User is not allowed to use TTS')
elif IRC.check_subonly(self, tags):
logging.info('TTS is sub-only')
elif IRC.check_modonly(self, tags):
logging.info('TTS is mod-only')
elif IRC.check_whitelist(self, user):
logging.info('User is not on whitelist')
else:
logging.info('Sending TTS message to raw_queue')
IRC.send_tts_msg(self, msg, tags)
def addquote(self, tags, msg):
""" !addquote command
Adds a newline to quotes.txt
"""
user = tags['user']
if IRC.check_user_denied(self, user):
logging.info('User is not allowed to use TTS')
elif IRC.check_subonly(self, tags):
logging.info('TTS is sub-only')
else:
try:
with open("quotes.txt", "rb") as file:
nol = len(file.readlines())
file.close()
except FileNotFoundError:
logging.warning("quotes.txt does not exists, will create")
nol = 0
nol = nol + 1
quote = msg.replace("!addquote ", "").strip()
quote = quote.split(" ",1)
username = quote[0]
date = time.strftime("%d.%m.%Y")
try:
token = CONF['IRC_OAUTH_TOKEN'].replace('oauth:','')
login = CONF['IRC_CHANNEL'].replace('#','')
api_endpoint = "https://api.twitch.tv/helix/users?login="+str(login)
headers = {
'Content-type': 'application/x-form-urlencoded',
'Authorization': 'Bearer '+token,
'Client-Id': 'ebo548vs6tq54c9zlrgin2yfzzlrrs'
}
req = requests.get(url=api_endpoint, headers=headers)
data = req.json()
user_id = data['data'][0]['id']
api_endpoint = "https://api.twitch.tv/helix/channels?broadcaster_id="+str(user_id)
headers = {
'Content-type': 'application/x-form-urlencoded',
'Authorization': 'Bearer '+token,
'Client-Id': 'ebo548vs6tq54c9zlrgin2yfzzlrrs'
}
req = requests.get(url=api_endpoint, headers=headers)
data = req.json()
game = data['data'][0]['game_name']
quote = f"#{nol}: \"{quote[1]}\" -{username}/{game} ({date})\n"
except:
logging.warning('Could not get metadata for quote')
quote = f"#{nol}: \"{quote[1]}\" -{username} ({date})\n"
logging.info('Adding quote %s', quote)
with open("quotes.txt", "ab") as file:
file.write(quote.encode('utf-8'))
msg = f"{CONF['MESSAGE']['QUOTE_ADDED_PREFIX']} #{nol} {CONF['MESSAGE']['QUOTE_ADDED_SUFFIX']}"
raw_msg = {
"TTS": True,
"msg": msg,
"badges": True,
"subscriber": True,
"msgid": True,
"user": CONF['IRC_USERNAME'],
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@"+str(user), msg)
def wiki(self, tags, msg):
""" !wiki command """
try:
user = tags['user']
wikipedia.set_lang(CONF['WIKI_LANG'])
msg = msg.replace('!wiki', '').strip()
wikiresult = wikipedia.summary(msg, sentences=3)
IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@"+str(user), wikiresult)
IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@"+str(user), wikipedia.page(msg).url)
wikiresult = wikiresult.replace('==', '')
raw_msg = {
"TTS": True,
"msg": wikiresult,
"badges": True,
"subscriber": True,
"msgid": True,
"user": 'wikipedia',
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
except wikipedia.exceptions.DisambiguationError:
IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['WIKI_TOO_MANY'])
except:
IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['WIKI_NO_RESULT'])
def quote(self, tags, msg = False):
""" !smartquote command
Gets a line from quotes.txt. If a number if given as msg
it fetch the given line number. If a string is given
it fetch the best matching line. If nothing is given
it fetch a random line.
"""
try:
user = tags['user']
query = msg.replace('!smartquote', '').strip()
query = msg.replace('!sq', '').strip()
if query.isdigit():
logging.info('Fetching quote #%s', query)
file = open("quotes.txt", "rb")
quotes = file.readlines()
for line in quotes:
if line.decode('utf-8').startswith("#"+str(query)+":"):
quote = line
break
file.close()
elif query != "":
logging.info('Fetching match for %s', query)
file = open("quotes.txt", "rb")
quotes = file.readlines()
matches = process.extract(query, quotes, limit=10)
quotes = []
for match, score in matches:
if score >= 60:
quotes.append(match)
logging.debug('Quotes: %s', quotes)
if len(quotes) >= 5:
quote = random.choice(quotes)
else:
quote = quotes[0]
else:
logging.info('Fetching random quote')
with open("quotes.txt", "rb") as file:
lines = file.read().splitlines()
quote = random.choice(lines)
except FileNotFoundError:
logging.error('"quotes.txt does not exists.')
except IndexError:
logging.error('Error fetching quote.')
if not 'quote' in vars():
logging.info('No quote found.')
quote = CONF['MESSAGE']['QUOTE_NOT_FOUND']
IRC.sendmsg(self, CONF['IRC_CHANNEL'], "", quote)
return False
if not isinstance(quote, str):
quote = quote.decode('utf-8')
if IRC.check_tts_disabled(self, user):
logging.info('TTS is disabled')
elif IRC.check_user_denied(self, user):
logging.info('User is not allowed to use TTS')
elif IRC.check_subonly(self, tags):
logging.info('TTS is sub-only')
elif IRC.check_modonly(self, tags):
logging.info('TTS is mod-only')
elif IRC.check_whitelist(self, user):
logging.info('User is not on whitelist')
else:
logging.info('Sending quote to TTS')
logging.debug("Quote: %s", quote)
IRC.sendmsg(self, CONF['IRC_CHANNEL'], "", quote)
quote = quote.rsplit('(', 1)[0]
raw_msg = {
"TTS": True,
"msg": quote,
"badges": True,
"subscriber": True,
"msgid": True,
"user": CONF['IRC_USERNAME'],
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
return True
def delay(self, msg):
""" !delay command
Adjust the delay setting in config.yml
:param str msg: The IRC message triggering the command
"""
try:
delay = msg.split(' ')[1]
except:
delay = False
if delay:
with open('config.yml','r', encoding='utf-8') as yamlfile:
cur_yaml = yaml.safe_load(yamlfile)
cur_yaml['irc']['clearmsg_timeout'] = int(delay)
if cur_yaml:
with open('config.yml','w', encoding='utf-8') as yamlfile:
yaml.safe_dump(cur_yaml, yamlfile)
load_config()
def usermap(self, msg):
""" !usermap command
Adds new entries to usermapping in config.yml
:param str msg: The IRC message triggering the command
"""
try:
msg = msg.replace('!usermap ', '')
splitmsg = msg.split(" ")
username, *mappingname = splitmsg
mappingname = ' '.join(mappingname)
except:
username = False
mappingname = False
if username and mappingname:
with open('config.yml','r', encoding='utf-8') as yamlfile:
cur_yaml = yaml.safe_load(yamlfile)
cur_yaml['usermapping'].update({username: mappingname})
if cur_yaml:
with open('config.yml','w', encoding='utf-8') as yamlfile:
yaml.safe_dump(cur_yaml, yamlfile)
load_config()
def pick(self, msg):
""" !pick command """
if self.pick_status:
logging.info('Pick stopped')
logging.debug("Got %s participats, wanted %s", self.pickcount, self.picknumber)
try:
if int(self.pickcount) > int(self.picknumber):
picks = random.sample(self.pickme, self.picknumber)
logging.info('Got more than the requested number of participants')
else:
picks = self.pickme
logging.info('Got less than or exactly the requested number of participants')
converted_picks = [str(element) for element in picks]
joined_picks = " ".join(converted_picks)
except:
logging.error("There was an error during picking.")
joined_picks = False
if joined_picks:
raw_msg = {
"TTS": True,
"msg": CONF['MESSAGE']['PICKRESULT'] +" "+ str(joined_picks),
"badges": True,
"subscriber": True,
"msgid": True,
"user": CONF['IRC_USERNAME'],
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
IRC.sendmsg(self,
CONF['IRC_CHANNEL'], "",
CONF['MESSAGE']['PICKRESULT']
)
IRC.sendmsg(self,
CONF['IRC_CHANNEL'], "*",
joined_picks
)
else:
IRC.sendmsg(self,
CONF['IRC_CHANNEL'], "*",
CONF['MESSAGE']['PICKNONE']
)
self.pick_status = False
self.pickme = []
self.pickcount = 0
return
logging.debug('Pick started')
self.pick_status = True
try:
msg = msg.split(' ')[1].strip()
self.picknumber = msg
except IndexError:
self.picknumber = self.picknumber
logging.info("Will pick %s participants", self.picknumber)
IRC.sendmsg(self,
CONF['IRC_CHANNEL'], "@chat",
CONF['MESSAGE']['PICKSTART']
)
return
def quickvote(self, msg):
""" !quickvote command
Starts or stops the !quickvote function. On stop calculates the 5 most casted
votes and send them to chat. The highest vote is send to msg_queue.
:param str msg: The IRC message triggering the command
"""
if self.quickvote_status:
logging.debug('Quickvote stopped')
if self.pollcount == 0:
logging.info("Nobody voted")
IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@chat", CONF['MESSAGE']['VOTEEND'])
IRC.sendmsg(self, CONF['IRC_CHANNEL'], "*", CONF['MESSAGE']['VOTENOBODY'])
raw_msg = {
"TTS": True,
"msg": CONF['MESSAGE']['VOTENOBODY'],
"badges": True,
"subscriber": True,
"msgid": True,
"user": CONF['IRC_USERNAME'],
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
logging.info('The result is: %s', CONF['MESSAGE']['VOTENOBODY'])
logging.debug('Votemsg: %s', msg)
self.quickvote_status = False
self.poll = {}
return
logging.info("Counting votes")
count = 0
count = Counter(self.poll.values()).most_common(5)
IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@chat", CONF['MESSAGE']['VOTEEND'])
logging.debug(count)
raw_msg = {
"TTS": True,
"msg": CONF['MESSAGE']['VOTERESULT'] +" "+ str(count[0][0].replace('#','')),
"badges": True,
"subscriber": True,
"msgid": True,
"user": CONF['IRC_USERNAME'],
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
logging.info('The result is: %s', CONF['MESSAGE']['VOTERESULT'] +" "+ str(count[0]))
logging.debug('Votemsg: %s', msg)
for key, value in count:
IRC.sendmsg(
self,
CONF['IRC_CHANNEL'], "*",
str(key)+" ("+str(value)+ " "+ CONF['MESSAGE']['VOTES'] + ")"
)
self.quickvote_status = False
self.poll = {}
self.pollcount = 0
return
logging.debug('Quickvote started')
self.quickvote_status = True
self.votemsg = msg.split('!quickvote', 1)[1].strip()
if self.votemsg:
IRC.sendmsg(self,
CONF['IRC_CHANNEL'], "@chat",
CONF['MESSAGE']['VOTESTART'] + " (" + str(self.votemsg) + ")"
)
else:
IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@chat", CONF['MESSAGE']['VOTESTART'])
return
def random(self, msg):
""" !random command
Read a random line from randomfile and put it into msg_queue
If no file is given in msg a standard file will be used
:param str msg: The IRC message triggering the command
:raise: FileNotFoundError if randomfile does not exists
:return: True if line was successfully read and added to msg_queue
:rtype: bool
"""
randomfile = msg.replace('!random', '').strip().lower()
if randomfile:
randomfile = "random_"+str(os.path.basename(randomfile))+".txt"
else:
randomfile = "random.txt"
try:
with open(randomfile,"r", encoding="utf-8") as file:
lines = file.read().splitlines()
random_msg = random.choice(lines)
except FileNotFoundError:
logging.error('%s not found', randomfile)
return False
raw_msg = {
"TTS": True,
"msg": random_msg,
"badges": True,
"subscriber": True,
"msgid": True,
"user": CONF['IRC_USERNAME'],
"length": CONF['IRC_TTS_LEN'],
"queuetime": datetime.datetime.now(),
"timestamp": str(time.time_ns())
}
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
return True
def ptts(self, msg):
""" !ptts command
Add user to tts_allowed list and remove user from tts_denied list
:param str msg: The IRC message triggering the command
"""
user = msg.replace('!ptts', '').strip().lower()
if user.startswith('@'):
logging.debug('Removing "@" from username')
user = user.replace('@', '')
logging.info("Adding %s to whitelist", user)
self.tts_allowed.append(user)
if user in self.tts_denied:
logging.info("Removing %s from deny list", user)
self.tts_denied.remove(user)
return
def dtts(self, msg):
""" !dtts command
Add user to tts_denied list and remove user from tts_allowed list
:param str msg: The IRC message triggering the command
"""
user = msg.replace('!dtts', '').strip().lower()
if user.startswith('@'):
logging.debug('Removing "@" from username')
user = user.replace('@', '')
if user not in self.tts_denied:
logging.info("Adding %s to deny list", user)
self.tts_denied.append(user)
if user in self.tts_allowed:
logging.info("Removing %s from allowed list", user)
self.tts_allowed.remove(user)
return
class ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
""" Threaded HTTP Server """
class HTTPserv(BaseHTTPRequestHandler):
"""Simple HTTP Server"""
def log_message(self, format, *args): # pylint: disable=redefined-builtin
"""Suppress HTTP log messages"""
return
def do_GET(self): # pylint: disable=invalid-name
"""Process GET requests"""
if self.path == '/':
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
with open("tts.html", "rb") as file:
html = file.read()
self.wfile.write(html)
elif self.path == '/favicon.ico':
self.send_response(200)
self.send_header('Content-type', 'image/x-icon')
self.end_headers()
with open("favicon.ico", "rb") as file:
icon = file.read()
self.wfile.write(icon)
elif self.path == '/tts.js':
self.send_response(200)
self.send_header('Content-type', 'text/javascript')
self.end_headers()
with open("tts.js", "rb") as file:
html = file.read()
self.wfile.write(html)
elif self.path == '/jquery.js':
self.send_response(200)
self.send_header('Content-type', 'text/javascript')
self.end_headers()
with open("jquery.js", "rb") as file:
html = file.read()
self.wfile.write(html)
elif self.path == '/bootstrap.min.css':
self.send_response(200)
self.send_header('Content-type', 'text/css')
self.end_headers()
with open("bootstrap.min.css", "rb") as file:
html = file.read()
self.wfile.write(html)
elif self.path.startswith('/tts_queue'):
tts_json = ""
tts = {}
self.send_response(200)
self.send_header('Content-type', 'text/json')
self.end_headers()
usermap = CONF['USERMAP']
sorted_tts = {k: msg_queue[k] for k in sorted(msg_queue, reverse=True)}
logging.debug(usermap)
for key in list(sorted_tts.keys()):
if key not in tts_done:
if msg_queue[key][0].lower() in usermap:
logging.debug('Using usermap for user: %s (%s)', msg_queue[key][0], usermap[msg_queue[key][0].lower()])
tts = {str(key): str(usermap[msg_queue[key][0].lower()]) + " " + str(CONF['MESSAGE']['SAYS']) + ":" + str(msg_queue[key][1])}
else:
logging.debug('No usermap entry found for user: %s', msg_queue[key][0])
tts = {str(key): str(msg_queue[key][0]) + " " + str(CONF['MESSAGE']['SAYS']) + ":" + str(msg_queue[key][1])}
tts_json = json.dumps(tts)
self.wfile.write(bytes(str(tts_json)+"\n", "utf-8"))
elif self.path.startswith('/tts_done'):
get_params = parse_qs(self.path)
if '/tts_done?id' in get_params:
logging.info("Removing message from queue")
tts_done.append(get_params['/tts_done?id'][0])
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes("OK\n", "utf-8"))
else:
self.send_response(500)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes("Internal Server error\n", "utf-8"))
elif self.path.startswith('/token'):
data = {}
data['client_id'] = "ebo548vs6tq54c9zlrgin2yfzzlrrs"
data['response_type'] = "token"
data['scope'] = "chat:edit chat:read"
if CONF['HTTP_PORT'] == 80:
data['redirect_uri'] = "http://localhost/token"
elif CONF['HTTP_PORT'] == 8080:
data['redirect_uri'] = "http://localhost:8080/token"
elif CONF['HTTP_PORT'] == 3000:
data['redirect_uri'] = "http://localhost:3000/token"
else:
self.send_response(500)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes("You can only use this function if HTTP_PORT is 80, 8080 or 3000. Please change your port, or use https://www.21x9.org/twitch instead.\n", "utf-8"))
return False
try:
url_values = urllib.parse.urlencode(data)
url = "https://id.twitch.tv/oauth2/authorize"
full_url = url + "?" + url_values
data = urllib.request.urlopen(full_url)
if data:
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(bytes("<html><head><title>OAuth Token Generator</title></head><body onload=\"displayCode();\"><div id=\"code\"><a href=\""+str(data.geturl())+"\">Click to start the OAuth process.</a></div><script>function displayCode() { var url = window.location.href; var test = url.indexOf(\"access_token\");if (test != -1) { token = url.substring(42,72); document.getElementById(\"code\").innerHTML = \"<p>oauth:\" + token + \"</p><p>Copy the token into your config.yml and restart the bot.</p>\";}}</script></body></html>\n", "utf-8"))
else:
self.send_response(500)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes("Could not get OAuth-URL from Twitch\n", "utf-8"))
except Exception: # pylint: disable=broad-except
logging.error('Could not fetch OAuth-URL from Twitch.')
else:
self.send_response(404)
self.send_header('Server', 'TTS')
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes("File not found.\n", "utf-8"))
return
def http_serve_forever(httpd):
"""httpd loop"""
httpd.serve_forever()
def load_config():
"""Loading config variables"""
logging.info("Loading configfile")
try:
with open("config.yml", "r", encoding="UTF-8") as ymlfile:
cfg = yaml.load(ymlfile, Loader=yaml.Loader)
except FileNotFoundError:
logging.fatal('Your config file is missing, please copy config-dist.yml to config.yml and review your settings.')
sys.exit(253)
for section in cfg:
try:
logging.debug('Fetching config: %s', section)
CONF['IRC_CHANNEL'] = cfg.get('irc', {}).get('channel', False)
CONF['IRC_USERNAME'] = cfg.get('irc', {}).get('username', False)
CONF['IRC_OAUTH_TOKEN'] = cfg.get('irc', {}).get('oauth_token', False)
CONF['IRC_SERVER'] = cfg.get('irc', {}).get('server', "irc.chat.twitch.tv")
CONF['IRC_CLEARMSG_TIMEOUT'] = cfg.get('irc', {}).get('clearmsg_timeout', 60)
CONF['IRC_SUBONLY'] = cfg.get('bot', {}).get('subonly', False)
CONF['IRC_MODONLY'] = cfg.get('bot', {}).get('modonly', False)
CONF['IRC_TTS_LEN'] = cfg.get('bot', {}).get('message_length', 200)
CONF['TTS_STARTENABLED'] = cfg.get('bot', {}).get('start_enabled', True)
CONF['WIKI_LANG'] = cfg.get('bot', {}).get('language', 'en')
CONF['LOG_LEVEL'] = cfg.get('log', {}).get('level', "INFO")
CONF['HTTP_PORT'] = cfg.get('http', {}).get('port', 80)
CONF['HTTP_BIND'] = cfg.get('http', {}).get('bind', "localhost")
CONF['MESSAGE'] = {}
CONF['MESSAGE']['TOFF'] = cfg.get('messages', {}).get('toff', "TTS is now disabled.")
CONF['MESSAGE']['TON'] = cfg.get('messages', {}).get('ton', "TTS is now active.")
CONF['MESSAGE']['TOO_LONG'] = cfg.get('messages', {}).get('too_long', "Sorry, your message is too long.")
CONF['MESSAGE']['DISABLED'] = cfg.get('messages', {}).get('disabled', "Sorry, TTS is disabled.")
CONF['MESSAGE']['DENIED'] = cfg.get('messages', {}).get('denied', "Sorry, you're not allowed to use TTS.")
CONF['MESSAGE']['SUBONLY'] = cfg.get('messages', {}).get('subonly', "Sorry, TTS is sub-only.")
CONF['MESSAGE']['MODONLY'] = cfg.get('messages', {}).get('modonly', "Sorry, TTS is mod-only.")
CONF['MESSAGE']['READY'] = cfg.get('messages', {}).get('ready', "TTS bot is ready.")
CONF['MESSAGE']['WHITELISTONLY'] = cfg.get('messages', {}).get('whitelist', False)
CONF['MESSAGE']['SAYS'] = cfg.get('messages', {}).get('says', "says")
CONF['MESSAGE']['VOTESTART'] = cfg.get('messages', {}).get('votestart', "Quickvote started. Send #yourchoice to participate.")
CONF['MESSAGE']['VOTEEND'] = cfg.get('messages', {}).get('voteend', "Quickvote ended. The results are:")
CONF['MESSAGE']['VOTENOBODY'] = cfg.get('messages', {}).get('votenobody', "Nobody casted a vote. :(")
CONF['MESSAGE']['VOTERESULT'] = cfg.get('messages', {}).get('voteresult', "Voting has ended. The result is:")
CONF['MESSAGE']['VOTES'] = cfg.get('messages', {}).get('votes', "Votes")
CONF['MESSAGE']['PICKSTART'] = cfg.get('messages', {}).get('pickstart', "Pick started. Send #pickme to participate.")
CONF['MESSAGE']['PICKRESULT'] = cfg.get('messages', {}).get('pickresult', "Pick ended. The results are:")
CONF['MESSAGE']['PICKNONE'] = cfg.get('messages', {}).get('picknone', "Pick ended. Nobody was picked.")
CONF['MESSAGE']['QUOTE_NOT_FOUND'] = cfg.get('messages', {}).get('quotenotfound', "Sorry, no quote found.")
CONF['MESSAGE']['QUOTE_ADDED_PREFIX'] = cfg.get('messages', {}).get('quoteaddedprefix', "Quote:")
CONF['MESSAGE']['QUOTE_ADDED_SUFFIX'] = cfg.get('messages', {}).get('quoteaddedsuffix', "added.")
CONF['MESSAGE']['WIKI_TOO_MANY'] = cfg.get('messages', {}).get('wiki_too_many', "Sorry, there are too many possible results. Try a more narrow search.")
CONF['MESSAGE']['WIKI_NO_RESULT'] = cfg.get('messages', {}).get('wiki_no_result', "Sorry, there was an error fetching the wikipedia answer.")
CONF['FEATURE'] = {}
CONF['FEATURE']['WIKI'] = cfg.get('features', {}).get('wiki', True)
CONF['FEATURE']['PICK'] = cfg.get('features', {}).get('pick', True)
CONF['FEATURE']['VOTE'] = cfg.get('features', {}).get('vote', True)
CONF['FEATURE']['QUOTE'] = cfg.get('features', {}).get('quote', True)
CONF['FEATURE']['RANDOM'] = cfg.get('features', {}).get('random', True)
CONF['FEATURE']['VERSION'] = cfg.get('features', {}).get('version', True)
CONF['FEATURE']['PING'] = cfg.get('features', {}).get('ping', True)
CONF['USERMAP'] = cfg.get('usermapping', [])
if 'whitelist' in cfg:
CONF['WHITELIST'] = True
CONF['WHITELIST_USER'] = cfg['whitelist']
else:
CONF['WHITELIST'] = False
except KeyError:
logging.exception('Your config file is invalid, please check and try again.')
sys.exit(254)
if CONF['WHITELIST']:
logging.info('Whitelist mode enabled')
logging.debug('Whitelist: %s', CONF['WHITELIST_USER'])
if not CONF['IRC_CHANNEL']:
raise ValueError('Please add your twitch channel to config.yml.')
if not CONF['IRC_USERNAME']:
raise ValueError('Please add the bots username to config.yml.')
if not CONF['IRC_OAUTH_TOKEN']:
CONF['IRC_OAUTH_TOKEN'] = "Invalid"
return CONF
if not CONF['IRC_OAUTH_TOKEN'].startswith('oauth:'):
raise ValueError('Your oauth-token is invalid, it has to start with: "oauth:"')
return CONF
def send_tts_queue():
""" Send messages to TTS """
for raw_msg in msg_queue_raw:
logging.debug('Raw msg: %s', msg_queue_raw)
now = datetime.datetime.now()
if now - raw_msg['queuetime'] > datetime.timedelta(seconds=CONF['IRC_CLEARMSG_TIMEOUT']):
logging.debug('clearmsg_timeout reached')
if raw_msg['timestamp'] not in msg_queue:
logging.info('Sending TTS message')
msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']]
logging.debug("msg_queue: %s", msg_queue)
else:
logging.debug('Msg is already in queue')
def get_url(path=False):
""" Generate a valid URL from config values """
if CONF['HTTP_BIND'] == "0.0.0.0":
url = "localhost"
else:
url = CONF['HTTP_BIND']
url = "http://"+str(url)+":"+str(CONF['HTTP_PORT'])+"/"
if path:
url = url+str(path)
return url
def check_oauth_token():
""" Check for valid authentication via Twitch API """
global CONF # pylint: disable=global-statement
logging.debug('Checking OAuth Token')
try:
url = 'https://id.twitch.tv/oauth2/validate'
oauth = "OAuth "+str(CONF['IRC_OAUTH_TOKEN'].replace('oauth:',''))
request = urllib.request.Request(url)
request.add_header('Authorization', oauth)
urllib.request.urlopen(request)
except HTTPError:
logging.fatal('Twitch rejected your OAuth Token. Please check and generate a new one.')
logging.info('Please open http://%s:%s/token to generate your OAuth-Token.', CONF['HTTP_BIND'], CONF['HTTP_PORT'])
url = get_url("token")
webbrowser.open_new_tab(url)
logging.info('Please complete the OAuth process and add the token into your "config.yml" within the next 5 minutes.')
time.sleep(300)
CONF = load_config()
check_oauth_token()
logging.info('OAuth Token is valid')
return CONF
def main():
"""Main loop"""
global CONF # pylint: disable=global-statement
CONF = load_config()
lastreload = datetime.datetime.now()
logging.getLogger().setLevel(CONF['LOG_LEVEL'])
if CONF['LOG_LEVEL'] == 'DEBUG':
sys.tracebacklimit = 5
logging.info("Starting Webserver")
httpd = ThreadingSimpleServer((CONF['HTTP_BIND'], CONF['HTTP_PORT']), HTTPserv)
http_thread = Thread(target=http_serve_forever, daemon=True, args=(httpd, ))
http_thread.start()
check_oauth_token()
logging.info("Starting IRC bot")
irc = IRC()
irc.connect(CONF['IRC_SERVER'], 6667, CONF['IRC_CHANNEL'], CONF['IRC_USERNAME'], CONF['IRC_OAUTH_TOKEN'])
irc.sendmsg(CONF['IRC_CHANNEL'], 'MrDestructoid', CONF['MESSAGE']['READY'])
logging.info('Connected and joined')
url = get_url()
logging.info("Please open your browser and visit: %s", url)
webbrowser.open_new_tab(url)
while True:
if CONF['LOG_LEVEL'] == "DEBUG":
time.sleep(1)
try:
irc.get_response()
confreload = datetime.datetime.now()
if confreload - lastreload > datetime.timedelta(seconds=60):
CONF = load_config()
lastreload = datetime.datetime.now()
if irc.quickvote_status and irc.votemsg:
logging.info('Quickvote is active')
irc.sendmsg(CONF['IRC_CHANNEL'], "@chat", CONF['MESSAGE']['VOTESTART'] + " (" + str(irc.votemsg) + ")")
if not irc.tts_status:
continue
logging.debug('msg_queue_raw: %s', msg_queue_raw)
send_tts_queue()
except KeyboardInterrupt:
httpd.shutdown()
logging.info('Exiting...')
os.kill(os.getpid(), signal.SIGTERM)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(module)s %(threadName)s %(levelname)s: %(message)s')
sys.tracebacklimit = 3
VERSION = "1.7.1"
CONF = {}
tts_done = []
msg_queue_raw = []
msg_queue = {}
if sys.argv[1:]:
if sys.argv[1] == "--version":
print('Simple TTS Bot')
print('Version %s', VERSION)
sys.exit(1)
main()