#!/usr/bin/python3 # -*- coding: utf-8 -*- # pylint: disable=line-too-long,too-many-lines """ TwitchTTS Copyright (C) 2022 gpkvt This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import os import sys import time import json import signal import socket import random import logging import datetime import webbrowser import urllib.request from threading import Thread from collections import Counter from urllib.parse import parse_qs from urllib.error import HTTPError from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn import yaml import requests import wikipedia from fuzzywuzzy import process class IRC: """ IRC bot """ irc = socket.socket() def __init__(self): self.irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tts_denied = [] self.tts_allowed = [] self.tts_status = conf['TTS_STARTENABLED'] self.quickvote_status = False self.pick_status = False self.votemsg = False self.poll = {} self.pollcount = 0 self.pickme = [] self.picknumber = 1 self.pickcount = 0 if 'WHITELIST_USER' in conf: self.tts_allowed = conf['WHITELIST_USER'] def connect(self, server, port, channel, botnick, botpass): """ Connect to Twitch IRC servers """ logging.info("Connecting to: %s", server) logging.info('Waiting...') try: self.irc.connect((server, port)) except ConnectionResetError: logging.fatal('Twitch refused to connect, please check your settings and try again.') sys.exit(252) self.irc.settimeout(1) self.irc.send(bytes("PASS " + botpass + "\r\n", "UTF-8")) self.irc.send(bytes( "USER " + botnick + " " + botnick +" " + botnick + " :python\r\n", "UTF-8") ) self.irc.send(bytes("NICK " + botnick + "\r\n", "UTF-8")) self.irc.send(bytes("CAP REQ :twitch.tv/commands twitch.tv/tags \r\n", "UTF-8")) time.sleep(5) try: self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8")) except ConnectionResetError: logging.warning('JOIN was refused, will try again in 30 seconds.') time.sleep(30) logging.warning('Please check your credentials, if this error persists.') self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8")) def sendmsg(self, channel, user, msg): """ Send (a public) message to IRC channel Parameters: channel (str): Channel to post msg to user (str): User to address message to msg (str): Message to send """ self.irc.send(bytes("PRIVMSG "+channel+" :"+user+" "+msg+"\r\n", "UTF-8")) def resp_ping(self): """ Respond to PING """ logging.debug('PING received') self.irc.send(bytes('PONG :tmi.twitch.tv\r\n', "UTF-8")) def resp_notice(self, resp): """ Respond to NOTICE """ if 'Login authentication failed' in resp: try: raise RuntimeError() except RuntimeError: logging.exception('Login failed, please check your credentials and try again.') sys.exit(251) def resp_clearmsg(self, resp): """ Respond to CLEARMSG """ logging.info('CLEARMSG received') msgid = False global msg_queue_raw # pylint: disable=global-statement,invalid-name filtered_msg_queue = [] tags = resp.split(';') for tag in tags: if "target-msg-id=" in tag: msgid = tag.rsplit('target-msg-id=',1)[1] logging.debug('Trying to suppress message') logging.debug(msgid) for msg in list(msg_queue_raw): if msg['msgid'] == msgid: logging.info('Suppressing message %s', msgid) else: filtered_msg_queue.append(msg) msg_queue_raw = filtered_msg_queue def resp_privmsg(self, resp): """ Respond to PRIVMSG """ logging.debug('PRIVMSG received') tags = self.get_tags(resp) message = self.get_message(resp) user = tags['user'] msg = message['message'] msglen = message['length'] logging.debug('Msg: %s', msg) logging.debug('Msg length: %s', msglen) logging.debug('Deny List:') logging.debug(self.tts_denied) self.priviledged_commands(message, tags) if msg.startswith('#pickme') and self.pick_status is True: logging.info('Pickme detected') self.pickcount = self.pickcount + 1 self.pickme.append(user) logging.debug("pickme %s added", user) return if msg.startswith('#') and self.quickvote_status is True: logging.info('Quickvote: Cast detected') self.pollcount += 1 self.poll[user] = msg.lower() logging.debug("poll: %s", self.poll) return if msg.startswith('!tts'): logging.info('!tts command detected') self.Commands.tts(self, message, tags) return if msg.startswith('!addquote'): logging.debug("!addquote command detected") self.Commands.addquote(self, tags, msg) return if msg.startswith('!wiki'): logging.debug("!wiki command detected") self.Commands.wiki(self, tags, msg) return if msg.startswith('!smartquote') or msg.startswith('!sq'): logging.debug("!smartquote command detected") self.Commands.quote(self, tags, msg) return def get_tags(self, resp): """ Strip tags from response """ tags = resp.split(';') for tag in tags: if tag.startswith('badges='): badges = tag.rsplit('badges=',1)[1] if tag.startswith('subscriber='): subscriber = tag.rsplit('subscriber=',1)[1] logging.debug('Subscriber: %s', subscriber) if tag.startswith('id='): msgid = tag.rsplit('id=',1)[1] logging.debug('Message ID: %s', msgid) if tag.startswith('display-name='): user = tag.rsplit('display-name=',1)[1].lower() logging.debug('Username: %s', user) tags = {} tags['badges'] = badges tags['subscriber'] = subscriber tags['msgid'] = msgid tags['user'] = user return tags def get_message(self, resp): """ Process message """ msg = {} msg['message'] = resp.rsplit('PRIVMSG #',1)[1].split(':',1)[1].replace('\r\n','') msg['length'] = len(msg['message']) return msg def priviledged_commands(self, message, tags): """ Process priviledged commands """ msg = message['message'] badges = tags['badges'] user = tags['user'] if 'broadcaster' in badges or 'moderator' in badges: if msg.startswith('!ping'): logging.debug("Ping check received.") self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), "Pong!") elif msg.startswith('!version'): logging.debug("!version command detected") self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), VERSION) elif msg.startswith('!pick'): logging.debug("!pick command detected") self.Commands.pick(self, msg) elif msg.startswith('!dtts'): logging.debug("!dtts command detected") self.Commands.dtts(self, msg) elif msg.startswith('!random'): logging.info('!random command detected') self.Commands.random(self, msg) elif msg.startswith('!quickvote'): logging.info("!quickvote command detected") self.Commands.quickvote(self, msg) elif msg.startswith('!ptts'): logging.debug("!ptts command detected") self.Commands.ptts(self, msg) elif msg.startswith('!usermap'): logging.info('!usermap command detected') self.Commands.usermap(self, msg) elif msg.startswith('!delay'): logging.info('!delay command detected') self.Commands.delay(self, msg) elif msg.startswith('!toff'): logging.info('TTS is now turned off') msg_queue.clear() msg_queue_raw.clear() self.tts_status = False self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TOFF']) elif msg.startswith('!ton'): logging.info('TTS is now turned on') msg_queue.clear() msg_queue_raw.clear() self.tts_status = True self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TON']) def check_subonly(self, tags): """ subonly """ if not conf['IRC_SUBONLY']: return False subscriber = tags['subscriber'] badges = tags['badges'] user = tags['user'] if subscriber != "0" or 'moderator' in badges or 'broadcaster' in badges: logging.info('TTS is sub-only and user has allowance') return False logging.debug('TTS is sub-only') self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['SUBONLY']) return True def check_modonly(self, tags): """ modonly """ if not conf['IRC_MODONLY']: return False badges = tags['badges'] user = tags['user'] if 'moderator' in badges or 'broadcaster' in badges: logging.info('TTS is mod-only and user has allowance') return False logging.debug('TTS is mod-only') self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['MODONLY']) return True def check_tts_disabled(self, user): """ Check if TTS is disabled """ if not self.tts_status: self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['DISABLED']) return True logging.debug('TTS is enabled') return False def check_msg_too_long(self, message, user): """ Check if message is too long """ if message['length'] > conf['IRC_TTS_LEN']: self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TOO_LONG']) return True logging.debug('Check length: Message is ok') return False def check_user_denied(self, user): """ Check if user is on denied list """ if user in self.tts_denied: logging.info("%s is not allowed to use TTS", user) self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['DENIED']) return True logging.debug("%s is allowed to use TTS", user) return False def check_whitelist(self, user): """ Check Whitelist """ if conf['WHITELIST']: if user not in self.tts_allowed: logging.debug("tts_allowed: %s", self.tts_allowed) self.sendmsg( conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['WHITELISTONLY'] ) return False return True return False def send_tts_msg(self, message, tags): """ Send message to TTS queue """ logging.info('Valid TTS message, adding to raw queue') tts = True now = datetime.datetime.now() timestamp = str(time.time_ns()) user = tags['user'] msgid = tags['msgid'] badges = tags['badges'] subscriber = tags['subscriber'] msg = message['message'] msglen = message['length'] msg = msg.replace('!tts','',1) msg = { "TTS": tts, "msg": msg, "badges": badges, "subscriber": subscriber, "msgid": msgid, "user": user, "length": msglen, "queuetime": now, "timestamp": timestamp } msg_queue_raw.append(msg) def get_response(self): """Get and process response from IRC""" try: resp = self.irc.recv(2048).decode("UTF-8") logging.debug('resp:') logging.debug(resp) except socket.timeout: return except Exception: # pylint: disable=broad-except logging.exception('An unknown error occured while getting a IRC response.') sys.exit(255) if resp.find('PING') != -1: self.resp_ping() if resp.find('CLEARMSG') != -1: self.resp_clearmsg(resp) if resp.find('NOTICE') != -1: self.resp_notice(resp) if resp.find('PRIVMSG') != -1: self.resp_privmsg(resp) class Commands(): """ Bot commands """ def __init__(self): self.tts_denied = [] self.tts_allowed = [] self.quickvote_status = self.quickvote_status self.pick_status = self.pick_status self.votemsg = self.votemsg self.poll = self.poll self.pollcount = self.pollcount self.pickme = self.pickme self.picknumber = self.picknumber self.pickcount = self.pickcount def tts(self, msg, tags): """ !tts command Check if message is valid and send it to queue :param str msg: The IRC message triggering the command :param dict tags: The message metadata """ user = tags['user'] if IRC.check_tts_disabled(self, user): logging.info('TTS is disabled') elif IRC.check_msg_too_long(self, msg, user): logging.info('TTS message is too long') elif IRC.check_user_denied(self, user): logging.info('User is not allowed to use TTS') elif IRC.check_subonly(self, tags): logging.info('TTS is sub-only') elif IRC.check_modonly(self, tags): logging.info('TTS is mod-only') elif IRC.check_whitelist(self, user): logging.info('User is not on whitelist') else: logging.info('Sending TTS message to raw_queue') IRC.send_tts_msg(self, msg, tags) def addquote(self, tags, msg): """ !addquote command Adds a newline to quotes.txt """ user = tags['user'] if IRC.check_user_denied(self, user): logging.info('User is not allowed to use TTS') elif IRC.check_subonly(self, tags): logging.info('TTS is sub-only') else: try: with open("quotes.txt", "rb") as file: nol = len(file.readlines()) file.close() except FileNotFoundError: logging.warning("quotes.txt does not exists, will create") nol = 0 nol = nol + 1 quote = msg.replace("!addquote ", "").strip() quote = quote.split(" ",1) username = quote[0] date = time.strftime("%d.%m.%Y") try: token = conf['IRC_OAUTH_TOKEN'].replace('oauth:','') login = conf['IRC_CHANNEL'].replace('#','') api_endpoint = "https://api.twitch.tv/helix/users?login="+str(login) headers = { 'Content-type': 'application/x-form-urlencoded', 'Authorization': 'Bearer '+token, 'Client-Id': 'ebo548vs6tq54c9zlrgin2yfzzlrrs' } req = requests.get(url=api_endpoint, headers=headers) data = req.json() user_id = data['data'][0]['id'] api_endpoint = "https://api.twitch.tv/helix/channels?broadcaster_id="+str(user_id) headers = { 'Content-type': 'application/x-form-urlencoded', 'Authorization': 'Bearer '+token, 'Client-Id': 'ebo548vs6tq54c9zlrgin2yfzzlrrs' } req = requests.get(url=api_endpoint, headers=headers) data = req.json() game = data['data'][0]['game_name'] quote = f"#{nol}: \"{quote[1]}\" -{username}/{game} ({date})\n" except: # pylint: disable=bare-except logging.warning('Could not get metadata for quote') quote = f"#{nol}: \"{quote[1]}\" -{username} ({date})\n" logging.info('Adding quote %s', quote) with open("quotes.txt", "ab") as file: file.write(quote.encode('utf-8')) msg = f"{conf['MESSAGE']['QUOTE_ADDED_PREFIX']} #{nol} {conf['MESSAGE']['QUOTE_ADDED_SUFFIX']}" raw_msg = { "TTS": True, "msg": msg, "badges": True, "subscriber": True, "msgid": True, "user": conf['IRC_USERNAME'], "length": conf['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] IRC.sendmsg(self, conf['IRC_CHANNEL'], "@"+str(user), msg) def wiki(self, tags, msg): """ !wiki command """ try: user = tags['user'] wikipedia.set_lang(conf['WIKI_LANG']) msg = msg.replace('!wiki', '').strip() wikiresult = wikipedia.summary(msg, sentences=3) IRC.sendmsg(self, conf['IRC_CHANNEL'], "@"+str(user), wikiresult) IRC.sendmsg(self, conf['IRC_CHANNEL'], "@"+str(user), wikipedia.page(msg).url) wikiresult = wikiresult.replace('==', '') raw_msg = { "TTS": True, "msg": wikiresult, "badges": True, "subscriber": True, "msgid": True, "user": 'wikipedia', "length": conf['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] except wikipedia.exceptions.DisambiguationError: IRC.sendmsg(self, conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['WIKI_TOO_MANY']) except: # pylint: disable=bare-except IRC.sendmsg(self, conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['WIKI_NO_RESULT']) def quote(self, tags, msg = False): """ !smartquote command Gets a line from quotes.txt. If a number if given as msg it fetch the given line number. If a string is given it fetch the best matching line. If nothing is given it fetch a random line. """ try: user = tags['user'] query = msg.replace('!smartquote', '').strip() query = msg.replace('!sq', '').strip() if query.isdigit(): logging.info('Fetching quote #%s', query) file = open("quotes.txt", "rb") quotes = file.readlines() for line in quotes: if line.decode('utf-8').startswith("#"+str(query)+":"): quote = line break file.close() elif query != "": logging.info('Fetching match for %s', query) file = open("quotes.txt", "rb") quotes = file.readlines() matches = process.extract(query, quotes, limit=10) quotes = [] for match, score in matches: if score >= 60: quotes.append(match) logging.debug('Quotes: %s', quotes) if len(quotes) >= 5: quote = random.choice(quotes) else: quote = quotes[0] else: logging.info('Fetching random quote') with open("quotes.txt", "rb") as file: lines = file.read().splitlines() quote = random.choice(lines) except FileNotFoundError: logging.error('"quotes.txt does not exists.') except IndexError: logging.error('Error fetching quote.') if not 'quote' in vars(): logging.info('No quote found.') quote = conf['MESSAGE']['QUOTE_NOT_FOUND'] IRC.sendmsg(self, conf['IRC_CHANNEL'], "", quote) return False if not isinstance(quote, str): quote = quote.decode('utf-8') if IRC.check_tts_disabled(self, user): logging.info('TTS is disabled') elif IRC.check_user_denied(self, user): logging.info('User is not allowed to use TTS') elif IRC.check_subonly(self, tags): logging.info('TTS is sub-only') elif IRC.check_modonly(self, tags): logging.info('TTS is mod-only') elif IRC.check_whitelist(self, user): logging.info('User is not on whitelist') else: logging.info('Sending quote to TTS') logging.debug("Quote: %s", quote) IRC.sendmsg(self, conf['IRC_CHANNEL'], "", quote) quote = quote.rsplit('(', 1)[0] raw_msg = { "TTS": True, "msg": quote, "badges": True, "subscriber": True, "msgid": True, "user": conf['IRC_USERNAME'], "length": conf['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] return True def delay(self, msg): """ !delay command Adjust the delay setting in config.yml :param str msg: The IRC message triggering the command """ try: delay = msg.split(' ')[1] except: # pylint: disable=bare-except delay = False if delay: with open('config.yml','r', encoding='utf-8') as yamlfile: cur_yaml = yaml.safe_load(yamlfile) cur_yaml['irc']['clearmsg_timeout'] = int(delay) if cur_yaml: with open('config.yml','w', encoding='utf-8') as yamlfile: yaml.safe_dump(cur_yaml, yamlfile) load_config() def usermap(self, msg): """ !usermap command Adds new entries to usermapping in config.yml :param str msg: The IRC message triggering the command """ try: msg = msg.replace('!usermap ', '') splitmsg = msg.split(" ") username, *mappingname = splitmsg mappingname = ' '.join(mappingname) except: # pylint: disable=bare-except username = False mappingname = False if username and mappingname: with open('config.yml','r', encoding='utf-8') as yamlfile: cur_yaml = yaml.safe_load(yamlfile) cur_yaml['usermapping'].update({username: mappingname}) if cur_yaml: with open('config.yml','w', encoding='utf-8') as yamlfile: yaml.safe_dump(cur_yaml, yamlfile) load_config() def pick(self, msg): """ !pick command """ if self.pick_status: logging.info('Pick stopped') logging.debug("Got %s participats, wanted %s", self.pickcount, self.picknumber) try: if int(self.pickcount) > int(self.picknumber): picks = random.sample(self.pickme, self.picknumber) logging.info('Got more than the requested number of participants') else: picks = self.pickme logging.info('Got less than or exactly the requested number of participants') converted_picks = [str(element) for element in picks] joined_picks = " ".join(converted_picks) except: # pylint: disable=bare-except logging.error("There was an error during picking.") joined_picks = False if joined_picks: raw_msg = { "TTS": True, "msg": conf['MESSAGE']['PICKRESULT'] +" "+ str(joined_picks), "badges": True, "subscriber": True, "msgid": True, "user": conf['IRC_USERNAME'], "length": conf['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] IRC.sendmsg(self, conf['IRC_CHANNEL'], "", conf['MESSAGE']['PICKRESULT'] ) IRC.sendmsg(self, conf['IRC_CHANNEL'], "*", joined_picks ) else: IRC.sendmsg(self, conf['IRC_CHANNEL'], "*", conf['MESSAGE']['PICKNONE'] ) self.pick_status = False self.pickme = [] self.pickcount = 0 return logging.debug('Pick started') self.pick_status = True try: msg = msg.split(' ')[1].strip() self.picknumber = msg except IndexError: self.picknumber = self.picknumber logging.info("Will pick %s participants", self.picknumber) IRC.sendmsg(self, conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['PICKSTART'] ) return def quickvote(self, msg): """ !quickvote command Starts or stops the !quickvote function. On stop calculates the 5 most casted votes and send them to chat. The highest vote is send to msg_queue. :param str msg: The IRC message triggering the command """ if self.quickvote_status: logging.debug('Quickvote stopped') if self.pollcount == 0: logging.info("Nobody voted") IRC.sendmsg(self, conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTEEND']) IRC.sendmsg(self, conf['IRC_CHANNEL'], "*", conf['MESSAGE']['VOTENOBODY']) raw_msg = { "TTS": True, "msg": conf['MESSAGE']['VOTENOBODY'], "badges": True, "subscriber": True, "msgid": True, "user": conf['IRC_USERNAME'], "length": conf['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] logging.info('The result is: %s', conf['MESSAGE']['VOTENOBODY']) logging.debug('Votemsg: %s', msg) self.quickvote_status = False self.poll = {} return logging.info("Counting votes") count = 0 count = Counter(self.poll.values()).most_common(5) IRC.sendmsg(self, conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTEEND']) logging.debug(count) raw_msg = { "TTS": True, "msg": conf['MESSAGE']['VOTERESULT'] +" "+ str(count[0][0].replace('#','')), "badges": True, "subscriber": True, "msgid": True, "user": conf['IRC_USERNAME'], "length": conf['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] logging.info('The result is: %s', conf['MESSAGE']['VOTERESULT'] +" "+ str(count[0])) logging.debug('Votemsg: %s', msg) for key, value in count: IRC.sendmsg( self, conf['IRC_CHANNEL'], "*", str(key)+" ("+str(value)+ " "+ conf['MESSAGE']['VOTES'] + ")" ) self.quickvote_status = False self.poll = {} self.pollcount = 0 return logging.debug('Quickvote started') self.quickvote_status = True self.votemsg = msg.split('!quickvote', 1)[1].strip() if self.votemsg: IRC.sendmsg(self, conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART'] + " (" + str(self.votemsg) + ")" ) else: IRC.sendmsg(self, conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART']) return def random(self, msg): """ !random command Read a random line from randomfile and put it into msg_queue If no file is given in msg a standard file will be used :param str msg: The IRC message triggering the command :raise: FileNotFoundError if randomfile does not exists :return: True if line was successfully read and added to msg_queue :rtype: bool """ randomfile = msg.replace('!random', '').strip().lower() if randomfile: randomfile = "random_"+str(os.path.basename(randomfile))+".txt" else: randomfile = "random.txt" try: with open(randomfile,"r", encoding="utf-8") as file: lines = file.read().splitlines() random_msg = random.choice(lines) except FileNotFoundError: logging.error('%s not found', randomfile) return False raw_msg = { "TTS": True, "msg": random_msg, "badges": True, "subscriber": True, "msgid": True, "user": conf['IRC_USERNAME'], "length": conf['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] return True def ptts(self, msg): """ !ptts command Add user to tts_allowed list and remove user from tts_denied list :param str msg: The IRC message triggering the command """ user = msg.replace('!ptts', '').strip().lower() if user.startswith('@'): logging.debug('Removing "@" from username') user = user.replace('@', '') logging.info("Adding %s to whitelist", user) self.tts_allowed.append(user) if user in self.tts_denied: logging.info("Removing %s from deny list", user) self.tts_denied.remove(user) return def dtts(self, msg): """ !dtts command Add user to tts_denied list and remove user from tts_allowed list :param str msg: The IRC message triggering the command """ user = msg.replace('!dtts', '').strip().lower() if user.startswith('@'): logging.debug('Removing "@" from username') user = user.replace('@', '') if user not in self.tts_denied: logging.info("Adding %s to deny list", user) self.tts_denied.append(user) if user in self.tts_allowed: logging.info("Removing %s from allowed list", user) self.tts_allowed.remove(user) return class ThreadingSimpleServer(ThreadingMixIn, HTTPServer): """ Threaded HTTP Server """ class HTTPserv(BaseHTTPRequestHandler): """Simple HTTP Server""" def log_message(self, format, *args): # pylint: disable=redefined-builtin """Suppress HTTP log messages""" return def do_GET(self): # pylint: disable=invalid-name """Process GET requests""" if self.path == '/': self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() with open("tts.html", "rb") as file: html = file.read() self.wfile.write(html) elif self.path == '/favicon.ico': self.send_response(200) self.send_header('Content-type', 'image/x-icon') self.end_headers() with open("favicon.ico", "rb") as file: icon = file.read() self.wfile.write(icon) elif self.path == '/tts.js': self.send_response(200) self.send_header('Content-type', 'text/javascript') self.end_headers() with open("tts.js", "rb") as file: html = file.read() self.wfile.write(html) elif self.path == '/jquery.js': self.send_response(200) self.send_header('Content-type', 'text/javascript') self.end_headers() with open("jquery.js", "rb") as file: html = file.read() self.wfile.write(html) elif self.path == '/bootstrap.min.css': self.send_response(200) self.send_header('Content-type', 'text/css') self.end_headers() with open("bootstrap.min.css", "rb") as file: html = file.read() self.wfile.write(html) elif self.path.startswith('/tts_queue'): tts_json = "" tts = {} self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() usermap = conf['USERMAP'] sorted_tts = {k: msg_queue[k] for k in sorted(msg_queue, reverse=True)} logging.debug(usermap) for key in list(sorted_tts.keys()): if key not in tts_done: if msg_queue[key][0].lower() in usermap: logging.debug('Using usermap for user: %s (%s)', msg_queue[key][0], usermap[msg_queue[key][0].lower()]) tts = {str(key): str(usermap[msg_queue[key][0].lower()]) + " " + str(conf['MESSAGE']['SAYS']) + ":" + str(msg_queue[key][1])} else: logging.debug('No usermap entry found for user: %s', msg_queue[key][0]) tts = {str(key): str(msg_queue[key][0]) + " " + str(conf['MESSAGE']['SAYS']) + ":" + str(msg_queue[key][1])} tts_json = json.dumps(tts) self.wfile.write(bytes(str(tts_json)+"\n", "utf-8")) elif self.path.startswith('/tts_done'): get_params = parse_qs(self.path) if '/tts_done?id' in get_params: logging.info("Removing message from queue") tts_done.append(get_params['/tts_done?id'][0]) self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("OK\n", "utf-8")) else: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("Internal Server error\n", "utf-8")) elif self.path.startswith('/token'): data = {} data['client_id'] = "ebo548vs6tq54c9zlrgin2yfzzlrrs" data['response_type'] = "token" data['scope'] = "chat:edit chat:read" if conf['HTTP_PORT'] == 80: data['redirect_uri'] = "http://localhost/token" elif conf['HTTP_PORT'] == 8080: data['redirect_uri'] = "http://localhost:8080/token" elif conf['HTTP_PORT'] == 3000: data['redirect_uri'] = "http://localhost:3000/token" else: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("You can only use this function if HTTP_PORT is 80, 8080 or 3000. Please change your port, or use https://www.21x9.org/twitch instead.\n", "utf-8")) return False try: url_values = urllib.parse.urlencode(data) url = "https://id.twitch.tv/oauth2/authorize" full_url = url + "?" + url_values data = urllib.request.urlopen(full_url) if data: self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(bytes("OAuth Token Generator
Click to start the OAuth process.
\n", "utf-8")) else: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("Could not get OAuth-URL from Twitch\n", "utf-8")) except Exception: # pylint: disable=broad-except logging.error('Could not fetch OAuth-URL from Twitch.') else: self.send_response(404) self.send_header('Server', 'TTS') self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("File not found.\n", "utf-8")) return def http_serve_forever(httpd): """httpd loop""" httpd.serve_forever() def load_config(): """Loading config variables""" logging.info("Loading configfile") try: with open("config.yml", "r", encoding="UTF-8") as ymlfile: cfg = yaml.load(ymlfile, Loader=yaml.Loader) except FileNotFoundError: logging.fatal('Your config file is missing, please copy config-dist.yml to config.yml and review your settings.') sys.exit(253) for section in cfg: try: logging.debug('Fetching config: %s', section) conf['IRC_CHANNEL'] = cfg.get('irc', {}).get('channel', False) conf['IRC_USERNAME'] = cfg.get('irc', {}).get('username', False) conf['IRC_OAUTH_TOKEN'] = cfg.get('irc', {}).get('oauth_token', False) conf['IRC_SERVER'] = cfg.get('irc', {}).get('server', "irc.chat.twitch.tv") conf['IRC_CLEARMSG_TIMEOUT'] = cfg.get('irc', {}).get('clearmsg_timeout', 60) conf['IRC_SUBONLY'] = cfg.get('bot', {}).get('subonly', False) conf['IRC_MODONLY'] = cfg.get('bot', {}).get('modonly', False) conf['IRC_TTS_LEN'] = cfg.get('bot', {}).get('message_length', 200) conf['TTS_STARTENABLED'] = cfg.get('bot', {}).get('start_enabled', True) conf['WIKI_LANG'] = cfg.get('bot', {}).get('language', 'en') conf['LOG_LEVEL'] = cfg.get('log', {}).get('level', "INFO") conf['HTTP_PORT'] = cfg.get('http', {}).get('port', 80) conf['HTTP_BIND'] = cfg.get('http', {}).get('bind', "localhost") conf['MESSAGE'] = {} conf['MESSAGE']['TOFF'] = cfg.get('messages', {}).get('toff', "TTS is now disabled.") conf['MESSAGE']['TON'] = cfg.get('messages', {}).get('ton', "TTS is now active.") conf['MESSAGE']['TOO_LONG'] = cfg.get('messages', {}).get('too_long', "Sorry, your message is too long.") conf['MESSAGE']['DISABLED'] = cfg.get('messages', {}).get('disabled', "Sorry, TTS is disabled.") conf['MESSAGE']['DENIED'] = cfg.get('messages', {}).get('denied', "Sorry, you're not allowed to use TTS.") conf['MESSAGE']['SUBONLY'] = cfg.get('messages', {}).get('subonly', "Sorry, TTS is sub-only.") conf['MESSAGE']['MODONLY'] = cfg.get('messages', {}).get('modonly', "Sorry, TTS is mod-only.") conf['MESSAGE']['READY'] = cfg.get('messages', {}).get('ready', "TTS bot is ready.") conf['MESSAGE']['WHITELISTONLY'] = cfg.get('messages', {}).get('whitelist', False) conf['MESSAGE']['SAYS'] = cfg.get('messages', {}).get('says', "says") conf['MESSAGE']['VOTESTART'] = cfg.get('messages', {}).get('votestart', "Quickvote started. Send #yourchoice to participate.") conf['MESSAGE']['VOTEEND'] = cfg.get('messages', {}).get('voteend', "Quickvote ended. The results are:") conf['MESSAGE']['VOTENOBODY'] = cfg.get('messages', {}).get('votenobody', "Nobody casted a vote. :(") conf['MESSAGE']['VOTERESULT'] = cfg.get('messages', {}).get('voteresult', "Voting has ended. The result is:") conf['MESSAGE']['VOTES'] = cfg.get('messages', {}).get('votes', "Votes") conf['MESSAGE']['PICKSTART'] = cfg.get('messages', {}).get('pickstart', "Pick started. Send #pickme to participate.") conf['MESSAGE']['PICKRESULT'] = cfg.get('messages', {}).get('pickresult', "Pick ended. The results are:") conf['MESSAGE']['PICKNONE'] = cfg.get('messages', {}).get('picknone', "Pick ended. Nobody was picked.") conf['MESSAGE']['QUOTE_NOT_FOUND'] = cfg.get('messages', {}).get('quotenotfound', "Sorry, no quote found.") conf['MESSAGE']['QUOTE_ADDED_PREFIX'] = cfg.get('messages', {}).get('quoteaddedprefix', "Quote:") conf['MESSAGE']['QUOTE_ADDED_SUFFIX'] = cfg.get('messages', {}).get('quoteaddedsuffix', "added.") conf['MESSAGE']['WIKI_TOO_MANY'] = cfg.get('messages', {}).get('wiki_too_many', "Sorry, there are too many possible results. Try a more narrow search.") conf['MESSAGE']['WIKI_NO_RESULT'] = cfg.get('messages', {}).get('wiki_no_result', "Sorry, there was an error fetching the wikipedia answer.") conf['USERMAP'] = cfg.get('usermapping', []) if 'whitelist' in cfg: conf['WHITELIST'] = True conf['WHITELIST_USER'] = cfg['whitelist'] else: conf['WHITELIST'] = False except KeyError: logging.exception('Your config file is invalid, please check and try again.') sys.exit(254) if conf['WHITELIST']: logging.info('Whitelist mode enabled') logging.debug('Whitelist: %s', conf['WHITELIST_USER']) if not conf['IRC_CHANNEL']: raise ValueError('Please add your twitch channel to config.yml.') if not conf['IRC_USERNAME']: raise ValueError('Please add the bots username to config.yml.') if not conf['IRC_OAUTH_TOKEN']: conf['IRC_OAUTH_TOKEN'] = "Invalid" return conf if not conf['IRC_OAUTH_TOKEN'].startswith('oauth:'): raise ValueError('Your oauth-token is invalid, it has to start with: "oauth:"') return conf def send_tts_queue(): """ Send messages to TTS """ for raw_msg in msg_queue_raw: logging.debug('Raw msg: %s', msg_queue_raw) now = datetime.datetime.now() if now - raw_msg['queuetime'] > datetime.timedelta(seconds=conf['IRC_CLEARMSG_TIMEOUT']): logging.debug('clearmsg_timeout reached') if raw_msg['timestamp'] not in msg_queue: logging.info('Sending TTS message') msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] logging.debug("msg_queue: %s", msg_queue) else: logging.debug('Msg is already in queue') def get_url(path=False): """ Generate a valid URL from config values """ if conf['HTTP_BIND'] == "0.0.0.0": url = "localhost" else: url = conf['HTTP_BIND'] url = "http://"+str(url)+":"+str(conf['HTTP_PORT'])+"/" if path: url = url+str(path) return url def check_oauth_token(): """ Check for valid authentication via Twitch API """ global conf # pylint: disable=global-statement,invalid-name logging.debug('Checking OAuth Token') try: url = 'https://id.twitch.tv/oauth2/validate' oauth = "OAuth "+str(conf['IRC_OAUTH_TOKEN'].replace('oauth:','')) request = urllib.request.Request(url) request.add_header('Authorization', oauth) urllib.request.urlopen(request) except HTTPError: logging.fatal('Twitch rejected your OAuth Token. Please check and generate a new one.') logging.info('Please open http://%s:%s/token to generate your OAuth-Token.', conf['HTTP_BIND'], conf['HTTP_PORT']) url = get_url("token") webbrowser.open_new_tab(url) logging.info('Please complete the OAuth process and add the token into your "config.yml" within the next 5 minutes.') time.sleep(300) conf = load_config() check_oauth_token() logging.info('OAuth Token is valid') return conf def main(): """Main loop""" global conf # pylint: disable=global-statement,invalid-name conf = load_config() lastreload = datetime.datetime.now() logging.getLogger().setLevel(conf['LOG_LEVEL']) if conf['LOG_LEVEL'] == 'DEBUG': sys.tracebacklimit = 5 logging.info("Starting Webserver") httpd = ThreadingSimpleServer((conf['HTTP_BIND'], conf['HTTP_PORT']), HTTPserv) http_thread = Thread(target=http_serve_forever, daemon=True, args=(httpd, )) http_thread.start() check_oauth_token() logging.info("Starting IRC bot") irc = IRC() irc.connect(conf['IRC_SERVER'], 6667, conf['IRC_CHANNEL'], conf['IRC_USERNAME'], conf['IRC_OAUTH_TOKEN']) irc.sendmsg(conf['IRC_CHANNEL'], 'MrDestructoid', conf['MESSAGE']['READY']) logging.info('Connected and joined') url = get_url() logging.info("Please open your browser and visit: %s", url) webbrowser.open_new_tab(url) while True: if conf['LOG_LEVEL'] == "DEBUG": time.sleep(1) try: irc.get_response() confreload = datetime.datetime.now() if confreload - lastreload > datetime.timedelta(seconds=60): conf = load_config() lastreload = datetime.datetime.now() if irc.quickvote_status and irc.votemsg: logging.info('Quickvote is active') irc.sendmsg(conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART'] + " (" + str(irc.votemsg) + ")") if not irc.tts_status: continue logging.debug('msg_queue_raw: %s', msg_queue_raw) send_tts_queue() except KeyboardInterrupt: httpd.shutdown() logging.info('Exiting...') os.kill(os.getpid(), signal.SIGTERM) if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(module)s %(threadName)s %(levelname)s: %(message)s') sys.tracebacklimit = 3 VERSION = "1.6.1" conf = {} tts_done = [] msg_queue_raw = [] msg_queue = {} if sys.argv[1:]: if sys.argv[1] == "--version": print('Simple TTS Bot') print('Version %s', VERSION) sys.exit(1) main()